diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index 1999920eb..7fd6561b9 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -62,6 +62,35 @@ class ThreadPool extends WorkOutput #end #end + /** + A rough estimate of how much of the app's time should be spent on + single-threaded `ThreadPool`s. For instance, the default value of 1/2 + means they will aim to take up about half the app's available time every + frame. See `workIterations` for instructions to improve the accuracy of + this estimate. + **/ + public static var workLoad:Float = 1/2; + + /** + __Access this only from the main thread.__ + + The sum of all active single-threaded pools' `workPriority` values. + **/ + @:allow(lime.system.JobList) + private static var __totalWorkPriority:Float = 0; + + /** + Returns whether the caller called this function from the main thread. + **/ + public static inline function isMainThread():Bool + { + #if (haxe4 && lime_threads) + return Thread.current() == __mainThread; + #else + return true; + #end + } + /** Indicates that no further events will be dispatched. **/ @@ -139,11 +168,21 @@ class ThreadPool extends WorkOutput **/ public var onRun(default, null) = new EventVoid>(); + /** + (Single-threaded mode only.) How important this pool's jobs are relative + to other single-threaded pools. + + For instance, if all pools use the default priority of 1, they will all + run for an approximately equal amount of time each frame. If one has a + value of 2, it will run approximately twice as long as the others. + **/ + public var workPriority(default, set):Float = 1; + @:deprecated("Instead pass the callback to ThreadPool.run().") @:noCompletion @:dox(hide) public var doWork(get, never):PseudoEvent; private var __doWork:WorkFunctionWorkOutput->Void>; - private var __activeJobs:JobList = new JobList(); + private var __activeJobs:JobList; #if lime_threads /** @@ -160,8 +199,6 @@ class ThreadPool extends WorkOutput private var __jobQueue:JobList = new JobList(); - private var __workPerFrame:Float; - /** __Call this only from the main thread.__ @@ -173,24 +210,12 @@ class ThreadPool extends WorkOutput @param mode Defaults to `MULTI_THREADED` on most targets, but `SINGLE_THREADED` in HTML5. In HTML5, `MULTI_THREADED` mode uses web workers, which impose additional restrictions. - @param workLoad (Single-threaded mode only) A rough estimate of how much - of the app's time should be spent on this `ThreadPool`. For instance, - the default value of 1/2 means this worker will take up about half the - app's available time every frame. See `workIterations` for instructions - to improve the accuracy of this estimate. **/ - public function new(minThreads:Int = 0, maxThreads:Int = 1, mode:ThreadMode = null, workLoad:Float = 1/2) + public function new(minThreads:Int = 0, maxThreads:Int = 1, mode:ThreadMode = null) { super(mode); - if (Application.current != null && Application.current.window != null) - { - __workPerFrame = workLoad / Application.current.window.frameRate; - } - else - { - __workPerFrame = workLoad / 60; - } + __activeJobs = new JobList(this); this.minThreads = minThreads; this.maxThreads = maxThreads; @@ -204,12 +229,10 @@ class ThreadPool extends WorkOutput **/ public function cancel(error:Dynamic = null):Void { - #if (haxe4 && lime_threads) - if (Thread.current() != __mainThread) + if (!isMainThread()) { throw "Call cancel() only from the main thread."; } - #end Application.current.onUpdate.remove(__update); @@ -313,12 +336,10 @@ class ThreadPool extends WorkOutput **/ public function run(doWork:WorkFunctionWorkOutput->Void> = null, state:State = null):Int { - #if (haxe4 && lime_threads) - if (Thread.current() != __mainThread) + if (!isMainThread()) { throw "Call run() only from the main thread."; } - #end if (doWork == null) { @@ -338,11 +359,11 @@ class ThreadPool extends WorkOutput } var job:JobData = new JobData(doWork, state); - __jobQueue.add(job); + __jobQueue.push(job); completed = false; canceled = false; - if (Application.current != null && !Application.current.onUpdate.has(__update)) + if (!Application.current.onUpdate.has(__update)) { Application.current.onUpdate.add(__update); } @@ -451,15 +472,13 @@ class ThreadPool extends WorkOutput **/ private function __update(deltaTime:Int):Void { - #if (haxe4 && lime_threads) - if (Thread.current() != __mainThread) + if (!isMainThread()) { return; } - #end // Process the queue. - while (!__jobQueue.isEmpty() && activeJobs < maxThreads) + while (__jobQueue.length > 0 && activeJobs < maxThreads) { var job:JobData = __jobQueue.pop(); @@ -480,15 +499,21 @@ class ThreadPool extends WorkOutput #end } - // Run the next single-threaded job. - if (mode == SINGLE_THREADED && activeJobs > 0) + // Run the next single-threaded job, if any. + if (mode == SINGLE_THREADED && __activeJobs.hasNext()) { - activeJob = __activeJobs.pop(); + activeJob = __activeJobs.next(); var state:State = activeJob.state; __jobComplete.value = false; workIterations.value = 0; + // `workLoad / frameRate` is the total time that pools may use per + // frame. `workPriority / __totalWorkPriority` is this pool's + // fraction of that total. + var maxTimeElapsed:Float = workPriority * workLoad + / (__totalWorkPriority * Application.current.window.frameRate); + var startTime:Float = timestamp(); var timeElapsed:Float = 0; try @@ -499,7 +524,7 @@ class ThreadPool extends WorkOutput activeJob.doWork.dispatch(state, this); timeElapsed = timestamp() - startTime; } - while (!__jobComplete.value && timeElapsed < __workPerFrame); + while (!__jobComplete.value && timeElapsed < maxTimeElapsed); } catch (e:#if (haxe_ver >= 4.1) haxe.Exception #else Dynamic #end) { @@ -508,9 +533,6 @@ class ThreadPool extends WorkOutput activeJob.duration += timeElapsed; - // Add this job to the end of the list, to cycle through. - __activeJobs.add(activeJob); - activeJob = null; } @@ -558,7 +580,7 @@ class ThreadPool extends WorkOutput var thread:Thread = __activeThreads[activeJob.id]; __activeThreads.remove(activeJob.id); - if (currentThreads > maxThreads || __jobQueue.isEmpty() && currentThreads > minThreads) + if (currentThreads > maxThreads || __jobQueue.length == 0 && currentThreads > minThreads) { thread.sendMessage(new ThreadEvent(EXIT, null, null)); } @@ -569,7 +591,7 @@ class ThreadPool extends WorkOutput } #end - completed = threadEvent.event == COMPLETE && activeJobs == 0 && __jobQueue.isEmpty(); + completed = threadEvent.event == COMPLETE && activeJobs == 0 && __jobQueue.length == 0; default: } @@ -577,7 +599,7 @@ class ThreadPool extends WorkOutput activeJob = null; } - if (completed && Application.current != null) + if (completed) { Application.current.onUpdate.remove(__update); } @@ -618,6 +640,15 @@ class ThreadPool extends WorkOutput { return this; } + + private function set_workPriority(value:Float):Float + { + if (mode == SINGLE_THREADED && activeJobs > 0) + { + __totalWorkPriority += value - workPriority; + } + return workPriority = value; + } } @:access(lime.system.ThreadPool) @:forward(canceled) @@ -650,12 +681,38 @@ private abstract PseudoEvent(ThreadPool) from ThreadPool { public inline function removeAll():Void { this.__doWork = null; } } -@:forward -abstract JobList(List) +class JobList { - public inline function new() + /** + * Whether `pool.workPriority` is being added to + * `ThreadPool.__totalWorkPriority`. Set this to true when `length > 0` and + * false when `length == 0`. The setter will ensure it is only added once. + */ + @:allow(lime.system.ThreadPool) + private var __addingWorkPriority(default, set):Bool; + + private var __index:Int = 0; + + private var __jobs:Array = []; + + public var length(get, never):Int; + + public var pool(default, null):ThreadPool; + + public inline function new(?pool:ThreadPool) { - this = new List(); + this.pool = pool; + @:bypassAccessor __addingWorkPriority = false; + } + + public inline function clear():Void + { + #if haxe4 + __jobs.resize(0); + #else + __jobs = []; + #end + __addingWorkPriority = false; } public inline function exists(job:JobData):Bool @@ -663,19 +720,65 @@ abstract JobList(List) return getByID(job.id) != null; } - public inline function remove(job:JobData):Bool + public inline function hasNext():Bool { - return this.remove(job) || removeByID(job.id); + return __jobs.length > 0; + } + + /** + Iterates in an endless loop, starting over upon reaching the end. + **/ + public inline function next():JobData + { + __index++; + if (__index >= length) + { + __index = 0; + } + + return __jobs[__index]; + } + + public inline function pop():JobData + { + var job:JobData = __jobs.pop(); + __addingWorkPriority = length > 0; + return job; + } + + public function remove(job:JobData):Bool + { + if (__jobs.remove(job)) + { + __addingWorkPriority = length > 0; + return true; + } + else if (removeByID(job.id)) + { + return true; + } + else + { + return false; + } } public inline function removeByID(id:Int):Bool { - return this.remove(getByID(id)); + if (__jobs.remove(getByID(id))) + { + __addingWorkPriority = length > 0; + return true; + } + else + { + return false; + } } public function getByID(id:Int):JobData { - for (job in this) + for (job in __jobs) { if (job.id == id) { @@ -692,7 +795,7 @@ abstract JobList(List) case ID(id): return getByID(id); case FUNCTION(doWork): - for (job in this) + for (job in __jobs) { if (job.doWork == doWork) { @@ -700,7 +803,7 @@ abstract JobList(List) } } case STATE(state): - for (job in this) + for (job in __jobs) { if (job.state == state) { @@ -710,6 +813,38 @@ abstract JobList(List) } return null; } + + public inline function push(job:JobData):Void + { + __jobs.push(job); + __addingWorkPriority = true; + } + + // Getters & Setters + + private inline function set___addingWorkPriority(value:Bool):Bool { + if (pool != null && __addingWorkPriority != value && ThreadPool.isMainThread()) + { + if (value) + { + ThreadPool.__totalWorkPriority += pool.workPriority; + } + else + { + ThreadPool.__totalWorkPriority -= pool.workPriority; + } + return __addingWorkPriority = value; + } + else + { + return __addingWorkPriority; + } + } + + private inline function get_length():Int + { + return __jobs.length; + } } /**