1 /++ 2 A module containing the parallel test runner 3 4 Copyright: © 2017 Szabo Bogdan 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Szabo Bogdan 7 +/ 8 module trial.executor.parallel; 9 10 public import trial.interfaces; 11 12 import std.datetime; 13 import std.exception; 14 import std.algorithm; 15 import std.array; 16 import core.thread; 17 18 version(unittest) { 19 version(Have_fluent_asserts) { 20 import fluent.asserts; 21 } 22 } 23 /// The Lifecycle listener used to send data from the tests threads to 24 /// the main thread 25 class ThreadLifeCycleListener : LifeCycleListeners { 26 static string currentTest; 27 28 override { 29 void begin(string suite, string test, ref StepResult step) { 30 ThreadProxy.instance.beginStep(currentTest, step.name, step.begin); 31 } 32 33 void end(string suite, string test, ref StepResult step) { 34 ThreadProxy.instance.endStep(currentTest, step.name, step.end); 35 } 36 37 void end(string, ref TestResult test) { 38 assert(false, "You can not call `end` outside of the main thread"); 39 } 40 41 void begin(string, ref TestResult test) { 42 assert(false, "You can not call `begin` outside of the main thread"); 43 } 44 45 void add(T)(T listener) { 46 assert(false, "You can not call `add` outside of the main thread"); 47 } 48 49 void begin(ulong) { 50 assert(false, "You can not call `begin` outside of the main thread"); 51 } 52 53 void end(SuiteResult[] result) { 54 assert(false, "You can not call `end` outside of the main thread"); 55 } 56 57 void begin(ref SuiteResult suite) { 58 assert(false, "You can not call `begin` outside of the main thread"); 59 } 60 61 void end(ref SuiteResult suite) { 62 assert(false, "You can not call `end` outside of the main thread"); 63 } 64 65 SuiteResult[] execute(ref const(TestCase)) { 66 assert(false, "You can not call `execute` outside of the main thread"); 67 } 68 69 SuiteResult[] beginExecution(ref const(TestCase)[]) { 70 assert(false, "You can not call `beginExecution` outside of the main thread"); 71 } 72 73 SuiteResult[] endExecution() { 74 assert(false, "You can not call `endExecution` outside of the main thread"); 75 } 76 } 77 } 78 79 static ~this() { 80 if(ThreadLifeCycleListener.currentTest != "") { 81 ThreadProxy.instance.end(ThreadLifeCycleListener.currentTest); 82 } 83 } 84 85 private { 86 import core.atomic; 87 88 struct StepAction { 89 enum Type { 90 begin, 91 end 92 } 93 94 string test; 95 string name; 96 SysTime time; 97 Type type; 98 } 99 100 synchronized class ThreadProxy { 101 private shared static ThreadProxy _instance = new shared ThreadProxy; 102 103 shared { 104 private { 105 string[] beginTests; 106 string[] endTests; 107 StepAction[] steps; 108 Throwable[string] failures; 109 ulong testCount; 110 } 111 112 static { 113 shared(ThreadProxy) instance() { 114 return _instance; 115 } 116 } 117 118 void reset() { 119 beginTests = []; 120 endTests = []; 121 steps = []; 122 123 failures = typeof(failures).init; 124 125 testCount = 0; 126 } 127 128 void begin(string name) { 129 beginTests ~= name; 130 } 131 132 void end(string name) { 133 core.atomic.atomicOp!"+="(this.testCount, 1); 134 endTests ~= name; 135 } 136 137 auto getTestCount() { 138 return testCount; 139 } 140 141 void beginStep(shared(string) testName, string stepName, SysTime begin) { 142 steps ~= StepAction(testName, stepName, begin, StepAction.Type.begin); 143 } 144 145 void endStep(shared(string) testName, string stepName, SysTime end) { 146 steps ~= StepAction(testName, stepName, end, StepAction.Type.end); 147 } 148 149 void setFailure(string key, shared(Throwable) t) { 150 failures[key] = t; 151 } 152 153 auto getStatus() { 154 struct Status { 155 string[] begin; 156 StepAction[] steps; 157 string[] end; 158 Throwable[string] failures; 159 ulong testCount; 160 } 161 162 auto status = shared Status(beginTests.dup, steps.dup, endTests.dup, failures, testCount); 163 164 beginTests = []; 165 steps = []; 166 endTests = []; 167 168 return status; 169 } 170 } 171 } 172 } 173 174 private void testThreadSetup(string testName) { 175 ThreadLifeCycleListener.currentTest = testName; 176 LifeCycleListeners.instance = new ThreadLifeCycleListener; 177 ThreadProxy.instance.begin(testName); 178 } 179 180 /// The parallel executors runs tests in a sepparate thread 181 class ParallelExecutor : ITestExecutor { 182 struct SuiteStats { 183 SuiteResult result; 184 185 ulong testsFinished; 186 bool isDone; 187 } 188 189 this(uint maxTestCount = 0) { 190 this.maxTestCount = maxTestCount; 191 192 if(this.maxTestCount <= 0) { 193 import core.cpuid : threadsPerCPU; 194 this.maxTestCount = threadsPerCPU; 195 } 196 } 197 198 private { 199 ulong testCount; 200 uint maxTestCount; 201 string currentSuite = ""; 202 203 SuiteStats[string] suiteStats; 204 TestCase[string] testCases; 205 206 StepResult[][string] stepStack; 207 208 void addSuiteResult(string name) { 209 suiteStats[name].result.begin = Clock.currTime; 210 suiteStats[name].result.end = Clock.currTime; 211 212 LifeCycleListeners.instance.begin(suiteStats[name].result); 213 } 214 215 void endSuiteResult(string name) { 216 suiteStats[name].result.end = Clock.currTime; 217 suiteStats[name].isDone = true; 218 219 LifeCycleListeners.instance.end(suiteStats[name].result); 220 } 221 222 void addTestResult(string key) { 223 auto testCase = testCases[key]; 224 225 if(currentSuite != testCase.suiteName) { 226 addSuiteResult(testCase.suiteName); 227 currentSuite = testCase.suiteName; 228 } 229 230 auto testResult = suiteStats[testCase.suiteName] 231 .result 232 .tests 233 .filter!(a => a.name == testCase.name) 234 .front; 235 236 testResult.begin = Clock.currTime; 237 testResult.end = Clock.currTime; 238 testResult.status = TestResult.Status.started; 239 240 LifeCycleListeners.instance.begin(testCase.suiteName, testResult); 241 stepStack[key] = [ testResult ]; 242 } 243 244 void endTestResult(string key, Throwable t) { 245 auto testCase = testCases[key]; 246 247 auto testResult = suiteStats[testCase.suiteName] 248 .result 249 .tests 250 .filter!(a => a.name == testCase.name) 251 .front; 252 253 testResult.end = Clock.currTime; 254 testResult.status = t is null ? TestResult.Status.success : TestResult.Status.failure; 255 testResult.throwable = t; 256 257 suiteStats[testCases[key].suiteName].testsFinished++; 258 259 LifeCycleListeners.instance.end(testCases[key].suiteName, testResult); 260 stepStack.remove(key); 261 } 262 263 void addStep(string key, string name, SysTime time) { 264 auto step = new StepResult; 265 step.name = name; 266 step.begin = time; 267 step.end = time; 268 269 stepStack[key][stepStack[key].length - 1].steps ~= step; 270 stepStack[key] ~= step; 271 272 LifeCycleListeners.instance.begin(testCases[key].suiteName, testCases[key].name, step); 273 } 274 275 void endStep(string key, string name, SysTime time) { 276 auto step = stepStack[key][stepStack[key].length - 1]; 277 278 enforce(step.name == name, "unexpected step name"); 279 step.end = time; 280 stepStack[key] ~= stepStack[key][0..$-1]; 281 282 LifeCycleListeners.instance.end(testCases[key].suiteName, testCases[key].name, step); 283 } 284 285 auto processEvents() { 286 LifeCycleListeners.instance.update; 287 288 auto status = ThreadProxy.instance.getStatus; 289 290 foreach(beginKey; status.begin) { 291 addTestResult(beginKey); 292 } 293 294 foreach(step; status.steps) { 295 if(step.type == StepAction.Type.begin) { 296 addStep(step.test, step.name, step.time); 297 } 298 299 if(step.type == StepAction.Type.end) { 300 endStep(step.test, step.name, step.time); 301 } 302 } 303 304 foreach(endKey; status.end) { 305 Throwable failure = null; 306 307 if(endKey in status.failures) { 308 failure = cast() status.failures[endKey]; 309 } 310 311 endTestResult(endKey, failure); 312 } 313 314 foreach(ref index, ref stat; suiteStats.values) { 315 if(!stat.isDone && stat.result.tests.length == stat.testsFinished) { 316 endSuiteResult(stat.result.name); 317 } 318 } 319 320 return status.testCount; 321 } 322 323 void wait() { 324 ulong executedTestCount; 325 326 do { 327 LifeCycleListeners.instance.update(); 328 executedTestCount = processEvents; 329 Thread.sleep(1.msecs); 330 } while(executedTestCount < testCount); 331 } 332 } 333 334 SuiteResult[] execute(ref const(TestCase) testCase) { 335 import std.parallelism; 336 337 SuiteResult[] result; 338 339 auto key = testCase.suiteName ~ "|" ~ testCase.name; 340 testCases[key] = TestCase(testCase); 341 342 testCount++; 343 344 task({ 345 testThreadSetup(key); 346 347 try { 348 testCase.func(); 349 } catch(Throwable t) { 350 ThreadProxy.instance.setFailure(key, cast(shared)t); 351 } 352 }).executeInNewThread(); 353 354 auto runningTests = testCount - ThreadProxy.instance.getTestCount; 355 356 while(maxTestCount <= runningTests && runningTests > 0) { 357 processEvents; 358 runningTests = testCount - ThreadProxy.instance.getTestCount; 359 } 360 361 return result; 362 } 363 364 SuiteResult[] beginExecution(ref const(TestCase)[] tests) { 365 foreach(test; tests) { 366 auto const suite = test.suiteName; 367 if(suite !in suiteStats) { 368 suiteStats[suite] = SuiteStats(SuiteResult(suite)); 369 } 370 371 suiteStats[suite].result.tests ~= new TestResult(test.name); 372 } 373 374 ThreadProxy.instance.reset(); 375 return []; 376 } 377 378 SuiteResult[] endExecution() { 379 wait; 380 381 foreach(stat; suiteStats.values) { 382 if(!stat.isDone) { 383 endSuiteResult(stat.result.name); 384 } 385 } 386 387 SuiteResult[] results; 388 389 foreach(stat; suiteStats) { 390 results ~= stat.result; 391 } 392 393 return results; 394 } 395 }