Implement ThreadPool.onUncaughtError.

This restores the behavior of `onError` to what it was in 8.1.0.
This commit is contained in:
Joseph Cloutier
2025-07-13 20:10:33 -04:00
parent 7b5a5e1d0a
commit e8b031a8a8
2 changed files with 61 additions and 8 deletions

View File

@@ -21,6 +21,9 @@ import lime._internal.backend.html5.HTML5Thread.Transferable;
#error "lime_threads_deque is not yet supported in HTML5"
#end
#end
#if (haxe_ver >= 4.1)
import haxe.Exception;
#end
/**
A thread pool executes one or more functions asynchronously.
@@ -177,6 +180,16 @@ class ThreadPool extends WorkOutput
**/
public var onRun(default, null) = new Event<State->Void>();
#if (haxe_ver >= 4.1)
/**
Dispatched on the main thread when `doWork` throws an error. Dispatched
at most once per job.
If no listeners have been added, instead the error will be rethrown.
**/
public var onUncaughtError(default, null) = new Event<Exception->Void>();
#end
/**
How important this pool's single-threaded jobs are, relative to other
pools. Pools will be allocated a share of the time per frame (see
@@ -455,7 +468,7 @@ class ThreadPool extends WorkOutput
activeJob.duration = timestamp() - activeJob.startTime;
}
if (event.event == COMPLETE || event.event == ERROR)
if (event.event == COMPLETE || event.event == ERROR || event.event == UNCAUGHT_ERROR)
{
__multiThreadedJobs.removeJob(activeJob.id);
}
@@ -488,6 +501,34 @@ class ThreadPool extends WorkOutput
case ERROR:
onError.dispatch(event.message);
case UNCAUGHT_ERROR:
var message:String;
#if (haxe_ver >= 4.1)
if (Std.isOfType(event.message, Exception))
{
if (onUncaughtError.__listeners.length > 0)
{
onUncaughtError.dispatch(event.message);
message = null;
}
else
{
message = (event.message:Exception).details();
}
}
else
#end
{
message = Std.string(event.message);
}
if (message != null)
{
activeJob = null;
Log.error(message);
}
case WORK:
activeJob.startTime = timestamp();
onRun.dispatch(activeJob.state);
@@ -670,9 +711,9 @@ class ThreadPool extends WorkOutput
event.doWork.dispatch(event.state, output);
}
}
catch (e:#if (haxe_ver >= 4.1) haxe.Exception #else Dynamic #end)
catch (e:#if (haxe_ver >= 4.1) Exception #else Dynamic #end)
{
output.sendError(e);
output.sendUncaughtError(e);
}
output.activeJob = null;
@@ -755,9 +796,10 @@ class ThreadPool extends WorkOutput
}
while (!__jobComplete.value && timestamp() < endTime);
}
catch (e:#if (haxe_ver >= 4.1) haxe.Exception #else Dynamic #end)
catch (e:#if (haxe_ver >= 4.1) Exception #else Dynamic #end)
{
sendError(e);
__jobComplete.value = true;
__dispatchJobOutput({event: UNCAUGHT_ERROR, message: e, jobID: activeJob.id});
}
var jobEndTime:Float = timestamp();

View File

@@ -61,14 +61,14 @@ class WorkOutput
/**
Thread-local storage. Tracks whether `sendError()` or `sendComplete()`
was called by this job.
was called by this job, or if the job threw an error.
**/
private var __jobComplete:Tls<Bool> = new Tls();
/**
The job that is currently running on this thread, or the job that
triggered the ongoing `onComplete`, `onError`, or `onProgress` event.
Will be null in all other cases.
triggered the ongoing event (`onComplete`, `onProgress`, etc.). Will be
null in all other cases.
**/
public var activeJob(get, set):Null<JobData>;
@@ -133,6 +133,16 @@ class WorkOutput
}
}
private function sendUncaughtError(message:#if (haxe_ver >= 4.1) haxe.Exception #else Dynamic #end):Void
{
if (!__jobComplete.value)
{
__jobComplete.value = true;
sendThreadEvent({event: UNCAUGHT_ERROR, message: message, jobID: activeJob.id});
}
}
private inline function sendThreadEvent(event:ThreadEvent, transferList:Array<Transferable> = null):Void
{
#if (lime_threads && html5)
@@ -328,6 +338,7 @@ class JobData
var COMPLETE = "COMPLETE";
var ERROR = "ERROR";
var PROGRESS = "PROGRESS";
var UNCAUGHT_ERROR = "UNCAUGHT_ERROR";
// Commands sent from the main thread to a worker thread, and returned by
// the worker to confirm the change of state, in multi-threaded mode only