Lumiera  0.pre.03
»edit your freedom«
work-force-test.cpp
Go to the documentation of this file.
1 /*
2  WorkForce(Test) - verify worker thread service
3 
4  Copyright (C)
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7   **Lumiera** is free software; you can redistribute it and/or modify it
8   under the terms of the GNU General Public License as published by the
9   Free Software Foundation; either version 2 of the License, or (at your
10   option) any later version. See the file COPYING for further details.
11 
12 * *****************************************************************/
13 
19 #include "lib/test/run.hpp"
21 #include "lib/thread.hpp"
22 #include "lib/sync.hpp"
23 
24 #include <functional>
25 #include <thread>
26 #include <chrono>
27 #include <set>
28 
29 using test::Test;
30 
31 
32 namespace vault{
33 namespace gear {
34 namespace test {
35 
36  using std::this_thread::sleep_for;
37  using namespace std::chrono_literals;
38  using std::chrono::milliseconds;
39  using lib::Thread;
40 
41 
42  namespace {
43  using WorkFun = std::function<work::SIG_WorkFun>;
44  using FinalFun = std::function<work::SIG_FinalHook>;
45 
51  template<class FUN>
52  auto
53  setup (FUN&& workFun)
54  {
55  struct Setup
56  : work::Config
57  {
58  WorkFun doWork;
59  FinalFun finalHook = [](bool){ /*NOP*/ };
60 
61  milliseconds IDLE_WAIT = work::Config::IDLE_WAIT;
63 
64  Setup (FUN&& workFun)
65  : doWork{std::forward<FUN> (workFun)}
66  { }
67 
68  Setup&&
69  withFinalHook (FinalFun finalFun)
70  {
71  finalHook = move (finalFun);
72  return move(*this);
73  }
74 
75  Setup&&
76  withSleepPeriod (std::chrono::milliseconds millis)
77  {
78  IDLE_WAIT = millis;
79  return move(*this);
80  }
81 
82  Setup&&
83  dismissAfter (size_t cycles)
84  {
85  DISMISS_CYCLES = cycles;
86  return move(*this);
87  }
88 
89  };
90 
91  return Setup{std::forward<FUN> (workFun)};
92  }
93  }
94 
95 
96 
97 
98  /*************************************************************************/
103  class WorkForce_test : public Test
104  {
105 
106  virtual void
107  run (Arg)
108  {
109  simpleUsage();
110 
111  verify_pullWork();
112  verify_workerHalt();
113  verify_workerSleep();
114  verify_workerRetard();
115  verify_workerDismiss();
116  verify_finalHook();
117  verify_detectError();
118  verify_defaultPool();
119  verify_scalePool();
120  verify_countActive();
121  verify_dtor_blocks();
122  }
123 
124 
127  void
129  {
130  atomic<uint> check{0};
131  WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
132  // ^^^ this is the doWork-λ
133  CHECK (0 == check);
134 
135  wof.activate();
136  sleep_for(20ms);
137 
138  CHECK (0 < check); // λ invoked in the worker threads
139  }
140 
141 
142 
145  void
147  {
148  atomic<uint> check{0};
149  WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
150 
151  CHECK (0 == check);
152 
153  wof.incScale();
154  sleep_for(20ms);
155 
156  uint invocations = check;
157  CHECK (0 < invocations);
158 
159  sleep_for(2ms);
160  CHECK (invocations < check);
161 
162  invocations = check;
163  sleep_for(2ms);
164  CHECK (invocations < check);
165 
166  wof.awaitShutdown();
167 
168  invocations = check;
169  sleep_for(2ms);
170  CHECK (invocations == check);
171  }
172 
173 
174 
177  void
179  {
180  atomic<uint> check{0};
181  atomic<activity::Proc> control{activity::PASS};
182  WorkForce wof{setup ([&]{ ++check; return activity::Proc(control); })};
183 
184  wof.incScale();
185  sleep_for(1ms);
186 
187  uint invocations = check;
188  CHECK (0 < invocations);
189 
190  control = activity::HALT;
191  sleep_for(1ms);
192 
193  invocations = check;
194  sleep_for(10ms);
195  CHECK (invocations == check);
196  }
197 
198 
199 
202  void
204  {
205  atomic<uint> check{0};
206  WorkForce wof{setup ([&]{ ++check; return activity::WAIT; })
207  .withSleepPeriod (10ms)};
208 
209  wof.incScale();
210  sleep_for(1ms);
211 
212  CHECK (1 == check);
213 
214  sleep_for(10us);
215  CHECK (1 == check);
216 
217  sleep_for(12ms); // after waiting one sleep-period...
218  CHECK (2 == check); // ...functor invoked again
219  }
220 
221 
222 
225  void
227  {
228  atomic<uint> check{0};
229  { // ▽▽▽▽ regular work-cycles without delay
230  WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
231  wof.incScale();
232  sleep_for(5ms);
233  }
234  uint cyclesPASS{check};
235  check = 0;
236  { // ▽▽▽▽ signals »contention«
237  WorkForce wof{setup ([&]{ ++check; return activity::KICK; })};
238  wof.incScale();
239  sleep_for(5ms);
240  }
241  uint cyclesKICK{check};
242  CHECK (cyclesKICK < cyclesPASS);
243  CHECK (cyclesKICK < 50);
244  }
245 
246 
247 
251  void
253  {
254  atomic<uint> check{0};
255  WorkForce wof{setup ([&]{ ++check; return activity::WAIT; })
256  .withSleepPeriod (10ms)
257  .dismissAfter(5)};
258 
259  wof.incScale();
260  sleep_for(1ms);
261 
262  CHECK (1 == check);
263 
264  sleep_for(12ms);
265  CHECK (2 == check); // after one wait cycle, one further invocation
266 
267  sleep_for(100ms);
268  CHECK (5 == check); // only 5 invocations total...
269  CHECK (0 == wof.size()); // ...after that, the worker terminated
270  }
271 
272 
273 
276  void
278  {
279  atomic<uint> exited{0};
280  atomic<activity::Proc> control{activity::PASS};
281  WorkForce wof{setup([&]{ return activity::Proc(control); })
282  .withFinalHook([&](bool){ ++exited; })};
283 
284  CHECK (0 == exited);
285 
286  wof.activate();
287  sleep_for(10ms);
288  CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY);
289  CHECK (0 == exited);
290 
291  control = activity::HALT;
292  sleep_for(10ms);
293  CHECK (0 == wof.size());
294  CHECK (exited == work::Config::COMPUTATION_CAPACITY);
295  }
296 
297 
298 
303  void
305  {
306  atomic<uint> check{0};
307  atomic<uint> errors{0};
308  WorkForce wof{setup ([&]{
309  if (++check == 555)
310  throw error::State("evil");
311  return activity::PASS;
312  })
313  .withFinalHook([&](bool isFailure)
314  {
315  if (isFailure)
316  ++errors;
317  })};
318  CHECK (0 == check);
319  CHECK (0 == errors);
320 
321  wof.incScale();
322  wof.incScale();
323  wof.incScale();
324 
325  sleep_for(10us);
326  CHECK (3 == wof.size());
327  CHECK (0 < check);
328  CHECK (0 == errors);
329 
330  sleep_for(200ms); // wait for the programmed disaster
331  CHECK (2 == wof.size());
332  CHECK (1 == errors);
333  }
334 
335 
336 
341  void
343  {
344  atomic<uint> check{0};
345  WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
346 
347  // after construction, the WorkForce is inactive
348  CHECK (0 == wof.size());
349  CHECK (0 == check);
350 
351  wof.activate();
352  sleep_for(20ms);
353 
354  CHECK (0 < check);
355  CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY);
356  CHECK (work::Config::COMPUTATION_CAPACITY == std::thread::hardware_concurrency());
357  }
358 
359 
360 
364  void
366  {
368  class UniqueCnt
369  : public std::set<std::thread::id>
370  , public lib::Sync<>
371  {
372  public:
373  void
374  mark (std::thread::id const& tID)
375  {
376  Lock guard{this};
377  this->insert(tID);
378  }
379 
380  operator size_t() const
381  {
382  Lock guard{this};
383  return this->size();
384  }
385  }
386  uniqueCnt;
387 
388  WorkForce wof{setup ([&]{
389  uniqueCnt.mark(std::this_thread::get_id());
390  return activity::PASS;
391  })};
392 
393  CHECK (0 == uniqueCnt);
394  CHECK (0 == wof.size());
395 
396  wof.incScale();
397  sleep_for(1ms);
398  CHECK (1 == uniqueCnt);
399  CHECK (1 == wof.size());
400 
401  wof.incScale();
402  sleep_for(1ms);
403  CHECK (2 == uniqueCnt);
404  CHECK (2 == wof.size());
405 
406 
407  auto fullCnt = work::Config::COMPUTATION_CAPACITY;
408 
409  wof.activate (1.0);
410  sleep_for(5ms);
411  CHECK (fullCnt == uniqueCnt);
412  CHECK (fullCnt == wof.size());
413 
414  wof.activate (2.0);
415  sleep_for(10ms);
416  CHECK (2*fullCnt == uniqueCnt);
417  CHECK (2*fullCnt == wof.size());
418 
419  wof.awaitShutdown();
420  CHECK (0 == wof.size());
421 
422  uniqueCnt.clear();
423  sleep_for(5ms);
424  CHECK (0 == uniqueCnt);
425 
426  wof.activate (0.5);
427  sleep_for(5ms);
428  CHECK (fullCnt/2 == uniqueCnt);
429  CHECK (fullCnt/2 == wof.size());
430  }
431 
432 
433 
436  void
438  {
439  atomic<uint> check{0};
440  WorkForce wof{setup ([&]{
441  ++check;
442  if (check == 5'000 or check == 5'110)
443  return activity::HALT;
444  else
445  return activity::PASS;
446  })};
447 
448  CHECK (0 == wof.size());
449 
450  wof.incScale();
451  wof.incScale();
452  wof.incScale();
453  sleep_for(10us); // this may be fragile; must be sufficiently short
454 
455  CHECK (3 == wof.size());
456 
457  while (check < 6'000)
458  sleep_for(15ms); // .....sufficiently long to count way beyond 10'000
459  CHECK (check > 6'000);
460  CHECK (1 == wof.size());
461  }
462 
463 
464 
472  void
473  verify_dtor_blocks()
474  {
475  atomic<bool> trapped{true};
476  auto blockingWork = [&]{
477  while (trapped)
478  /* spin */;
479  return activity::PASS;
480  };
481 
482  atomic<bool> pool_scaled_up{false};
483  atomic<bool> shutdown_done{false};
484 
485  Thread operate{"controller"
486  ,[&] {
487  {// nested scope...
488  WorkForce wof{setup (blockingWork)};
489 
490  wof.activate();
491  sleep_for (10ms);
492  CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY);
493  pool_scaled_up = true;
494  } // WorkForce goes out of scope => dtor called
495 
496  // when reaching this point, dtor has terminated
497  shutdown_done = true;
498  }};
499 
500  CHECK (operate); // operate-thread is in running state
501  sleep_for(100ms);
502 
503  CHECK (pool_scaled_up);
504  CHECK (not shutdown_done); // all workers are trapped in the work-functor
505  // thus the destructor can't dismantle the pool
506  trapped = false;
507  sleep_for(20ms);
508  CHECK (shutdown_done);
509  CHECK (not operate); // operate-thread has detached and terminated
510  }
511  };
512 
513 
515  LAUNCHER (WorkForce_test, "unit engine");
516 
517 
518 }}} // namespace vault::gear::test
Facility for monitor object based locking.
Definition: sync.hpp:209
Definition: Setup.py:1
Definition: run.hpp:40
const size_t DISMISS_CYCLES
number of wait cycles before an idle worker terminates completely
Definition: scheduler.hpp:129
Object Monitor based synchronisation.
Derived specific exceptions within Lumiera&#39;s exception hierarchy.
Definition: error.hpp:190
Abstract Base Class for all testcases.
Definition: run.hpp:53
const auto IDLE_WAIT
sleep-recheck cycle for workers deemed idle
Definition: scheduler.hpp:128
Simplistic test class runner.
Convenience front-end to simplify and codify basic thread handling.
const milliseconds IDLE_WAIT
wait period when a worker falls idle
Definition: work-force.hpp:108
const size_t DISMISS_CYCLES
number of idle cycles after which the worker terminates
Definition: work-force.hpp:109
auto setup(FUN &&workFun)
Helper: setup a Worker-Pool configuration for the test.
A thin convenience wrapper to simplify thread-handling.
Definition: thread.hpp:648
A pool of workers for multithreaded rendering.
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Definition: work-force.hpp:106
Proc
Result instruction from Activity activation.
Definition: activity.hpp:140
Pool of worker threads for rendering.
Definition: work-force.hpp:240
Base for configuration of the worker pool.
Definition: work-force.hpp:104
Vault-Layer implementation namespace root.