Limit total time spent on green threads. (#1774)

Every single-threaded `ThreadPool` takes up a certain fraction of the app's time per frame. Without any coordination, they could take up more than 100% of the allotted time, causing the app to slow down. By using static variables, we can make them work together to limit the total time spent per frame.
This commit is contained in:
player-03
2024-05-29 21:15:20 -04:00
committed by GitHub
parent ae941e7442
commit 57da678f67

View File

@@ -62,6 +62,35 @@ class ThreadPool extends WorkOutput
#end
#end
/**
A rough estimate of how much of the app's time should be spent on
single-threaded `ThreadPool`s. For instance, the default value of 1/2
means they will aim to take up about half the app's available time every
frame. See `workIterations` for instructions to improve the accuracy of
this estimate.
**/
public static var workLoad:Float = 1/2;
/**
__Access this only from the main thread.__
The sum of all active single-threaded pools' `workPriority` values.
**/
@:allow(lime.system.JobList)
private static var __totalWorkPriority:Float = 0;
/**
Returns whether the caller called this function from the main thread.
**/
public static inline function isMainThread():Bool
{
#if (haxe4 && lime_threads)
return Thread.current() == __mainThread;
#else
return true;
#end
}
/**
Indicates that no further events will be dispatched.
**/
@@ -139,11 +168,21 @@ class ThreadPool extends WorkOutput
**/
public var onRun(default, null) = new Event<State->Void>();
/**
(Single-threaded mode only.) How important this pool's jobs are relative
to other single-threaded pools.
For instance, if all pools use the default priority of 1, they will all
run for an approximately equal amount of time each frame. If one has a
value of 2, it will run approximately twice as long as the others.
**/
public var workPriority(default, set):Float = 1;
@:deprecated("Instead pass the callback to ThreadPool.run().")
@:noCompletion @:dox(hide) public var doWork(get, never):PseudoEvent;
private var __doWork:WorkFunction<State->WorkOutput->Void>;
private var __activeJobs:JobList = new JobList();
private var __activeJobs:JobList;
#if lime_threads
/**
@@ -160,8 +199,6 @@ class ThreadPool extends WorkOutput
private var __jobQueue:JobList = new JobList();
private var __workPerFrame:Float;
/**
__Call this only from the main thread.__
@@ -173,24 +210,12 @@ class ThreadPool extends WorkOutput
@param mode Defaults to `MULTI_THREADED` on most targets, but
`SINGLE_THREADED` in HTML5. In HTML5, `MULTI_THREADED` mode uses web
workers, which impose additional restrictions.
@param workLoad (Single-threaded mode only) A rough estimate of how much
of the app's time should be spent on this `ThreadPool`. For instance,
the default value of 1/2 means this worker will take up about half the
app's available time every frame. See `workIterations` for instructions
to improve the accuracy of this estimate.
**/
public function new(minThreads:Int = 0, maxThreads:Int = 1, mode:ThreadMode = null, workLoad:Float = 1/2)
public function new(minThreads:Int = 0, maxThreads:Int = 1, mode:ThreadMode = null)
{
super(mode);
if (Application.current != null && Application.current.window != null)
{
__workPerFrame = workLoad / Application.current.window.frameRate;
}
else
{
__workPerFrame = workLoad / 60;
}
__activeJobs = new JobList(this);
this.minThreads = minThreads;
this.maxThreads = maxThreads;
@@ -204,12 +229,10 @@ class ThreadPool extends WorkOutput
**/
public function cancel(error:Dynamic = null):Void
{
#if (haxe4 && lime_threads)
if (Thread.current() != __mainThread)
if (!isMainThread())
{
throw "Call cancel() only from the main thread.";
}
#end
Application.current.onUpdate.remove(__update);
@@ -313,12 +336,10 @@ class ThreadPool extends WorkOutput
**/
public function run(doWork:WorkFunction<State->WorkOutput->Void> = null, state:State = null):Int
{
#if (haxe4 && lime_threads)
if (Thread.current() != __mainThread)
if (!isMainThread())
{
throw "Call run() only from the main thread.";
}
#end
if (doWork == null)
{
@@ -338,11 +359,11 @@ class ThreadPool extends WorkOutput
}
var job:JobData = new JobData(doWork, state);
__jobQueue.add(job);
__jobQueue.push(job);
completed = false;
canceled = false;
if (Application.current != null && !Application.current.onUpdate.has(__update))
if (!Application.current.onUpdate.has(__update))
{
Application.current.onUpdate.add(__update);
}
@@ -451,15 +472,13 @@ class ThreadPool extends WorkOutput
**/
private function __update(deltaTime:Int):Void
{
#if (haxe4 && lime_threads)
if (Thread.current() != __mainThread)
if (!isMainThread())
{
return;
}
#end
// Process the queue.
while (!__jobQueue.isEmpty() && activeJobs < maxThreads)
while (__jobQueue.length > 0 && activeJobs < maxThreads)
{
var job:JobData = __jobQueue.pop();
@@ -480,15 +499,21 @@ class ThreadPool extends WorkOutput
#end
}
// Run the next single-threaded job.
if (mode == SINGLE_THREADED && activeJobs > 0)
// Run the next single-threaded job, if any.
if (mode == SINGLE_THREADED && __activeJobs.hasNext())
{
activeJob = __activeJobs.pop();
activeJob = __activeJobs.next();
var state:State = activeJob.state;
__jobComplete.value = false;
workIterations.value = 0;
// `workLoad / frameRate` is the total time that pools may use per
// frame. `workPriority / __totalWorkPriority` is this pool's
// fraction of that total.
var maxTimeElapsed:Float = workPriority * workLoad
/ (__totalWorkPriority * Application.current.window.frameRate);
var startTime:Float = timestamp();
var timeElapsed:Float = 0;
try
@@ -499,7 +524,7 @@ class ThreadPool extends WorkOutput
activeJob.doWork.dispatch(state, this);
timeElapsed = timestamp() - startTime;
}
while (!__jobComplete.value && timeElapsed < __workPerFrame);
while (!__jobComplete.value && timeElapsed < maxTimeElapsed);
}
catch (e:#if (haxe_ver >= 4.1) haxe.Exception #else Dynamic #end)
{
@@ -508,9 +533,6 @@ class ThreadPool extends WorkOutput
activeJob.duration += timeElapsed;
// Add this job to the end of the list, to cycle through.
__activeJobs.add(activeJob);
activeJob = null;
}
@@ -558,7 +580,7 @@ class ThreadPool extends WorkOutput
var thread:Thread = __activeThreads[activeJob.id];
__activeThreads.remove(activeJob.id);
if (currentThreads > maxThreads || __jobQueue.isEmpty() && currentThreads > minThreads)
if (currentThreads > maxThreads || __jobQueue.length == 0 && currentThreads > minThreads)
{
thread.sendMessage(new ThreadEvent(EXIT, null, null));
}
@@ -569,7 +591,7 @@ class ThreadPool extends WorkOutput
}
#end
completed = threadEvent.event == COMPLETE && activeJobs == 0 && __jobQueue.isEmpty();
completed = threadEvent.event == COMPLETE && activeJobs == 0 && __jobQueue.length == 0;
default:
}
@@ -577,7 +599,7 @@ class ThreadPool extends WorkOutput
activeJob = null;
}
if (completed && Application.current != null)
if (completed)
{
Application.current.onUpdate.remove(__update);
}
@@ -618,6 +640,15 @@ class ThreadPool extends WorkOutput
{
return this;
}
private function set_workPriority(value:Float):Float
{
if (mode == SINGLE_THREADED && activeJobs > 0)
{
__totalWorkPriority += value - workPriority;
}
return workPriority = value;
}
}
@:access(lime.system.ThreadPool) @:forward(canceled)
@@ -650,12 +681,38 @@ private abstract PseudoEvent(ThreadPool) from ThreadPool {
public inline function removeAll():Void { this.__doWork = null; }
}
@:forward
abstract JobList(List<JobData>)
class JobList
{
public inline function new()
/**
* Whether `pool.workPriority` is being added to
* `ThreadPool.__totalWorkPriority`. Set this to true when `length > 0` and
* false when `length == 0`. The setter will ensure it is only added once.
*/
@:allow(lime.system.ThreadPool)
private var __addingWorkPriority(default, set):Bool;
private var __index:Int = 0;
private var __jobs:Array<JobData> = [];
public var length(get, never):Int;
public var pool(default, null):ThreadPool;
public inline function new(?pool:ThreadPool)
{
this = new List<JobData>();
this.pool = pool;
@:bypassAccessor __addingWorkPriority = false;
}
public inline function clear():Void
{
#if haxe4
__jobs.resize(0);
#else
__jobs = [];
#end
__addingWorkPriority = false;
}
public inline function exists(job:JobData):Bool
@@ -663,19 +720,65 @@ abstract JobList(List<JobData>)
return getByID(job.id) != null;
}
public inline function remove(job:JobData):Bool
public inline function hasNext():Bool
{
return this.remove(job) || removeByID(job.id);
return __jobs.length > 0;
}
/**
Iterates in an endless loop, starting over upon reaching the end.
**/
public inline function next():JobData
{
__index++;
if (__index >= length)
{
__index = 0;
}
return __jobs[__index];
}
public inline function pop():JobData
{
var job:JobData = __jobs.pop();
__addingWorkPriority = length > 0;
return job;
}
public function remove(job:JobData):Bool
{
if (__jobs.remove(job))
{
__addingWorkPriority = length > 0;
return true;
}
else if (removeByID(job.id))
{
return true;
}
else
{
return false;
}
}
public inline function removeByID(id:Int):Bool
{
return this.remove(getByID(id));
if (__jobs.remove(getByID(id)))
{
__addingWorkPriority = length > 0;
return true;
}
else
{
return false;
}
}
public function getByID(id:Int):JobData
{
for (job in this)
for (job in __jobs)
{
if (job.id == id)
{
@@ -692,7 +795,7 @@ abstract JobList(List<JobData>)
case ID(id):
return getByID(id);
case FUNCTION(doWork):
for (job in this)
for (job in __jobs)
{
if (job.doWork == doWork)
{
@@ -700,7 +803,7 @@ abstract JobList(List<JobData>)
}
}
case STATE(state):
for (job in this)
for (job in __jobs)
{
if (job.state == state)
{
@@ -710,6 +813,38 @@ abstract JobList(List<JobData>)
}
return null;
}
public inline function push(job:JobData):Void
{
__jobs.push(job);
__addingWorkPriority = true;
}
// Getters & Setters
private inline function set___addingWorkPriority(value:Bool):Bool {
if (pool != null && __addingWorkPriority != value && ThreadPool.isMainThread())
{
if (value)
{
ThreadPool.__totalWorkPriority += pool.workPriority;
}
else
{
ThreadPool.__totalWorkPriority -= pool.workPriority;
}
return __addingWorkPriority = value;
}
else
{
return __addingWorkPriority;
}
}
private inline function get_length():Int
{
return __jobs.length;
}
}
/**