Lumiera  0.pre.03
»edit your freedom«
work-force.hpp
Go to the documentation of this file.
1 /*
2  WORK-FORCE.hpp - actively coordinated pool of workers for rendering
3 
4  Copyright (C)
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7   **Lumiera** is free software; you can redistribute it and/or modify it
8   under the terms of the GNU General Public License as published by the
9   Free Software Foundation; either version 2 of the License, or (at your
10   option) any later version. See the file COPYING for further details.
11 
12 */
13 
14 
43 #ifndef SRC_VAULT_GEAR_WORK_FORCE_H_
44 #define SRC_VAULT_GEAR_WORK_FORCE_H_
45 
46 
47 #include "vault/common.hpp"
48 #include "vault/gear/activity.hpp"
49 #include "lib/meta/function.hpp"
50 #include "lib/thread.hpp"
51 #include "lib/nocopy.hpp"
52 #include "lib/util.hpp"
53 
54 #include <utility>
55 #include <chrono>
56 #include <atomic>
57 #include <list>
58 
59 
60 namespace vault{
61 namespace gear {
62 
63  using std::move;
64  using std::atomic;
65  using util::unConst;
66  using std::chrono::milliseconds;
67  using std::chrono::microseconds;
68  using std::chrono_literals::operator ""ms;
69  using std::chrono_literals::operator ""us;
70  using std::this_thread::sleep_for;
71 
72 
73  namespace {
74  const double MAX_OVERPROVISIONING = 3.0;
75 
76  const size_t CONTEND_SOFT_LIMIT = 3;
77  const size_t CONTEND_STARK_LIMIT = CONTEND_SOFT_LIMIT + 5;
78  const size_t CONTEND_SATURATION = CONTEND_STARK_LIMIT + 4;
79  const size_t CONTEND_SOFT_FACTOR = 100;
80  const size_t CONTEND_RANDOM_STEP = 11;
81  const microseconds CONTEND_WAIT = 100us;
82 
83  inline size_t
84  thisThreadHash()
85  {
86  return std::hash<std::thread::id>{} (std::this_thread::get_id());
87  }
88  }
89 
90  namespace work {
91 
92  using SIG_WorkFun = activity::Proc(void);
93  using SIG_FinalHook = void(bool);
94 
104  struct Config
105  {
106  static size_t COMPUTATION_CAPACITY;
107 
108  const milliseconds IDLE_WAIT = 20ms;
109  const size_t DISMISS_CYCLES = 100;
110 
111  static size_t getDefaultComputationCapacity();
112  };
113 
114 
115 
116  void performRandomisedSpin (size_t,size_t);
117  microseconds steppedRandDelay(size_t,size_t);
118 
119 
120  using Launch = lib::Thread::Launch;
121 
122  /*************************************/
126  template<class CONF>
127  class Worker
128  : CONF
130  {
131  public:
132  Worker (CONF config)
133  : CONF{move (config)}
134  , thread_{Launch{&Worker::pullWork, this}
135  .threadID("Worker")
136  .decorateCounter()}
137  { }
138 
140  std::atomic<bool> emergency{false};
141 
143  bool isDead() const { return not thread_; }
144 
145 
146  private:
147  lib::Thread thread_;
148 
149  void
150  pullWork()
151  {
152  ASSERT_MEMBER_FUNCTOR (&CONF::doWork, SIG_WorkFun);
153  ASSERT_MEMBER_FUNCTOR (&CONF::finalHook, SIG_FinalHook);
154 
155  bool regularExit{false};
156  try /* ================ pull work ===================== */
157  {
158  while (true)
159  {
160  activity::Proc res = CONF::doWork();
161  if (emergency.load (std::memory_order_relaxed))
162  break;
163  if (res == activity::KICK)
164  res = contentionWait();
165  else
166  if (kickLevel_)
167  kickLevel_ /= 2;
168  if (res == activity::WAIT)
169  res = idleWait();
170  else
171  idleCycles_ = 0;
172  if (res != activity::PASS)
173  break;
174  }
175  regularExit = true;
176  }
177  ERROR_LOG_AND_IGNORE (threadpool, "defunct worker thread")
178 
179  try /* ================ thread-exit hook ============== */
180  {
181  CONF::finalHook (not regularExit);
182  }
183  ERROR_LOG_AND_IGNORE (threadpool, "failure in thread-exit hook")
184  }// Thread will terminate....
185 
187  idleWait()
188  {
189  ++idleCycles_;
190  if (idleCycles_ < CONF::DISMISS_CYCLES)
191  {
192  sleep_for (CONF::IDLE_WAIT);
193  return activity::PASS;
194  }
195  else // idle beyond threshold => terminate worker
196  return activity::HALT;
197  }
198  size_t idleCycles_{0};
199 
200 
202  contentionWait()
203  {
204  if (not randFact_)
205  randFact_ = thisThreadHash() % CONTEND_RANDOM_STEP;
206 
207  if (kickLevel_ <= CONTEND_SOFT_LIMIT)
208  for (uint i=0; i<kickLevel_; ++i)
209  {
210  performRandomisedSpin (kickLevel_,randFact_);
211  std::this_thread::yield();
212  }
213  else
214  {
215  auto stepping = util::min (kickLevel_, CONTEND_STARK_LIMIT) - CONTEND_SOFT_LIMIT;
216  std::this_thread::sleep_for (steppedRandDelay(stepping,randFact_));
217  }
218 
219  if (kickLevel_ < CONTEND_SATURATION)
220  ++kickLevel_;
221  return activity::PASS;
222  }
223  size_t kickLevel_{0};
224  size_t randFact_{0};
225  };
226  }//(End)namespace work
227 
228 
229 
230 
231  /***********************************************************/
239  template<class CONF>
240  class WorkForce
242  {
243  using Pool = std::list<work::Worker<CONF>>;
244 
245  CONF setup_;
246  Pool workers_;
247 
248 
249  public:
250  WorkForce (CONF config)
251  : setup_{move (config)}
252  , workers_{}
253  { }
254 
255  ~WorkForce()
256  {
257  try {
258  awaitShutdown();
259  }
260  ERROR_LOG_AND_IGNORE (threadpool, "defunct worker thread")
261  }
262 
263 
270  void
271  activate (double degree =1.0)
272  {
273  size_t scale{setup_.COMPUTATION_CAPACITY};
274  scale = size_t(util::limited (0.0, degree*scale, scale*MAX_OVERPROVISIONING));
275  for (uint i = workers_.size(); i < scale; ++i)
276  workers_.emplace_back (setup_);
277  }
278 
279  void
280  incScale(uint step =+1)
281  {
282  uint i = workers_.size();
283  uint target = util::min (i+step, setup_.COMPUTATION_CAPACITY);
284  for ( ; i < target; ++i)
285  workers_.emplace_back (setup_);
286  }
287 
288  void
289  awaitShutdown()
290  {
291  for (auto& w : workers_)
292  w.emergency.store (true, std::memory_order_relaxed);
293  while (0 < size())
294  sleep_for (setup_.IDLE_WAIT);
295  }
296 
297  size_t
298  size() const
299  {
300  unConst(workers_).remove_if([](auto& w){ return w.isDead(); });
301  return workers_.size();
302  }
303  };
304 
305 
306 
307 }} // namespace vault::gear
308 #endif /*SRC_VAULT_GEAR_WORK_FORCE_H_*/
const size_t CONTEND_SOFT_LIMIT
zone for soft anti-contention measures, counting continued contention events
Definition: work-force.hpp:76
Individual worker thread: repeatedly pulls the doWork functor.
Definition: work-force.hpp:127
#define ERROR_LOG_AND_IGNORE(_FLAG_, _OP_DESCR_)
convenience shortcut for a sequence of catch blocks just logging and consuming an error...
Definition: error.hpp:266
#define ASSERT_MEMBER_FUNCTOR(_EXPR_, _SIG_)
Macro for a compile-time check to verify some member is present and comprises something invokable wit...
Definition: function.hpp:273
const microseconds CONTEND_WAIT
base time unit for the exponentially stepped-up sleep delay in case of contention ...
Definition: work-force.hpp:81
void(bool) SIG_FinalHook
config should define callable invoked at exit (argument: isFailure)
Definition: work-force.hpp:93
const size_t DISMISS_CYCLES
number of wait cycles before an idle worker terminates completely
Definition: scheduler.hpp:129
Any copy and copy construction prohibited.
Definition: nocopy.hpp:37
Primary class template for std::hash.
microseconds steppedRandDelay(size_t, size_t)
Calculate the delay time for a stronger anti-contention wait.
Definition: work-force.cpp:82
activity::Proc(void) SIG_WorkFun
config should define a callable with this signature to perform work
Definition: work-force.hpp:92
void performRandomisedSpin(size_t, size_t)
This is part of the weak level of anti-contention measures.
Definition: work-force.cpp:66
Mix-Ins to allow or prohibit various degrees of copying and cloning.
Metaprogramming tools for transforming functor types.
const auto IDLE_WAIT
sleep-recheck cycle for workers deemed idle
Definition: scheduler.hpp:128
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
bool isDead() const
this Worker starts out active, but may terminate
Definition: work-force.hpp:143
Convenience front-end to simplify and codify basic thread handling.
const size_t CONTEND_STARK_LIMIT
zone for stark measures, performing a sleep with exponential stepping
Definition: work-force.hpp:77
const size_t CONTEND_SATURATION
upper limit for the contention event count
Definition: work-force.hpp:78
const size_t CONTEND_SOFT_FACTOR
base counter for a spinning wait loop
Definition: work-force.hpp:79
const milliseconds IDLE_WAIT
wait period when a worker falls idle
Definition: work-force.hpp:108
const size_t DISMISS_CYCLES
number of idle cycles after which the worker terminates
Definition: work-force.hpp:109
void activate(double degree=1.0)
Activate or scale up the worker pool.
Definition: work-force.hpp:271
Basic set of definitions and includes commonly used together (Vault).
A thin convenience wrapper to simplify thread-handling.
Definition: thread.hpp:648
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Definition: work-force.hpp:106
Proc
Result instruction from Activity activation.
Definition: activity.hpp:140
Pool of worker threads for rendering.
Definition: work-force.hpp:240
const size_t CONTEND_RANDOM_STEP
stepping for randomisation of anti-contention measures
Definition: work-force.hpp:80
Base for configuration of the worker pool.
Definition: work-force.hpp:104
const double MAX_OVERPROVISIONING
safety guard to prevent catastrophic over-provisioning
Definition: work-force.hpp:74
Vault-Layer implementation namespace root.
static size_t getDefaultComputationCapacity()
default value for full computing capacity is to use all (virtual) cores.
Definition: work-force.cpp:51
Descriptor for a piece of operational logic performed by the scheduler.