Lumiera  0.pre.03
»edit your freedom«
steam-dispatcher.cpp
Go to the documentation of this file.
1 /*
2  SteamDispatcher - Steam-Layer command dispatch and execution
3 
4  Copyright (C)
5  2008, Hermann Vosseler <Ichthyostega@web.de>
6 
7   **Lumiera** is free software; you can redistribute it and/or modify it
8   under the terms of the GNU General Public License as published by the
9   Free Software Foundation; either version 2 of the License, or (at your
10   option) any later version. See the file COPYING for further details.
11 
12 * *****************************************************************/
13 
14 
74 #include "lib/error.hpp"
75 #include "include/logging.h"
79 #include "steam/control/looper.hpp"
82 #include "lib/depend-inject.hpp"
83 #include "lib/sync-barrier.hpp"
84 #include "lib/thread.hpp"
85 #include "lib/util.hpp"
86 
87 #include <utility>
88 #include <memory>
89 
90 using lib::Sync;
91 using lib::SyncBarrier;
93 using lib::RecursiveLock_Waitable;
94 using std::make_unique;
95 using std::move;
96 
97 namespace steam {
98 namespace control {
99 
100  namespace error = lumiera::error;
101 
102 
103  /********************************************************************/
111  : public CommandDispatch
112  , public Sync<RecursiveLock_Waitable>
113  {
115  using Launch = lib::ThreadHookable::Launch;
116 
119 
120  SyncBarrier init_;
121  CommandQueue queue_;
122  string error_;
123  Looper looper_;
124  ThreadHookable thread_;
125 
126  public:
135  template<typename FUN>
136  DispatcherLoop (FUN&& atExit)
137  : commandService_{ServiceHandle::NOT_YET_STARTED}
138  , queue_{}
139  , error_{}
140  , looper_([&]() -> bool
141  {
142  return not queue_.empty();
143  })
144  , thread_{ //----the-Session-Thread---------------
145  Launch{&DispatcherLoop::runSessionThread, this}
146  .threadID("Session")
147  .atExit([this, signalEndOfThread = move(atExit)]
149  {
151  signalEndOfThread (&error_);
152  })}
153  {
154  init_.sync(); // done with setup; loop may run now....
155  INFO (session, "Steam-Dispatcher running...");
156  {
157  Lock sync{this}; // open public session interface:
158  commandService_.createInstance(*this);
159  }
160  }
161 
162  ~DispatcherLoop()
163  {
164  try {
165  commandService_.shutdown(); // redundant call, to ensure session interface is closed reliably
166  INFO (session, "Steam-Dispatcher stopped.");
167  if (thread_)
168  ALERT (session, "Dispatcher destroyed while Session thread is running. The rest is silence.");
169  }
170  ERROR_LOG_AND_IGNORE(session, "Stopping the Steam-Dispatcher");
171  }
172 
173  void
174  activateCommandProecssing()
175  {
176  Lock sync{this};
177  looper_.enableProcessing(true);
178  INFO (command, "Session command processing activated.");
179  sync.notify_all();
180  }
181 
182  void
183  deactivateCommandProecssing()
184  {
185  Lock sync{this};
186  looper_.enableProcessing(false);
187  INFO (command, "Session command interface closed.");
188  sync.notify_all();
189  }
190 
191  void
192  requestStop() noexcept
193  {
194  Lock sync{this};
195  commandService_.shutdown(); // closes Session interface
196  looper_.triggerShutdown();
197  sync.notify_all();
198  }
199 
200  void
201  awaitStateProcessed() const
202  {
203  Lock{this, [&]{ return isStateSynched(); }};
204  // wake-up typically by updateState()
205  }
206 
207  size_t
208  size() const
209  {
210  Lock sync{this};
211  return queue_.size();
212  }
213 
214 
215  /* === CommandDispatch interface === */
216 
217  void
218  enqueue (Command&& cmd) override
219  {
220  Lock sync{this};
221  queue_.feed (move (cmd));
222  sync.notify_all();
223  }
224 
225  void
226  clear() override
227  {
228  Lock sync{this};
229  queue_.clear();
230  sync.notify_all();
231  }
232 
233  private:
241  void
243  {
244  init_.sync();
245  try
246  {
247  while (looper_.shallLoop())
248  {
249  awaitAction();
250  if (looper_.isDying())
251  break;
252  if (looper_.runBuild())
253  startBuilder();
254  else
255  if (looper_.isWorking())
256  processCommands();
257  updateState();
258  }
259  }
260  catch (std::exception& problem)
261  { // could also be lumiera::Error
262  error_ = problem.what();
263  lumiera_error(); // clear error flag
264  }
265  catch (...)
266  {
267  error_ = string{lumiera_error()};
268  }
269  // Session thread terminates...
270  } // atExit-λ will invoke ~DispatcherLoop()
271 
272  void
273  awaitAction()
274  {
275  Lock{this}.wait_for (looper_.getTimeout()
276  ,[&]{ return looper_.requireAction(); });
277  }
278 
279  void
280  updateState()
281  {
282  looper_.markStateProcessed();
283  if (looper_.isDisabled()) // otherwise wake-up would not be safe
284  getMonitor(this).notify_all();
285  }
286 
287  bool
288  isStateSynched() const
289  {
290  if (thread_.invokedWithinThread())
291  throw error::Fatal("Possible Deadlock. "
292  "Attempt to synchronise to a command processing check point "
293  "from within the (single) session thread."
294  , error::LUMIERA_ERROR_LIFECYCLE);
295  return not looper_.hasPendingChanges();
296  }
297 
298  void
299  processCommands()
300  {
301  Command cmd;
302  {
303  Lock sync{this};
304  if (not queue_.empty())
305  cmd = queue_.pop();
306  }
307  if (cmd)
308  {
309  INFO (command, "+++ dispatch %s", cStr(cmd));
310 
312  if (util::startsWith (string(cmd.getID()), "test"))
313  {
314  INFO (command, "+++ -------->>> bang!");
315  auto resultState = cmd();
316  resultState.maybeThrow();
317  }
319  }
320  }
321 
322  void
323  startBuilder()
324  {
325  INFO (builder, "+++ start the Steam-Builder...");
326  }
327  };
328 
329 
330 
331 
332 
333 
336 
337 
338  /* ======== SteamDispatcher implementation ======== */
339 
340  SteamDispatcher::SteamDispatcher() { }
341  SteamDispatcher::~SteamDispatcher() { }
342 
351  bool
352  SteamDispatcher::start (Subsys::SigTerm termNotification)
353  {
354  Lock sync{this};
355  if (runningLoop_) return false;
356 
357  runningLoop_ =
358  make_unique<DispatcherLoop>(
359  [=](string* problemIndicator)
360  { // when the Session thread ends....
362  termNotification (problemIndicator);
363  });
364 
365  if (active_)
366  runningLoop_->activateCommandProecssing();
367  return true;
368  }
369 
379  void
381  {
382  Lock sync{this};
383  if (runningLoop_)
384  runningLoop_.reset(); // delete DispatcherLoop object
385  else
386  WARN (command, "clean-up of DispatcherLoop invoked, "
387  "while SteamDispatcher is not marked as 'running'. "
388  "Likely an error in lifecycle logic, as the only one "
389  "intended to delete this object is the loop thread itself.");
390  }
391 
392 
397  bool
399  {
400  Lock sync{this};
401  return bool(runningLoop_);
402  }
403 
404 
408  void
410  {
411  try {
412  Lock sync{this};
413  if (runningLoop_)
414  runningLoop_->requestStop();
415  }
416  ERROR_LOG_AND_IGNORE (command, "Request for Session Loop Thread to terminate");
417  }
418 
419 
420 
429  void
431  {
432  Lock sync{this};
433  active_ = true;
434  if (runningLoop_)
435  runningLoop_->activateCommandProecssing();
436  }
437 
438 
445  void
447  {
448  Lock sync{this};
449  active_ = false;
450  if (runningLoop_)
451  runningLoop_->deactivateCommandProecssing();
452  }
453 
454 
461  void
463  {
464  Lock sync{this};
465  if (runningLoop_)
466  runningLoop_->awaitStateProcessed();
467  }
468 
469 
471  void
473  {
474  Lock sync{this};
475  if (not empty())
476  {
477  WARN (command, "DISCARDING pending Session commands.");
478  REQUIRE (runningLoop_);
479  runningLoop_->clear();
480  }
481  }
482 
483 
484  bool
485  SteamDispatcher::empty() const
486  {
487  Lock sync{this};
488  return not runningLoop_
489  or 0 == runningLoop_->size();
490  }
491 
492 
493 
494 
495 }} // namespace steam::control
Facility for monitor object based locking.
Definition: sync.hpp:209
DispatcherLoop(FUN &&atExit)
start the session loop thread
static lib::Depend< SteamDispatcher > instance
storage for Singleton access
void awaitDeactivation()
block until the dispatcher has actually reached disabled state.
void runSessionThread()
any operation running in the Session thread is started from here.
bool invokedWithinThread() const
detect if the currently executing code runs within this thread
Definition: thread.cpp:65
CStr cStr(std::string const &rendered)
convenience shortcut: forced conversion to c-String via string.
Definition: symbol.hpp:59
#define ERROR_LOG_AND_IGNORE(_FLAG_, _OP_DESCR_)
convenience shortcut for a sequence of catch blocks just logging and consuming an error...
Definition: error.hpp:266
Configuration handle to expose a service implementation through the Depend<SRV> front-end.
bool hasPendingChanges() const
< "check point"
Definition: looper.hpp:173
Interface to abstract the SteamDispatcher&#39;s ability to handle command messages.
Per type specific configuration of instances created as service dependencies.
Dispatch and execute mutation operations on the High-level model.
void clear()
discard any commands waiting in the dispatcher queue
A public service offered by the Session, implementing the SessionCommand facade interface.
This header is for including and configuring NoBug.
Steam-Layer implementation namespace root.
Implementation building block of SteamDispatcher to organise commands.
Namespace of Session and user visible high-level objects.
Definition: sequence.hpp:65
Access point to singletons and other kinds of dependencies designated by type.
Definition: depend.hpp:280
ServiceHandle commandService_
manage the primary public Session interface
bool shallLoop() const
state fusion to control looping
Definition: looper.hpp:197
bool requireAction()
state fusion to control (timed) wait
Definition: looper.hpp:180
Implementation building block of SteamDispatcher to control waiting and timing.
bool start(Subsys::SigTerm)
starting the SteamDispatcher means to start the session subsystem.
Derived specific exceptions within Lumiera&#39;s exception hierarchy.
Definition: error.hpp:190
Interface of a service to perform Commands on the session.
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
void markStateProcessed()
invoking this function signals that all consequences of past state changes have been processed and ar...
Definition: looper.hpp:165
lumiera_err lumiera_error(void)
Get and clear current error state.
Definition: error-state.c:115
void requestStop() noexcept
signal to the loop thread that it needs to terminate.
Convenience front-end to simplify and codify basic thread handling.
Lumiera error handling (C++ interface).
Implementation of the Session&#39;s command queue.
void deactivate()
halt further processing of session commands
A one time N-fold mutual synchronisation barrier.
Handle object representing a single Command instance to be used by client code.
Definition: command.hpp:115
Extended variant of the standard case, allowing to install callbacks (hook functions) to be invoked d...
Definition: thread.hpp:716
PImpl within SteamDispatcher to implement the Session Loop Thread. During the lifetime of this object...
Primary Interface to the current Session.
Encapsulated control logic for the session thread loop.
Definition: looper.hpp:105
void activate()
activate processing of enqueued session commands.
bool isRunning()
whether the »session subsystem« is operational.
void detach_thread_from_wrapper()
allow to detach explicitly — independent from thread-function&#39;s state.
Definition: thread.hpp:231
A N-fold synchronisation latch using yield-wait until fulfilment.