提高CTP的jswig_JAVA接口回调线程处理效率

Posted 灌木大叔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了提高CTP的jswig_JAVA接口回调线程处理效率相关的知识,希望对你有一定的参考价值。

ctp回调线程要快速返回,每次从ctp进入java时必然要产生一个新的线程对象匹配,效率实在太低。

在java中产生一个线程,该线程调用native方法进入本地代码形成工作线程,该线程负责读取缓冲区数据并调用java接口处理。

在swig的Spi代码中生成一个缓冲区,每次有回调发生时,把所有数据复制到缓冲区形成一个数据包,然后唤醒工作线程。

在thostmduserapi_se_wrap.h头文件增加:

 1 #include <Windows.h>
 2 struct DataHead {
 3     short packLength;//报文长度,
 4     short dataLength;//报文内有效数据长度,
 5     int  eventID;//事件ID
 6 };
 7 class SwigDirector_CThostFtdcMdSpi : public CThostFtdcMdSpi, public Swig::Director {
 8 
 9 public:
10     CRITICAL_SECTION  g_mutex;//临界区变量
11     HANDLE  g_cond;//唤醒信号
12     //缓冲区
13     /*有数据到达时,调用lockBuf(int length)请求一段缓冲区,然后填入消息报数据。
14     消息报由头部和数据构成。DataHead。
15     */
16     char *dataBuf;
17     int buf_begin, buf_end,buf_length;
18     char* lockBuf(int length);//请求加锁的缓冲区
19     void notifyBuf(char* pPtr, int plength);//设置数据长度并唤醒处理线程
20     ~SwigDirector_CThostFtdcMdSpi();

在thostmduserapi_se_wrap.h头文件增加native工作线程代码:

 1 #ifdef __cplusplus
 2 extern "C" {
 3 #endif
 4     SWIGEXPORT jlong JNICALL Java_............._handleEvent(JNIEnv *jenv, jobject  swigjobj, jlong cPtr)
 5     {
 6         /*检查消息缓存,无消息则进入等待
 7         //jlong cPtr指向内存中SwigDirector_CThostFtdcMdSpi对象
 8         //
 9         */
10         DataHead *pPtr = NULL;
11         SwigDirector_CThostFtdcMdSpi *clsPtr = (SwigDirector_CThostFtdcMdSpi *)cPtr;
12         while (true){
13             EnterCriticalSection(&(clsPtr->g_mutex));
14             int asize = 0;
15             while (true){
16                 pPtr = NULL;
17                 asize = clsPtr->buf_end - clsPtr->buf_begin;
18                 if (asize < 0){
19                     asize += clsPtr->buf_length;
20                 }
21                 if (asize <= 0){
22                     break;
23                 }
24                 pPtr = (DataHead*)(clsPtr->dataBuf + clsPtr->buf_begin);
25                 if (pPtr->dataLength == 0){
26                     //判断是否空包,跳过空包
27                     clsPtr->buf_begin += pPtr->packLength;
28                     if (clsPtr->buf_begin >= clsPtr->buf_length){
29                         clsPtr->buf_begin = 0;//有空间
30                     }
31                 }
32                 else{
33                     break;
34                 }
35             }
36             LeaveCriticalSection(&(clsPtr->g_mutex));
37             if (asize <= 0 || pPtr->dataLength <= 0){
38                 //进入休眠
39                 WaitForSingleObject(clsPtr->g_cond, INFINITE);
40                 continue;
41             }
42             //开始调用回调方法pPtr
43             int eID = pPtr->eventID;
44             char *dPtr = (char*)pPtr;
45             dPtr += sizeof(DataHead);
46             if (eID == 0){
47                 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[0], swigjobj);
48             }
49             else if (eID == 1){
50                 jint jnReason = (jint)(*(int*)(dPtr));
51                 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[1], swigjobj, jnReason);
52             }
53             else if (eID == 3){
54                 jlong jpRspUserLogin = 0;
55                 jlong jpRspInfo = 0;
56                 jint jnRequestID;
57                 jboolean jbIsLast;
58                 *((CThostFtdcRspUserLoginField **)&jpRspUserLogin) = (CThostFtdcRspUserLoginField *)dPtr;
59                 dPtr += sizeof(CThostFtdcRspUserLoginField);
60                 *((CThostFtdcRspInfoField **)&jpRspInfo) = (CThostFtdcRspInfoField *)dPtr;
61                 dPtr += sizeof(CThostFtdcRspInfoField);
62                 jnRequestID = (jint)(*(int*)(dPtr));
63                 dPtr += sizeof(int);
64                 jbIsLast = (jboolean)(*(bool*)(dPtr));
65                 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[3], swigjobj, jpRspUserLogin, jpRspInfo, jnRequestID, jbIsLast);
66             }
67             else if (eID == 11){
68                 jlong jpDepthMarketData = 0;
69                 *((CThostFtdcDepthMarketDataField **)&jpDepthMarketData) = (CThostFtdcDepthMarketDataField *)dPtr;
70                 jenv->CallStaticVoidMethod(Swig::jclass_thostmduserapiJNI, Swig::director_method_ids[11], swigjobj, jpDepthMarketData);
71             }
72             pPtr->dataLength = 0;//变成空包
73             jthrowable swigerror = jenv->ExceptionOccurred();
74             if (swigerror) {
75                 Swig::DirectorException::raise(jenv, swigerror);
76             }
77             continue;
78         }
79         return 0;
80     }

在thostmduserapi_se_wrap.h头文件修改添加申请缓冲区、唤醒工作线程代码。并修改回调代码:

  1 //初始化数据
  2 
  3 SwigDirector_CThostFtdcMdSpi::SwigDirector_CThostFtdcMdSpi(JNIEnv *jenv) : CThostFtdcMdSpi(), Swig::Director(jenv) {
  4     this->buf_length = 4096 * 1024;
  5     this->dataBuf = new char[this->buf_length];
  6     this->buf_begin = 0;
  7     this->buf_end = 0;
  8     InitializeCriticalSection(&(this->g_mutex));  //初始化临界区
  9     //以non-signaled创建auto-reset模式的事件对象  
 10     this->g_cond = CreateEvent(NULL, FALSE, FALSE, NULL);
 11 }
 12 //清场
 13 SwigDirector_CThostFtdcMdSpi::~SwigDirector_CThostFtdcMdSpi(){
 14     delete this->dataBuf;
 15     DeleteCriticalSection(&this->g_mutex);  //释放临界区
 16     CloseHandle(this->g_cond);  //销毁事件对象
 17 }
 18 
 19 //获取数据包缓冲区,并设置相关指针
 20 char* SwigDirector_CThostFtdcMdSpi::lockBuf(int plength){
 21     int asize;
 22     DataHead *pPtr = NULL;
 23     EnterCriticalSection(&g_mutex);
 24     if (this->buf_end >= this->buf_begin){
 25         //end指针大于begin指针
 26         asize = this->buf_length - this->buf_end;
 27         if (asize > plength + sizeof(DataHead) + sizeof(DataHead)){
 28             //可以容纳
 29             pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 30             pPtr->packLength = plength + sizeof(DataHead);
 31             pPtr->dataLength = -1;
 32             this->buf_end += pPtr->packLength;
 33         }
 34         else{
 35             if (this->buf_begin > 0){
 36                 //begin前有空间容纳end指针,可以分配或者填充空包
 37                 if (asize >= plength + sizeof(DataHead)){
 38                     //可以分配
 39                     pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 40                     pPtr->packLength = asize;
 41                     pPtr->dataLength = -1;
 42                     this->buf_end = 0;
 43                 }
 44                 else{
 45                     //可以填充空包
 46                     pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 47                     pPtr->packLength = asize;
 48                     pPtr->dataLength = 0;
 49                     this->buf_end = 0;
 50                     pPtr = NULL;
 51                 }
 52             }
 53             else{
 54                 pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 55                 pPtr->packLength = 0;
 56             }
 57         }
 58     }
 59     asize = this->buf_begin - this->buf_end - 1;//end不能大于等于begin
 60     if (pPtr == NULL){//不用else是因为上面可能没有分配空间
 61         if (asize >= plength + sizeof(DataHead)){
 62             //可以容纳
 63             pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 64             pPtr->packLength = plength + sizeof(DataHead);
 65             pPtr->dataLength = -1;
 66             this->buf_end += pPtr->packLength;
 67         }
 68         else{
 69             pPtr = (DataHead*)(this->dataBuf + this->buf_end);
 70             pPtr->packLength = 0;
 71         }
 72     }
 73     if (pPtr->packLength == 0){
 74         pPtr = NULL;
 75     }
 76     LeaveCriticalSection(&g_mutex);
 77     return (char*)pPtr;
 78 }
 79 //唤醒线程
 80 void SwigDirector_CThostFtdcMdSpi::notifyBuf(char* pPtr, int plength){
 81     EnterCriticalSection(&g_mutex);
 82     ((DataHead*)pPtr)->dataLength = plength;
 83     SetEvent(g_cond);//唤醒线程
 84     LeaveCriticalSection(&g_mutex);
 85 }
 86 
 87 //修改后的回调代码例子
 88 void SwigDirector_CThostFtdcMdSpi::OnRspUserLogin(CThostFtdcRspUserLoginField *pRspUserLogin, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast) {
 89     int len = sizeof(CThostFtdcRspUserLoginField) + sizeof(CThostFtdcRspInfoField) + sizeof(int) + sizeof(bool);
 90     char *pPtr = this->lockBuf(len);
 91     if (pPtr != NULL){
 92         char  *dPtr = pPtr;
 93         ((DataHead*)pPtr)->eventID = 3;
 94         dPtr += sizeof(DataHead);
 95         memcpy(dPtr, (char*)pRspUserLogin, sizeof(CThostFtdcRspUserLoginField));
 96         dPtr += sizeof(CThostFtdcRspUserLoginField);
 97         memcpy(dPtr, (char*)pRspInfo, sizeof(CThostFtdcRspInfoField));
 98         dPtr += sizeof(CThostFtdcRspInfoField);
 99         *(int*)(dPtr) = nRequestID;
100         dPtr += sizeof(int);
101         *(bool*)(dPtr) = bIsLast;
102         this->notifyBuf(pPtr,len);
103     }
104     return;
105 }

 

以上是关于提高CTP的jswig_JAVA接口回调线程处理效率的主要内容,如果未能解决你的问题,请参考以下文章

CTP期货期权交易开发

线程也疯狂-----异步编程

Android中的AsyncTask和接口回调使用详解

如何最大限度地提高实时处理性能(Portaudio)

Java 回调机制的理解

基于接口回调详解JUC中Callable和FutureTask实现原理