Lumiera  0.pre.03
»edit your freedom«
load-controller.hpp
Go to the documentation of this file.
1 /*
2  LOAD-CONTROLLER.hpp - coordinator for scheduler resource usage
3 
4  Copyright (C)
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7   **Lumiera** is free software; you can redistribute it and/or modify it
8   under the terms of the GNU General Public License as published by the
9   Free Software Foundation; either version 2 of the License, or (at your
10   option) any later version. See the file COPYING for further details.
11 
12 */
13 
14 
76 #ifndef SRC_VAULT_GEAR_LOAD_CONTROLLER_H_
77 #define SRC_VAULT_GEAR_LOAD_CONTROLLER_H_
78 
79 
80 #include "lib/error.hpp"
81 //#include "vault/gear/block-flow.hpp"
82 //#include "vault/gear/activity-lang.hpp"
83 //#include "lib/symbol.hpp"
84 #include "lib/time/timevalue.hpp"
85 #include "lib/nocopy.hpp"
86 #include "lib/util.hpp"
87 #include "lib/format-cout.hpp"
88 
89 //#include <string>
90 #include <cmath>
91 #include <atomic>
92 #include <chrono>
93 #include <utility>
94 #include <functional>
95 
96 
97 namespace vault{
98 namespace gear {
99 
100 // using util::isnil;
101 // using std::string;
102 
103  using util::max;
104  using util::limited;
105  using lib::time::Time;
106  using lib::time::FSecs;
107  using lib::time::TimeVar;
108  using lib::time::TimeValue;
109  using lib::time::Duration;
110  using lib::time::Offset;
111  using std::chrono_literals::operator ""ms;
112  using std::chrono_literals::operator ""us;
113  using std::function;
114  using std::atomic_int64_t;
115  using std::memory_order_relaxed;
116 
117  namespace { // Scheduler default config
118 
119  inline TimeValue
120  _uTicks (std::chrono::microseconds us)
121  {
122  return TimeValue{us.count()};
123  }
124 
125 
126  Duration SLEEP_HORIZON{_uTicks (20ms)};
127  Duration WORK_HORIZON {_uTicks ( 5ms)};
128  Duration NEAR_HORIZON {_uTicks (50us)};
129  Duration STANDARD_LAG {_uTicks(200us)};
130 
131  const double LAG_SAMPLE_DAMPING = 2;
132  }
133 
134 
135 
146  {
147  public:
148  struct Wiring
149  {
150  function<size_t()> maxCapacity {[]{ return 1; }};
151  function<size_t()> currWorkForceSize{[]{ return 0; }};
152  function<void(uint)> stepUpWorkForce{[](uint){/*NOP*/}};
154  };
155 
156  explicit
157  LoadController (Wiring&& wiring)
158  : wiring_{std::move (wiring)}
159  { }
160 
163  { }
164 
165  private:
166  const Wiring wiring_;
167 
168  TimeVar tendedHead_{Time::ANYTIME};
169 
170 
171  atomic_int64_t sampledLag_{0};
172 
181  void
182  markLagSample (Time head, Time now)
183  { // negative when free capacity
184  double lag = _raw(std::clamp<TimeVar> (now - (head.isRegular()? head:now)
185  , -SLEEP_HORIZON
186  , WORK_HORIZON));
187  const double alpha = LAG_SAMPLE_DAMPING / (1 + wiring_.maxCapacity());
188  int64_t average = sampledLag_.load (memory_order_relaxed);
189  int64_t newAverage;
190  do newAverage = std::floor (lag*alpha + (1-alpha)*average);
191  while (not sampledLag_.compare_exchange_weak (average, newAverage, memory_order_relaxed));
192  }
193 
194  public:
203  int64_t
204  averageLag() const
205  {
206  return sampledLag_.load (memory_order_relaxed);
207  }
208 
215  int64_t
216  setCurrentAverageLag (int64_t lag)
217  {
218  return sampledLag_.exchange(lag, memory_order_relaxed);
219  }
220 
228  double
230  {
231  double lag = averageLag();
232  lag -= _raw(STANDARD_LAG);
233  lag /= _raw(WORK_HORIZON);
234  lag *= 10;
235  double lagFactor = lag<0? 1/(1-lag): 1+lag;
236  double loadFactor = wiring_.currWorkForceSize() / double(wiring_.maxCapacity());
237  return loadFactor * lagFactor;
238  }
239 
241  void
243  {
245  //
246  auto lag = averageLag();
247  if (lag > _raw(WORK_HORIZON))
248  wiring_.stepUpWorkForce(+4);
249  else
250  if (averageLag() > 2*_raw(STANDARD_LAG))
251  wiring_.stepUpWorkForce(+1);
252  }
253 
255  void
257  {
259  }
260 
264  void
265  ensureCapacity (Time startHorizon)
266  {
267  if (startHorizon > 2* SLEEP_HORIZON)
268  return;
269  if (averageLag() > 2*_raw(STANDARD_LAG))
270  wiring_.stepUpWorkForce(+1);
271  }
272 
277  bool
278  tendedNext (Time nextHead) const
279  {
280  return not nextHead.isRegular() // note: empty queue reports Time::NEVER
281  or nextHead == tendedHead_;
282  }
283 
291  void
292  tendNext (Time nextHead)
293  {
294  tendedHead_ = nextHead;
295  }
296 
297 
298 
306  };
307 
309  static Capacity
311  {
312  if (off > SLEEP_HORIZON) return IDLEWAIT;
313  if (off > WORK_HORIZON) return WORKTIME;
314  if (off > NEAR_HORIZON) return NEARTIME;
315  if (off > Time::ZERO) return SPINTIME;
316  else return DISPATCH;
317  }
318 
319 
322  Capacity
324  {
325  auto horizon = classifyTimeHorizon (Offset{head - now});
326  return horizon > SPINTIME
327  and not tendedNext(head)? TENDNEXT
328  : horizon==IDLEWAIT ? WORKTIME // re-randomise sleeper cycles
329  : horizon;
330  }
331 
334  Capacity
336  {
337  markLagSample (head,now);
338  return classifyTimeHorizon (Offset{head - now})
339  > NEARTIME ? IDLEWAIT
340  : markOutgoingCapacity(head,now);
341  }
342 
343 
344 
360  Offset
362  {
363  auto scatter = [&](Duration horizon)
364  {
365  gavl_time_t wrap = hash_value(now) % _raw(horizon);
366  ENSURE (0 <= wrap and wrap < _raw(horizon));
367  return TimeValue{wrap};
368  };
369 
370  TimeVar headDistance = max (tendedHead_-now, Time::ZERO);
371 
372  switch (capacity) {
373  case DISPATCH:
374  return Offset::ZERO;
375  case SPINTIME:
376  return Offset::ZERO;
377  case TENDNEXT:
378  return Offset{headDistance};
379  case NEARTIME:
380  return Offset{headDistance + scatter(Offset{limited (NEAR_HORIZON,headDistance,WORK_HORIZON)})};
381  case WORKTIME:
382  case IDLEWAIT:
383  return Offset{headDistance + scatter(SLEEP_HORIZON)};
384  default:
385  NOTREACHED ("uncovered work capacity classification.");
386  }
387  }
388  };
389 
390 
391 
392 }}// namespace vault::gear
393 #endif /*SRC_VAULT_GEAR_LOAD_CONTROLLER_H_*/
static const Time ANYTIME
border condition marker value. ANYTIME <= any time value
Definition: timevalue.hpp:313
bool tendedNext(Time nextHead) const
did we already tend for the indicated next relevant head time?
int64_t setCurrentAverageLag(int64_t lag)
a mutable time value, behaving like a plain number, allowing copy and re-accessing ...
Definition: timevalue.hpp:232
Duration NEAR_HORIZON
what counts as "imminent" (e.g. for spin-waiting)
void markLagSample(Time head, Time now)
Automatically use custom string conversion in C++ stream output.
const double LAG_SAMPLE_DAMPING
smoothing factor for exponential moving average of lag;
Any copy and copy construction prohibited.
Definition: nocopy.hpp:37
Capacity markIncomingCapacity(Time head, Time now)
decide how this thread&#39;s capacity shall be used when returning from idle wait and asking for work ...
Lumiera&#39;s internal time value datatype.
Definition: timevalue.hpp:299
Controller to coordinate resource usage related to the Scheduler.
void ensureCapacity(Time startHorizon)
Hook to check and possibly scale up WorkForce to handle one additional job.
Mix-Ins to allow or prohibit various degrees of copying and cloning.
capacity for active processing required
bool isRegular() const
Definition: timevalue.hpp:771
Offset scatteredDelayTime(Time now, Capacity capacity)
Generate a time offset to relocate currently unused capacity to a time range where it&#39;s likely to be ...
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
Duration STANDARD_LAG
Experience shows that on average scheduling happens with 200µs delay.
boost::rational< int64_t > FSecs
rational representation of fractional seconds
Definition: timevalue.hpp:220
void tendNext(Time nextHead)
Mark the indicated next head time as tended.
Lumiera error handling (C++ interface).
void markWorkerExit()
statistics update on scaling down the WorkForce
Duration SLEEP_HORIZON
schedules beyond that horizon justify going idle
Offset measures a distance in time.
Definition: timevalue.hpp:358
awaiting imminent activities
Duration is the internal Lumiera time metric.
Definition: timevalue.hpp:468
NUM constexpr limited(NB lowerBound, NUM val, NB upperBound)
force a numeric to be within bounds, inclusively
Definition: util.hpp:91
static Capacity classifyTimeHorizon(Offset off)
classification of time horizon for scheduling
Capacity markOutgoingCapacity(Time head, Time now)
decide how this thread&#39;s capacity shall be used after it returned from being actively employed ...
a family of time value like entities and their relationships.
Duration WORK_HORIZON
the scope of activity currently in the works
basic constant internal time value.
Definition: timevalue.hpp:133
Capacity
Allocation of capacity to time horizon of expected work.
Vault-Layer implementation namespace root.
void updateState(Time)
periodic call to build integrated state indicators
typical stable work task rhythm expected