提高CTP的jswig_JAVA接口回调线程处理效率
Posted 灌木大叔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了提高CTP的jswig_JAVA接口回调线程处理效率相关的知识,希望对你有一定的参考价值。
ctp回调线程要快速返回,每次从ctp进入java时必然要产生一个新的线程对象匹配,效率实在太低。
在java中产生一个线程,该线程调用native方法进入本地代码形成工作线程,该线程负责读取缓冲区数据并调用java接口处理。
在swig的Spi代码中生成一个缓冲区,每次有回调发生时,把所有数据复制到缓冲区形成一个数据包,然后唤醒工作线程。
在thostmduserapi_se_wrap.h头文件增加:
1 #include <Windows.h> 2 struct DataHead { 3 short packLength;//报文长度, 4 short dataLength;//报文内有效数据长度, 5 int eventID;//事件ID 6 }; 7 class SwigDirector_CThostFtdcMdSpi : public CThostFtdcMdSpi, public Swig::Director { 8 9 public: 10 CRITICAL_SECTION g_mutex;//临界区变量 11 HANDLE g_cond;//唤醒信号 12 //缓冲区 13 /*有数据到达时,调用lockBuf(int length)请求一段缓冲区,然后填入消息报数据。 14 消息报由头部和数据构成。DataHead。 15 */ 16 char *dataBuf; 17 int buf_begin, buf_end,buf_length; 18 char* lockBuf(int length);//请求加锁的缓冲区 19 void notifyBuf(char* pPtr, int plength);//设置数据长度并唤醒处理线程 20 ~SwigDirector_CThostFtdcMdSpi();
在thostmduserapi_se_wrap.h头文件增加native工作线程代码:
1 #ifdef __cplusplus 2 extern "C" { 3 #endif 4 SWIGEXPORT jlong JNICALL Java_............._handleEvent(JNIEnv *jenv, jobject swigjobj, jlong cPtr) 5 { 6 /*检查消息缓存,无消息则进入等待 7 //jlong cPtr指向内存中SwigDirector_CThostFtdcMdSpi对象 8 // 9 */ 10 DataHead *pPtr = NULL; 11 SwigDirector_CThostFtdcMdSpi *clsPtr = (SwigDirector_CThostFtdcMdSpi *)cPtr; 12 while (true){ 13 EnterCriticalSection(&(clsPtr->g_mutex)); 14 int asize = 0; 15 while (true){ 16 pPtr = NULL; 17 asize = clsPtr->buf_end - clsPtr->buf_begin; 18 if (asize < 0){ 19 asize += clsPtr->buf_length; 20 } 21 if (asize <= 0){ 22 break; 23 } 24 pPtr = (DataHead*)(clsPtr->dataBuf + clsPtr->buf_begin); 25 if (pPtr->dataLength == 0){ 26 //判断是否空包,跳过空包 27 clsPtr->buf_begin += pPtr->packLength; 28 if (clsPtr->buf_begin >= clsPtr->buf_length){ 29 clsPtr->buf_begin = 0;//有空间 30 } 31 } 32 else{ 33 break; 34 } 35 } 36 LeaveCriticalSection(&(clsPtr->g_mutex)); 37 if (asize <= 0 || pPtr->dataLength <= 0){ 38 //进入休眠 39 WaitForSingleObject(clsPtr->g_cond, INFINITE); 40 continue; 41 } 42 //开始调用回调方法pPtr 43 int eID = pPtr->eventID; 44 char *dPtr = (char*)pPtr; 45 dPtr += sizeof(DataHead); 46 if (eID == 0){ 47 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[0], swigjobj); 48 } 49 else if (eID == 1){ 50 jint jnReason = (jint)(*(int*)(dPtr)); 51 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[1], swigjobj, jnReason); 52 } 53 else if (eID == 3){ 54 jlong jpRspUserLogin = 0; 55 jlong jpRspInfo = 0; 56 jint jnRequestID; 57 jboolean jbIsLast; 58 *((CThostFtdcRspUserLoginField **)&jpRspUserLogin) = (CThostFtdcRspUserLoginField *)dPtr; 59 dPtr += sizeof(CThostFtdcRspUserLoginField); 60 *((CThostFtdcRspInfoField **)&jpRspInfo) = (CThostFtdcRspInfoField *)dPtr; 61 dPtr += sizeof(CThostFtdcRspInfoField); 62 jnRequestID = (jint)(*(int*)(dPtr)); 63 dPtr += sizeof(int); 64 jbIsLast = (jboolean)(*(bool*)(dPtr)); 65 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[3], swigjobj, jpRspUserLogin, jpRspInfo, jnRequestID, jbIsLast); 66 } 67 else if (eID == 11){ 68 jlong jpDepthMarketData = 0; 69 *((CThostFtdcDepthMarketDataField **)&jpDepthMarketData) = (CThostFtdcDepthMarketDataField *)dPtr; 70 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[11], swigjobj, jpDepthMarketData); 71 } 72 pPtr->dataLength = 0;//变成空包 73 jthrowable swigerror = jenv->ExceptionOccurred(); 74 if (swigerror) { 75 Swig::DirectorException::raise(jenv, swigerror); 76 } 77 continue; 78 } 79 return 0; 80 }
在thostmduserapi_se_wrap.h头文件修改添加申请缓冲区、唤醒工作线程代码。并修改回调代码:
1 //初始化数据 2 3 SwigDirector_CThostFtdcMdSpi::SwigDirector_CThostFtdcMdSpi(JNIEnv *jenv) : CThostFtdcMdSpi(), Swig::Director(jenv) { 4 this->buf_length = 4096 * 1024; 5 this->dataBuf = new char[this->buf_length]; 6 this->buf_begin = 0; 7 this->buf_end = 0; 8 InitializeCriticalSection(&(this->g_mutex)); //初始化临界区 9 //以non-signaled创建auto-reset模式的事件对象 10 this->g_cond = CreateEvent(NULL, FALSE, FALSE, NULL); 11 } 12 //清场 13 SwigDirector_CThostFtdcMdSpi::~SwigDirector_CThostFtdcMdSpi(){ 14 delete this->dataBuf; 15 DeleteCriticalSection(&this->g_mutex); //释放临界区 16 CloseHandle(this->g_cond); //销毁事件对象 17 } 18 19 //获取数据包缓冲区,并设置相关指针 20 char* SwigDirector_CThostFtdcMdSpi::lockBuf(int plength){ 21 int asize; 22 DataHead *pPtr = NULL; 23 EnterCriticalSection(&g_mutex); 24 if (this->buf_end >= this->buf_begin){ 25 //end指针大于begin指针 26 asize = this->buf_length - this->buf_end; 27 if (asize > plength + sizeof(DataHead) + sizeof(DataHead)){ 28 //可以容纳 29 pPtr = (DataHead*)(this->dataBuf + this->buf_end); 30 pPtr->packLength = plength + sizeof(DataHead); 31 pPtr->dataLength = -1; 32 this->buf_end += pPtr->packLength; 33 } 34 else{ 35 if (this->buf_begin > 0){ 36 //begin前有空间容纳end指针,可以分配或者填充空包 37 if (asize >= plength + sizeof(DataHead)){ 38 //可以分配 39 pPtr = (DataHead*)(this->dataBuf + this->buf_end); 40 pPtr->packLength = asize; 41 pPtr->dataLength = -1; 42 this->buf_end = 0; 43 } 44 else{ 45 //可以填充空包 46 pPtr = (DataHead*)(this->dataBuf + this->buf_end); 47 pPtr->packLength = asize; 48 pPtr->dataLength = 0; 49 this->buf_end = 0; 50 pPtr = NULL; 51 } 52 } 53 else{ 54 pPtr = (DataHead*)(this->dataBuf + this->buf_end); 55 pPtr->packLength = 0; 56 } 57 } 58 } 59 asize = this->buf_begin - this->buf_end - 1;//end不能大于等于begin 60 if (pPtr == NULL){//不用else是因为上面可能没有分配空间 61 if (asize >= plength + sizeof(DataHead)){ 62 //可以容纳 63 pPtr = (DataHead*)(this->dataBuf + this->buf_end); 64 pPtr->packLength = plength + sizeof(DataHead); 65 pPtr->dataLength = -1; 66 this->buf_end += pPtr->packLength; 67 } 68 else{ 69 pPtr = (DataHead*)(this->dataBuf + this->buf_end); 70 pPtr->packLength = 0; 71 } 72 } 73 if (pPtr->packLength == 0){ 74 pPtr = NULL; 75 } 76 LeaveCriticalSection(&g_mutex); 77 return (char*)pPtr; 78 } 79 //唤醒线程 80 void SwigDirector_CThostFtdcMdSpi::notifyBuf(char* pPtr, int plength){ 81 EnterCriticalSection(&g_mutex); 82 ((DataHead*)pPtr)->dataLength = plength; 83 SetEvent(g_cond);//唤醒线程 84 LeaveCriticalSection(&g_mutex); 85 } 86 87 //修改后的回调代码例子 88 void SwigDirector_CThostFtdcMdSpi::OnRspUserLogin(CThostFtdcRspUserLoginField *pRspUserLogin, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) { 89 int len = sizeof(CThostFtdcRspUserLoginField) + sizeof(CThostFtdcRspInfoField) + sizeof(int) + sizeof(bool); 90 char *pPtr = this->lockBuf(len); 91 if (pPtr != NULL){ 92 char *dPtr = pPtr; 93 ((DataHead*)pPtr)->eventID = 3; 94 dPtr += sizeof(DataHead); 95 memcpy(dPtr, (char*)pRspUserLogin, sizeof(CThostFtdcRspUserLoginField)); 96 dPtr += sizeof(CThostFtdcRspUserLoginField); 97 memcpy(dPtr, (char*)pRspInfo, sizeof(CThostFtdcRspInfoField)); 98 dPtr += sizeof(CThostFtdcRspInfoField); 99 *(int*)(dPtr) = nRequestID; 100 dPtr += sizeof(int); 101 *(bool*)(dPtr) = bIsLast; 102 this->notifyBuf(pPtr,len); 103 } 104 return; 105 }
以上是关于提高CTP的jswig_JAVA接口回调线程处理效率的主要内容,如果未能解决你的问题,请参考以下文章