Avoid sending JobData back to the main thread.

The main thread can easily look these up by ID, and in HTML5, sending the full `JobData` can cause errors.
This commit is contained in:
Joseph Cloutier
2024-08-09 15:58:48 -04:00
parent dae33c0c1a
commit bf4711a01d
3 changed files with 35 additions and 61 deletions

View File

@@ -416,17 +416,11 @@ import lime.utils.Log;
var result = bundle.work.dispatch(bundle.state); var result = bundle.work.dispatch(bundle.state);
if (result != null || bundle.legacyCode) if (result != null || bundle.legacyCode)
{ {
#if (lime_threads && html5)
bundle.work.makePortable();
#end
output.sendComplete(result); output.sendComplete(result);
} }
} }
catch (e:Dynamic) catch (e:Dynamic)
{ {
#if (lime_threads && html5)
bundle.work.makePortable();
#end
output.sendError(e); output.sendError(e);
} }
} }

View File

@@ -245,12 +245,12 @@ class ThreadPool extends WorkOutput
var thread:Thread = __activeThreads[job.id]; var thread:Thread = __activeThreads[job.id];
if (idleThreads < minThreads) if (idleThreads < minThreads)
{ {
thread.sendMessage(new ThreadEvent(WORK, null, null)); thread.sendMessage({event: CANCEL});
__idleThreads.push(thread); __idleThreads.push(thread);
} }
else else
{ {
thread.sendMessage(new ThreadEvent(EXIT, null, null)); thread.sendMessage({event: EXIT});
} }
} }
#end #end
@@ -270,10 +270,10 @@ class ThreadPool extends WorkOutput
__activeJobs.clear(); __activeJobs.clear();
#if lime_threads #if lime_threads
// Cancel idle threads if there are more than the minimum. // Exit idle threads if there are more than the minimum.
while (idleThreads > minThreads) while (idleThreads > minThreads)
{ {
__idleThreads.pop().sendMessage(new ThreadEvent(EXIT, null, null)); __idleThreads.pop().sendMessage({event: EXIT});
} }
#end #end
@@ -310,7 +310,7 @@ class ThreadPool extends WorkOutput
var thread:Thread = __activeThreads[data.id]; var thread:Thread = __activeThreads[data.id];
if (thread != null) if (thread != null)
{ {
thread.sendMessage(new ThreadEvent(WORK, null, null)); thread.sendMessage({event: CANCEL});
__activeThreads.remove(data.id); __activeThreads.remove(data.id);
__idleThreads.push(thread); __idleThreads.push(thread);
} }
@@ -395,7 +395,7 @@ class ThreadPool extends WorkOutput
{ {
event = Thread.readMessage(true); event = Thread.readMessage(true);
} }
while (!#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (event, ThreadEvent)); while (event == null || !Reflect.hasField(event, "event"));
output.resetJobProgress(); output.resetJobProgress();
} }
@@ -409,7 +409,7 @@ class ThreadPool extends WorkOutput
return; return;
} }
if (event.event != WORK || event.job == null) if (event.event != WORK || !#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (event.job, JobData))
{ {
// Go idle. // Go idle.
event = null; event = null;
@@ -440,7 +440,7 @@ class ThreadPool extends WorkOutput
// Work is done; wait for more. // Work is done; wait for more.
event = interruption; event = interruption;
} }
else if(#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (interruption, ThreadEvent)) else if(Reflect.hasField(interruption, "event"))
{ {
// Work on the new job. // Work on the new job.
event = interruption; event = interruption;
@@ -494,7 +494,7 @@ class ThreadPool extends WorkOutput
var thread:Thread = __idleThreads.isEmpty() ? createThread(__executeThread) : __idleThreads.pop(); var thread:Thread = __idleThreads.isEmpty() ? createThread(__executeThread) : __idleThreads.pop();
__activeThreads[job.id] = thread; __activeThreads[job.id] = thread;
thread.sendMessage(new ThreadEvent(WORK, null, job)); thread.sendMessage({event: WORK, job: job});
} }
#end #end
} }
@@ -539,15 +539,19 @@ class ThreadPool extends WorkOutput
var threadEvent:ThreadEvent; var threadEvent:ThreadEvent;
while ((threadEvent = __jobOutput.pop(false)) != null) while ((threadEvent = __jobOutput.pop(false)) != null)
{ {
if (!__activeJobs.exists(threadEvent.job)) if (threadEvent.jobID != null)
{ {
// Ignore events from canceled jobs. activeJob = __activeJobs.getByID(threadEvent.jobID);
continue; }
else
{
activeJob = threadEvent.job;
} }
// Get by ID because in HTML5, the object will have been cloned, if (activeJob == null || !__activeJobs.exists(activeJob))
// which will interfere with attempts to test equality. {
activeJob = __activeJobs.getByID(threadEvent.job.id); continue;
}
if (mode == MULTI_THREADED) if (mode == MULTI_THREADED)
{ {
@@ -582,7 +586,7 @@ class ThreadPool extends WorkOutput
if (currentThreads > maxThreads || __jobQueue.length == 0 && currentThreads > minThreads) if (currentThreads > maxThreads || __jobQueue.length == 0 && currentThreads > minThreads)
{ {
thread.sendMessage(new ThreadEvent(EXIT, null, null)); thread.sendMessage({event: EXIT});
} }
else else
{ {

View File

@@ -105,12 +105,11 @@ class WorkOutput
#if (lime_threads && html5) #if (lime_threads && html5)
if (mode == MULTI_THREADED) if (mode == MULTI_THREADED)
{ {
activeJob.doWork.makePortable(); Thread.returnMessage({event: COMPLETE, message: message, jobID: activeJob.id}, transferList);
Thread.returnMessage(new ThreadEvent(COMPLETE, message, activeJob), transferList);
} }
else else
#end #end
__jobOutput.add(new ThreadEvent(COMPLETE, message, activeJob)); __jobOutput.add({event: COMPLETE, message: message, jobID: activeJob.id});
} }
} }
@@ -130,12 +129,11 @@ class WorkOutput
#if (lime_threads && html5) #if (lime_threads && html5)
if (mode == MULTI_THREADED) if (mode == MULTI_THREADED)
{ {
activeJob.doWork.makePortable(); Thread.returnMessage({event: ERROR, message: message, jobID: activeJob.id}, transferList);
Thread.returnMessage(new ThreadEvent(ERROR, message, activeJob), transferList);
} }
else else
#end #end
__jobOutput.add(new ThreadEvent(ERROR, message, activeJob)); __jobOutput.add({event: ERROR, message: message, jobID: activeJob.id});
} }
} }
@@ -153,12 +151,11 @@ class WorkOutput
#if (lime_threads && html5) #if (lime_threads && html5)
if (mode == MULTI_THREADED) if (mode == MULTI_THREADED)
{ {
activeJob.doWork.makePortable(); Thread.returnMessage({event: PROGRESS, message: message, jobID: activeJob.id}, transferList);
Thread.returnMessage(new ThreadEvent(PROGRESS, message, activeJob), transferList);
} }
else else
#end #end
__jobOutput.add(new ThreadEvent(PROGRESS, message, activeJob)); __jobOutput.add({event: PROGRESS, message: message, jobID: activeJob.id});
} }
} }
@@ -343,43 +340,22 @@ class JobData
#if haxe4 enum #else @:enum #end abstract ThreadEventType(String) #if haxe4 enum #else @:enum #end abstract ThreadEventType(String)
{ {
/** // Events sent from a worker thread to the main thread
Sent by the background thread, indicating completion.
**/
var COMPLETE = "COMPLETE"; var COMPLETE = "COMPLETE";
/**
Sent by the background thread, indicating failure.
**/
var ERROR = "ERROR"; var ERROR = "ERROR";
/**
Sent by the background thread.
**/
var PROGRESS = "PROGRESS"; var PROGRESS = "PROGRESS";
/**
Sent by the main thread, indicating that the provided job should begin // Commands sent from the main thread to a worker thread
in place of any ongoing job. If `state == null`, the existing job will
stop and the thread will go idle. (To run a job with no argument, set
`state = {}` instead.)
**/
var WORK = "WORK"; var WORK = "WORK";
/** var CANCEL = "CANCEL";
Sent by the main thread to shut down a thread.
**/
var EXIT = "EXIT"; var EXIT = "EXIT";
} }
class ThreadEvent typedef ThreadEvent = {
{ var event:ThreadEventType;
public var event(default, null):ThreadEventType; @:optional var message:Dynamic;
public var message(default, null):State; @:optional var job:JobData;
public var job(default, null):JobData; @:optional var jobID:Int;
public inline function new(event:ThreadEventType, message:State, job:JobData)
{
this.event = event;
this.message = message;
this.job = job;
}
} }
class JSAsync class JSAsync