Merge pull request #1830 from player-03/old_thread_classes

Restore old `Future` and `BackgroundWorker` behavior.
This commit is contained in:
player-03
2024-08-16 17:14:01 -04:00
committed by GitHub
5 changed files with 236 additions and 171 deletions

View File

@@ -67,24 +67,36 @@ import lime.utils.Log;
@:noCompletion private var __progressListeners:Array<Int->Int->Void>;
/**
@param work Deprecated; use `Future.withEventualValue()` instead.
@param useThreads Deprecated; use `Future.withEventualValue()` instead.
@param work Optional: a function to compute this future's value.
@param useThreads Whether to run `work` on a background thread, where supported.
If false or if this isn't a system target, it will run immediately on the main thread.
**/
public function new(work:WorkFunction<Void->T> = null, useThreads:Bool = false)
public function new(work:Void->T = null, useThreads:Bool = false)
{
if (work != null)
{
var promise = new Promise<T>();
promise.future = this;
#if (lime_threads && html5)
#if (lime_threads && !html5)
if (useThreads)
{
work.makePortable();
}
#end
var promise = new Promise<T>();
promise.future = this;
FutureWork.run(dispatchWorkFunction, work, promise, useThreads ? MULTI_THREADED : SINGLE_THREADED, true);
FutureWork.run(work, promise);
}
else
#end
{
try
{
value = work();
isComplete = true;
}
catch (e:Dynamic)
{
error = e;
isError = true;
}
}
}
}
@@ -189,6 +201,7 @@ import lime.utils.Log;
**/
public function ready(waitTime:Int = -1):Future<T>
{
#if (lime_threads && !html5)
if (isComplete || isError)
{
return this;
@@ -196,34 +209,22 @@ import lime.utils.Log;
else
{
var time = System.getTimer();
var prevTime = time;
var end = time + waitTime;
while (!isComplete && !isError && time <= end)
while (!isComplete && !isError && time <= end && FutureWork.activeJobs > 0)
{
if (FutureWork.activeJobs < 1)
{
Log.error('Cannot block for a Future without a "work" function.');
return this;
}
#if sys
Sys.sleep(0.01);
#end
if (FutureWork.singleThreadPool != null && FutureWork.singleThreadPool.activeJobs > 0)
{
@:privateAccess FutureWork.singleThreadPool.__update(time - prevTime);
}
else
{
#if sys
Sys.sleep(0.01);
#end
}
prevTime = time;
time = System.getTimer();
}
return this;
}
#else
return this;
#end
}
/**
@@ -305,41 +306,9 @@ import lime.utils.Log;
future.value = value;
return future;
}
/**
Creates a `Future` instance which will asynchronously compute a value.
Once `work()` returns a non-null value, the `Future` will finish with that value.
If `work()` throws an error, the `Future` will finish with that error instead.
@param work A function that computes a value of type `T`.
@param state An argument to pass to `work()`. As this may be used on another thread, the
main thread must not access or modify `state` until the `Future` finishes.
@param mode Whether to use real threads as opposed to green threads. Green threads rely
on cooperative multitasking, meaning `work()` must return periodically to allow other code
enough time to run. In these cases, `work()` should return null to signal that it isn't finished.
@return A new `Future` instance.
@see https://en.wikipedia.org/wiki/Cooperative_multitasking
**/
public static function withEventualValue<T>(work:WorkFunction<State -> Null<T>>, state:State, mode:ThreadMode = #if html5 SINGLE_THREADED #else MULTI_THREADED #end):Future<T>
{
var future = new Future<T>();
var promise = new Promise<T>();
promise.future = future;
FutureWork.run(work, state, promise, mode);
return future;
}
/**
(For backwards compatibility.) Dispatches the given zero-argument function.
**/
@:noCompletion private static function dispatchWorkFunction<T>(work:WorkFunction<Void -> T>):Null<T>
{
return work.dispatch();
}
}
#if (lime_threads && !html5)
/**
The class that handles asynchronous `work` functions passed to `new Future()`.
**/
@@ -349,75 +318,34 @@ import lime.utils.Log;
#end
@:dox(hide) class FutureWork
{
@:allow(lime.app.Future)
private static var singleThreadPool:ThreadPool;
#if lime_threads
private static var multiThreadPool:ThreadPool;
// It isn't safe to pass a promise object to a web worker, but since it's
// `@:generic` we can't store it as `Promise<Dynamic>`. Instead, we'll store
// the two methods we need.
private static var promises:Map<Int, {complete:Dynamic -> Dynamic, error:Dynamic -> Dynamic}> = new Map();
#end
private static var threadPool:ThreadPool;
private static var promises:Map<Int, {complete:Dynamic -> Dynamic, error:Dynamic -> Dynamic}>;
public static var minThreads(default, set):Int = 0;
public static var maxThreads(default, set):Int = 1;
public static var activeJobs(get, never):Int;
private static function getPool(mode:ThreadMode):ThreadPool
{
#if lime_threads
if (mode == MULTI_THREADED) {
if(multiThreadPool == null) {
multiThreadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED);
multiThreadPool.onComplete.add(multiThreadPool_onComplete);
multiThreadPool.onError.add(multiThreadPool_onError);
}
return multiThreadPool;
}
#end
if(singleThreadPool == null) {
singleThreadPool = new ThreadPool(minThreads, maxThreads, SINGLE_THREADED);
singleThreadPool.onComplete.add(singleThreadPool_onComplete);
singleThreadPool.onError.add(singleThreadPool_onError);
}
return singleThreadPool;
}
@:allow(lime.app.Future)
private static function run<T>(work:WorkFunction<State->Null<T>>, state:State, promise:Promise<T>, mode:ThreadMode = MULTI_THREADED, legacyCode:Bool = false):Void
private static function run<T>(work:Void->T, promise:Promise<T>):Void
{
var bundle = {work: work, state: state, promise: promise, legacyCode: legacyCode};
if(threadPool == null) {
threadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED);
threadPool.onComplete.add(threadPool_onComplete);
threadPool.onError.add(threadPool_onError);
#if lime_threads
if (mode == MULTI_THREADED)
{
#if html5
work.makePortable();
#end
bundle.promise = null;
promises = new Map();
}
#end
var jobID:Int = getPool(mode).run(threadPool_doWork, bundle);
#if lime_threads
if (mode == MULTI_THREADED)
{
promises[jobID] = {complete: promise.complete, error: promise.error};
}
#end
var jobID:Int = threadPool.run(threadPool_doWork, work);
promises[jobID] = {complete: promise.complete, error: promise.error};
}
// Event Handlers
private static function threadPool_doWork(bundle:{work:WorkFunction<State->Dynamic>, state:State, legacyCode:Bool}, output:WorkOutput):Void
private static function threadPool_doWork(work:Void->Dynamic, output:WorkOutput):Void
{
try
{
var result = bundle.work.dispatch(bundle.state);
if (result != null || bundle.legacyCode)
{
output.sendComplete(result);
}
output.sendComplete(work());
}
catch (e:Dynamic)
{
@@ -425,76 +353,42 @@ import lime.utils.Log;
}
}
private static function singleThreadPool_onComplete(result:Dynamic):Void
private static function threadPool_onComplete(result:Dynamic):Void
{
singleThreadPool.activeJob.state.promise.complete(result);
}
private static function singleThreadPool_onError(error:Dynamic):Void
{
singleThreadPool.activeJob.state.promise.error(error);
}
#if lime_threads
private static function multiThreadPool_onComplete(result:Dynamic):Void
{
var promise = promises[multiThreadPool.activeJob.id];
promises.remove(multiThreadPool.activeJob.id);
var promise = promises[threadPool.activeJob.id];
promises.remove(threadPool.activeJob.id);
promise.complete(result);
}
private static function multiThreadPool_onError(error:Dynamic):Void
private static function threadPool_onError(error:Dynamic):Void
{
var promise = promises[multiThreadPool.activeJob.id];
promises.remove(multiThreadPool.activeJob.id);
var promise = promises[threadPool.activeJob.id];
promises.remove(threadPool.activeJob.id);
promise.error(error);
}
#end
// Getters & Setters
@:noCompletion private static inline function set_minThreads(value:Int):Int
{
if (singleThreadPool != null)
if (threadPool != null)
{
singleThreadPool.minThreads = value;
threadPool.minThreads = value;
}
#if lime_threads
if (multiThreadPool != null)
{
multiThreadPool.minThreads = value;
}
#end
return minThreads = value;
}
@:noCompletion private static inline function set_maxThreads(value:Int):Int
{
if (singleThreadPool != null)
if (threadPool != null)
{
singleThreadPool.maxThreads = value;
threadPool.maxThreads = value;
}
#if lime_threads
if (multiThreadPool != null)
{
multiThreadPool.maxThreads = value;
}
#end
return maxThreads = value;
}
@:noCompletion private static function get_activeJobs():Int
@:noCompletion private static inline function get_activeJobs():Int
{
var sum:Int = 0;
if (singleThreadPool != null)
{
sum += singleThreadPool.activeJobs;
}
#if lime_threads
if (multiThreadPool != null)
{
sum += multiThreadPool.activeJobs;
}
#end
return sum;
return threadPool != null ? threadPool.activeJobs : 0;
}
}
#end

View File

@@ -1002,7 +1002,7 @@ class Image
return promise.future;
#else
return Future.withEventualValue(fromBytes, bytes, MULTI_THREADED);
return new Future(fromBytes.bind(bytes), true);
#end
}

View File

@@ -335,7 +335,7 @@ class AudioBuffer
return promise.future;
#else
return Future.withEventualValue(fromFiles, paths, MULTI_THREADED);
return new Future(fromFiles.bind(paths), true);
#end
}

View File

@@ -1,4 +1,177 @@
package lime.system;
@:deprecated("Replace references to lime.system.BackgroundWorker with lime.system.ThreadPool. As the API is identical, no other changes are necessary.")
typedef BackgroundWorker = ThreadPool;
import lime.app.Application;
import lime.app.Event;
#if sys
#if haxe4
import sys.thread.Deque;
import sys.thread.Thread;
#elseif cpp
import cpp.vm.Deque;
import cpp.vm.Thread;
#elseif neko
import neko.vm.Deque;
import neko.vm.Thread;
#end
#end
#if !lime_debug
@:fileXml('tags="haxe,release"')
@:noDebug
#end
/**
A background worker executes a single function on a background thread,
allowing it to avoid blocking the main thread. However, only system targets
have thread support, meaning the function will block on any other target.
@see `ThreadPool` for improved thread safety, HTML5 threads, and more.
**/
class BackgroundWorker
{
private static var MESSAGE_COMPLETE = "__COMPLETE__";
private static var MESSAGE_ERROR = "__ERROR__";
public var canceled(default, null):Bool;
public var completed(default, null):Bool;
public var doWork = new Event<Dynamic->Void>();
public var onComplete = new Event<Dynamic->Void>();
public var onError = new Event<Dynamic->Void>();
public var onProgress = new Event<Dynamic->Void>();
@:noCompletion private var __runMessage:Dynamic;
#if (cpp || neko)
@:noCompletion private var __messageQueue:Deque<Dynamic>;
@:noCompletion private var __workerThread:Thread;
#end
public function new() {}
public function cancel():Void
{
canceled = true;
#if (cpp || neko)
__workerThread = null;
#end
}
public function run(message:Dynamic = null):Void
{
canceled = false;
completed = false;
__runMessage = message;
#if (cpp || neko)
__messageQueue = new Deque<Dynamic>();
__workerThread = Thread.create(__doWork);
// TODO: Better way to do this
if (Application.current != null)
{
Application.current.onUpdate.add(__update);
}
#else
__doWork();
#end
}
public function sendComplete(message:Dynamic = null):Void
{
completed = true;
#if (cpp || neko)
__messageQueue.add(MESSAGE_COMPLETE);
__messageQueue.add(message);
#else
if (!canceled)
{
canceled = true;
onComplete.dispatch(message);
}
#end
}
public function sendError(message:Dynamic = null):Void
{
#if (cpp || neko)
__messageQueue.add(MESSAGE_ERROR);
__messageQueue.add(message);
#else
if (!canceled)
{
canceled = true;
onError.dispatch(message);
}
#end
}
public function sendProgress(message:Dynamic = null):Void
{
#if (cpp || neko)
__messageQueue.add(message);
#else
if (!canceled)
{
onProgress.dispatch(message);
}
#end
}
@:noCompletion private function __doWork():Void
{
doWork.dispatch(__runMessage);
// #if (cpp || neko)
//
// __messageQueue.add (MESSAGE_COMPLETE);
//
// #else
//
// if (!canceled) {
//
// canceled = true;
// onComplete.dispatch (null);
//
// }
//
// #end
}
@:noCompletion private function __update(deltaTime:Int):Void
{
#if (cpp || neko)
var message = __messageQueue.pop(false);
if (message != null)
{
if (message == MESSAGE_ERROR)
{
Application.current.onUpdate.remove(__update);
if (!canceled)
{
canceled = true;
onError.dispatch(__messageQueue.pop(false));
}
}
else if (message == MESSAGE_COMPLETE)
{
Application.current.onUpdate.remove(__update);
if (!canceled)
{
canceled = true;
onComplete.dispatch(__messageQueue.pop(false));
}
}
else
{
if (!canceled)
{
onProgress.dispatch(message);
}
}
}
#end
}
}

View File

@@ -37,9 +37,7 @@ import lime._internal.backend.html5.HTML5Thread as Thread;
`WorkOutput` object it receives. Calling `output.sendComplete()` will
trigger an `onComplete` event on the main thread.
@see `lime.system.WorkOutput.WorkFunction` for important information about
`doWork`.
@see https://player03.com/openfl/threads-guide/ for a tutorial.
@see `lime.system.WorkOutput.WorkFunction` for important information about `doWork`.
**/
#if !lime_debug
@:fileXml('tags="haxe,release"')