Provide more ways to cancel jobs.

This commit is contained in:
Joseph Cloutier
2022-03-28 18:24:11 -04:00
parent d4588434b9
commit a2e67cdba8

View File

@@ -140,7 +140,7 @@ class ThreadPool extends WorkOutput
@:noCompletion @:dox(hide) public var doWork(get, never):{ add: (Dynamic->Void)->Void }; @:noCompletion @:dox(hide) public var doWork(get, never):{ add: (Dynamic->Void)->Void };
private var __doWork:WorkFunction<State->WorkOutput->Void>; private var __doWork:WorkFunction<State->WorkOutput->Void>;
private var __activeJobs:ActiveJobs = new ActiveJobs(); private var __activeJobs:JobList = new JobList();
#if lime_threads #if lime_threads
/** /**
@@ -155,7 +155,7 @@ class ThreadPool extends WorkOutput
private var __idleThreads:List<Thread> = new List(); private var __idleThreads:List<Thread> = new List();
#end #end
private var __jobQueue:List<JobData> = new List(); private var __jobQueue:JobList = new JobList();
private var __workPerFrame:Float; private var __workPerFrame:Float;
@@ -259,52 +259,46 @@ class ThreadPool extends WorkOutput
} }
/** /**
Cancels one active or queued job. Does not dispatch events. Cancels one active or queued job. Does not dispatch an error event.
@param job A `JobData` object, or a job's unique `id`, `state`, or
`doWork` function.
@return Whether a job was canceled.
**/ **/
// Be sure to keep this synchronized with `BackgroundWorker.cancelJob()`. public function cancelJob(job:JobIdentifier):Bool
public function cancelJob(state:State):Bool
{ {
for (job in __activeJobs) var data:JobData = __activeJobs.get(job);
{
if (job.state == state)
{
#if lime_threads
var thread:Thread = __activeThreads[job.id];
if (thread != null)
{
thread.sendMessage(new ThreadEvent(WORK, null, null));
__activeThreads.remove(job.id);
__idleThreads.push(thread);
}
#end
return __activeJobs.remove(job); if (data != null)
{
#if lime_threads
var thread:Thread = __activeThreads[data.id];
if (thread != null)
{
thread.sendMessage(new ThreadEvent(WORK, null, null));
__activeThreads.remove(data.id);
__idleThreads.push(thread);
} }
#end
return __activeJobs.remove(data);
} }
for (job in __jobQueue) return __jobQueue.remove(__jobQueue.get(job));
{
if (job.state == state)
{
return __jobQueue.remove(job);
}
}
return false;
} }
/** /**
Alias for `ThreadPool.run()`. Alias for `ThreadPool.run()`.
**/ **/
@:noCompletion public inline function queue(doWork:WorkFunction<State->WorkOutput->Void> = null, state:State = null):Void @:noCompletion public inline function queue(doWork:WorkFunction<State->WorkOutput->Void> = null, state:State = null):Int
{ {
run(doWork, state); return run(doWork, state);
} }
/** /**
Queues a new job, to be run once a thread becomes available. Queues a new job, to be run once a thread becomes available.
@return The job's unique ID.
**/ **/
public function run(doWork:WorkFunction<State->WorkOutput->Void> = null, state:State = null):Void public function run(doWork:WorkFunction<State->WorkOutput->Void> = null, state:State = null):Int
{ {
#if lime_threads #if lime_threads
if (Thread.current() != __mainThread) if (Thread.current() != __mainThread)
@@ -330,7 +324,8 @@ class ThreadPool extends WorkOutput
state = {}; state = {};
} }
__jobQueue.add(new JobData(doWork, state)); var job:JobData = new JobData(doWork, state);
__jobQueue.add(job);
completed = false; completed = false;
canceled = false; canceled = false;
@@ -338,6 +333,8 @@ class ThreadPool extends WorkOutput
{ {
Application.current.onUpdate.add(__update); Application.current.onUpdate.add(__update);
} }
return job.id;
} }
#if lime_threads #if lime_threads
@@ -629,7 +626,7 @@ class ThreadPool extends WorkOutput
} }
@:forward @:forward.new @:forward @:forward.new
abstract ActiveJobs(List<JobData>) abstract JobList(List<JobData>)
{ {
public inline function exists(job:JobData):Bool public inline function exists(job:JobData):Bool
{ {
@@ -638,18 +635,81 @@ abstract ActiveJobs(List<JobData>)
public inline function remove(job:JobData):Bool public inline function remove(job:JobData):Bool
{ {
return this.remove(getByID(job.id)); return this.remove(job) || removeByID(job.id);
}
public inline function removeByID(id:Int):Bool
{
return this.remove(getByID(id));
} }
public function getByID(id:Int):JobData public function getByID(id:Int):JobData
{ {
for (j in this) for (job in this)
{ {
if (j.id == id) if (job.id == id)
{ {
return j; return job;
} }
} }
return null; return null;
} }
public function get(jobIdentifier:JobIdentifier):JobData
{
switch (jobIdentifier)
{
case ID(id):
return getByID(id);
case FUNCTION(doWork):
for (job in this)
{
if (job.doWork == doWork)
{
return job;
}
}
case STATE(state):
for (job in this)
{
if (job.state == state)
{
return job;
}
}
}
return null;
}
}
/**
A piece of data that uniquely represents a job. This can be the integer ID
(and integers will be assumed to be such), the `doWork` function, or the
`JobData` object itself. Failing any of those, a value will be assumed to be
the job's `state`.
Caution: if the provided data isn't unique, such as a `doWork` function
that's in use by multiple jobs, the wrong job may be selected or canceled.
**/
@:forward
abstract JobIdentifier(JobIdentifierImpl) from JobIdentifierImpl {
@:from private static inline function fromJob(job:JobData):JobIdentifier {
return ID(job.id);
}
@:from private static inline function fromID(id:Int):JobIdentifier {
return ID(id);
}
@:from private static inline function fromFunction(doWork:WorkFunction<State->WorkOutput->Void>):JobIdentifier {
return FUNCTION(doWork);
}
@:from private static inline function fromState(state:State):JobIdentifier {
return STATE(state);
}
}
private enum JobIdentifierImpl
{
ID(id:Int);
FUNCTION(doWork:WorkFunction<State->WorkOutput->Void>);
STATE(state:State);
} }