Flume简介与使用——Thrift Source采集数据

Posted WOTGL

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume简介与使用——Thrift Source采集数据相关的知识,希望对你有一定的参考价值。

Flume简介与使用(二)——Thrift Source采集数据

  继上一篇安装Flume后,本篇将介绍如何使用Thrift Source采集数据。

  Thrift是Google开发的用于跨语言RPC通信,它拥有功能强大的软件堆栈和代码生成引擎,允许定义一个简单的IDL文件来生成不同语言的代码,服务器端和客户端通过共享这个IDL文件来构建来完成通信。

  Flume的Thrift Source是其实现的众多Source中的一个,Flume已经实现了服务器端,因此我们可以用任意自己熟悉的语言编写自己的Thrift Source客户端来采集数据,然后发送给Thrift Source服务器端。

  [一]、生成C++代码

  下载源码版的Flume,在apache-flume-1.6.0-src\\flume-ng-sdk\\src\\main\\thrift目录下有Flume定义好的flume.thrift文件,现在只要用这个文件来生成我们需要的代码就行了。

  flume.thrift文件内容如下:

 1 namespace java org.apache.flume.thrift
 2 
 3 struct ThriftFlumeEvent {
 4   1: required map <string, string> headers,
 5   2: required binary body,
 6 }
 7 
 8 enum Status {
 9   OK,
10   FAILED,
11   ERROR,
12   UNKNOWN
13 }
14 
15 service ThriftSourceProtocol {
16   Status append(1: ThriftFlumeEvent event),
17   Status appendBatch(1: list<ThriftFlumeEvent> events),
18 }

  1、定义了一个ThriftFlumeEvent结构体,用来封装发送的数据;

  2、定义了一个service类ThriftSourceProtocol,服务器端具体实现ThriftSourceProtocol里面的两个方法,再由客户端调用这些方法把数据传给Thrift Source服务器端。

  3、运行下面的命令:thrift --gen cpp flume.thrift,会在当前目录生成gen-cpp目录,里面是Thrift自动生成c++头文件和代码。(在这之前要先安装Thrift)

  [二]、下面是编写自己的客户端代码,我这里是接收远程传过来的数据,然后发送给Flume的Thrift Source服务器。

  1 #include <arpa/inet.h>
  2 #include <sys/types.h>
  3 #include <sys/socket.h>
  4 #include <pthread.h>
  5 #include <unistd.h>
  6 #include <stdlib.h>
  7 #include "include/MESA_prof_load.h"
  8 #include "include/MESA_handle_logger.h"
  9 
 10 #include <string>
 11 #include <iostream>
 12 #include "gen-cpp/flume_constants.h"
 13 #include "gen-cpp/flume_types.h"
 14 #include "gen-cpp/ThriftSourceProtocol.h"
 15 #include <thrift/protocol/TBinaryProtocol.h>
 16 #include <thrift/protocol/TCompactProtocol.h>
 17 #include <thrift/transport/TSocket.h>
 18 #include <thrift/transport/TTransportUtils.h>
 19 using namespace std;
 20 using namespace apache::thrift;
 21 using namespace apache::thrift::protocol;
 22 using namespace apache::thrift::transport;
 23 
 24 #define LOG_PATH "/home/zjf/DFcode/trafficlog/traffic_source.log"
 25 #define DATA_BUFFER 2048    //send buffer data length
 26 #define BUFLEN   2048       //received buffer data length
 27 #define BATCH_SIZE 1000     //send event num to flume once
 28 
 29 //defined my C++ object
 30 class ThriftClient{
 31     public:
 32         // Thrift protocol needings...
 33         boost::shared_ptr<TTransport> socket;
 34         boost::shared_ptr<TTransport> transport;
 35         boost::shared_ptr<TProtocol> protocol;
 36         ThriftSourceProtocolClient* pClient;
 37 
 38     public:
 39         ThriftClient();
 40 };
 41 //cconstruction function, init the thrift source server ip and port
 42 ThriftClient::ThriftClient():
 43     socket(new TSocket("10.208.129.12",5497)),
 44     transport(new TFramedTransport(socket)),
 45     protocol(new TCompactProtocol(transport))
 46 {
 47     pClient = new ThriftSourceProtocolClient(protocol);
 48 }
 49 
 50 //log
 51 struct log_info_t{
 52     char *path;
 53     int log_level;
 54     void * handle;
 55 };
 56 struct log_info_t log_info;
 57 const char *module = "zjf_traffic_data_collector";
 58 
 59 //类的对象
 60 ThriftClient *client = new ThriftClient();
 61 std::map<std::string, std::string>  headers;
 62 std::vector<ThriftFlumeEvent> eventbatch;
 63 unsigned long long pkt_num_tgl = 0;
 64 
 65 int RecvAndSendUDP(){
 66     MESA_handle_runtime_log(log_info.handle, RLOG_LV_INFO, module, "RecvUDP be called");
 67     int listen_socket;          //socket id
 68     struct sockaddr_in    local;    //client IP, where to recevied data
 69     struct sockaddr_in    from;      //server IP(local host)
 70     char server_addr[16] = "10.208.129.12";    //received traffic IP
 71     int server_port = 6789;                    //received traffic port
 72     char send_buf[DATA_BUFFER] = {0};          //data send to flume
 73     char Buf[BUFLEN] = {0};
 74     int fromlen;
 75     int len;
 76 
 77     //init socket
 78 reconnect:
 79     memset(&local, 0, sizeof(local));
 80     local.sin_family = AF_INET;
 81     local.sin_addr.s_addr = inet_addr(server_addr);
 82     local.sin_port = htons(server_port);
 83     listen_socket = socket(AF_INET, SOCK_DGRAM, 0); // UDP socket
 84     if(listen_socket < 0) {
 85         printf("error udp socket\\n");
 86     }else{
 87         printf("listen_socket create OK\\n");
 88     }
 89     if(bind(listen_socket, (struct sockaddr *)&local, sizeof(local)) < 0) {
 90         printf("error udp bind\\n");
 91         return -1;
 92     }else{
 93         printf("socket bind OK\\n");
 94     }
 95 
 96     while(1){
 97         char sip[16] = {0};
 98         char dip[16] = {0};
 99         char srcport[6] = {0};
100         char destport[6] = {0};
101         char url[BUFLEN] = {0};
102         memset(Buf,0,BUFLEN);
103         fromlen = sizeof(from);
104         len = recvfrom(listen_socket, (void *)Buf, (size_t)BUFLEN, 0, (struct sockaddr *)&from,(socklen_t *)&fromlen);
105         if(len == -1) {
106             printf("error udp recvfrom\\n");
107             close(listen_socket);
108             goto reconnect;
109         }
110         //parse received buf, transform to key-value
111         int i;
112         int sip_loc = 0;
113         int sport_loc = 0;
114         int dip_loc = 0;
115         int dport_loc = 0;
116         int dotcount = 0;
117         for(i=0;Buf[i] != \'\\0\';i++){
118             if(Buf[i] == \'.\'){
119                 dotcount++;
120                 if(dotcount == 4){
121                     sip_loc = i;
122                     memcpy(sip,Buf,i);
123                 }
124                 else if(dotcount == 8){
125                     dip_loc = i;
126                     memcpy(dip,Buf+sport_loc+1,dip_loc-sport_loc-1);
127                 }
128                 else if(dotcount == 9){
129                     dport_loc = i;
130                     memcpy(destport,Buf+dip_loc+1,dport_loc-dip_loc-1);
131                     break;
132                 }
133                 else{}
134             }
135             if(Buf[i] == \'>\'){
136                 sport_loc = i;
137                 memcpy(srcport,Buf+sip_loc+1,sport_loc-sip_loc-1);
138             }
139         }
140         memcpy(url,Buf+dport_loc+1,strlen(Buf)-dport_loc);
141         unsigned long src_ip = inet_addr(sip);
142         unsigned long dst_ip = inet_addr(dip);
143         sprintf(send_buf,"SrcIP=%u SrcPort=%s DestIP=%u DestPort=%s",ntohl(src_ip),srcport,ntohl(dst_ip),destport);
144         //construct an event and append to send
145         if(0 != strlen(send_buf) ){
146             pkt_num_tgl++;
147             string sBody(send_buf);
148             ThriftFlumeEvent tfEvent;
149             tfEvent.__set_headers(headers);
150             tfEvent.__set_body(sBody);
151             eventbatch.push_back(tfEvent);
152             if(eventbatch.size() >= BATCH_SIZE){
153                 if(!client->transport->isOpen())
154                     client->transport->open();
155                 Status::type res = client->pClient->appendBatch(eventbatch);
156                 if(res != Status::OK){
157                     MESA_handle_runtime_log(log_info.handle, RLOG_LV_FATAL, module, "WARNING: send event via thrift failed, return code:%d",res);
158                 }else{
159                     //printf("sended %lld event data to flume successful\\n", pkt_num_tgl);
160                 }
161                 eventbatch.clear();
162             }
163         }
164         bzero(send_buf,DATA_BUFFER);
165     }
166 }
167 
168 
169 int main()
170 {
171     //create――logger
172     log_info.path = (char *)LOG_PATH;
173     log_info.log_level = 10;
174     log_info.handle = MESA_create_runtime_log_handle(log_info.path, log_info.log_level);
175     //open thrift connection
176     if(!client->transport->isOpen()){
177         client->transport->open();
178     }
179     eventbatch.clear();
180     RecvAndSendUDP();
181     return 0;
182 }

 [三]、编译并运行

  g++ -g -DHAVE_NETINET_IN_H -I. -I/usr/local/include/thrift -L/usr/local/lib rec_send_traffic_thrift.cpp gen-cpp/flume_constants.cpp gen-cpp/flume_types.cpp gen-cpp/ThriftSourceProtocol.cpp  -o  rec_send_traffic_thrift  -lthrift   -lpcap -L/usr/lib64 -lMESA_htable -lpthread -lMESA_handle_logger

  用守护进程启动程序:

 1 #!/bin/sh
 2 
 3 while [ 1 ]; do
 4     ulimit -c unlimited
 5     #./jz
 6     #cgexec -g cpu,memory:/MESA/jz ./jz >> jz.log
 7     ./rec_send_traffic_thrift
 8     #./jz
 9     echo program crashed, restart at `date +"%w %Y/%m/%d, %H:%M:%S"` >> RESTART.log
10     sleep 10
11 done

 


推荐博文:【1】http://www.micmiu.com/soa/rpc/thrift-sample/

     【2】http://www.mamicode.com/info-detail-869223.html

     【3】http://blog.csdn.net/yuzx2008/article/details/50179033

     【4】http://shiyanjun.cn/archives/456.html

     【5】http://flume.apache.org/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift

转载请注明原文出处,谢谢

以上是关于Flume简介与使用——Thrift Source采集数据的主要内容,如果未能解决你的问题,请参考以下文章

FlumeFlume基础之安装与使用

Flume技术

flume与kafka集成配置

Flume高级组件之Source Interceptors及项目实践

Scala + Thrift+ Zookeeper+Flume+Kafka配置笔记

Apache Flume简介及安装部署