Lumiera 0.pre.04
»edit your freedom«
Loading...
Searching...
No Matches
scheduler-commutator-test.cpp
Go to the documentation of this file.
1/*
2 SchedulerCommutator(Test) - verify dependent activity processing in the scheduler
3
4 Copyright (C)
5 2023, Hermann Vosseler <Ichthyostega@web.de>
6
7  **Lumiera** is free software; you can redistribute it and/or modify it
8  under the terms of the GNU General Public License as published by the
9  Free Software Foundation; either version 2 of the License, or (at your
10  option) any later version. See the file COPYING for further details.
11
12* *****************************************************************/
13
19#include "lib/test/run.hpp"
20#include "activity-detector.hpp"
24#include "lib/format-cout.hpp"
25#include "lib/thread.hpp"
26#include "lib/util.hpp"
27
28#include <chrono>
29
30using test::Test;
32
33
34namespace vault{
35namespace gear {
36namespace test {
37
38 using lib::time::Time;
39 using lib::time::FSecs;
40 using std::atomic_bool;
44
45 using std::unique_ptr;
46 using std::make_unique;
47 using std::this_thread::yield;
48 using std::this_thread::sleep_for;
49 using std::chrono_literals::operator ""us;
50
51
52 namespace { // Load test parameters
53 const size_t NUM_THREADS = 20;
54 const size_t REPETITIONS = 100;
55 }
56
57
58
59
60
61 /******************************************************************/
82 {
83
84 virtual void
98
99
102 void
107 Activity activity;
108 Time when{3,4};
109 Time dead{5,6};
110
111 // use the ActivityDetector for test instrumentation...
112 ActivityDetector detector;
113 Time now = detector.executionCtx.getSchedTime();
114 CHECK (now < dead);
115
116 // prepare scenario: some activity is enqueued
117 queue.instruct ({activity, when, dead});
118
119 // retrieve one event from queue and dispatch it
120 ActivationEvent act = sched.findWork(queue,now);
122
123 CHECK (detector.verifyInvocation("CTX-tick").arg(now));
124 CHECK (queue.empty());
125
126// cout << detector.showLog()<<endl; // HINT: use this for investigation...
127 }
128
129
130
133 void
136
138
139 auto myself = std::this_thread::get_id();
140 CHECK (not sched.holdsGroomingToken (myself));
141
142 CHECK (sched.acquireGoomingToken());
143 CHECK ( sched.holdsGroomingToken (myself));
144
145 sched.dropGroomingToken();
146 CHECK (not sched.holdsGroomingToken (myself));
148 }
149
151 static void
153 {
154 auto myself = std::this_thread::get_id();
155 CHECK (not sched.holdsGroomingToken(myself));
156 CHECK (sched.acquireGoomingToken());
157 sched.dropGroomingToken();
158 }
159
160
161
166 void
169
171
172 // Case-1: if a thread already holds the token....
173 CHECK (sched.acquireGoomingToken());
174 CHECK (sched.holdsGroomingToken (thisThread()));
175 {
176 auto guard = sched.requireGroomingTokenHere();
177 CHECK (sched.holdsGroomingToken (thisThread()));
178 }// leave scope -> nothing happens in this case
179 CHECK (sched.holdsGroomingToken (thisThread()));
180
181 // Case-2: when not holding the token...
182 sched.dropGroomingToken();
183 {
184 // acquire automatically (this may block)
185 auto guard = sched.requireGroomingTokenHere();
186 CHECK (sched.holdsGroomingToken (thisThread()));
187 }// leave scope -> dropped automatically
188 CHECK (not sched.holdsGroomingToken (thisThread()));
189
191 }
192
193
194
200 void
203
205
206 size_t checkSum{0};
207 auto pause_and_sum = [&](size_t i) -> size_t
208 {
209 auto oldSum = checkSum;
210 sleep_for (500us);
211 checkSum = oldSum + i;
212 return 1;
213 };
214 auto protected_sum = [&](size_t i) -> size_t
215 {
216 while (not sched.acquireGoomingToken())
217 yield(); // contend until getting exclusive access
218 pause_and_sum(i);
219 sched.dropGroomingToken();
220 return 1;
221 };
222
223 threadBenchmark<NUM_THREADS> (pause_and_sum, REPETITIONS);
224
225 size_t brokenSum = checkSum;
226 checkSum = 0;
227
228 threadBenchmark<NUM_THREADS> (protected_sum, REPETITIONS);
229
230 CHECK (brokenSum < checkSum);
231 CHECK (checkSum = NUM_THREADS * REPETITIONS*(REPETITIONS-1)/2);
233 }
234
235
236
237 atomic_bool stopTheHog_{false};
238 unique_ptr<ThreadHookable> groomingHog_;
239 using Launch = ThreadHookable::Launch;
240
242 void
244 {
245 REQUIRE (not groomingHog_);
246 if (sched.holdsGroomingToken(std::this_thread::get_id()))
247 sched.dropGroomingToken();
248
249 stopTheHog_ = false;
250 groomingHog_ = make_unique<ThreadHookable>(
251 Launch{[&]{
252 CHECK (sched.acquireGoomingToken());
253 do sleep_for (100us);
254 while (not stopTheHog_);
255 sched.dropGroomingToken();
256 }}
257 .atExit([&](ThreadWrapper& handle)
258 {
260 groomingHog_.reset();
261 })
262 .threadID("grooming-hog"));
263 sleep_for (500us);
264 ENSURE (groomingHog_);
265 }
266
268 void
270 {
271 stopTheHog_ = true;
272 while (groomingHog_)
273 yield();
274 }
275
276
277
280 void
283
286
287 Time t1{10,0};
288 Time t2{20,0};
289 Time t3{30,0};
290 Time now{t2};
291
292 CHECK (not sched.findWork (queue, now)); // empty queue, no work found
293
294 Activity a1{1u,1u};
295 Activity a2{2u,2u};
296 Activity a3{3u,3u};
297
298 queue.instruct ({a3, t3}); // activity scheduled into the future
299 CHECK (not sched.findWork (queue, now)); // ... not found with time `now`
300 CHECK (t3 == queue.headTime());
301
302 queue.instruct ({a1, t1});
303 CHECK (isSameObject (a1, *sched.findWork(queue, now))); // but past activity is found
304 CHECK (not sched.findWork (queue, now)); // activity was retrieved
305
306 queue.instruct ({a2, t2});
307 CHECK (isSameObject (a2, *sched.findWork(queue, now))); // activity scheduled for `now` is found
308 CHECK (not sched.findWork (queue, now)); // nothing more found for `now`
309 CHECK (t3 == queue.headTime());
310 CHECK (not queue.empty()); // yet the future activity a3 is still queued...
311
312 CHECK (isSameObject (a3, *sched.findWork(queue, t3))); // ...and will be found when querying "later"
313 CHECK (not sched.findWork (queue, t3));
314 CHECK ( queue.empty()); // Everything retrieved and queue really empty
315
316 queue.instruct ({a2, t2});
317 queue.instruct ({a1, t1});
318 CHECK (isSameObject (a1, *sched.findWork(queue, now))); // the earlier activity is found first
319 CHECK (t2 == queue.headTime());
320 CHECK (isSameObject (a2, *sched.findWork(queue, now)));
321 CHECK (not sched.findWork (queue, now));
322 CHECK ( queue.empty());
323
324 queue.instruct ({a2, t2}); // prepare activity which /would/ be found...
325 blockGroomingToken(sched); // but prevent this thread from acquiring the GroomingToken
326 CHECK (not sched.findWork (queue, now)); // thus search aborts immediately
327 CHECK (not queue.empty());
328
329 unblockGroomingToken(); // yet when we're able to get the GroomingToken
330 CHECK (isSameObject (a2, *sched.findWork(queue, now))); // the task can be retrieved
331 CHECK (queue.empty());
332 }
333
334
335
345 void
348
351
352 Time t1{10,0}; Activity a1{1u,1u};
353 Time t2{20,0}; Activity a2{2u,2u};
354 Time t3{30,0}; Activity a3{3u,3u};
355 Time t4{40,0}; Activity a4{4u,4u};
356 // start,deadline, manif.ID, isCompulsory
357 queue.instruct ({a1, t1, t4, ManifestationID{5}});
358 queue.instruct ({a2, t2, t2});
359 queue.instruct ({a3, t3, t3, ManifestationID{23}, true});
360 queue.instruct ({a4, t4, t4});
361 queue.activate(ManifestationID{5});
362 queue.activate(ManifestationID{23});
363
364 queue.feedPrioritisation();
365 CHECK (t1 == queue.headTime());
366 CHECK (isSameObject (a1, *queue.peekHead()));
367 CHECK (not queue.isMissed(t1));
368 CHECK (not queue.isOutdated(t1));
369
370 queue.drop(ManifestationID{5});
371 CHECK (t1 == queue.headTime());
372 CHECK (not queue.isMissed(t1));
373 CHECK ( queue.isOutdated(t1));
374
375 CHECK (not sched.findWork(queue, t1));
376 CHECK (t2 == queue.headTime());
377 CHECK (isSameObject (a2, *queue.peekHead()));
378 CHECK (not queue.isMissed (t2));
379 CHECK (not queue.isOutdated(t2));
380 CHECK ( queue.isMissed (t3));
381 CHECK ( queue.isOutdated(t3));
382
383 CHECK (not sched.findWork(queue, t2+Time{5,0}));
384 CHECK (t3 == queue.headTime());
385 CHECK (isSameObject (a3, *queue.peekHead()));
386 CHECK (not queue.isMissed (t3));
387 CHECK (not queue.isOutdated (t3));
388 CHECK (not queue.isOutOfTime(t3));
389 CHECK ( queue.isMissed (t4));
390 CHECK ( queue.isOutdated (t4));
391 CHECK ( queue.isOutOfTime(t4));
392
393 CHECK (not sched.findWork(queue, t4));
394 CHECK (t3 == queue.headTime());
395 CHECK (not queue.isMissed (t3));
396 CHECK (not queue.isOutdated (t3));
397 CHECK (not queue.isOutOfTime(t3));
398 CHECK ( queue.isMissed (t4));
399 CHECK ( queue.isOutdated (t4));
400 CHECK ( queue.isOutOfTime(t4));
401
402 queue.drop(ManifestationID{23});
403 CHECK (t3 == queue.headTime());
404 CHECK (not queue.isMissed (t3));
405 CHECK ( queue.isOutdated (t3));
406 CHECK (not queue.isOutOfTime(t3));
407 CHECK ( queue.isMissed (t4));
408 CHECK ( queue.isOutdated (t4));
409 CHECK (not queue.isOutOfTime(t4));
410
411 CHECK (isSameObject (a3, *queue.peekHead()));
412 CHECK (isSameObject (a4, *sched.findWork(queue, t4)));
413 CHECK (queue.empty());
414 }
415
416
417
420 void
423
424 // rigged execution environment to detect activations--------------
425 ActivityDetector detector;
426 Activity& activity = detector.buildActivationProbe ("testActivity");
427 auto makeEvent = [&](Time start) { return ActivationEvent{activity, start, start+Time{0,1}}; };
428 // set a dummy deadline to pass the sanity check
431
432 Time now = detector.executionCtx.getSchedTime();
433 Time past {Time::ZERO};
434 Time future{now+now};
435
436 // no one holds the GroomingToken
438 auto myself = std::this_thread::get_id();
439 CHECK (sched.acquireGoomingToken());
440
441 // Activity with start time way into the past is enqueued, but then discarded
442 CHECK (activity::PASS == sched.postChain (makeEvent(past), queue));
443 CHECK (detector.ensureNoInvocation("testActivity")); // not invoked
444 CHECK (queue.peekHead()); // still in the queue...
445 CHECK (not sched.findWork (queue,now)); // but it is not retrieved due to deadline
446 CHECK (not queue.peekHead()); // and thus was dropped
447 CHECK (queue.empty());
448
449 // future Activity is enqueued by short-circuit directly into the PriorityQueue if possible
450 CHECK (activity::PASS == sched.postChain (makeEvent(future), queue));
451 CHECK ( sched.holdsGroomingToken (myself));
452 CHECK (not queue.empty());
453 CHECK (isSameObject (activity, *queue.peekHead())); // appears at Head, implying it's in Priority-Queue
454
455 queue.pullHead();
456 sched.dropGroomingToken();
457 CHECK (not sched.holdsGroomingToken (myself));
458 CHECK (queue.empty());
459
460 // ...but GroomingToken is not acquired explicitly; Activity is just placed into the Instruct-Queue
461 CHECK (activity::PASS == sched.postChain (makeEvent(future), queue));
462 CHECK (not sched.holdsGroomingToken (myself));
463 CHECK (not queue.peekHead()); // not appearing at Head this time,
464 CHECK (not queue.empty()); // rather waiting in the Instruct-Queue
465
466
467 blockGroomingToken(sched);
468 CHECK (activity::PASS == sched.postChain (makeEvent(now), queue));
469 CHECK (not sched.holdsGroomingToken (myself));
470 CHECK (not queue.peekHead()); // was enqueued, not executed
471
472 // Note: this test did not cause any direct invocation;
473 // all provided events were queued only
474 CHECK (detector.ensureNoInvocation("testActivity"));
475
476 // As sanity-check: the first event was enqueued and the picked up;
477 // two further cases where enqueued; we could retrieve them if
478 // re-acquiring the GroomingToken and using suitable query-time
480 queue.feedPrioritisation();
481 CHECK (now == queue.headTime());
482 CHECK (isSameObject (activity, *sched.findWork(queue, now)));
483 CHECK (sched.holdsGroomingToken (myself)); // findWork() acquired the token
484 CHECK (future == queue.headTime());
485 CHECK (not queue.isDue(now));
486 CHECK ( queue.isDue(future));
487 CHECK (sched.findWork(queue, future));
488 CHECK ( queue.empty());
489 }
490
491
492
500 void
503
504 // rigged execution environment to detect activations--------------
505 ActivityDetector detector;
506 Activity& activity = detector.buildActivationProbe ("testActivity");
507 // set a dummy deadline to pass the sanity check
510 LoadController lCtrl;
511
512 Time start{0,1};
513 Time dead{0,10};
514 // prepare the queue with one activity
515 CHECK (Time::NEVER == queue.headTime());
516 queue.instruct ({activity, start, dead});
517 queue.feedPrioritisation();
518 CHECK (start == queue.headTime());
519
520 // for the first testcase,
521 // set Grooming-Token to be blocked
522 blockGroomingToken(sched);
523 auto myself = std::this_thread::get_id();
524 CHECK (not sched.holdsGroomingToken (myself));
525
526 // invoking the dequeue and dispatch requires some wiring
527 // with functionality provided by other parts of the scheduler
528 auto getSchedTime = detector.executionCtx.getSchedTime;
529 auto executeActivity = [&](ActivationEvent event)
530 {
531 return ActivityLang::dispatchChain (event, detector.executionCtx);
532 };
533
534 // Invoke the pull-work functionality directly from this thread
535 // (in real usage, this function is invoked from a worker)
536 CHECK (activity::KICK == sched.dispatchCapacity (queue
537 ,lCtrl
538 ,executeActivity
539 ,getSchedTime));
540 CHECK (not queue.empty());
541 // the first invocation was kicked back,
542 // since the Grooming-token could not be acquired.
544
545 // ...now this thread can acquire, fetch from queue and dispatch....
546 CHECK (activity::PASS == sched.dispatchCapacity (queue
547 ,lCtrl
548 ,executeActivity
549 ,getSchedTime));
550
551 CHECK (queue.empty());
552 CHECK (not sched.holdsGroomingToken (myself));
553 CHECK (detector.verifyInvocation("testActivity"));
554 }
555
556
557
558
569 void
571 { // ·==================================================================== setup a rigged Job
573 Time nominal{7,7};
574 Time start{0,1};
575 Time dead{0,10};
576
577 ActivityDetector detector;
578 Job testJob{detector.buildMockJob("testJob", nominal, 12345)};
579
580 BlockFlowAlloc bFlow;
581 ActivityLang activityLang{bFlow};
582
583 // Build the Activity-Term for a simple calculation job...
584 Activity& anchor = activityLang.buildCalculationJob (testJob, start,dead)
585 .post(); // retrieve the entrance point to the chain
586
587 // insert instrumentation to trace activation
588 detector.watchGate (anchor.next, "theGate");
589
590
591 // ·=================================================================== setup test subject
594
595 // no one holds the GroomingToken
597 auto myself = std::this_thread::get_id();
598 CHECK (not sched.holdsGroomingToken (myself));
599
600 TimeVar now{Time::ZERO};
601
602 // rig the ExecutionCtx to allow manipulating "current scheduler time"
603 detector.executionCtx.getSchedTime = [&]{ return Time{now}; };
604 // rig the λ-work to verify GroomingToken and to drop it then
605 detector.executionCtx.work.implementedAs(
606 [&](Time, size_t)
607 {
608 CHECK (sched.holdsGroomingToken (myself));
609 sched.dropGroomingToken();
610 });
611
612
613 // ·=================================================================== actual test sequence
614 // Add the Activity-Term to be scheduled for planned start-Time
615 sched.postChain (ActivationEvent{anchor, start}, queue);
616 CHECK (detector.ensureNoInvocation("testJob"));
617 CHECK (not sched.holdsGroomingToken (myself));
618 CHECK (not queue.empty());
619
620 // later->"now"
621 now = Time{555,5};
622 detector.incrementSeq();
623
624 // Assuming a worker runs "later" and retrieves work...
625 ActivationEvent act = sched.findWork(queue,now);
626 CHECK (sched.holdsGroomingToken (myself)); // acquired the GroomingToken
627 CHECK (isSameObject(*act, anchor)); // "found" the rigged Activity as next piece of work
628
629 // dispatch the Activity-chain just retrieved from the queue
631
632 CHECK (queue.empty());
633 CHECK (not sched.holdsGroomingToken (myself)); // the λ-work was invoked and dropped the GroomingToken
634
635 CHECK (detector.verifySeqIncrement(1)
636 .beforeInvocation("theGate").arg("5.555 ⧐ Act(GATE")
637 .beforeInvocation("after-theGate").arg("⧐ Act(WORKSTART")
638 .beforeInvocation("CTX-work").arg("5.555","")
639 .beforeInvocation("testJob") .arg("7.007",12345)
640 .beforeInvocation("CTX-done").arg("5.555",""));
641
642// cout << detector.showLog()<<endl; // HINT: use this for investigation...
643 }
644 };
645
646
649
650
651
652}}} // namespace vault::gear::test
Diagnostic setup to instrument and observe Activity activations.
Extended variant of the standard case, allowing to install callbacks (hook functions) to be invoked d...
Definition thread.hpp:718
a mutable time value, behaving like a plain number, allowing copy and re-accessing
Lumiera's internal time value datatype.
static const Time NEVER
border condition marker value. NEVER >= any time value
static const Time ZERO
Abstract Base Class for all testcases.
Definition run.hpp:54
void seedRand()
draw a new random seed from a common nucleus, and re-seed the default-Gen.
Definition suite.cpp:211
Term builder and execution framework to perform chains of scheduler Activities.
static activity::Proc dispatchChain(Activity *chain, EXE &executionCtx)
Execution Framework: dispatch performance of a chain of Activities.
Record to describe an Activity, to happen within the Scheduler's control flow.
Definition activity.hpp:227
activity::Proc activate(Time now, EXE &executionCtx)
Core Operation: Activate and perform this Activity.
Definition activity.hpp:626
Activity * next
Activities are organised into chains to represent relations based on verbs.
Definition activity.hpp:249
Individual frame rendering task, forwarding to a closure.
Definition job.h:276
Controller to coordinate resource usage related to the Scheduler.
Marker for current (and obsolete) manifestations of a CalcStream processed by the Render-Engine.
Definition activity.hpp:85
Scheduler Layer-2 : execution of Scheduler Activities.
bool holdsGroomingToken(ThreadID id) noexcept
check if the indicated thread currently holds the right to conduct internal state transitions.
activity::Proc postChain(ActivationEvent event, SchedulerInvocation &layer1)
This is the primary entrance point to the Scheduler.
bool acquireGoomingToken() noexcept
acquire the right to perform internal state transitions.
void dropGroomingToken() noexcept
relinquish the right for internal state transitions.
activity::Proc dispatchCapacity(SchedulerInvocation &, LoadController &, DISPATCH, CLOCK)
Implementation of the worker-Functor:
ActivationEvent findWork(SchedulerInvocation &layer1, Time now)
Look into the queues and possibly retrieve work due by now.
Scheduler Layer-1 : time based dispatch.
void instruct(ActivationEvent actEvent)
Accept an ActivationEvent with an Activity for time-bound execution.
bool isDue(Time now) const
Determine if there is work to do right now.
void feedPrioritisation()
Pick up all new events from the entrance queue and enqueue them to be retrieved ordered by start time...
ActivationEvent pullHead()
Retrieve from the scheduling queue the entry with earliest start time.
Diagnostic context to record and evaluate activations within the Scheduler.
ActivityMatch verifySeqIncrement(uint seqNr)
ActivityMatch ensureNoInvocation(string fun)
Job buildMockJob(string id="", Time nominal=lib::test::randTime(), size_t extra=rani())
Activity & watchGate(Activity *&wiring, string id="")
ActivityMatch verifyInvocation(string fun)
uint incrementSeq()
increment the internal invocation sequence number
ActivityMatch & arg(ARGS const &...args)
qualifier: additionally match the function arguments
ActivityMatch & beforeInvocation(string match)
static void ___ensureGroomingTokenReleased(SchedulerCommutator &sched)
Automatically use custom string conversion in C++ stream output.
Functions to perform (multithreaded) timing measurement on a given functor.
auto threadBenchmark(FUN const &subject, const size_t repeatCnt=DEFAULT_RUNS)
perform a multithreaded microbenchmark.
boost::rational< int64_t > FSecs
rational representation of fractional seconds
Test runner and basic definitions for tests.
bool isSameObject(A const &a, B const &b)
compare plain object identity, based directly on the referee's memory identities.
Definition util.hpp:421
@ PASS
pass on the activation down the chain
Definition activity.hpp:140
@ KICK
back pressure; get out of the way but be back soon
Definition activity.hpp:143
Vault-Layer implementation namespace root.
Simplistic test class runner.
#define LAUNCHER(_TEST_CLASS_, _GROUPS_)
Definition run.hpp:116
Layer-2 of the Scheduler: coordination and interaction of activities.
void detach_thread_from_wrapper()
allow to detach explicitly — independent from thread-function's state.
Definition thread.hpp:231
#define MARK_TEST_FUN
Macro to mark the current test function in STDOUT.
Convenience front-end to simplify and codify basic thread handling.
a family of time value like entities and their relationships.
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...