Improvements to Future/Promise

This commit is contained in:
Joshua Granick
2016-10-06 12:12:04 -07:00
parent afcce2f99e
commit 3dad6d81d0
2 changed files with 123 additions and 52 deletions

View File

@@ -1,24 +1,23 @@
package lime.app; package lime.app;
import lime.system.System;
import lime.system.ThreadPool; import lime.system.ThreadPool;
import lime.utils.Log;
@:allow(lime.app.Promise) @:allow(lime.app.Promise)
class Future<T> { /*@:generic*/ class Future<T> {
private static var __threadPool:ThreadPool; public var error:Dynamic;
public var isComplete (default, null):Bool;
public var isCompleted (get, null):Bool; public var isError (default, null):Bool;
public var value (default, null):T; public var value (default, null):T;
private var __completed:Bool;
private var __completeListeners:Array<T->Void>; private var __completeListeners:Array<T->Void>;
private var __errored:Bool;
private var __errorListeners:Array<Dynamic->Void>; private var __errorListeners:Array<Dynamic->Void>;
private var __errorMessage:Dynamic;
private var __progressListeners:Array<Float->Void>; private var __progressListeners:Array<Float->Void>;
@@ -26,19 +25,10 @@ class Future<T> {
if (work != null) { if (work != null) {
if (__threadPool == null) {
__threadPool = new ThreadPool ();
__threadPool.doWork.add (threadPool_doWork);
__threadPool.onComplete.add (threadPool_onComplete);
__threadPool.onError.add (threadPool_onError);
}
var promise = new Promise<T> (); var promise = new Promise<T> ();
promise.future = this; promise.future = this;
__threadPool.queue ({ promise: promise, work: work } ); FutureWork.queue ({ promise: promise, work: work });
} }
@@ -49,11 +39,11 @@ class Future<T> {
if (listener != null) { if (listener != null) {
if (__completed) { if (isComplete) {
listener (value); listener (value);
} else if (!__errored) { } else if (!isError) {
if (__completeListeners == null) { if (__completeListeners == null) {
@@ -76,11 +66,11 @@ class Future<T> {
if (listener != null) { if (listener != null) {
if (__errored) { if (isError) {
listener (__errorMessage); listener (error);
} else if (!__completed) { } else if (!isComplete) {
if (__errorListeners == null) { if (__errorListeners == null) {
@@ -118,16 +108,78 @@ class Future<T> {
} }
public function ready (waitTime:Int = -1):Future<T> {
#if js
if (isComplete || isError) {
return this;
} else {
Log.warn ("Cannot block thread in JavaScript");
return this;
}
#else
if (isComplete || isError) {
return this;
} else {
var time = System.getTimer ();
var end = (waitTime > -1) ? time + waitTime : time;
while (!isComplete && !isError && time <= end) {
#if sys
Sys.sleep (0.01);
#end
time = System.getTimer ();
}
return this;
}
#end
}
public function result (waitTime:Int = -1):Null<T> {
ready (waitTime);
if (isComplete) {
return value;
} else {
return null;
}
}
public function then<U> (next:T->Future<U>):Future<U> { public function then<U> (next:T->Future<U>):Future<U> {
if (__completed) { if (isComplete) {
return next (value); return next (value);
} else if (__errored) { } else if (isError) {
var future = new Future<U> (); var future = new Future<U> ();
future.onError (__errorMessage); future.onError (error);
return future; return future;
} else { } else {
@@ -152,6 +204,31 @@ class Future<T> {
} }
}
@:dox(hide) private class FutureWork {
private static var threadPool:ThreadPool;
public static function queue (state:Dynamic = null):Void {
if (threadPool == null) {
threadPool = new ThreadPool ();
threadPool.doWork.add (threadPool_doWork);
threadPool.onComplete.add (threadPool_onComplete);
threadPool.onError.add (threadPool_onError);
}
threadPool.queue (state);
}
// Event Handlers // Event Handlers
@@ -164,11 +241,11 @@ class Future<T> {
try { try {
var result = state.work (); var result = state.work ();
__threadPool.sendComplete ({ promise: state.promise, result: result } ); threadPool.sendComplete ({ promise: state.promise, result: result } );
} catch (e:Dynamic) { } catch (e:Dynamic) {
__threadPool.sendError ({ promise: state.promise, error: e } ); threadPool.sendError ({ promise: state.promise, error: e } );
} }
@@ -189,18 +266,4 @@ class Future<T> {
} }
// Get & Set Methods
private function get_isCompleted ():Bool {
return (__completed || __errored);
}
} }

View File

@@ -4,25 +4,26 @@ package lime.app;
@:allow(lime.app.Future) @:allow(lime.app.Future)
class Promise<T> { @:generic class Promise<T> {
public var future (default, null):Future<T>; public var future (default, null):Future<T>;
public var isCompleted (get, null):Bool; public var isComplete (get, null):Bool;
public var isError (get, null):Bool;
public function new () { public function new () {
future = new Future (); future = new Future<T> ();
} }
public function complete (data:T):Promise<T> { public function complete (data:T):Promise<T> {
if (!future.__errored) { if (!future.isError) {
future.__completed = true; future.isComplete = true;
future.value = data; future.value = data;
if (future.__completeListeners != null) { if (future.__completeListeners != null) {
@@ -58,10 +59,10 @@ class Promise<T> {
public function error (msg:Dynamic):Promise<T> { public function error (msg:Dynamic):Promise<T> {
if (!future.__completed) { if (!future.isComplete) {
future.__errored = true; future.isError = true;
future.__errorMessage = msg; future.error = msg;
if (future.__errorListeners != null) { if (future.__errorListeners != null) {
@@ -84,7 +85,7 @@ class Promise<T> {
public function progress (progress:Float):Promise<T> { public function progress (progress:Float):Promise<T> {
if (!future.__errored && !future.__completed) { if (!future.isError && !future.isComplete) {
if (future.__progressListeners != null) { if (future.__progressListeners != null) {
@@ -110,9 +111,16 @@ class Promise<T> {
private function get_isCompleted ():Bool { private function get_isComplete ():Bool {
return future.isCompleted; return future.isComplete;
}
private function get_isError ():Bool {
return future.isError;
} }