Lumiera 0.pre.04
»edit your freedom«
Loading...
Searching...
No Matches
scheduler-service-test.cpp
Go to the documentation of this file.
1/*
2 SchedulerService(Test) - component integration test for the scheduler
3
4 Copyright (C)
5 2023, Hermann Vosseler <Ichthyostega@web.de>
6
7  **Lumiera** is free software; you can redistribute it and/or modify it
8  under the terms of the GNU General Public License as published by the
9  Free Software Foundation; either version 2 of the License, or (at your
10  option) any later version. See the file COPYING for further details.
11
12* *****************************************************************/
13
19#include "lib/test/run.hpp"
20#include "test-chain-load.hpp"
21#include "activity-detector.hpp"
24#include "lib/format-cout.hpp"
25#include "lib/format-string.hpp"
28#include "lib/util.hpp"
29
30#include <thread>
31
32using test::Test;
33
34
35namespace vault{
36namespace gear {
37namespace test {
38
39 using util::max;
40 using util::_Fmt;
41 using lib::time::Time;
42 using std::this_thread::sleep_for;
43
44 namespace {
45 Time t100us = Time{FSecs{1, 10'000}};
48 Time t1ms = Time{1,0};
49
51 }
52
53
54
55
56 /*************************************************************************/
65 {
66
67 virtual void
78
79
82 void
84 {
85 BlockFlowAlloc bFlow;
87 Scheduler scheduler{bFlow, watch};
88 CHECK (scheduler.empty());
89
90 auto task = onetimeCrunch(4ms);
91 CHECK (1 == task.remainingInvocations());
92
93 Job job{ task
96 };
97 scheduler.defineSchedule(job)
98 .startOffset(6ms)
99 .lifeWindow(2ms)
100 .post();
101 CHECK (not scheduler.empty());
102
103 sleep_for (3ms); // not invoked yet
104 CHECK (1 == task.remainingInvocations());
105
106 sleep_for (20ms);
107 CHECK (0 == task.remainingInvocations());
108 } // task has been invoked
109
110
111
112
120 static void
121 postNewTask (Scheduler& scheduler, Activity& chain, Time start)
122 {
123 ActivationEvent actEvent{chain, start, start + Time{50,0}}; // add dummy deadline +50ms
124 scheduler.layer2_.postChain (actEvent, scheduler.layer1_);
125 }
126
127
128
131 void
133 {
134 BlockFlowAlloc bFlow;
136 Scheduler scheduler{bFlow, watch};
137 CHECK (scheduler.empty());
138
140 auto postIt = [&] { postNewTask (scheduler, dummy, RealClock::now()+t200us); };
141
142 scheduler.ignite();
143 CHECK (not scheduler.empty());// repeated »tick« task enlisted....
144
145 postIt();
146 CHECK (not scheduler.empty());
147
148 scheduler.terminateProcessing();
149 CHECK (scheduler.empty());
150
151 postIt();
152 postIt();
153 scheduler.ignite();
154 CHECK (not scheduler.empty());
155 //... and just walk away => scheduler unwinds cleanly from destructor
156 }// Note: BlockFlow and WorkForce unwinding is covered in dedicated tests
157
158
159
175 void
177 {
179 BlockFlowAlloc bFlow;
181 Scheduler scheduler{bFlow, watch};
182 CHECK (scheduler.empty());
183
184 // use a single FEED as content
186
187 auto anchor = RealClock::now();
188 auto offset = [&](Time when =RealClock::now()){ return _raw(when) - _raw(anchor); };
189
190 auto createLoad = [&](Offset start, uint cnt)
191 { // use internal API (this test is declared as friend)
192 for (uint i=0; i<cnt; ++i) // flood the queue
193 postNewTask (scheduler, dummy, anchor + start + TimeValue{i});
194 };
195
196
197 auto LOAD_PEAK_DURATION_us = 2000;
198 auto fatPackage = LOAD_PEAK_DURATION_us/TYPICAL_TIME_FOR_ONE_SCHEDULE_us;
199
200 createLoad (Offset{Time{ 5,0}}, fatPackage);
201 createLoad (Offset{Time{15,0}}, fatPackage);
202
203 scheduler.ignite();
204 cout << "Timing : start-up required.."<<offset()<<" µs"<<endl;
205
206 // now watch change of load and look out for two peaks....
207 uint peak1_s =0;
208 uint peak1_dur=0;
209 double peak1_max=0;
210 uint peak2_s =0;
211 uint peak2_dur=0;
212 double peak2_max=0;
213
214 uint phase=0;
215 _Fmt row{"%6d | Load: %5.3f Head:%5d Lag:%6d\n"};
216
217 while (not scheduler.empty()) // should fall empty at end
218 {
219 sleep_for(50us);
220 double load = scheduler.getLoadIndicator();
221
222 switch (phase) {
223 case 0:
224 if (load > 1.0)
225 {
226 ++phase;
227 peak1_s = offset();
228 }
229 break;
230 case 1:
231 peak1_max = max (load, peak1_max);
232 if (load < 1.0)
233 {
234 ++phase;
235 peak1_dur = offset() - peak1_s;
236 }
237 break;
238 case 2:
239 if (load > 1.0)
240 {
241 ++phase;
242 peak2_s = offset();
243 }
244 break;
245 case 3:
246 peak2_max = max (load, peak2_max);
247 if (load < 1.0)
248 {
249 ++phase;
250 peak2_dur = offset() - peak2_s;
251 }
252 break;
253 }
254 cout << row % offset() % load
255 % offset(scheduler.layer1_.headTime())
256 % scheduler.loadControl_.averageLag();
257 }
258 uint done = offset();
259
260 //--------Summary-Table------------------------------
261 _Fmt peak{"\nPeak %d ....... %5d +%dµs %34tmax=%3.1f"};
262 cout << "-------+-------------+----------+----------"
263 << "\n\n"
264 << peak % 1 % peak1_s % peak1_dur % peak1_max
265 << peak % 2 % peak2_s % peak2_dur % peak2_max
266 << "\nTick ....... "<<done
267 <<endl;
268
269 CHECK (phase == 4);
270 CHECK (peak1_s > 5000); // first peak was scheduled at 5ms
271 CHECK (peak1_s < 10000);
272 CHECK (peak2_s > 15000); // second peak was scheduled at 15ms
273 CHECK (peak2_s < 20000);
274 CHECK (peak1_max > 2.0);
275 CHECK (peak2_max > 2.0);
276
277 CHECK (done > 50000); // »Tick« period is 50ms
278 // and this tick should determine end of timeline
279
280 cout << "\nwaiting for shutdown of WorkForce";
281 while (scheduler.workForce_.size() > 0)
282 {
283 sleep_for(10ms);
284 cout << "." << std::flush;
285 }
286 uint shutdown = offset();
287 cout << "\nShutdown after "<<shutdown / 1.0e6<<"sec"<<endl;
288 CHECK (shutdown > 2.0e6);
289 }
290
291
292
320 void
322 {
324 BlockFlowAlloc bFlow;
326 Scheduler scheduler{bFlow, watch};
327
328 ActivityDetector detector;
329 Activity& probe = detector.buildActivationProbe ("testProbe");
330
331 TimeVar start;
332 int64_t delay_us;
333 int64_t slip_us;
334 activity::Proc res;
335
336 auto post = [&](Time start)
337 { // this test class is declared friend to get a backdoor into Scheduler internals...
338 scheduler.layer2_.acquireGoomingToken();
339 postNewTask (scheduler, probe, start);
340 };
341
342 auto pullWork = [&] {
343 delay_us = lib::test::benchmarkTime([&]{ res = scheduler.doWork(); });
344 slip_us = _raw(detector.invokeTime(probe)) - _raw(start);
345 cout << "res:"<<res<<" delay="<<delay_us<<"µs slip="<<slip_us<<"µs"<<endl;
346 };
347
348
349 auto wasClose = [](TimeValue a, TimeValue b)
350 { // 500µs are considered "close"
351 return Duration{Offset{a,b}} < Duration{FSecs{1,2000}};
352 };
353 auto wasInvoked = [&](Time start)
354 {
355 Time invoked = detector.invokeTime (probe);
356 return invoked >= start
357 and wasClose (invoked, start);
358 };
359
360
361 cout << "pullWork() on empty queue..."<<endl;
362 pullWork(); // Call the work-Function on empty Scheduler queue
363 CHECK (activity::WAIT == res); // the result instructs this thread to go to sleep immediately
364
365
366 cout << "Due at pullWork()..."<<endl;
367 TimeVar now = RealClock::now();
368 start = now + t100us; // Set a schedule 100ms ahead of "now"
369 post (start);
370 CHECK (not scheduler.empty()); // was enqueued
371 CHECK (not wasInvoked(start)); // ...but not activated yet
372
373 sleep_for (100us); // wait beyond the planned start point (typically waits ~150µs or more)
374 pullWork();
375 CHECK (wasInvoked(start));
376 CHECK (slip_us < 300); // Note: typically there is a slip of 100..200µs, because sleep waits longer
377 CHECK (scheduler.empty()); // The scheduler is empty now and this thread will go to sleep,
378 CHECK (delay_us < 20200); // however the sleep-cycle is first re-shuffled by a wait between 0 ... 20ms
379 CHECK (activity::PASS == res); // this thread is instructed to check back once
380 pullWork();
381 CHECK (activity::WAIT == res); // ...yet since the queue is still empty, it is sent immediately to sleep
382 CHECK (delay_us < 40);
383
384
385 cout << "next some time ahead => up-front delay"<<endl;
386 now = RealClock::now();
387 start = now + t500us; // Set a schedule significantly into the future...
388 post (start);
389 CHECK (not scheduler.empty());
390
391 pullWork(); // ...and invoke the work-Function immediately "now"
392 CHECK (activity::PASS == res); // Result: this thread was kept in sleep in the work-Function
393 CHECK (not wasInvoked(start)); // but the next dispatch did not happen yet; we are instructed to re-invoke immediately
394 CHECK (delay_us > 500); // this proves that there was a delay to wait for the next schedule
395 CHECK (delay_us < 1000);
396 pullWork(); // if we now re-invoke the work-Function as instructed...
397 CHECK (wasInvoked(start)); // then the next schedule is already slightly overdue and immediately invoked
398 CHECK (scheduler.empty()); // the queue is empty and thus this thread will be sent to sleep
399 CHECK (delay_us < 20200); // but beforehand the sleep-cycle is re-shuffled by a wait between 0 ... 20ms
400 CHECK (slip_us < 300);
401 CHECK (activity::PASS == res); // instruction to check back once
402 pullWork();
403 CHECK (activity::WAIT == res); // but next call will send this thread to sleep right away
404 CHECK (delay_us < 40);
405
406
407 cout << "follow-up with some distance => follow-up delay"<<endl;
408 now = RealClock::now();
409 start = now + t100us;
410 post (start); // This time the schedule is set to be "soon"
411 post (start+t1ms); // But another schedule is placed 1ms behind
412 sleep_for (100us); // wait for "soon" to pass...
413 pullWork();
414 CHECK (wasInvoked(start)); // Result: the first invocation happened immediately
415 CHECK (slip_us < 300);
416 CHECK (delay_us > 900); // yet this thread was afterwards kept in sleep to await the next task;
417 CHECK (activity::PASS == res); // returns instruction to re-invoke immediately
418 CHECK (not scheduler.empty()); // since there is still work in the queue
419
420 start += t1ms; // (just re-adjust the reference point to calculate slip_us)
421 pullWork(); // re-invoke immediately as instructed
422 CHECK (wasInvoked(start)); // Result: also the next Activity has been dispatched
423 CHECK (slip_us < 400); // not much slip
424 CHECK (delay_us < 20200); // ...and the post-delay is used to re-shuffle the sleep cycle as usual
425 CHECK (activity::PASS == res); // since queue is empty, we will call back once...
426 CHECK (scheduler.empty());
427 pullWork();
428 CHECK (activity::WAIT == res); // and then go to sleep.
429
430
431 cout << "already tended-next => re-target capacity"<<endl;
432 now = RealClock::now();
433 start = now + t500us; // Set the next schedule with some distance...
434 post (start);
435
436 // Access scheduler internals (as friend)
437 CHECK (start == scheduler.layer1_.headTime()); // next schedule indeed appears as next-head
438 CHECK (not scheduler.loadControl_.tendedNext(start)); // but this next time was not yet marked as "tended"
439
440 scheduler.loadControl_.tendNext(start); // manipulate scheduler to mark next-head as "tended"
441 CHECK ( scheduler.loadControl_.tendedNext(start));
442
443 CHECK (start == scheduler.layer1_.headTime()); // other state still the same
444 CHECK (not scheduler.empty());
445
446 pullWork();
447 CHECK (not wasInvoked(start)); // since next-head was marked as "tended"...
448 CHECK (not scheduler.empty()); // ...this thread is not used to dispatch it
449 CHECK (delay_us < 6000); // rather it is re-focussed as free capacity within WORK_HORIZON
450 }
451
452
453
454
466 void
468 {
469 BlockFlowAlloc bFlow;
471 Scheduler scheduler{bFlow, watch};
472
473 // prevent scale-up of the Scheuler's WorkForce
475
476 Time nominal{7,7};
477 Time start{0,1};
478 Time dead{0,10};
479
480 ActivityDetector detector;
481 Job testJob{detector.buildMockJob("testJob", nominal, 1337)};
482
483 CHECK (scheduler.empty());
484
485 // use the public Render-Job builder API
486 scheduler.defineSchedule(testJob)
487 .startOffset(400us)
488 .lifeWindow (2ms)
489 .post();
490
491 CHECK (not scheduler.empty());
492
493 // cause the new entry to migrate to the priority queue...
494 scheduler.layer2_.maybeFeed(scheduler.layer1_);
495
496 // investigate the generated ActivationEvent at queue head
497 auto entry = scheduler.layer1_.peekHead();
498 auto now = RealClock::now();
499
500 CHECK (entry.activity->is(Activity::POST));
501 CHECK (entry.activity->next->is(Activity::GATE));
502 CHECK (entry.activity->next->next->is(Activity::WORKSTART));
503 CHECK (entry.activity->next->next->next->is(Activity::INVOKE));
504 CHECK (entry.startTime() - now < _uTicks( 400us));
505 CHECK (entry.deathTime() - now < _uTicks(2400us));
506 CHECK (entry.manifestation == 0);
507 CHECK (entry.isCompulsory == false);
508
509
510 sleep_for(400us); // wait to be sure the new entry has reached maturity
511 detector.incrementSeq(); // mark this point in the detector-log...
512
513 // Explicitly invoke the work-Function (normally done by the workers)
514 CHECK (activity::PASS == scheduler.doWork());
515
516 CHECK (detector.verifySeqIncrement(1)
517 .beforeInvocation("testJob").arg("7.007", 1337));
518
519// cout << detector.showLog()<<endl; // HINT: use this for investigation...
520 }
521
522
523
524
525
537 void
539 {
541 TestChainLoad<16> testLoad{64};
543 .buildTopology();
544
545 auto stats = testLoad.computeGraphStatistics();
546 cout << _Fmt{"Test-Load: Nodes: %d Levels: %d ∅Node/Level: %3.1f Forks: %d Joins: %d"}
547 % stats.nodes
548 % stats.levels
549 % stats.indicators[STAT_NODE].pL
550 % stats.indicators[STAT_FORK].cnt
551 % stats.indicators[STAT_JOIN].cnt
552 << endl;
553
554 // while building the calculation-plan graph
555 // node hashes were computed, observing dependencies
556 size_t expectedHash = testLoad.getHash();
557
558 // some jobs/nodes are marked with a weight-step
559 // these can be instructed to spend some CPU time
560 auto LOAD_BASE = 500us;
561 testLoad.performGraphSynchronously(LOAD_BASE);
562 CHECK (testLoad.getHash() == expectedHash);
563
564 double referenceTime = testLoad.calcRuntimeReference(LOAD_BASE);
565 cout << "refTime(singleThr): "<<referenceTime/1000<<"ms"<<endl;
566
567
568 // Perform through Scheduler----------
569 BlockFlowAlloc bFlow;
571 Scheduler scheduler{bFlow, watch};
572
573 double performanceTime =
574 testLoad.setupSchedule(scheduler)
575 .withLoadTimeBase(LOAD_BASE)
576 .withJobDeadline(30ms)
577 .launch_and_wait();
578
579 cout << "runTime(Scheduler): "<<performanceTime/1000<<"ms"<<endl;
580
581 // invocation through Scheduler has reproduced all node hashes
582 CHECK (testLoad.getHash() == expectedHash);
583
584 // due to the massive load burst at end, Scheduler falls behind plan
585 CHECK (performanceTime < 2*referenceTime);
586 // typical values: refTime ≡ ~35ms, runTime ≡ ~45ms
587 }
588 };
589
590
593
594
595
596}}} // namespace vault::gear::test
Diagnostic setup to instrument and observe Activity activations.
Duration is the internal Lumiera time metric.
Offset measures a distance in time.
basic constant internal time value.
a mutable time value, behaving like a plain number, allowing copy and re-accessing
Lumiera's internal time value datatype.
static const Time ANYTIME
border condition marker value. ANYTIME <= any time value
Abstract Base Class for all testcases.
Definition run.hpp:54
void seedRand()
draw a new random seed from a common nucleus, and re-seed the default-Gen.
Definition suite.cpp:211
A front-end for using printf-style formatting.
static Time now()
Record to describe an Activity, to happen within the Scheduler's control flow.
Definition activity.hpp:227
@ POST
post a message providing a chain of further time-bound Activities
Definition activity.hpp:237
@ FEED
supply additional payload data for a preceding Activity
Definition activity.hpp:238
@ GATE
probe window + count-down; activate next Activity, else re-schedule
Definition activity.hpp:236
@ INVOKE
dispatch a JobFunctor into a worker thread
Definition activity.hpp:232
@ WORKSTART
signal start of some processing and transition grooming mode ⟼ *work mode
Definition activity.hpp:233
Collector and aggregator for performance data.
Individual frame rendering task, forwarding to a closure.
Definition job.h:276
activity::Proc postChain(ActivationEvent event, SchedulerInvocation &layer1)
This is the primary entrance point to the Scheduler.
»Scheduler-Service« : coordinate render activities.
SchedulerInvocation layer1_
SchedulerCommutator layer2_
Diagnostic context to record and evaluate activations within the Scheduler.
ActivityMatch verifySeqIncrement(uint seqNr)
Activity & buildActivationProbe(string id)
build a rigged HOOK-Activity to record each invocation
Time invokeTime(Activity const *hook)
Job buildMockJob(string id="", Time nominal=lib::test::randTime(), size_t extra=rani())
uint incrementSeq()
increment the internal invocation sequence number
ActivityMatch & arg(ARGS const &...args)
qualifier: additionally match the function arguments
ActivityMatch & beforeInvocation(string match)
static void postNewTask(Scheduler &scheduler, Activity &chain, Time start)
A Generator for synthetic Render Jobs for Scheduler load testing.
TestChainLoad && configureShape_chain_loadBursts()
preconfigured topology: single graph with massive »load bursts«
TestChainLoad && buildTopology()
Use current configuration and seed to (re)build Node connectivity.
Statistic computeGraphStatistics()
Operator on TestChainLoad to evaluate current graph connectivity.
Automatically use custom string conversion in C++ stream output.
Front-end for printf-style string template interpolation.
unsigned int uint
Definition integral.hpp:29
opaque ID attached to each individual job invocation.
Definition job.h:105
Functions to perform (multithreaded) timing measurement on a given functor.
double benchmarkTime(FUN const &invokeTestCode, const size_t repeatCnt=1)
Helper to invoke a functor or λ to observe its running time.
Test runner and basic definitions for tests.
auto max(IT &&elms)
Proc
Result instruction from Activity activation.
Definition activity.hpp:140
@ PASS
pass on the activation down the chain
Definition activity.hpp:140
@ WAIT
nothing to do; wait and re-check for work later
Definition activity.hpp:142
const StatKey STAT_FORK
forking node
SpecialJobFun onetimeCrunch(milliseconds runTime)
a »throw-away« render-job
const StatKey STAT_NODE
all nodes
const StatKey STAT_JOIN
joining node
FlowDiagnostic< CONF > watch(BlockFlow< CONF > &theFlow)
Vault-Layer implementation namespace root.
Simplistic test class runner.
#define LAUNCHER(_TEST_CLASS_, _GROUPS_)
Definition run.hpp:116
Service for coordination and dispatch of render activities.
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Generate synthetic computation load for Scheduler performance tests.
#define MARK_TEST_FUN
Macro to mark the current test function in STDOUT.
a family of time value like entities and their relationships.
Test helper to perform temporary manipulations within a test scope.
#define TRANSIENTLY(_OO_)
Macro to simplify capturing assignments.
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...