This commit is contained in:
Joshua Granick
2015-09-04 11:56:36 -07:00
parent 8152f1f2db
commit f7ae00f635
3 changed files with 38 additions and 73 deletions

View File

@@ -1,7 +1,8 @@
package lime.app;
import lime.system.ThreadPool;
import lime.system.ThreadPool;
@:allow(lime.app.Promise)
@@ -9,7 +10,6 @@ class Future<T> {
private static var __threadPool:ThreadPool;
private static var __workID:Int;
public var isCompleted (get, null):Bool;
public var value (default, null):T;
@@ -32,14 +32,13 @@ class Future<T> {
__threadPool.doWork.add (threadPool_doWork);
__threadPool.onComplete.add (threadPool_onComplete);
__threadPool.onError.add (threadPool_onError);
__workID = 1;
}
var promise = new Promise<T> ();
promise.future = this;
__threadPool.queue (Std.string (__workID++), { promise: promise, work: work } );
__threadPool.queue ({ promise: promise, work: work } );
}
@@ -160,32 +159,32 @@ class Future<T> {
private static function threadPool_doWork (id:String, data:Dynamic):Void {
private static function threadPool_doWork (state:Dynamic):Void {
try {
var result = data.work ();
__threadPool.sendComplete (id, { promise: data.promise, result: result } );
var result = state.work ();
__threadPool.sendComplete ({ promise: state.promise, result: result } );
} catch (e:Dynamic) {
__threadPool.sendError (id, { promise: data.promise, error: e } );
__threadPool.sendError ({ promise: state.promise, error: e } );
}
}
private static function threadPool_onComplete (id:String, data:Dynamic):Void {
private static function threadPool_onComplete (state:Dynamic):Void {
data.promise.complete (data.result);
state.promise.complete (state.result);
}
private static function threadPool_onError (id:String, data:Dynamic):Void {
private static function threadPool_onError (state:Dynamic):Void {
data.promise.error (data.error);
state.promise.error (state.error);
}

View File

@@ -17,12 +17,12 @@ class ThreadPool {
public var currentThreads (default, null):Int;
public var doWork = new Event<String->Dynamic->Void> ();
public var doWork = new Event<Dynamic->Void> ();
public var maxThreads:Int;
public var minThreads:Int;
public var onComplete = new Event<String->Dynamic->Void> ();
public var onError = new Event<String->Dynamic->Void> ();
public var onProgress = new Event<String->Dynamic->Void> ();
public var onComplete = new Event<Dynamic->Void> ();
public var onError = new Event<Dynamic->Void> ();
public var onProgress = new Event<Dynamic->Void> ();
#if (cpp || neko)
private var __workCompleted:Int;
@@ -61,11 +61,11 @@ class ThreadPool {
//}
public function queue (id:String, message:Dynamic = null):Void {
public function queue (state:Dynamic = null):Void {
#if (cpp || neko)
__workIncoming.add (new ThreadPoolMessage (WORK, id, message));
__workIncoming.add (new ThreadPoolMessage (WORK, state));
__workQueued++;
if (currentThreads < maxThreads && currentThreads < (__workQueued - __workCompleted)) {
@@ -83,41 +83,41 @@ class ThreadPool {
#else
doWork.dispatch (id, message);
doWork.dispatch (state);
#end
}
public function sendComplete (id:String, message:Dynamic = null):Void {
public function sendComplete (state:Dynamic = null):Void {
#if (cpp || neko)
__workResult.add (new ThreadPoolMessage (COMPLETE, id, message));
__workResult.add (new ThreadPoolMessage (COMPLETE, state));
#else
onComplete.dispatch (id, message);
onComplete.dispatch (state);
#end
}
public function sendError (id:String, message:Dynamic = null):Void {
public function sendError (state:Dynamic = null):Void {
#if (cpp || neko)
__workResult.add (new ThreadPoolMessage (ERROR, id, message));
__workResult.add (new ThreadPoolMessage (ERROR, state));
#else
onError.dispatch (id, message);
onError.dispatch (state);
#end
}
public function sendProgress (id:String, message:Dynamic = null):Void {
public function sendProgress (state:Dynamic = null):Void {
#if (cpp || neko)
__workResult.add (new ThreadPoolMessage (PROGRESS, id, message));
__workResult.add (new ThreadPoolMessage (PROGRESS, state));
#else
onProgress.dispatch (id, message);
onProgress.dispatch (state);
#end
}
@@ -133,7 +133,7 @@ class ThreadPool {
if (message.type == WORK) {
doWork.dispatch (message.id, message.message);
doWork.dispatch (message.state);
} else if (message.type == EXIT) {
@@ -158,7 +158,7 @@ class ThreadPool {
case PROGRESS:
onProgress.dispatch (message.id, message.message);
onProgress.dispatch (message.state);
case COMPLETE, ERROR:
@@ -167,17 +167,17 @@ class ThreadPool {
if (currentThreads > (__workQueued - __workCompleted) || currentThreads > maxThreads) {
currentThreads--;
__workIncoming.add (new ThreadPoolMessage (EXIT, null, null));
__workIncoming.add (new ThreadPoolMessage (EXIT, null));
}
if (message.type == COMPLETE) {
onComplete.dispatch (message.id, message.message);
onComplete.dispatch (message.state);
} else {
onError.dispatch (message.id, message.message);
onError.dispatch (message.state);
}
@@ -223,16 +223,14 @@ private enum ThreadPoolMessageType {
private class ThreadPoolMessage {
public var id:String;
public var message:Dynamic;
public var state:Dynamic;
public var type:ThreadPoolMessageType;
public function new (type:ThreadPoolMessageType, id:String, message:Dynamic) {
public function new (type:ThreadPoolMessageType, state:Dynamic) {
this.type = type;
this.id = id;
this.message = message;
this.state = state;
}

View File

@@ -10,7 +10,6 @@ import lime.audio.AudioSource;
import lime.audio.openal.AL;
import lime.audio.AudioBuffer;
import lime.graphics.Image;
import lime.system.ThreadPool;
import lime.text.Font;
import lime.utils.ByteArray;
import lime.utils.UInt8Array;
@@ -44,7 +43,6 @@ class DefaultAssetLibrary extends AssetLibrary {
public var type (default, null) = new Map <String, AssetType> ();
private var lastModified:Float;
private var threadPool:ThreadPool;
private var timer:Timer;
@@ -136,24 +134,6 @@ class DefaultAssetLibrary extends AssetLibrary {
}
private function createThreadPool ():Void {
threadPool = new ThreadPool (0, 2);
threadPool.doWork.add (function (id, data) {
data.result = data.getMethod (id);
threadPool.sendComplete (data.handler, data);
});
threadPool.onComplete.add (function (id, data) {
data.handler (data.result);
});
}
public override function exists (id:String, type:String):Bool {
var requestedType = type != null ? cast (type, AssetType) : null;
@@ -532,7 +512,7 @@ class DefaultAssetLibrary extends AssetLibrary {
#else
promise.complete (getAudioBuffer (id));
promise.completeWith (new Future<AudioBuffer> (function () return getAudioBuffer (id)));
#end
@@ -620,13 +600,7 @@ class DefaultAssetLibrary extends AssetLibrary {
#else
if (threadPool == null) {
createThreadPool ();
}
threadPool.queue (id, { promise: promise, getMethod: getBytes });
promise.completeWith (new Future<ByteArray> (function () return getBytes (id)));
#end
@@ -693,13 +667,7 @@ class DefaultAssetLibrary extends AssetLibrary {
#else
if (threadPool == null) {
createThreadPool ();
}
threadPool.queue (id, { promise: promise, getMethod: getImage });
promise.completeWith (new Future<Image> (function () return getImage (id)));
#end