Lumiera  0.pre.03
»edit your freedom«
incidence-count.hpp
Go to the documentation of this file.
1 /*
2  INCIDENCE-COUNT.hpp - instrumentation helper to watch possibly multithreaded task activations
3 
4  Copyright (C)
5  2024, Hermann Vosseler <Ichthyostega@web.de>
6 
7   **Lumiera** is free software; you can redistribute it and/or modify it
8   under the terms of the GNU General Public License as published by the
9   Free Software Foundation; either version 2 of the License, or (at your
10   option) any later version. See the file COPYING for further details.
11 
12 */
13 
38 #ifndef LIB_INCIDENCE_COUNT_H
39 #define LIB_INCIDENCE_COUNT_H
40 
41 
42 #include "lib/nocopy.hpp"
43 #include "lib/iter-explorer.hpp"
44 
45 #include <cstdint>
46 #include <atomic>
47 #include <vector>
48 #include <chrono>
49 #include <limits>
50 #include <algorithm>
51 
52 
53 namespace lib {
54 
55  using std::vector;
56 
67  {
68  using TIMING_SCALE = std::micro; // Results are in µ-sec
69  using Clock = std::chrono::steady_clock;
70 
71  using Instance = decltype(Clock::now());
72  using Dur = std::chrono::duration<double, TIMING_SCALE>;
73 
74  struct Inc
75  {
76  Instance when;
77  uint8_t thread :8;
78  uint8_t caseID :8;
79  bool isLeave :1;
80  };
81 
82  using Sequence = vector<Inc>;
83  using Recording = vector<Sequence>;
84 
85  Recording rec_;
86 
87  std::atomic_uint8_t slotID_{0};
88 
90  uint8_t
92  { // Note : returning previous value before increment
93  return slotID_.fetch_add(+1, std::memory_order_relaxed);
94  }
95 
96  uint8_t
97  getMySlot()
98  {
99  thread_local uint8_t threadID{allocateNextSlot()};
100  ASSERT (threadID < std::numeric_limits<uint8_t>::max(), "WOW -- so many threads?");
101  return threadID;
102  }
103 
104  Sequence&
105  getMySequence(uint8_t threadID)
106  {
107  if (threadID >= rec_.size())
108  {
109  rec_.reserve (threadID+1);
110  for (size_t i = rec_.size(); i < threadID+1u; ++i)
111  rec_.emplace_back();
112  }
113  return rec_[threadID];
114  }
115 
116  void
117  addEntry(uint8_t caseID, bool isLeave)
118  {
119  uint8_t threadID{getMySlot()};
120  Sequence& seq = getMySequence(threadID);
121  Inc& incidence = seq.emplace_back();
122  incidence.when = Clock::now();
123  incidence.thread = threadID;
124  incidence.caseID = caseID;
125  incidence.isLeave = isLeave;
126  }
127 
128 
129  public:
130  IncidenceCount() = default;
131 
132 
134  expectThreads(uint8_t cnt)
135  {
136  REQUIRE (cnt);
137  rec_.resize (cnt);
138  return *this;
139  }
140 
142  expectIncidents(size_t cnt)
143  {
144  REQUIRE (cnt);
145  for (Sequence& s : rec_)
146  s.reserve (2*cnt);
147  return *this;
148  }
149 
150 
151 
152  /* ===== Measurement API ===== */
153 
154  void markEnter(uint8_t caseID =0) { addEntry(caseID, false); }
155  void markLeave(uint8_t caseID =0) { addEntry(caseID, true); }
156 
157 
158  /* ===== Evaluations ===== */
159 
160  struct Statistic
161  {
162  size_t eventCnt{0};
163  size_t activationCnt{0};
164  double cumulatedTime{0};
165  double activeTime{0};
166  double coveredTime{0};
167  double avgConcurrency{0};
168 
169  vector<size_t> caseCntr{};
170  vector<size_t> thrdCntr{};
171  vector<double> caseTime{};
172  vector<double> thrdTime{};
173  vector<double> concTime{};
174 
175  template<typename VAL>
176  static VAL
177  access (vector<VAL> const& data, size_t idx)
178  {
179  return idx < data.size()? data[idx]
180  : VAL{};
181  }
182  size_t cntCase (size_t id) const { return access (caseCntr, id); }
183  size_t cntThread(size_t id) const { return access (thrdCntr, id); }
184  double timeCase (size_t id) const { return access (caseTime, id); }
185  double timeThread(size_t id) const { return access (thrdTime, id); }
186  double timeAtConc(size_t id) const { return access (concTime, id); }
187  };
188 
190 
191  double
192  calcCumulatedTime()
193  {
194  return evaluate().cumulatedTime;
195  }
196 
197  };
198 
199 
200 
208  {
209  Statistic stat;
210  size_t numThreads = rec_.size();
211  if (numThreads == 0) return stat;
212 
213  size_t numEvents = explore(rec_)
214  .transform([](Sequence& seq){ return seq.size(); })
215  .resultSum();
216  if (numEvents == 0) return stat;
217  Sequence timeline;
218  timeline.reserve(numEvents);
219  for (Sequence& seq : rec_)
220  for (Inc& event : seq)
221  timeline.emplace_back(event);
222  std::stable_sort (timeline.begin(), timeline.end()
223  ,[](Inc const& l, Inc const& r) { return l.when < r.when; }
224  );
225 
226  int active{0};
227  vector<int> active_case;
228  vector<int> active_thrd(numThreads);
229  stat.thrdCntr.resize (numThreads);
230  stat.thrdTime.resize (numThreads);
231  stat.concTime.resize (numThreads+1); // also accounting for idle times in range
232 
233  // Integrate over the timeline...
234  // - book the preceding interval length into each affected partial sum
235  // - adjust current active count in accordance to the current event
236  Instance prev = timeline.front().when;
237  for (Inc& event : timeline)
238  {
239  if (event.caseID >= stat.caseCntr.size())
240  {
241  active_case .resize (event.caseID+1);
242  stat.caseCntr.resize (event.caseID+1);
243  stat.caseTime.resize (event.caseID+1);
244  }
245  Dur timeSlice = event.when - prev;
246  stat.cumulatedTime += active * timeSlice.count();
247  for (uint i=0; i < stat.caseCntr.size(); ++i)
248  stat.caseTime[i] += active_case[i] * timeSlice.count();
249  for (uint i=0; i < numThreads; ++i)
250  if (active_thrd[i]) // note: counting activity per thread, without overlapping cases
251  stat.thrdTime[i] += timeSlice.count();
252  size_t concurr = explore(active_thrd).filter([](int a){ return 0 < a; }).count();
253  ENSURE (concurr <= numThreads);
254  stat.avgConcurrency += concurr * timeSlice.count(); // contribution for weighted average
255  stat.concTime[concurr] += timeSlice.count();
256  if (event.isLeave)
257  {
258  ASSERT (0 < active);
259  ASSERT (0 < active_case[event.caseID]);
260  ASSERT (0 < active_thrd[event.thread]);
261  --active;
262  --active_case[event.caseID];
263  --active_thrd[event.thread];
264  }
265  else
266  {
267  ++active;
268  ++active_case[event.caseID];
269  ++active_thrd[event.thread];
270  ++stat.caseCntr[event.caseID];
271  ++stat.thrdCntr[event.thread];
272  ++stat.activationCnt;
273  }
274  prev = event.when;
275  }
276  Dur covered = timeline.back().when - timeline.front().when;
277  stat.coveredTime = covered.count();
278  stat.eventCnt = timeline.size();
279  ENSURE (0 < stat.activationCnt);
280  ENSURE (stat.eventCnt % 2 == 0);
281  stat.avgConcurrency /= stat.coveredTime; // time used as weight sum
282  stat.activeTime = explore(stat.thrdTime).resultSum();
283  return stat;
284  }
285 
286 
287 } // namespace lib
288 #endif /*LIB_INCIDENCE_COUNT_H*/
auto explore(IT &&srcSeq)
start building a IterExplorer by suitably wrapping the given iterable source.
double cumulatedTime
aggregated time over all cases
Any copy and copy construction prohibited.
Definition: nocopy.hpp:37
Implementation namespace for support and library code.
double avgConcurrency
amortised concurrency in timespan
Mix-Ins to allow or prohibit various degrees of copying and cloning.
uint8_t allocateNextSlot()
threadsafe allocation of thread/slotID
double coveredTime
overall timespan of observation
vector< double > caseTime
aggregated time per case
vector< size_t > thrdCntr
counting activations per thread
A recorder for concurrent incidences.
double activeTime
compounded time of thread activity
vector< size_t > caseCntr
counting activations per case
vector< double > thrdTime
time of activity per thread
Statistic evaluate()
Visit all data captured thus far, construct an unified timeline and then compute statistics evaluatio...
Building tree expanding and backtracking evaluations within hierarchical scopes.