Lumiera 0.pre.04~rc.1
»edit your freedom«
Loading...
Searching...
No Matches
scheduler.hpp
Go to the documentation of this file.
1/*
2 SCHEDULER.hpp - coordination of render activities under timing and dependency constraints
3
4 Copyright (C)
5 2023, Hermann Vosseler <Ichthyostega@web.de>
6
7  **Lumiera** is free software; you can redistribute it and/or modify it
8  under the terms of the GNU General Public License as published by the
9  Free Software Foundation; either version 2 of the License, or (at your
10  option) any later version. See the file COPYING for further details.
11
12*/
13
14
95#ifndef SRC_VAULT_GEAR_SCHEDULER_H_
96#define SRC_VAULT_GEAR_SCHEDULER_H_
97
98
99#include "lib/error.hpp"
107#include "vault/real-clock.hpp"
108#include "lib/nocopy.hpp"
109
110#include <optional>
111#include <utility>
112
113
114namespace vault{
115namespace gear {
116
117 using std::move;
118 using lib::time::Time;
119 using lib::time::FSecs;
120 using lib::time::Offset;
121
122 namespace test { // declared friend for test access
123 class SchedulerService_test;
124 }
125
126 namespace { // Scheduler default config
127
128 const auto IDLE_WAIT = 20ms;
129 const size_t DISMISS_CYCLES = 100;
133 }
134
135
136 class Scheduler;
137
140 {
145 bool isCompulsory_{false};
146
148 std::optional<activity::Term> term_;
149
150 public:
152 : job_{job}
153 , theScheduler_{&sched}
154 , term_{std::nullopt}
155 { }
156
158 startOffset (microseconds afterNow)
159 {
160 start_ = RealClock::now() + _uTicks(afterNow);
161 return move(*this);
162 }
163
165 startTime (Time fixedTime)
166 {
167 start_ = fixedTime;
168 return move(*this);
169 }
170
172 lifeWindow (microseconds afterStart)
173 {
174 death_ = start_ + _uTicks(afterStart);
175 return move(*this);
176 }
177
180 {
181 manID_ = manID;
182 return move(*this);
183 }
184
186 compulsory (bool indeed =true)
187 {
188 isCompulsory_ = indeed;
189 return move(*this);
190 }
191
192
195
196 ScheduleSpec linkToSuccessor (ScheduleSpec&, bool unlimitedTime =false);
197 ScheduleSpec linkToPredecessor(ScheduleSpec&, bool unlimitedTime =false);
198 private:
199 void maybeBuildTerm();
200 };
201
202
203
204 /******************************************************/
215 {
223
224
228
232
233
234 public:
235 Scheduler (BlockFlowAlloc& activityAllocator
236 ,EngineObserver& engineObserver)
237 : layer1_{}
238 , layer2_{}
239 , workForce_{Setup{IDLE_WAIT, DISMISS_CYCLES, *this}}
240 , activityLang_{activityAllocator}
242 , engineObserver_{engineObserver}
243 { }
244
245
246 bool
247 empty() const
248 {
249 return layer1_.empty();
250 }
251
258 void
260 {
261 TRACE (engine, "Ignite Scheduler Dispatch.");
262 bool force_continued_run{true};
263 handleDutyCycle (RealClock::now(), force_continued_run);
264 if (not empty())
265 workForce_.activate();
266 }
267
268
277 void
279 {
280 TRACE (engine, "Forcibly terminate Scheduler Dispatch.");
281 workForce_.awaitShutdown();
283 }
284
285
291 double
293 {
295 }
296
297
317 void
318 seedCalcStream (Job planningJob
320 ,FrameRate expectedAdditionalLoad = FrameRate(25))
321 {
322 auto guard = layer2_.requireGroomingTokenHere(); // allow mutation
323 layer1_.activate (manID);
324 activityLang_.announceLoad (expectedAdditionalLoad);
325 continueMetaJob (RealClock::now(), planningJob, manID);
326 }
327
328
332 void
334 ,Job planningJob
336 {
337 bool isCompulsory = true;
338 Time deadline = nextStart + DUTY_CYCLE_TOLERANCE;
339 // place the meta-Job into the timeline...
340 postChain ({activityLang_.buildMetaJob(planningJob, nextStart, deadline)
341 .post()
342 , nextStart
343 , deadline
344 , manID
345 , isCompulsory});
346 }
347
348
358 {
359 return ScheduleSpec{*this, job};
360 }
361
362
363
369
370
371 private:
373 void sanityCheck (ActivationEvent const&);
374 void handleDutyCycle (Time now, bool =false);
375 void handleWorkerTermination (bool isFailure);
376 void maybeScaleWorkForce (Time startHorizon);
377
378 void triggerEmergency();
379
380
384 {
386 setup.maxCapacity = []{ return work::Config::COMPUTATION_CAPACITY; };
387 setup.currWorkForceSize = [this]{ return workForce_.size(); };
388 setup.stepUpWorkForce = [this](uint steps){ workForce_.incScale(steps); };
389 return setup;
390 }
391
393 Time
395 {
396 return RealClock::now();
397 }
398
400 class ExecutionCtx;
401 friend class ExecutionCtx;
402
404 friend class ScheduleSpec;
405
408 };
409
410
411
412
413
414
417 : public EngineEvent
418 {
421
424
425 public:
426 static WorkTiming start (Time now) { return WorkTiming{WORKSTART, Payload{now}}; }
427 static WorkTiming stop (Time now) { return WorkTiming{WORKSTOP, Payload{now}}; }
428 };
429
440 {
442 public:
443
445
447 : scheduler_{self}
448 , rootEvent{toDispatch}
449 { }
450
451
452 /* ==== Implementation of the Concept ExecutionCtx ==== */
453
463 post (Time when, Time dead, Activity* chain, ExecutionCtx& ctx)
464 {
465 REQUIRE (chain);
466 ActivationEvent chainEvent = ctx.rootEvent;
467 chainEvent.refineTo (chain, when, dead);
468 scheduler_.sanityCheck (chainEvent);
469 return scheduler_.layer2_.postChain (chainEvent, scheduler_.layer1_);
470 }
471
478 void
479 work (Time now, size_t qualifier)
480 {
483 }
484
486 void
487 done (Time now, size_t qualifier)
488 {
490 }
491
494 tick (Time now)
495 {
497 return activity::PASS;
498 }
499
501 Time
503 {
504 return scheduler_.getSchedTime();
505 }
506 };
507
508
509
510
511 /***********************************************************************/
519 inline activity::Proc
521 {
524 ,[this](ActivationEvent toDispatch)
525 {
526 ExecutionCtx ctx{*this, toDispatch};
527 return ActivityLang::dispatchChain (toDispatch, ctx);
528 }
529 ,[this] { return getSchedTime(); }
530 );
531 }
532
533
534
535
536
546 inline ScheduleSpec
548 { // execute term-builder on-demand...
550
551 // set up new schedule by retrieving the Activity-chain...
553 , death_
554 , manID_
555 , isCompulsory_});
556 return move(*this);
557 }
558
563 inline void
571
572 inline ScheduleSpec
573 ScheduleSpec::linkToSuccessor (ScheduleSpec& succSpec, bool unlimitedTime)
574 {
575 this->maybeBuildTerm();
576 succSpec.maybeBuildTerm();
577 term_->appendNotificationTo (*succSpec.term_, unlimitedTime);
578 return move(*this);
579 }
580
581 inline ScheduleSpec
582 ScheduleSpec::linkToPredecessor (ScheduleSpec& predSpec, bool unlimitedTime)
583 {
584 predSpec.maybeBuildTerm();
585 this->maybeBuildTerm();
586 predSpec.term_->appendNotificationTo (*term_, unlimitedTime);
587 return move(*this);
588 }
589
590
591
592 inline void
594 {
595 if (not event)
596 throw error::Logic ("Empty event passed into Scheduler entrance");
597 if (event.startTime() == Time::ANYTIME)
598 throw error::Fatal ("Attempt to schedule an Activity without valid start time");
599 if (event.deathTime() == Time::NEVER)
600 throw error::Fatal ("Attempt to schedule an Activity without valid deadline");
601 Time now{getSchedTime()};
602 Offset toDeadline{now, event.deathTime()};
603 if (toDeadline > FUTURE_PLANNING_LIMIT)
604 throw error::Fatal (util::_Fmt{"Attempt to schedule Activity %s "
605 "with a deadline by %s into the future"}
606 % *event.activity
607 % toDeadline);
608 }
609
610
611
612
613 /*****************************************************************/
619 inline void
621 {
622 sanityCheck (actEvent);
623 maybeScaleWorkForce (actEvent.startTime());
624 layer2_.postChain (actEvent, layer1_);
625 }
626
627
628
629
646 inline void
647 Scheduler::handleDutyCycle (Time now, bool forceContinuation)
648 {
649 auto guard = layer2_.requireGroomingTokenHere();
650
651 // consolidate queue and discard outdated tasks
652 if (not layer2_.maintainQueueHead(layer1_,now))
653 { // missed deadline of compulsory task
655 return;// leave everything as-is
656 }
657
658 // clean-up of obsolete Activities
660
662
663 if (not empty() or forceContinuation)
664 {// prepare next duty cycle »tick«
665 Time nextTick = now + (forceContinuation? WORK_HORIZON : DUTY_CYCLE_PERIOD);
666 Time deadline = nextTick + DUTY_CYCLE_TOLERANCE;
667 Activity& tickActivity = activityLang_.createTick (deadline);
668 ActivationEvent tickEvent{tickActivity, nextTick, deadline, ManifestationID(), true};
669 layer2_.postChain (tickEvent, layer1_);
670 } // *deliberately* use low-level entrance
671 } // to avoid ignite() cycles and derailed load-regulation
672
677 inline void
679 {
680 if (isFailure)
682 else
684 }
685
694 inline void
696 {
697 if (empty())
698 ignite();
699 else
700 loadControl_.ensureCapacity (startHorizon);
701 }
702
712 inline void
714 {
715 UNIMPLEMENTED ("scheduler overrun -- trigger Emergency");
716 }
717
718
719
720}} // namespace vault::gear
721#endif /*SRC_VAULT_GEAR_SCHEDULER_H_*/
A language framework to define and interconnect scheduler activity verbs.
Memory management scheme for activities and parameter data passed through the Scheduler within the Lu...
Token or Atom with distinct identity.
Definition symbol.hpp:120
Framerate specified as frames per second.
Offset measures a distance in time.
a mutable time value, behaving like a plain number, allowing copy and re-accessing
Lumiera's internal time value datatype.
static const Time NEVER
border condition marker value. NEVER >= any time value
static const Time ANYTIME
border condition marker value. ANYTIME <= any time value
Types marked with this mix-in may be moved and move-assigned.
Definition nocopy.hpp:64
Any copy and copy construction prohibited.
Definition nocopy.hpp:38
A front-end for using printf-style formatting.
static Time now()
Term builder and execution framework to perform chains of scheduler Activities.
activity::Term buildCalculationJob(Job job, Time start, Time deadline)
Builder-API: initiate definition of render activities for a media calculation job.
void announceLoad(FrameRate fps)
void discardBefore(Time deadline)
static activity::Proc dispatchChain(Activity *chain, EXE &executionCtx)
Execution Framework: dispatch performance of a chain of Activities.
activity::Term buildMetaJob(Job job, Time start, Time deadline)
Builder-API: initiate definition of internal/planning job.
Activity & createTick(Time deadline)
Record to describe an Activity, to happen within the Scheduler's control flow.
Definition activity.hpp:227
Low-level Render Engine event — abstracted storage base.
Collector and aggregator for performance data.
void dispatchEvent(size_t, EngineEvent)
Individual frame rendering task, forwarding to a closure.
Definition job.h:276
Controller to coordinate resource usage related to the Scheduler.
void markWorkerExit()
statistics update on scaling down the WorkForce
void updateState(Time)
periodic call to build integrated state indicators
void ensureCapacity(Time startHorizon)
Hook to check and possibly scale up WorkForce to handle one additional job.
Marker for current (and obsolete) manifestations of a CalcStream processed by the Render-Engine.
Definition activity.hpp:85
ScheduleSpec linkToSuccessor(ScheduleSpec &, bool unlimitedTime=false)
ScheduleSpec lifeWindow(microseconds afterStart)
ScheduleSpec startOffset(microseconds afterNow)
ScheduleSpec(Scheduler &sched, Job job)
ScheduleSpec linkToPredecessor(ScheduleSpec &, bool unlimitedTime=false)
ScheduleSpec compulsory(bool indeed=true)
ScheduleSpec startTime(Time fixedTime)
std::optional< activity::Term > term_
ScheduleSpec manifestation(ManifestationID manID)
ScheduleSpec post()
build Activity chain and hand-over to the Scheduler.
Scheduler Layer-2 : execution of Scheduler Activities.
activity::Proc postChain(ActivationEvent event, SchedulerInvocation &layer1)
This is the primary entrance point to the Scheduler.
ScopedGroomingGuard requireGroomingTokenHere()
a scope guard to force acquisition of the GroomingToken
bool maintainQueueHead(SchedulerInvocation &layer1, Time now)
update queue head to discard obsolete content.
void dropGroomingToken() noexcept
relinquish the right for internal state transitions.
activity::Proc dispatchCapacity(SchedulerInvocation &, LoadController &, DISPATCH, CLOCK)
Implementation of the worker-Functor:
Scheduler Layer-1 : time based dispatch.
void activate(ManifestationID manID)
Enable entries marked with a specific ManifestationID to be processed.
void discardSchedule()
forcibly clear out the schedule
activity::Proc tick(Time now)
λ-tick : scheduler management duty cycle
void work(Time now, size_t qualifier)
λ-work : transition Managment-Mode -> Work-Mode
activity::Proc post(Time when, Time dead, Activity *chain, ExecutionCtx &ctx)
λ-post: enqueue for time-bound execution, within given ExecutionCtx.
ExecutionCtx(Scheduler &self, ActivationEvent toDispatch)
void done(Time now, size_t qualifier)
λ-done : signal end time of actual processing
Time getSchedTime()
access high-resolution-clock, rounded to µ-Ticks
»Scheduler-Service« : coordinate render activities.
void handleWorkerTermination(bool isFailure)
Callback invoked whenever a worker-thread is about to exit.
void continueMetaJob(Time nextStart, Job planningJob, ManifestationID manID=ManifestationID())
Place a follow-up job-planning job into the timeline.
void sanityCheck(ActivationEvent const &)
LoadController loadControl_
Scheduler(BlockFlowAlloc &activityAllocator, EngineObserver &engineObserver)
ActivityLang activityLang_
void seedCalcStream(Job planningJob, ManifestationID manID=ManifestationID(), FrameRate expectedAdditionalLoad=FrameRate(25))
Set the Scheduler to work on a new CalcStream.
Time getSchedTime()
access high-resolution-clock, rounded to µ-Ticks
LoadController::Wiring connectMonitoring()
void ignite()
Spark the engine self-regulation cycle and power up WorkForce.
SchedulerInvocation layer1_
void terminateProcessing()
Bring down processing destructively as fast as possible.
activity::Proc doWork()
The worker-Functor: called by the active Workers from the WorkForce to pull / perform the actual rend...
void postChain(ActivationEvent)
Enqueue for time-bound execution, possibly dispatch immediately.
void maybeScaleWorkForce(Time startHorizon)
Hook invoked whenever a new task is passed in.
void triggerEmergency()
Trip the emergency brake and unwind processing while retaining all state.
ScheduleSpec defineSchedule(Job job)
Render Job builder: start definition of a schedule to invoke the given Job.
void handleDutyCycle(Time now, bool=false)
»Tick-hook« : code to maintain a sane running status.
EngineObserver & engineObserver_
WorkForce< Setup > workForce_
SchedulerCommutator layer2_
Pool of worker threads for rendering.
work-timing event for performance observation
static WorkTiming start(Time now)
static WorkTiming stop(Time now)
static Symbol WORKSTART
Render Engine performance data collection service.
Lumiera error handling (C++ interface).
#define _(String)
Definition gtk-base.hpp:68
unsigned int uint
Definition integral.hpp:29
Scheduler resource usage coordination.
Definition Setup.py:1
boost::rational< int64_t > FSecs
rational representation of fractional seconds
LumieraError< LERR_(FATAL), Logic > Fatal
Definition error.hpp:208
LumieraError< LERR_(LOGIC)> Logic
Definition error.hpp:207
STL namespace.
Test runner and basic definitions for tests.
Proc
Result instruction from Activity activation.
Definition activity.hpp:140
@ PASS
pass on the activation down the chain
Definition activity.hpp:140
Offset DUTY_CYCLE_PERIOD
period of the regular scheduler »tick« for state maintenance.
Offset FUTURE_PLANNING_LIMIT
limit timespan of deadline into the future (~360 MiB max)
const auto IDLE_WAIT
sleep-recheck cycle for workers deemed idle
Offset DUTY_CYCLE_TOLERANCE
maximum slip tolerated on duty-cycle start before triggering Scheduler-emergency
const size_t DISMISS_CYCLES
number of wait cycles before an idle worker terminates completely
Vault-Layer implementation namespace root.
Mix-Ins to allow or prohibit various degrees of copying and cloning.
Front-end for simplified access to the current wall clock time.
Layer-2 of the Scheduler: coordination and interaction of activities.
Layer-1 of the Scheduler: queueing and prioritisation of activities.
void refineTo(Activity *chain, Time when, Time dead)
Base for configuration of the worker pool.
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
A pool of workers for multithreaded rendering.