diff --git a/src/lime/_internal/backend/native/NativeHTTPRequest.hx b/src/lime/_internal/backend/native/NativeHTTPRequest.hx index 705456bd7..239fa213f 100644 --- a/src/lime/_internal/backend/native/NativeHTTPRequest.hx +++ b/src/lime/_internal/backend/native/NativeHTTPRequest.hx @@ -8,6 +8,8 @@ import lime.app.Promise; import lime.net.curl.CURL; import lime.net.curl.CURLCode; import lime.net.curl.CURLMulti; +import lime.net.curl.CURLMultiCode; +import lime.net.curl.CURLMultiMessage; import lime.net.HTTPRequest; import lime.net.HTTPRequestHeader; import lime.net.HTTPRequestMethod; @@ -17,6 +19,12 @@ import lime.system.ThreadPool; import sys.FileSystem; #end +#if cpp +import cpp.vm.Deque; +#elseif neko +import neko.vm.Deque; +#end + #if !lime_debug @:fileXml('tags="haxe,release"') @:noDebug @@ -26,10 +34,17 @@ import sys.FileSystem; class NativeHTTPRequest { + private static var activeInstances:Array; + private static var localThreadPool:ThreadPool; private static var multi:CURLMulti; private static var multiInstances:Map; - private static var multiTimer:Timer; - private static var threadPool:ThreadPool; + private static var multiProgressTimer:Timer; + private static var multiThreadPool:ThreadPool; + private static var multiThreadPoolRunning:Bool; + + #if (cpp || neko) + private static var multiAddHandle:Deque; + #end private var bytes:Bytes; private var bytesLoaded:Int; @@ -38,6 +53,8 @@ class NativeHTTPRequest { private var curl:CURL; private var parent:_IHTTPRequest; private var promise:Promise; + private var writeBytesLoaded:Int; + private var writeBytesTotal:Int; private var writePosition:Int; private var timeout:Timer; @@ -88,6 +105,8 @@ class NativeHTTPRequest { bytesLoaded = 0; bytesTotal = 0; + writeBytesLoaded = 0; + writeBytesTotal = 0; writePosition = 0; if (curl == null) { @@ -291,17 +310,17 @@ class NativeHTTPRequest { if (uri.indexOf ("http://") == -1 && uri.indexOf ("https://") == -1) { - if (threadPool == null) { + if (localThreadPool == null) { - threadPool = new ThreadPool (0, 1); - threadPool.doWork.add (threadPool_doWork); - threadPool.onProgress.add (threadPool_onProgress); - threadPool.onComplete.add (threadPool_onComplete); - threadPool.onError.add (threadPool_onError); + localThreadPool = new ThreadPool (0, 1); + localThreadPool.doWork.add (localThreadPool_doWork); + localThreadPool.onProgress.add (localThreadPool_onProgress); + localThreadPool.onComplete.add (localThreadPool_onComplete); + localThreadPool.onError.add (localThreadPool_onError); } - threadPool.queue ({ instance: this, uri: uri }); + localThreadPool.queue ({ instance: this, uri: uri }); } else { @@ -310,6 +329,7 @@ class NativeHTTPRequest { CURL.globalInit (CURL.GLOBAL_ALL); multi = new CURLMulti (); + activeInstances = new Array (); multiInstances = new Map (); } @@ -318,16 +338,35 @@ class NativeHTTPRequest { if (curl != null) { + activeInstances.push (this); multiInstances.set (curl, this); - multi.addHandle (curl); - if (multiTimer == null) { + #if (cpp || neko) + if (multiAddHandle == null) multiAddHandle = new Deque (); + multiAddHandle.push (curl); + #end - // TODO: Reduce sleep when network is busy? + if (multiThreadPool == null) { - multiTimer = new Timer (8); - multiTimer.run = multiTimer_onRun; - multiTimer_onRun (); + multiThreadPool = new ThreadPool (0, 1); + multiThreadPool.doWork.add (multiThreadPool_doWork); + multiThreadPool.onProgress.add (multiThreadPool_onProgress); + multiThreadPool.onComplete.add (multiThreadPool_onComplete); + + } + + if (!multiThreadPoolRunning) { + + multiThreadPoolRunning = true; + multiThreadPool.queue (); + + } + + if (multiProgressTimer == null) { + + multiProgressTimer = new Timer (8); + multiProgressTimer.run = multiProgressTimer_onRun; + multiProgressTimer_onRun (); } @@ -402,14 +441,15 @@ class NativeHTTPRequest { private function curl_onProgress (curl:CURL, dltotal:Int, dlnow:Int, uptotal:Int, upnow:Int):Int { - if (upnow > bytesLoaded || dlnow > bytesLoaded || uptotal > bytesTotal || dltotal > bytesTotal) { + if (upnow > writeBytesLoaded || dlnow > writeBytesLoaded || uptotal > writeBytesTotal || dltotal > writeBytesTotal) { - if (upnow > bytesLoaded) bytesLoaded = upnow; - if (dlnow > bytesLoaded) bytesLoaded = dlnow; - if (uptotal > bytesTotal) bytesTotal = uptotal; - if (dltotal > bytesTotal) bytesTotal = dltotal; + if (upnow > writeBytesLoaded) writeBytesLoaded = upnow; + if (dlnow > writeBytesLoaded) writeBytesLoaded = dlnow; + if (uptotal > writeBytesTotal) writeBytesTotal = uptotal; + if (dltotal > writeBytesTotal) writeBytesTotal = dltotal; - promise.progress (bytesLoaded, bytesTotal); + // Wrong thread + // promise.progress (bytesLoaded, bytesTotal); } @@ -430,81 +470,7 @@ class NativeHTTPRequest { } - private static function multiTimer_onRun ():Void { - - multi.perform (); - - var message = multi.infoRead (); - var curl, instance, status; - - if (message == null && multi.runningHandles == 0) { - - multiTimer.stop (); - multiTimer = null; - - } - - while (message != null) { - - curl = message.curl; - - if (multiInstances.exists (curl)) { - - instance = multiInstances.get (curl); - multiInstances.remove (curl); - - status = curl.getInfo (RESPONSE_CODE); - instance.parent.responseStatus = status; - - curl.cleanup (); - curl = null; - - if (message.result == CURLCode.OK) { - - if ((status >= 200 && status < 400) || status == 0) { - - if (!instance.promise.isError) { - - instance.promise.complete (instance.bytes); - - } - - } else if (instance.bytes != null) { - - instance.promise.error (instance.bytes.getString (0, instance.bytes.length)); - - } else { - - instance.promise.error ('Status ${status}'); - - } - - } else { - - instance.promise.error (CURL.strerror (message.result)); - - } - - if (instance.timeout != null) { - - instance.timeout.stop (); - instance.timeout = null; - - } - - instance.bytes = null; - instance.promise = null; - - } - - message = multi.infoRead (); - - } - - } - - - private static function threadPool_doWork (state:Dynamic):Void { + private static function localThreadPool_doWork (state:Dynamic):Void { var instance:NativeHTTPRequest = state.instance; var path:String = state.uri; @@ -527,7 +493,7 @@ class NativeHTTPRequest { if (path == null #if (sys && !android) || !FileSystem.exists (path) #end) { - threadPool.sendError ({ instance: instance, promise: instance.promise, error: "Cannot load file: " + path }); + localThreadPool.sendError ({ instance: instance, promise: instance.promise, error: "Cannot load file: " + path }); } else { @@ -535,12 +501,12 @@ class NativeHTTPRequest { if (instance.bytes != null) { - threadPool.sendProgress ({ instance: instance, promise: instance.promise, bytesLoaded: instance.bytes.length, bytesTotal: instance.bytes.length }); - threadPool.sendComplete ({ instance: instance, promise: instance.promise, result: instance.bytes }); + localThreadPool.sendProgress ({ instance: instance, promise: instance.promise, bytesLoaded: instance.bytes.length, bytesTotal: instance.bytes.length }); + localThreadPool.sendComplete ({ instance: instance, promise: instance.promise, result: instance.bytes }); } else { - threadPool.sendError ({ instance: instance, promise: instance.promise, error: "Cannot load file: " + path }); + localThreadPool.sendError ({ instance: instance, promise: instance.promise, error: "Cannot load file: " + path }); } @@ -549,7 +515,7 @@ class NativeHTTPRequest { } - private static function threadPool_onComplete (state:Dynamic):Void { + private static function localThreadPool_onComplete (state:{ instance:NativeHTTPRequest, promise:Promise, result:Bytes }):Void { var promise:Promise = state.promise; if (promise.isError) return; @@ -570,7 +536,7 @@ class NativeHTTPRequest { } - private static function threadPool_onError (state:Dynamic):Void { + private static function localThreadPool_onError (state:{ instance:NativeHTTPRequest, promise:Promise, error:String }):Void { var promise:Promise = state.promise; promise.error (state.error); @@ -590,7 +556,7 @@ class NativeHTTPRequest { } - private static function threadPool_onProgress (state:Dynamic):Void { + private static function localThreadPool_onProgress (state:{ instance:NativeHTTPRequest, promise:Promise, bytesLoaded:Int, bytesTotal:Int }):Void { var promise:Promise = state.promise; if (promise.isComplete || promise.isError) return; @@ -599,4 +565,131 @@ class NativeHTTPRequest { } + private static function multiThreadPool_doWork (_):Void { + + while (true) { + + #if (cpp || neko) + var curl = multiAddHandle.pop (false); + if (curl != null) multi.addHandle (curl); + #end + + var code = multi.wait (1000); + + if (code == CURLMultiCode.OK) { + + multi.perform (); + var message = multi.infoRead (); + + if (message == null && multi.runningHandles == 0) { + + multiThreadPool.sendComplete (); + break; + + } + + while (message != null) { + + var curl = message.curl; + var status = curl.getInfo (RESPONSE_CODE); + + multi.removeHandle (curl); + curl.cleanup (); + + multiThreadPool.sendProgress ({ curl: curl, result: message.result, status: status }); + message = multi.infoRead (); + + } + + } + + } + + } + + + private static function multiThreadPool_onComplete (_):Void { + + if (multiProgressTimer != null) { + + multiProgressTimer.stop (); + multiProgressTimer = null; + + } + + multiThreadPoolRunning = false; + + } + + + private static function multiThreadPool_onProgress (state:{ curl:CURL, result:Int, status:Int }):Void { + + if (multiInstances.exists (state.curl)) { + + var instance = multiInstances.get (state.curl); + activeInstances.remove (instance); + multiInstances.remove (state.curl); + + instance.parent.responseStatus = state.status; + + if (state.result == CURLCode.OK) { + + if ((state.status >= 200 && state.status < 400) || state.status == 0) { + + if (!instance.promise.isError) { + + instance.promise.complete (instance.bytes); + + } + + } else if (instance.bytes != null) { + + instance.promise.error (instance.bytes.getString (0, instance.bytes.length)); + + } else { + + instance.promise.error ('Status ${state.status}'); + + } + + } else { + + instance.promise.error (CURL.strerror (state.result)); + + } + + if (instance.timeout != null) { + + instance.timeout.stop (); + instance.timeout = null; + + } + + instance.bytes = null; + instance.promise = null; + + } + + state.curl = null; + + } + + + private static function multiProgressTimer_onRun ():Void { + + for (instance in activeInstances) { + + if (instance.bytesLoaded != instance.writeBytesLoaded || instance.bytesTotal != instance.writeBytesTotal) { + + instance.bytesLoaded = instance.writeBytesLoaded; + instance.bytesTotal = instance.writeBytesTotal; + instance.promise.progress (instance.bytesLoaded, instance.bytesTotal); + + } + + } + + } + + }