Lumiera 0.pre.04
»edit your freedom«
Loading...
Searching...
No Matches
work-force-test.cpp
Go to the documentation of this file.
1/*
2 WorkForce(Test) - verify worker thread service
3
4 Copyright (C)
5 2023, Hermann Vosseler <Ichthyostega@web.de>
6
7  **Lumiera** is free software; you can redistribute it and/or modify it
8  under the terms of the GNU General Public License as published by the
9  Free Software Foundation; either version 2 of the License, or (at your
10  option) any later version. See the file COPYING for further details.
11
12* *****************************************************************/
13
19#include "lib/test/run.hpp"
21#include "lib/thread.hpp"
22#include "lib/sync.hpp"
23
24#include <functional>
25#include <thread>
26#include <chrono>
27#include <set>
28
29using test::Test;
30
31
32namespace vault{
33namespace gear {
34namespace test {
35
36 using std::this_thread::sleep_for;
37 using namespace std::chrono_literals;
38 using std::chrono::milliseconds;
39 using lib::Thread;
40
41
42 namespace {
43 using WorkFun = std::function<work::SIG_WorkFun>;
44 using FinalFun = std::function<work::SIG_FinalHook>;
45
51 template<class FUN>
52 auto
53 setup (FUN&& workFun)
54 {
55 struct Setup
57 {
58 WorkFun doWork;
59 FinalFun finalHook = [](bool){ /*NOP*/ };
60
61 milliseconds IDLE_WAIT = work::Config::IDLE_WAIT;
62 size_t DISMISS_CYCLES = work::Config::DISMISS_CYCLES;
63
64 Setup (FUN&& workFun)
65 : doWork{std::forward<FUN> (workFun)}
66 { }
67
68 Setup&&
69 withFinalHook (FinalFun finalFun)
70 {
71 finalHook = move (finalFun);
72 return move(*this);
73 }
74
75 Setup&&
76 withSleepPeriod (std::chrono::milliseconds millis)
77 {
78 IDLE_WAIT = millis;
79 return move(*this);
80 }
81
82 Setup&&
83 dismissAfter (size_t cycles)
84 {
85 DISMISS_CYCLES = cycles;
86 return move(*this);
87 }
88
89 };
90
91 return Setup{std::forward<FUN> (workFun)};
92 }
93 }
94
95
96
97
98 /*************************************************************************/
103 class WorkForce_test : public Test
104 {
105
106 virtual void
123
124
127 void
129 {
130 atomic<uint> check{0};
131 WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
132 // ^^^ this is the doWork-λ
133 CHECK (0 == check);
134
135 wof.activate();
136 sleep_for(20ms);
137
138 CHECK (0 < check); // λ invoked in the worker threads
139 }
140
141
142
145 void
147 {
148 atomic<uint> check{0};
149 WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
150
151 CHECK (0 == check);
152
153 wof.incScale();
154 sleep_for(20ms);
155
156 uint invocations = check;
157 CHECK (0 < invocations);
158
159 sleep_for(2ms);
160 CHECK (invocations < check);
161
162 invocations = check;
163 sleep_for(2ms);
164 CHECK (invocations < check);
165
166 wof.awaitShutdown();
167
168 invocations = check;
169 sleep_for(2ms);
170 CHECK (invocations == check);
171 }
172
173
174
177 void
179 {
180 atomic<uint> check{0};
181 atomic<activity::Proc> control{activity::PASS};
182 WorkForce wof{setup ([&]{ ++check; return activity::Proc(control); })};
183
184 wof.incScale();
185 sleep_for(1ms);
186
187 uint invocations = check;
188 CHECK (0 < invocations);
189
190 control = activity::HALT;
191 sleep_for(1ms);
192
193 invocations = check;
194 sleep_for(10ms);
195 CHECK (invocations == check);
196 }
197
198
199
202 void
204 {
205 atomic<uint> check{0};
206 WorkForce wof{setup ([&]{ ++check; return activity::WAIT; })
207 .withSleepPeriod (10ms)};
208
209 wof.incScale();
210 sleep_for(1ms);
211
212 CHECK (1 == check);
213
214 sleep_for(10us);
215 CHECK (1 == check);
216
217 sleep_for(12ms); // after waiting one sleep-period...
218 CHECK (2 == check); // ...functor invoked again
219 }
220
221
222
225 void
227 {
228 atomic<uint> check{0};
229 { // ▽▽▽▽ regular work-cycles without delay
230 WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
231 wof.incScale();
232 sleep_for(5ms);
233 }
234 uint cyclesPASS{check};
235 check = 0;
236 { // ▽▽▽▽ signals »contention«
237 WorkForce wof{setup ([&]{ ++check; return activity::KICK; })};
238 wof.incScale();
239 sleep_for(5ms);
240 }
241 uint cyclesKICK{check};
242 CHECK (cyclesKICK < cyclesPASS);
243 CHECK (cyclesKICK < 50);
244 }
245
246
247
251 void
253 {
254 atomic<uint> check{0};
255 WorkForce wof{setup ([&]{ ++check; return activity::WAIT; })
256 .withSleepPeriod (10ms)
257 .dismissAfter(5)};
258
259 wof.incScale();
260 sleep_for(1ms);
261
262 CHECK (1 == check);
263
264 sleep_for(12ms);
265 CHECK (2 == check); // after one wait cycle, one further invocation
266
267 sleep_for(100ms);
268 CHECK (5 == check); // only 5 invocations total...
269 CHECK (0 == wof.size()); // ...after that, the worker terminated
270 }
271
272
273
276 void
278 {
279 atomic<uint> exited{0};
280 atomic<activity::Proc> control{activity::PASS};
281 WorkForce wof{setup([&]{ return activity::Proc(control); })
282 .withFinalHook([&](bool){ ++exited; })};
283
284 CHECK (0 == exited);
285
286 wof.activate();
287 sleep_for(10ms);
288 CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY);
289 CHECK (0 == exited);
290
291 control = activity::HALT;
292 sleep_for(10ms);
293 CHECK (0 == wof.size());
294 CHECK (exited == work::Config::COMPUTATION_CAPACITY);
295 }
296
297
298
303 void
305 {
306 atomic<uint> check{0};
307 atomic<uint> errors{0};
308 WorkForce wof{setup ([&]{
309 if (++check == 555)
310 throw error::State("evil");
311 return activity::PASS;
312 })
313 .withFinalHook([&](bool isFailure)
314 {
315 if (isFailure)
316 ++errors;
317 })};
318 CHECK (0 == check);
319 CHECK (0 == errors);
320
321 wof.incScale();
322 wof.incScale();
323 wof.incScale();
324
325 sleep_for(10us);
326 CHECK (3 == wof.size());
327 CHECK (0 < check);
328 CHECK (0 == errors);
329
330 sleep_for(200ms); // wait for the programmed disaster
331 CHECK (2 == wof.size());
332 CHECK (1 == errors);
333 }
334
335
336
341 void
343 {
344 atomic<uint> check{0};
345 WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
346
347 // after construction, the WorkForce is inactive
348 CHECK (0 == wof.size());
349 CHECK (0 == check);
350
351 wof.activate();
352 sleep_for(20ms);
353
354 CHECK (0 < check);
355 CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY);
356 CHECK (work::Config::COMPUTATION_CAPACITY == std::thread::hardware_concurrency());
357 }
358
359
360
364 void
366 {
368 class UniqueCnt
369 : public std::set<std::thread::id>
370 , public lib::Sync<>
371 {
372 public:
373 void
374 mark (std::thread::id const& tID)
375 {
376 Lock guard{this};
377 this->insert(tID);
378 }
379
380 operator size_t() const
381 {
382 Lock guard{this};
383 return this->size();
384 }
385 }
386 uniqueCnt;
387
388 WorkForce wof{setup ([&]{
389 uniqueCnt.mark(std::this_thread::get_id());
390 return activity::PASS;
391 })};
392
393 CHECK (0 == uniqueCnt);
394 CHECK (0 == wof.size());
395
396 wof.incScale();
397 sleep_for(1ms);
398 CHECK (1 == uniqueCnt);
399 CHECK (1 == wof.size());
400
401 wof.incScale();
402 sleep_for(1ms);
403 CHECK (2 == uniqueCnt);
404 CHECK (2 == wof.size());
405
406
408
409 wof.activate (1.0);
410 sleep_for(5ms);
411 CHECK (fullCnt == uniqueCnt);
412 CHECK (fullCnt == wof.size());
413
414 wof.activate (2.0);
415 sleep_for(10ms);
416 CHECK (2*fullCnt == uniqueCnt);
417 CHECK (2*fullCnt == wof.size());
418
419 wof.awaitShutdown();
420 CHECK (0 == wof.size());
421
422 uniqueCnt.clear();
423 sleep_for(5ms);
424 CHECK (0 == uniqueCnt);
425
426 wof.activate (0.5);
427 sleep_for(5ms);
428 CHECK (fullCnt/2 == uniqueCnt);
429 CHECK (fullCnt/2 == wof.size());
430 }
431
432
433
436 void
438 {
439 atomic<uint> check{0};
440 WorkForce wof{setup ([&]{
441 ++check;
442 if (check == 5'000 or check == 5'110)
443 return activity::HALT;
444 else
445 return activity::PASS;
446 })};
447
448 CHECK (0 == wof.size());
449
450 wof.incScale();
451 wof.incScale();
452 wof.incScale();
453 sleep_for(15us); // this may be fragile; must be sufficiently short
454
455 CHECK (3 == wof.size());
456
457 while (check < 6'000)
458 sleep_for(15ms); // .....sufficiently long to count way beyond 10'000
459 CHECK (check > 6'000);
460 CHECK (1 == wof.size());
461 }
462
463
464
472 void
474 {
475 atomic<bool> trapped{true};
476 auto blockingWork = [&]{
477 while (trapped)
478 /* spin */;
479 return activity::PASS;
480 };
481
482 atomic<bool> pool_scaled_up{false};
483 atomic<bool> shutdown_done{false};
484
485 Thread operate{"controller"
486 ,[&] {
487 {// nested scope...
488 WorkForce wof{setup (blockingWork)};
489
490 wof.activate();
491 sleep_for (10ms);
492 CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY);
493 pool_scaled_up = true;
494 } // WorkForce goes out of scope => dtor called
495
496 // when reaching this point, dtor has terminated
497 shutdown_done = true;
498 }};
499
500 CHECK (operate); // operate-thread is in running state
501 sleep_for(100ms);
502
503 CHECK (pool_scaled_up);
504 CHECK (not shutdown_done); // all workers are trapped in the work-functor
505 // thus the destructor can't dismantle the pool
506 trapped = false;
507 sleep_for(40ms);
508 CHECK (shutdown_done);
509 CHECK (not operate); // operate-thread has detached and terminated
510 }
511 };
512
513
515 LAUNCHER (WorkForce_test, "unit engine");
516
517
518}}} // namespace vault::gear::test
Facility for monitor object based locking.
Definition sync.hpp:210
A thin convenience wrapper to simplify thread-handling.
Definition thread.hpp:650
Abstract Base Class for all testcases.
Definition run.hpp:54
Pool of worker threads for rendering.
unsigned int uint
Definition integral.hpp:29
Definition Setup.py:1
LumieraError< LERR_(STATE)> State
Definition error.hpp:209
Test runner and basic definitions for tests.
Proc
Result instruction from Activity activation.
Definition activity.hpp:140
@ PASS
pass on the activation down the chain
Definition activity.hpp:140
@ KICK
back pressure; get out of the way but be back soon
Definition activity.hpp:143
@ WAIT
nothing to do; wait and re-check for work later
Definition activity.hpp:142
@ HALT
abandon this play / render process
Definition activity.hpp:144
auto setup(FUN &&workFun)
Helper: setup a Worker-Pool configuration for the test.
Vault-Layer implementation namespace root.
Simplistic test class runner.
#define LAUNCHER(_TEST_CLASS_, _GROUPS_)
Definition run.hpp:116
Base for configuration of the worker pool.
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Object Monitor based synchronisation.
Convenience front-end to simplify and codify basic thread handling.
A pool of workers for multithreaded rendering.