Lumiera  0.pre.03
»edit your freedom«
call-queue-test.cpp
Go to the documentation of this file.
1 /*
2  CallQueue(Test) - verify queue based dispatch of bound function objects
3 
4  Copyright (C)
5  2017, Hermann Vosseler <Ichthyostega@web.de>
6 
7   **Lumiera** is free software; you can redistribute it and/or modify it
8   under the terms of the GNU General Public License as published by the
9   Free Software Foundation; either version 2 of the License, or (at your
10   option) any later version. See the file COPYING for further details.
11 
12 * *****************************************************************/
13 
19 #include "lib/test/run.hpp"
21 #include "lib/sync-barrier.hpp"
22 #include "lib/thread.hpp"
23 #include "lib/sync.hpp"
24 #include "lib/util.hpp"
25 
26 #include "lib/call-queue.hpp"
27 
28 #include <string>
29 
30 
31 
32 namespace lib {
33 namespace test{
34 
35  using lib::Sync;
36  using lib::SyncBarrier;
37  using lib::ThreadJoinable;
38 
39  using util::isnil;
40  using std::string;
41 
42 
43 
44  namespace { // test fixture
45 
46  // --------random-stress-test------
47  uint const NUM_OF_THREADS = 50;
48  uint const MAX_RAND_INCMT = 200;
49  uint const MAX_RAND_STEPS = 500;
50  uint const MAX_RAND_DELAY = 1000;
51  // --------random-stress-test------
52 
53 
54  uint calc_sum = 0;
55  uint ctor_sum = 0;
56  uint dtor_sum = 0;
57 
58  template<uint i>
59  struct Dummy
60  {
61  uint val_;
62 
63  Dummy()
64  : val_(i)
65  {
66  ctor_sum += (val_+1);
67  }
68 
69  ~Dummy()
70  {
71  dtor_sum += val_;
72  }
73 
74  int
75  operator++()
76  {
77  return ++val_;
78  }
79  };
80 
81  template<uint i>
82  void
83  increment (Dummy<i>&& dummy) //NOTE: dummy is consumed here
84  {
85  calc_sum += ++dummy;
86  }
87 
88  }//(End) test fixture
89 
90 
91 
92 
93  /**********************************************************************************/
102  class CallQueue_test : public Test
103  {
104 
105  virtual void
106  run (Arg)
107  {
108  verify_SimpleUse();
109  verify_Consistency();
110  verify_ThreadSafety();
111  }
112 
113 
114  void
115  verify_SimpleUse ()
116  {
117  CallQueue queue;
118  CHECK (isnil (queue));
119 
120  int val = 2;
121  queue.feed ([&]() { val = -1; });
122  CHECK (1 == queue.size());
123  CHECK (val == 2);
124 
125  queue.invoke();
126  CHECK (val == -1);
127  CHECK (0 == queue.size());
128 
129  queue.invoke();
130  CHECK (0 == queue.size());
131  }
132 
133 
144  void
146  {
147  calc_sum = 0;
148  ctor_sum = 0;
149  dtor_sum = 0;
150 
151  CallQueue queue;
152  queue.feed ([]() { increment(Dummy<0>{}); }); //NOTE: each lambda binds a different instantiation of the increment template
153  queue.feed ([]() { increment(Dummy<1>{}); }); // and each invocation closes over an anonymous rvalue instance
154  queue.feed ([]() { increment(Dummy<2>{}); });
155 
156  queue.invoke();
157  queue.invoke();
158  queue.feed ([]() { increment(Dummy<3>{}); });
159  queue.feed ([]() { increment(Dummy<4>{}); });
160 
161  queue.invoke();
162  queue.invoke();
163  queue.invoke();
164 
165  uint expected = (5+1)*5/2;
166  CHECK (calc_sum = expected);
167  CHECK (ctor_sum = expected);
168  CHECK (dtor_sum = expected);
169  }
170 
171 
172 
173  struct Worker
174  : ThreadJoinable<>
175  , Sync<>
176  {
177  uint64_t producerSum = 0;
178  uint64_t consumerSum = 0;
179 
180  SyncBarrier& trigger_;
181  Random rand_;
182 
183  void
184  countConsumerCall (uint increment)
185  {
186  Lock sync{this}; // NOTE: will be invoked from some random other thread
187  consumerSum += increment;
188  }
189 
190  Worker(CallQueue& queue, SyncBarrier& commonTrigger)
191  : ThreadJoinable{"CallQueue_test: concurrent dispatch"
192  , [&]() {
193  uint cnt = rand_.i(MAX_RAND_STEPS);
194  uint delay = rand_.i(MAX_RAND_DELAY);
195 
196  trigger_.sync(); // block until all threads are ready
197  for (uint i=0; i<cnt; ++i)
198  {
199  uint increment = rand_.i(MAX_RAND_INCMT);
200  queue.feed ([=]() { countConsumerCall(increment); });
201  producerSum += increment;
202  usleep (delay);
203  queue.invoke(); // NOTE: dequeue one functor added during our sleep
204  } // and thus belonging to some random other thread
205  }}
206  , trigger_{commonTrigger}
207  , rand_{defaultGen}
208  { }
209  };
210 
212 
213 
220  void
222  {
223  seedRand();
224  CallQueue queue;
225  SyncBarrier trigger{NUM_OF_THREADS + 1};
226 
227  // Start a bunch of threads with random access pattern
228  Workers workers{NUM_OF_THREADS,
229  [&](Workers::ElementHolder& storage)
230  {
231  storage.create<Worker> (queue, trigger);
232  }
233  };
234 
235  // unleash all worker functions
236  trigger.sync();
237 
238  // wait for termination of all threads and detect possible exceptions
239  bool allFine{true};
240  for (auto& worker : workers)
241  allFine &= worker.join().isValid();
242  CHECK (allFine);
243 
244  // collect the results of all worker threads
245  uint64_t globalProducerSum = 0;
246  uint64_t globalConsumerSum = 0;
247  for (auto& worker : workers)
248  {
249  globalProducerSum += worker.producerSum;
250  globalConsumerSum += worker.consumerSum;
251  }
252 
253  // VERIFY: locally recorded partial sums match total sum
254  CHECK (globalProducerSum == globalConsumerSum);
255  }
256  };
257 
258 
260  LAUNCHER (CallQueue_test, "unit common");
261 
262 
263 }} // namespace lib::test
Facility for monitor object based locking.
Definition: sync.hpp:209
Variant of the standard case, requiring to wait and join() on the termination of this thread...
Definition: thread.hpp:668
A fixed collection of non-copyable polymorphic objects.
A threadsafe queue for bound void(void) functors.
Definition: call-queue.hpp:49
Definition: run.hpp:40
int i(uint bound=_iBOUND())
drop-in replacement for rand() % bound
Definition: random.hpp:205
scoped guard to control the actual locking.
Definition: sync.hpp:226
Implementation namespace for support and library code.
Managing a collection of non-copyable polymorphic objects in compact storage.
Storage Frame to hold one Child object.
Object Monitor based synchronisation.
ThreadJoinable(string const &, FUN &&, ARGS &&...) -> ThreadJoinable< std::invoke_result_t< FUN, ARGS... >>
deduction guide: find out about result value to capture from a generic callable.
A Queue for function invocations, allowing them to be dispatched on demand.
Simplistic test class runner.
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
lib::Result< RES > join()
put the caller into a blocking wait until this thread has terminated
Definition: thread.hpp:685
Convenience front-end to simplify and codify basic thread handling.
A Dummy object for tests.
A one time N-fold mutual synchronisation barrier.
Random defaultGen
a global default RandomSequencer for mundane purposes
Definition: random.cpp:70
A N-fold synchronisation latch using yield-wait until fulfilment.