Use thread to speed up cURL Multi in native HTTPRequest (resolve #1276)

This commit is contained in:
Joshua Granick
2019-01-18 10:52:55 -08:00
parent 0c73e47b28
commit e582f8f6b4

View File

@@ -8,6 +8,8 @@ import lime.app.Promise;
import lime.net.curl.CURL; import lime.net.curl.CURL;
import lime.net.curl.CURLCode; import lime.net.curl.CURLCode;
import lime.net.curl.CURLMulti; import lime.net.curl.CURLMulti;
import lime.net.curl.CURLMultiCode;
import lime.net.curl.CURLMultiMessage;
import lime.net.HTTPRequest; import lime.net.HTTPRequest;
import lime.net.HTTPRequestHeader; import lime.net.HTTPRequestHeader;
import lime.net.HTTPRequestMethod; import lime.net.HTTPRequestMethod;
@@ -17,6 +19,12 @@ import lime.system.ThreadPool;
import sys.FileSystem; import sys.FileSystem;
#end #end
#if cpp
import cpp.vm.Deque;
#elseif neko
import neko.vm.Deque;
#end
#if !lime_debug #if !lime_debug
@:fileXml('tags="haxe,release"') @:fileXml('tags="haxe,release"')
@:noDebug @:noDebug
@@ -26,10 +34,17 @@ import sys.FileSystem;
class NativeHTTPRequest { class NativeHTTPRequest {
private static var activeInstances:Array<NativeHTTPRequest>;
private static var localThreadPool:ThreadPool;
private static var multi:CURLMulti; private static var multi:CURLMulti;
private static var multiInstances:Map<CURL, NativeHTTPRequest>; private static var multiInstances:Map<CURL, NativeHTTPRequest>;
private static var multiTimer:Timer; private static var multiProgressTimer:Timer;
private static var threadPool:ThreadPool; private static var multiThreadPool:ThreadPool;
private static var multiThreadPoolRunning:Bool;
#if (cpp || neko)
private static var multiAddHandle:Deque<CURL>;
#end
private var bytes:Bytes; private var bytes:Bytes;
private var bytesLoaded:Int; private var bytesLoaded:Int;
@@ -38,6 +53,8 @@ class NativeHTTPRequest {
private var curl:CURL; private var curl:CURL;
private var parent:_IHTTPRequest; private var parent:_IHTTPRequest;
private var promise:Promise<Bytes>; private var promise:Promise<Bytes>;
private var writeBytesLoaded:Int;
private var writeBytesTotal:Int;
private var writePosition:Int; private var writePosition:Int;
private var timeout:Timer; private var timeout:Timer;
@@ -88,6 +105,8 @@ class NativeHTTPRequest {
bytesLoaded = 0; bytesLoaded = 0;
bytesTotal = 0; bytesTotal = 0;
writeBytesLoaded = 0;
writeBytesTotal = 0;
writePosition = 0; writePosition = 0;
if (curl == null) { if (curl == null) {
@@ -291,17 +310,17 @@ class NativeHTTPRequest {
if (uri.indexOf ("http://") == -1 && uri.indexOf ("https://") == -1) { if (uri.indexOf ("http://") == -1 && uri.indexOf ("https://") == -1) {
if (threadPool == null) { if (localThreadPool == null) {
threadPool = new ThreadPool (0, 1); localThreadPool = new ThreadPool (0, 1);
threadPool.doWork.add (threadPool_doWork); localThreadPool.doWork.add (localThreadPool_doWork);
threadPool.onProgress.add (threadPool_onProgress); localThreadPool.onProgress.add (localThreadPool_onProgress);
threadPool.onComplete.add (threadPool_onComplete); localThreadPool.onComplete.add (localThreadPool_onComplete);
threadPool.onError.add (threadPool_onError); localThreadPool.onError.add (localThreadPool_onError);
} }
threadPool.queue ({ instance: this, uri: uri }); localThreadPool.queue ({ instance: this, uri: uri });
} else { } else {
@@ -310,6 +329,7 @@ class NativeHTTPRequest {
CURL.globalInit (CURL.GLOBAL_ALL); CURL.globalInit (CURL.GLOBAL_ALL);
multi = new CURLMulti (); multi = new CURLMulti ();
activeInstances = new Array ();
multiInstances = new Map (); multiInstances = new Map ();
} }
@@ -318,16 +338,35 @@ class NativeHTTPRequest {
if (curl != null) { if (curl != null) {
activeInstances.push (this);
multiInstances.set (curl, this); multiInstances.set (curl, this);
multi.addHandle (curl);
if (multiTimer == null) { #if (cpp || neko)
if (multiAddHandle == null) multiAddHandle = new Deque<CURL> ();
multiAddHandle.push (curl);
#end
// TODO: Reduce sleep when network is busy? if (multiThreadPool == null) {
multiTimer = new Timer (8); multiThreadPool = new ThreadPool (0, 1);
multiTimer.run = multiTimer_onRun; multiThreadPool.doWork.add (multiThreadPool_doWork);
multiTimer_onRun (); multiThreadPool.onProgress.add (multiThreadPool_onProgress);
multiThreadPool.onComplete.add (multiThreadPool_onComplete);
}
if (!multiThreadPoolRunning) {
multiThreadPoolRunning = true;
multiThreadPool.queue ();
}
if (multiProgressTimer == null) {
multiProgressTimer = new Timer (8);
multiProgressTimer.run = multiProgressTimer_onRun;
multiProgressTimer_onRun ();
} }
@@ -402,14 +441,15 @@ class NativeHTTPRequest {
private function curl_onProgress (curl:CURL, dltotal:Int, dlnow:Int, uptotal:Int, upnow:Int):Int { private function curl_onProgress (curl:CURL, dltotal:Int, dlnow:Int, uptotal:Int, upnow:Int):Int {
if (upnow > bytesLoaded || dlnow > bytesLoaded || uptotal > bytesTotal || dltotal > bytesTotal) { if (upnow > writeBytesLoaded || dlnow > writeBytesLoaded || uptotal > writeBytesTotal || dltotal > writeBytesTotal) {
if (upnow > bytesLoaded) bytesLoaded = upnow; if (upnow > writeBytesLoaded) writeBytesLoaded = upnow;
if (dlnow > bytesLoaded) bytesLoaded = dlnow; if (dlnow > writeBytesLoaded) writeBytesLoaded = dlnow;
if (uptotal > bytesTotal) bytesTotal = uptotal; if (uptotal > writeBytesTotal) writeBytesTotal = uptotal;
if (dltotal > bytesTotal) bytesTotal = dltotal; if (dltotal > writeBytesTotal) writeBytesTotal = dltotal;
promise.progress (bytesLoaded, bytesTotal); // Wrong thread
// promise.progress (bytesLoaded, bytesTotal);
} }
@@ -430,81 +470,7 @@ class NativeHTTPRequest {
} }
private static function multiTimer_onRun ():Void { private static function localThreadPool_doWork (state:Dynamic):Void {
multi.perform ();
var message = multi.infoRead ();
var curl, instance, status;
if (message == null && multi.runningHandles == 0) {
multiTimer.stop ();
multiTimer = null;
}
while (message != null) {
curl = message.curl;
if (multiInstances.exists (curl)) {
instance = multiInstances.get (curl);
multiInstances.remove (curl);
status = curl.getInfo (RESPONSE_CODE);
instance.parent.responseStatus = status;
curl.cleanup ();
curl = null;
if (message.result == CURLCode.OK) {
if ((status >= 200 && status < 400) || status == 0) {
if (!instance.promise.isError) {
instance.promise.complete (instance.bytes);
}
} else if (instance.bytes != null) {
instance.promise.error (instance.bytes.getString (0, instance.bytes.length));
} else {
instance.promise.error ('Status ${status}');
}
} else {
instance.promise.error (CURL.strerror (message.result));
}
if (instance.timeout != null) {
instance.timeout.stop ();
instance.timeout = null;
}
instance.bytes = null;
instance.promise = null;
}
message = multi.infoRead ();
}
}
private static function threadPool_doWork (state:Dynamic):Void {
var instance:NativeHTTPRequest = state.instance; var instance:NativeHTTPRequest = state.instance;
var path:String = state.uri; var path:String = state.uri;
@@ -527,7 +493,7 @@ class NativeHTTPRequest {
if (path == null #if (sys && !android) || !FileSystem.exists (path) #end) { if (path == null #if (sys && !android) || !FileSystem.exists (path) #end) {
threadPool.sendError ({ instance: instance, promise: instance.promise, error: "Cannot load file: " + path }); localThreadPool.sendError ({ instance: instance, promise: instance.promise, error: "Cannot load file: " + path });
} else { } else {
@@ -535,12 +501,12 @@ class NativeHTTPRequest {
if (instance.bytes != null) { if (instance.bytes != null) {
threadPool.sendProgress ({ instance: instance, promise: instance.promise, bytesLoaded: instance.bytes.length, bytesTotal: instance.bytes.length }); localThreadPool.sendProgress ({ instance: instance, promise: instance.promise, bytesLoaded: instance.bytes.length, bytesTotal: instance.bytes.length });
threadPool.sendComplete ({ instance: instance, promise: instance.promise, result: instance.bytes }); localThreadPool.sendComplete ({ instance: instance, promise: instance.promise, result: instance.bytes });
} else { } else {
threadPool.sendError ({ instance: instance, promise: instance.promise, error: "Cannot load file: " + path }); localThreadPool.sendError ({ instance: instance, promise: instance.promise, error: "Cannot load file: " + path });
} }
@@ -549,7 +515,7 @@ class NativeHTTPRequest {
} }
private static function threadPool_onComplete (state:Dynamic):Void { private static function localThreadPool_onComplete (state:{ instance:NativeHTTPRequest, promise:Promise<Bytes>, result:Bytes }):Void {
var promise:Promise<Bytes> = state.promise; var promise:Promise<Bytes> = state.promise;
if (promise.isError) return; if (promise.isError) return;
@@ -570,7 +536,7 @@ class NativeHTTPRequest {
} }
private static function threadPool_onError (state:Dynamic):Void { private static function localThreadPool_onError (state:{ instance:NativeHTTPRequest, promise:Promise<Bytes>, error:String }):Void {
var promise:Promise<Bytes> = state.promise; var promise:Promise<Bytes> = state.promise;
promise.error (state.error); promise.error (state.error);
@@ -590,7 +556,7 @@ class NativeHTTPRequest {
} }
private static function threadPool_onProgress (state:Dynamic):Void { private static function localThreadPool_onProgress (state:{ instance:NativeHTTPRequest, promise:Promise<Bytes>, bytesLoaded:Int, bytesTotal:Int }):Void {
var promise:Promise<Bytes> = state.promise; var promise:Promise<Bytes> = state.promise;
if (promise.isComplete || promise.isError) return; if (promise.isComplete || promise.isError) return;
@@ -599,4 +565,131 @@ class NativeHTTPRequest {
} }
private static function multiThreadPool_doWork (_):Void {
while (true) {
#if (cpp || neko)
var curl = multiAddHandle.pop (false);
if (curl != null) multi.addHandle (curl);
#end
var code = multi.wait (1000);
if (code == CURLMultiCode.OK) {
multi.perform ();
var message = multi.infoRead ();
if (message == null && multi.runningHandles == 0) {
multiThreadPool.sendComplete ();
break;
}
while (message != null) {
var curl = message.curl;
var status = curl.getInfo (RESPONSE_CODE);
multi.removeHandle (curl);
curl.cleanup ();
multiThreadPool.sendProgress ({ curl: curl, result: message.result, status: status });
message = multi.infoRead ();
}
}
}
}
private static function multiThreadPool_onComplete (_):Void {
if (multiProgressTimer != null) {
multiProgressTimer.stop ();
multiProgressTimer = null;
}
multiThreadPoolRunning = false;
}
private static function multiThreadPool_onProgress (state:{ curl:CURL, result:Int, status:Int }):Void {
if (multiInstances.exists (state.curl)) {
var instance = multiInstances.get (state.curl);
activeInstances.remove (instance);
multiInstances.remove (state.curl);
instance.parent.responseStatus = state.status;
if (state.result == CURLCode.OK) {
if ((state.status >= 200 && state.status < 400) || state.status == 0) {
if (!instance.promise.isError) {
instance.promise.complete (instance.bytes);
}
} else if (instance.bytes != null) {
instance.promise.error (instance.bytes.getString (0, instance.bytes.length));
} else {
instance.promise.error ('Status ${state.status}');
}
} else {
instance.promise.error (CURL.strerror (state.result));
}
if (instance.timeout != null) {
instance.timeout.stop ();
instance.timeout = null;
}
instance.bytes = null;
instance.promise = null;
}
state.curl = null;
}
private static function multiProgressTimer_onRun ():Void {
for (instance in activeInstances) {
if (instance.bytesLoaded != instance.writeBytesLoaded || instance.bytesTotal != instance.writeBytesTotal) {
instance.bytesLoaded = instance.writeBytesLoaded;
instance.bytesTotal = instance.writeBytesTotal;
instance.promise.progress (instance.bytesLoaded, instance.bytesTotal);
}
}
}
} }