1 /++
2   A module containing the parallel test runner
3 
4   Copyright: © 2017 Szabo Bogdan
5   License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6   Authors: Szabo Bogdan
7 +/
8 module trial.executor.parallel;
9 
10 public import trial.interfaces;
11 
12 import std.datetime;
13 import std.exception;
14 import std.algorithm;
15 import std.array;
16 import core.thread;
17 
18 version(unittest) {
19   version(Have_fluent_asserts) {
20     import fluent.asserts;
21   }
22 }
23 /// The Lifecycle listener used to send data from the tests threads to
24 /// the main thread
25 class ThreadLifeCycleListener : LifeCycleListeners {
26   static string currentTest;
27 
28   override {
29     void begin(string suite, string test, ref StepResult step) {
30       ThreadProxy.instance.beginStep(currentTest, step.name, step.begin);
31     }
32 
33     void end(string suite, string test, ref StepResult step) {
34       ThreadProxy.instance.endStep(currentTest, step.name, step.end);
35     }
36 
37     void end(string, ref TestResult test) {
38       assert(false, "You can not call `end` outside of the main thread");
39     }
40 
41     void begin(string, ref TestResult test) {
42       assert(false, "You can not call `begin` outside of the main thread");
43     }
44 
45     void add(T)(T listener) {
46       assert(false, "You can not call `add` outside of the main thread");
47     }
48 
49     void begin(ulong) {
50       assert(false, "You can not call `begin` outside of the main thread");
51     }
52 
53     void end(SuiteResult[] result) {
54       assert(false, "You can not call `end` outside of the main thread");
55     }
56 
57     void begin(ref SuiteResult suite) {
58       assert(false, "You can not call `begin` outside of the main thread");
59     }
60 
61     void end(ref SuiteResult suite) {
62       assert(false, "You can not call `end` outside of the main thread");
63     }
64 
65     SuiteResult[] execute(ref const(TestCase)) {
66       assert(false, "You can not call `execute` outside of the main thread");
67     }
68 
69     SuiteResult[] beginExecution(ref const(TestCase)[]) {
70       assert(false, "You can not call `beginExecution` outside of the main thread");
71     }
72 
73     SuiteResult[] endExecution() {
74       assert(false, "You can not call `endExecution` outside of the main thread");
75     }
76   }
77 }
78 
79 static ~this() {
80   if(ThreadLifeCycleListener.currentTest != "") {
81     ThreadProxy.instance.end(ThreadLifeCycleListener.currentTest);
82   }
83 }
84 
85 private {
86   import core.atomic;
87 
88   struct StepAction {
89     enum Type {
90       begin,
91       end
92     }
93 
94     string test;
95     string name;
96     SysTime time;
97     Type type;
98   }
99 
100   synchronized class ThreadProxy {
101     private shared static ThreadProxy _instance = new shared ThreadProxy;
102 
103     shared {
104       private {
105         string[] beginTests;
106         string[] endTests;
107         StepAction[] steps;
108         Throwable[string] failures;
109         ulong testCount;
110       }
111 
112       static {
113         shared(ThreadProxy) instance() {
114           return _instance;
115         }
116       }
117 
118       void reset() {
119         beginTests = [];
120         endTests = [];
121         steps = [];
122 
123         failures = typeof(failures).init;
124 
125         testCount = 0;
126       }
127 
128       void begin(string name) {
129         beginTests ~= name;
130       }
131 
132       void end(string name) {
133         core.atomic.atomicOp!"+="(this.testCount, 1);
134         endTests ~= name;
135       }
136 
137       auto getTestCount() {
138         return testCount;
139       }
140 
141       void beginStep(shared(string) testName, string stepName, SysTime begin) {
142         steps ~= StepAction(testName, stepName, begin, StepAction.Type.begin);
143       }
144 
145       void endStep(shared(string) testName, string stepName, SysTime end) {
146         steps ~= StepAction(testName, stepName, end, StepAction.Type.end);
147       }
148 
149       void setFailure(string key, shared(Throwable) t) {
150         failures[key] = t;
151       }
152 
153       auto getStatus() {
154         struct Status {
155           string[] begin;
156           StepAction[] steps;
157           string[] end;
158           Throwable[string] failures;
159           ulong testCount;
160         }
161 
162         auto status = shared Status(beginTests.dup, steps.dup, endTests.dup, failures, testCount);
163 
164         beginTests = [];
165         steps = [];
166         endTests = [];
167 
168         return status;
169       }
170     }
171   }
172 }
173 
174 private void testThreadSetup(string testName) {
175   ThreadLifeCycleListener.currentTest = testName;
176   LifeCycleListeners.instance = new ThreadLifeCycleListener;
177   ThreadProxy.instance.begin(testName);
178 }
179 
180 /// The parallel executors runs tests in a sepparate thread
181 class ParallelExecutor : ITestExecutor {
182   struct SuiteStats {
183     SuiteResult result;
184 
185     ulong testsFinished;
186     bool isDone;
187   }
188 
189   this(uint maxTestCount = 0) {
190     this.maxTestCount = maxTestCount;
191 
192     if(this.maxTestCount <= 0) {
193       import core.cpuid : threadsPerCPU;
194       this.maxTestCount = threadsPerCPU;
195     }
196   }
197 
198   private {
199     ulong testCount;
200     uint maxTestCount;
201     string currentSuite = "";
202 
203     SuiteStats[string] suiteStats;
204     TestCase[string] testCases;
205 
206     StepResult[][string] stepStack;
207 
208     void addSuiteResult(string name) {
209       suiteStats[name].result.begin = Clock.currTime;
210       suiteStats[name].result.end = Clock.currTime;
211 
212       LifeCycleListeners.instance.begin(suiteStats[name].result);
213     }
214 
215     void endSuiteResult(string name) {
216       suiteStats[name].result.end = Clock.currTime;
217       suiteStats[name].isDone = true;
218 
219       LifeCycleListeners.instance.end(suiteStats[name].result);
220     }
221 
222     void addTestResult(string key) {
223       auto testCase = testCases[key];
224 
225       if(currentSuite != testCase.suiteName) {
226         addSuiteResult(testCase.suiteName);
227         currentSuite = testCase.suiteName;
228       }
229 
230       auto testResult = suiteStats[testCase.suiteName]
231         .result
232         .tests
233         .filter!(a => a.name == testCase.name)
234           .front;
235 
236       testResult.begin = Clock.currTime;
237       testResult.end = Clock.currTime;
238       testResult.status = TestResult.Status.started;
239 
240       LifeCycleListeners.instance.begin(testCase.suiteName, testResult);
241       stepStack[key] = [ testResult ];
242     }
243 
244     void endTestResult(string key, Throwable t) {
245       auto testCase = testCases[key];
246 
247       auto testResult = suiteStats[testCase.suiteName]
248         .result
249         .tests
250         .filter!(a => a.name == testCase.name)
251           .front;
252 
253       testResult.end = Clock.currTime;
254       testResult.status = t is null ? TestResult.Status.success : TestResult.Status.failure;
255       testResult.throwable = t;
256 
257       suiteStats[testCases[key].suiteName].testsFinished++;
258 
259       LifeCycleListeners.instance.end(testCases[key].suiteName, testResult);
260       stepStack.remove(key);
261     }
262 
263     void addStep(string key, string name, SysTime time) {
264       auto step = new StepResult;
265       step.name = name;
266       step.begin = time;
267       step.end = time;
268 
269       stepStack[key][stepStack[key].length - 1].steps ~= step;
270       stepStack[key] ~= step;
271 
272       LifeCycleListeners.instance.begin(testCases[key].suiteName, testCases[key].name, step);
273     }
274 
275     void endStep(string key, string name, SysTime time) {
276       auto step = stepStack[key][stepStack[key].length - 1];
277 
278       enforce(step.name == name, "unexpected step name");
279       step.end = time;
280       stepStack[key] ~= stepStack[key][0..$-1];
281 
282       LifeCycleListeners.instance.end(testCases[key].suiteName, testCases[key].name, step);
283     }
284 
285     auto processEvents() {
286       LifeCycleListeners.instance.update;
287 
288       auto status = ThreadProxy.instance.getStatus;
289 
290       foreach(beginKey; status.begin) {
291         addTestResult(beginKey);
292       }
293 
294       foreach(step; status.steps) {
295         if(step.type == StepAction.Type.begin) {
296           addStep(step.test, step.name, step.time);
297         }
298 
299         if(step.type == StepAction.Type.end) {
300           endStep(step.test, step.name, step.time);
301         }
302       }
303 
304       foreach(endKey; status.end) {
305         Throwable failure = null;
306 
307         if(endKey in status.failures) {
308           failure = cast() status.failures[endKey];
309         }
310 
311         endTestResult(endKey, failure);
312       }
313 
314       foreach(ref index, ref stat; suiteStats.values) {
315         if(!stat.isDone && stat.result.tests.length == stat.testsFinished) {
316           endSuiteResult(stat.result.name);
317         }
318       }
319 
320       return status.testCount;
321     }
322 
323     void wait() {
324       ulong executedTestCount;
325 
326       do {
327         LifeCycleListeners.instance.update();
328         executedTestCount = processEvents;
329         Thread.sleep(1.msecs);
330       } while(executedTestCount < testCount);
331     }
332   }
333 
334   SuiteResult[] execute(ref const(TestCase) testCase) {
335     import std.parallelism;
336 
337     SuiteResult[] result;
338 
339     auto key = testCase.suiteName ~ "|" ~ testCase.name;
340     testCases[key] = TestCase(testCase);
341 
342     testCount++;
343 
344     task({
345       testThreadSetup(key);
346 
347       try {
348         testCase.func();
349       } catch(Throwable t) {
350         ThreadProxy.instance.setFailure(key, cast(shared)t);
351       }
352     }).executeInNewThread();
353 
354     auto runningTests = testCount - ThreadProxy.instance.getTestCount;
355 
356     while(maxTestCount <= runningTests && runningTests > 0) {
357       processEvents;
358       runningTests = testCount - ThreadProxy.instance.getTestCount;
359     }
360 
361     return result;
362   }
363 
364   SuiteResult[] beginExecution(ref const(TestCase)[] tests) {
365     foreach(test; tests) {
366       auto const suite = test.suiteName;
367       if(suite !in suiteStats) {
368         suiteStats[suite] = SuiteStats(SuiteResult(suite));
369       }
370 
371       suiteStats[suite].result.tests ~= new TestResult(test.name);
372     }
373 
374     ThreadProxy.instance.reset();
375     return [];
376   }
377 
378   SuiteResult[] endExecution() {
379     wait;
380 
381     foreach(stat; suiteStats.values) {
382       if(!stat.isDone) {
383         endSuiteResult(stat.result.name);
384       }
385     }
386 
387     SuiteResult[] results;
388 
389     foreach(stat; suiteStats) {
390       results ~= stat.result;
391     }
392 
393     return results;
394   }
395 }