Work on cURL Multi

This commit is contained in:
Joshua Granick
2018-06-21 12:41:23 -07:00
parent 23caa4d36a
commit c67d3562c5
5 changed files with 286 additions and 149 deletions

View File

@@ -884,6 +884,7 @@ class NativeCFFI {
@:cffi private static function lime_curl_multi_info_read (multi_handle:CFFIPointer):Dynamic;
@:cffi private static function lime_curl_multi_perform (multi_handle:CFFIPointer):Int;
@:cffi private static function lime_curl_multi_remove_handle (multi_handle:CFFIPointer, curl_handle:CFFIPointer):Int;
@:cffi private static function lime_curl_multi_setopt (multi_handle:CFFIPointer, option:Int, parameter:Dynamic):Int;
@:cffi private static function lime_curl_multi_wait (multi_handle:CFFIPointer, timeout_ms:Int):Int;
#else
@:hlNative("lime", "lime_curl_getdate") private static function lime_curl_getdate (date:String, now:Float):Float { return 0; }
@@ -911,6 +912,7 @@ class NativeCFFI {
@:hlNative("lime", "lime_curl_multi_info_read") private static function lime_curl_multi_info_read (multi_handle:CFFIPointer, object:Dynamic):Dynamic { return null; }
@:hlNative("lime", "lime_curl_multi_perform") private static function lime_curl_multi_perform (multi_handle:CFFIPointer):Int { return 0; }
@:hlNative("lime", "lime_curl_multi_remove_handle") private static function lime_curl_multi_remove_handle (multi_handle:CFFIPointer, curl_handle:CFFIPointer):Int { return 0; }
@:hlNative("lime", "lime_curl_multi_setopt") private static function lime_curl_multi_setopt (multi_handle:CFFIPointer, option:Int, parameter:Dynamic):Int { return 0; }
@:hlNative("lime", "lime_curl_multi_wait") private static function lime_curl_multi_wait (multi_handle:CFFIPointer, timeout_ms:Int):Int { return 0; }
#end
#end

View File

@@ -1,118 +0,0 @@
package lime.net.curl;
import haxe.io.Bytes;
import lime._backend.native.NativeCFFI;
import lime.net.curl.CURL;
#if !lime_debug
@:fileXml('tags="haxe,release"')
@:noDebug
#end
@:access(lime._backend.native.NativeCFFI)
@:deprecated class CURLEasy {
public static function cleanup (handle:CURL):Void {
handle.cleanup ();
}
public static function duphandle (handle:CURL):CURL {
return handle.clone ();
}
public static function escape (handle:CURL, url:String, length:Int):String {
return handle.escape (url, length);
}
public static function getinfo (handle:CURL, info:CURLInfo):Dynamic {
return handle.getInfo (info);
}
public static function init ():CURL {
return new CURL ();
}
public static function pause (handle:CURL, bitMask:Int):CURLCode {
return handle.pause (bitMask);
}
public static function perform (handle:CURL):CURLCode {
return handle.perform ();
}
/*public static function recv (handle:Dynamic):CURLCode {
#if (lime_cffi && lime_curl && !macro)
return cast NativeCFFI.lime_curl_easy_perform (handle);
#else
return cast 0;
#end
}*/
public static function reset (handle:CURL):Void {
return handle.reset ();
}
/*public static function send (handle:Dynamic):CURLCode {
#if (lime_cffi && lime_curl && !macro)
return cast NativeCFFI.lime_curl_easy_perform (handle);
#else
return cast 0;
#end
}*/
public static function setopt (handle:CURL, option:CURLOption, parameter:Dynamic):CURLCode {
return handle.setOption (option, parameter);
}
public static function strerror (code:CURLCode):String {
return CURL.strerror (code);
}
public static function unescape (handle:CURL, url:String, inLength:Int, outLength:Int):String {
return handle.unescape (url, inLength, outLength);
}
}

View File

@@ -93,6 +93,17 @@ class CURLMulti {
}
public function setOption (option:CURLMultiOption, parameter:Dynamic):CURLMultiCode {
#if (lime_cffi && lime_curl && !macro)
return cast NativeCFFI.lime_curl_multi_setopt (handle, option, parameter);
#else
return cast 0;
#end
}
public function wait (timeoutMS:Int):CURLMultiCode {
#if (lime_cffi && lime_curl && !macro)

View File

@@ -0,0 +1,55 @@
package lime.net.curl;
@:enum abstract CURLMultiOption(Int) from Int to Int from UInt to UInt {
/* This is the socket callback function pointer */
var SOCKETFUNCTION = 200001;
/* This is the argument passed to the socket callback */
var SOCKETDATA = 100002;
/* set to 1 to enable pipelining for this multi handle */
var PIPELINING = 3;
/* This is the timer callback function pointer */
var TIMERFUNCTION = 20004;
/* This is the argument passed to the timer callback */
var TIMERDATA = 10005;
/* maximum number of entries in the connection cache */
var MAXCONNECTS = 6;
/* maximum number of (pipelining) connections to one host */
var MAX_HOST_CONNECTIONS = 7;
/* maximum number of requests in a pipeline */
var MAX_PIPELINE_LENGTH = 8;
/* a connection with a content-length longer than this
will not be considered for pipelining */
var CONTENT_LENGTH_PENALTY_SIZE = 30009;
/* a connection with a chunk length longer than this
will not be considered for pipelining */
var CHUNK_LENGTH_PENALTY_SIZE = 30010;
/* a list of site names(+port) that are blacklisted from
pipelining */
var PIPELINING_SITE_BL = 10011;
/* a list of server types that are blacklisted from
pipelining */
var PIPELINING_SERVER_BL = 10012;
/* maximum number of open connections in total */
var MAX_TOTAL_CONNECTIONS = 13;
/* This is the server push callback function pointer */
var PUSHFUNCTION = 20014;
/* This is the argument passed to the server push callback */
var PUSHDATA = 10015;
}

View File

@@ -32,7 +32,8 @@ namespace lime {
};
std::map<void*, CURLM*> curlMultiHandles;
std::map<void*, std::vector<void*>* > curlMultiHandles;
std::map<void*, void*> curlMultiReferences;
std::map<void*, int> curlMultiRunningHandles;
std::map<void*, bool> curlMultiValid;
std::map<CURL*, void*> curlObjects;
@@ -60,11 +61,11 @@ namespace lime {
curl_gc_mutex.Lock ();
if (curlMultiHandles.find (handle) != curlMultiHandles.end ()) {
if (curlMultiReferences.find (handle) != curlMultiReferences.end ()) {
CURLM* multi = curlMultiHandles[handle];
curl_multi_remove_handle (multi, handle);
curlMultiHandles.erase (handle);
value multi_handle = (value)curlMultiReferences[handle];
curl_multi_remove_handle ((CURLM*)val_data (multi_handle), handle);
curlMultiReferences.erase (handle);
}
@@ -164,11 +165,11 @@ namespace lime {
curl_gc_mutex.Lock ();
if (curlMultiHandles.find (handle) != curlMultiHandles.end ()) {
if (curlMultiReferences.find (handle) != curlMultiReferences.end ()) {
CURLM* multi = curlMultiHandles[handle];
curl_multi_remove_handle (multi, handle);
curlMultiHandles.erase (handle);
HL_CFFIPointer* multi_handle = (HL_CFFIPointer*)curlMultiReferences[handle];
curl_multi_remove_handle ((CURLM*)multi_handle->ptr, handle);
curlMultiReferences.erase (handle);
}
@@ -273,16 +274,19 @@ namespace lime {
}
for (std::map<void*, void*>::iterator it = curlMultiHandles.begin (); it != curlMultiHandles.end (); ++it) {
std::vector<void*>* handles = curlMultiHandles[handle];
for (std::vector<void*>::iterator it = handles->begin (); it != handles->end (); ++it) {
if (curlMultiHandles[it->first] == handle) {
gc_curl ((value)it->first);
}
curl_gc_mutex.Unlock ();
gc_curl ((value)*it);
curl_gc_mutex.Lock ();
}
delete curlMultiHandles[handle];
curlMultiHandles.erase (handle);
val_gc (handle, 0);
//handle = alloc_null ();
@@ -306,16 +310,19 @@ namespace lime {
}
for (std::map<void*, void*>::iterator it = curlMultiHandles.begin (); it != curlMultiHandles.end (); ++it) {
std::vector<void*>* handles = curlMultiHandles[handle];
for (std::vector<void*>::iterator it = handles->begin (); it != handles->end (); ++it) {
if (curlMultiHandles[it->first] == handle) {
hl_gc_curl ((HL_CFFIPointer*)it->first);
}
curl_gc_mutex.Unlock ();
hl_gc_curl ((HL_CFFIPointer*)*it);
curl_gc_mutex.Lock ();
}
delete curlMultiHandles[handle];
curlMultiHandles.erase (handle);
handle->finalizer = NULL;
//handle = alloc_null ();
@@ -497,7 +504,9 @@ namespace lime {
for (std::vector<char*>::iterator it = values->begin (); it != values->end (); ++it) {
curl_gc_mutex.Unlock ();
headerCallback->Call (alloc_string (*it));
curl_gc_mutex.Lock ();
}
@@ -524,7 +533,9 @@ namespace lime {
buffer->sputn ((char*)bytes->b, length);
buffer->str ("");
curl_gc_mutex.Unlock ();
length = val_int ((value)writeCallback->Call (bytes->Value ((value)bytesRoot->Get ())));
curl_gc_mutex.Lock ();
if (length == CURL_WRITEFUNC_PAUSE) {
@@ -543,7 +554,9 @@ namespace lime {
CURL_Progress* progress = progressValues[easy_handle];
ValuePointer* progressCallback = progressCallbacks[easy_handle];
curl_gc_mutex.Unlock ();
code = val_int ((value)progressCallback->Call (alloc_float (progress->dltotal), alloc_float (progress->dlnow), alloc_float (progress->ultotal), alloc_float (progress->ulnow)));
curl_gc_mutex.Lock ();
if (code != 0) { // CURLE_OK
@@ -558,7 +571,9 @@ namespace lime {
CURL_XferInfo* xferInfo = xferInfoValues[easy_handle];
ValuePointer* xferInfoCallback = xferInfoCallbacks[easy_handle];
curl_gc_mutex.Unlock ();
code = val_int ((value)xferInfoCallback->Call (alloc_int (xferInfo->dltotal), alloc_int (xferInfo->dlnow), alloc_int (xferInfo->ultotal), alloc_int (xferInfo->ulnow)));
curl_gc_mutex.Lock ();
if (code != 0) {
@@ -568,6 +583,8 @@ namespace lime {
}
curl_gc_mutex.Unlock ();
}
@@ -588,7 +605,9 @@ namespace lime {
vdynamic* bytes = hl_alloc_dynamic (&hlt_bytes);
bytes->v.bytes = (vbyte*)*it;
curl_gc_mutex.Unlock ();
headerCallback->Call (bytes);
curl_gc_mutex.Lock ();
}
@@ -614,7 +633,9 @@ namespace lime {
buffer->sputn ((char*)bytes->b, length);
buffer->str ("");
curl_gc_mutex.Unlock ();
length = (int)writeCallback->Call (bytes);
curl_gc_mutex.Lock ();
if (length == CURL_WRITEFUNC_PAUSE) {
@@ -643,7 +664,9 @@ namespace lime {
ultotal->v.d = progress->ultotal;
ulnow->v.d = progress->ulnow;
curl_gc_mutex.Unlock ();
code = (int)progressCallback->Call (dltotal, dlnow, ultotal, ulnow);
curl_gc_mutex.Lock ();
if (code != 0) { // CURLE_OK
@@ -668,7 +691,9 @@ namespace lime {
ultotal->v.i = xferInfo->ultotal;
ulnow->v.i = xferInfo->ulnow;
curl_gc_mutex.Unlock ();
code = (int)xferInfoCallback->Call (dltotal, dlnow, ultotal, ulnow);
curl_gc_mutex.Lock ();
if (code != 0) {
@@ -678,6 +703,8 @@ namespace lime {
}
curl_gc_mutex.Unlock ();
}
@@ -2119,12 +2146,12 @@ namespace lime {
int lime_curl_multi_cleanup (value multi_handle) {
curl_gc_mutex.Lock ();
// curl_gc_mutex.Lock ();
// CURLMcode result = curl_multi_cleanup ((CURLM*)val_data (multi_handle));
gc_curl_multi (multi_handle);
curl_gc_mutex.Unlock ();
// curl_gc_mutex.Unlock ();
return CURLM_OK;
@@ -2133,12 +2160,12 @@ namespace lime {
HL_PRIM int hl_lime_curl_multi_cleanup (HL_CFFIPointer* multi_handle) {
curl_gc_mutex.Lock ();
// curl_gc_mutex.Lock ();
// CURLMcode result = curl_multi_cleanup ((CURLM*)val_data (multi_handle));
hl_gc_curl_multi (multi_handle);
curl_gc_mutex.Unlock ();
// curl_gc_mutex.Unlock ();
return CURLM_OK;
@@ -2159,6 +2186,7 @@ namespace lime {
curlMultiValid[handle] = true;
curlMultiRunningHandles[handle] = 0;
curlMultiHandles[handle] = new std::vector<void*> ();
curl_gc_mutex.Unlock ();
@@ -2181,6 +2209,7 @@ namespace lime {
curlMultiValid[handle] = true;
curlMultiRunningHandles[handle] = 0;
curlMultiHandles[handle] = new std::vector<void*> ();
curl_gc_mutex.Unlock ();
@@ -2197,7 +2226,8 @@ namespace lime {
if (result == CURLM_OK) {
curlMultiHandles[curl_handle] = multi_handle;
curlMultiReferences[curl_handle] = multi_handle;
curlMultiHandles[multi_handle]->push_back (curl_handle);
}
@@ -2216,7 +2246,8 @@ namespace lime {
if (result == CURLM_OK) {
curlMultiHandles[curl_handle] = multi_handle;
curlMultiReferences[curl_handle] = multi_handle;
curlMultiHandles[multi_handle]->push_back (curl_handle);
}
@@ -2321,6 +2352,16 @@ namespace lime {
int runningHandles = 0;
CURLMcode result = curl_multi_perform ((CURLM*)val_data (multi_handle), &runningHandles);
std::vector<void*>* handles = curlMultiHandles[multi_handle];
for (std::vector<void*>::iterator it = handles->begin (); it != handles->end (); ++it) {
curl_gc_mutex.Unlock ();
lime_curl_easy_flush ((value)*it);
curl_gc_mutex.Lock ();
}
curlMultiRunningHandles[multi_handle] = runningHandles;
curl_gc_mutex.Unlock ();
@@ -2337,6 +2378,16 @@ namespace lime {
int runningHandles = 0;
CURLMcode result = curl_multi_perform ((CURLM*)multi_handle->ptr, &runningHandles);
std::vector<void*>* handles = curlMultiHandles[multi_handle];
for (std::vector<void*>::iterator it = handles->begin (); it != handles->end (); ++it) {
curl_gc_mutex.Unlock ();
hl_lime_curl_easy_flush ((HL_CFFIPointer*)*it);
curl_gc_mutex.Lock ();
}
curlMultiRunningHandles[multi_handle] = runningHandles;
curl_gc_mutex.Unlock ();
@@ -2352,9 +2403,26 @@ namespace lime {
CURLMcode result = curl_multi_remove_handle ((CURLM*)val_data (multi_handle), (CURL*)val_data (curl_handle));
if (/*result == CURLM_OK &&*/ curlMultiHandles.find (curl_handle) != curlMultiHandles.end ()) {
if (/*result == CURLM_OK &&*/ curlMultiReferences.find (curl_handle) != curlMultiReferences.end ()) {
curlMultiHandles.erase (curl_handle);
curlMultiReferences.erase (curl_handle);
}
std::vector<void*>* handles = curlMultiHandles[multi_handle];
if (handles->size () > 0) {
for (std::vector<void*>::iterator it = handles->begin (); it != handles->end (); ++it) {
if (*it == curl_handle) {
handles->erase (it);
break;
}
}
}
@@ -2371,9 +2439,26 @@ namespace lime {
CURLMcode result = curl_multi_remove_handle ((CURLM*)multi_handle->ptr, (CURL*)curl_handle->ptr);
if (/*result == CURLM_OK &&*/ curlMultiHandles.find (curl_handle) != curlMultiHandles.end ()) {
if (/*result == CURLM_OK &&*/ curlMultiReferences.find (curl_handle) != curlMultiReferences.end ()) {
curlMultiHandles.erase (curl_handle);
curlMultiReferences.erase (curl_handle);
}
std::vector<void*>* handles = curlMultiHandles[multi_handle];
if (handles->size () > 0) {
for (std::vector<void*>::iterator it = handles->begin (); it != handles->end (); ++it) {
if (*it == curl_handle) {
handles->erase (it);
break;
}
}
}
@@ -2384,6 +2469,106 @@ namespace lime {
}
int lime_curl_multi_setopt (value multi_handle, int option, value parameter) {
CURLMcode code = CURLM_OK;
CURLM* multi = (CURLM*)val_data (multi_handle);
CURLMoption type = (CURLMoption)option;
switch (type) {
case CURLMOPT_PIPELINING:
code = curl_multi_setopt (multi, type, val_bool (parameter));
break;
case CURLMOPT_MAXCONNECTS:
case CURLMOPT_MAX_HOST_CONNECTIONS:
case CURLMOPT_MAX_PIPELINE_LENGTH:
case CURLMOPT_MAX_TOTAL_CONNECTIONS:
case CURLMOPT_CONTENT_LENGTH_PENALTY_SIZE:
case CURLMOPT_CHUNK_LENGTH_PENALTY_SIZE:
code = curl_multi_setopt (multi, type, val_int (parameter));
break;
case CURLMOPT_SOCKETFUNCTION:
case CURLMOPT_SOCKETDATA:
case CURLMOPT_TIMERFUNCTION:
case CURLMOPT_TIMERDATA:
case CURLMOPT_PUSHFUNCTION:
case CURLMOPT_PUSHDATA:
// TODO?
break;
case CURLMOPT_PIPELINING_SITE_BL:
case CURLMOPT_PIPELINING_SERVER_BL:
// TODO, array to slist
break;
default:
break;
}
return code;
}
HL_PRIM int hl_lime_curl_multi_setopt (HL_CFFIPointer* multi_handle, int option, vdynamic* parameter) {
CURLMcode code = CURLM_OK;
CURLM* multi = (CURLM*)multi_handle->ptr;
CURLMoption type = (CURLMoption)option;
switch (type) {
case CURLMOPT_PIPELINING:
code = curl_multi_setopt (multi, type, parameter->v.b);
break;
case CURLMOPT_MAXCONNECTS:
case CURLMOPT_MAX_HOST_CONNECTIONS:
case CURLMOPT_MAX_PIPELINE_LENGTH:
case CURLMOPT_MAX_TOTAL_CONNECTIONS:
case CURLMOPT_CONTENT_LENGTH_PENALTY_SIZE:
case CURLMOPT_CHUNK_LENGTH_PENALTY_SIZE:
code = curl_multi_setopt (multi, type, parameter->v.i);
break;
case CURLMOPT_SOCKETFUNCTION:
case CURLMOPT_SOCKETDATA:
case CURLMOPT_TIMERFUNCTION:
case CURLMOPT_TIMERDATA:
case CURLMOPT_PUSHFUNCTION:
case CURLMOPT_PUSHDATA:
// TODO?
break;
case CURLMOPT_PIPELINING_SITE_BL:
case CURLMOPT_PIPELINING_SERVER_BL:
// TODO, array to slist
break;
default:
break;
}
return code;
}
int lime_curl_multi_wait (value multi_handle, int timeout_ms) {
System::GCEnterBlocking ();
@@ -2495,6 +2680,7 @@ namespace lime {
DEFINE_PRIME1 (lime_curl_multi_info_read);
DEFINE_PRIME1 (lime_curl_multi_perform);
DEFINE_PRIME2 (lime_curl_multi_remove_handle);
DEFINE_PRIME3 (lime_curl_multi_setopt);
DEFINE_PRIME2 (lime_curl_multi_wait);
DEFINE_PRIME0 (lime_curl_version);
DEFINE_PRIME1 (lime_curl_version_info);
@@ -2527,6 +2713,7 @@ namespace lime {
DEFINE_HL_PRIM (_DYN, lime_curl_multi_info_read, _TCFFIPOINTER _DYN);
DEFINE_HL_PRIM (_I32, lime_curl_multi_perform, _TCFFIPOINTER);
DEFINE_HL_PRIM (_I32, lime_curl_multi_remove_handle, _TCFFIPOINTER _TCFFIPOINTER);
DEFINE_HL_PRIM (_I32, lime_curl_multi_setopt, _TCFFIPOINTER _I32 _DYN);
DEFINE_HL_PRIM (_I32, lime_curl_multi_wait, _TCFFIPOINTER _I32);
DEFINE_HL_PRIM (_BYTES, lime_curl_version, _NO_ARG);
DEFINE_HL_PRIM (_DYN, lime_curl_version_info, _I32);