java实时监听日志写入kafka

Posted 世界那么大,我想去看看

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java实时监听日志写入kafka相关的知识,希望对你有一定的参考价值。

目的

实时监听某目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)
 

源码:

[java] view plain copy
 
  1. import java.io.BufferedReader;  
  2. import java.io.BufferedWriter;  
  3. import java.io.File;  
  4. import java.io.FileInputStream;  
  5. import java.io.FileNotFoundException;  
  6. import java.io.FileReader;  
  7. import java.io.FileWriter;  
  8. import java.io.IOException;  
  9. import java.io.LineNumberReader;  
  10. import java.io.PrintWriter;  
  11. import java.io.RandomAccessFile;  
  12. import java.net.NoRouteToHostException;  
  13. import java.util.ArrayList;    
  14. import java.util.Collection;    
  15. import java.util.List;    
  16. import java.util.Properties;    
  17. import java.util.Random;  
  18. import java.util.concurrent.Executors;  
  19. import java.util.concurrent.ScheduledExecutorService;  
  20. import java.util.concurrent.TimeUnit;  
  21.   
  22.   
  23.     
  24. import kafka.javaapi.producer.Producer;    
  25. import kafka.producer.KeyedMessage;    
  26. import kafka.producer.ProducerConfig;    
  27.   
  28.   
  29. /* 
  30.  * 自己在源服务器写生产者往kafka插入数据,注意文件"producer.properties放在linux下该jar文件同一目录 
  31.  * 监听某个目录下的文件数据然后写入kafka 
  32.  * nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position  >/home/sre/portalhandler/handler.log 2>&1 & 
  33.  *  
  34.  *  
  35.  */  
  36. public class PortalLogTail_Line {    
  37.     
  38.     private Producer<String,String> inner;    
  39.     java.util.Random ran = new Random();  
  40.     public PortalLogTail_Line() throws FileNotFoundException, IOException {    
  41.         Properties properties = new Properties();    
  42.      //   properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));    
  43.         
  44.         properties.load(new FileInputStream("producer.properties"));    
  45.          
  46.         ProducerConfig config = new ProducerConfig(properties);   
  47.         
  48.         inner = new Producer<String, String>(config);    
  49.        
  50.     }    
  51.     
  52.         
  53.     public void send(String topicName,String message) {    
  54.         if(topicName == null || message == null){    
  55.             return;    
  56.         }    
  57.      //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);    
  58.         //随机作为key,hash分散到各个分区  
  59.       KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);    
  60.      //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message);  
  61.         inner.send(km);    
  62.           
  63.     }    
  64.         
  65.     public void send(String topicName,Collection<String> messages) {    
  66.         if(topicName == null || messages == null){    
  67.             return;    
  68.         }    
  69.         if(messages.isEmpty()){    
  70.             return;    
  71.         }    
  72.         List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();    
  73.         for(String entry : messages){    
  74.             KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);    
  75.             kms.add(km);    
  76.         }    
  77.         inner.send(kms);    
  78.     }    
  79.         
  80.     public void close(){    
  81.         inner.close();    
  82.     }    
  83.   
  84.       
  85.       
  86.     public String getNewFile(File file)  
  87.     {  
  88.         File[] fs=file.listFiles();  
  89.         long maxtime=0;  
  90.         String newfilename="";  
  91.         for (int i=0;i<fs.length;i++)  
  92.         {  
  93.             if (fs[i].lastModified()>maxtime && fs[i].getName().contains("access"))  
  94.             {  
  95.                 maxtime=fs[i].lastModified();  
  96.                 newfilename=fs[i].getAbsolutePath();  
  97.                   
  98.             }  
  99.         }  
  100.         return newfilename;  
  101.     }  
  102.     //写入文件名及行号  
  103.     public void writePosition(String path,int rn,String positionpath)  
  104.     {  
  105.         try {  
  106.                BufferedWriter out = new BufferedWriter(new FileWriter(positionpath));  
  107.                out.write(path+","+rn);  
  108.                out.close();  
  109.         } catch (IOException e) {  
  110.         }  
  111.     }  
  112.     LineNumberReader randomFile=null;  
  113.      String newfile=null;  
  114.      String thisfile=null;  
  115.      String prefile=null;  
  116.      int ln=0;  
  117.      int beginln=0;  
  118.     public void realtimeShowLog(final File file,final String topicname, final String positionpath) throws IOException{       
  119.         
  120.         //启动一个线程每1秒钟读取新增的日志信息       
  121.        new Thread(new Runnable(){       
  122.             public void run() {       
  123.                 thisfile=getNewFile(file);  
  124.                prefile=thisfile;  
  125.                 //访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件  
  126.                 try {  
  127.                     BufferedReader br=new BufferedReader(new FileReader(positionpath));  
  128.                     String line=br.readLine();  
  129.                     if (line!=null &&line.contains(","))  
  130.                     {  
  131.                         thisfile=line.split(",")[0];  
  132.                          prefile=thisfile;  
  133.                          beginln=Integer.parseInt(line.split(",")[1]);  
  134.                     }  
  135.                       
  136.                       
  137.                 } catch (FileNotFoundException e2) {  
  138.                     // TODO Auto-generated catch block  
  139.                     e2.printStackTrace();  
  140.                 }  
  141.                  catch (IOException e2) {  
  142.                         // TODO Auto-generated catch block  
  143.                         e2.printStackTrace();  
  144.                     }  
  145.                   
  146.                  //指定文件可读可写       
  147.                      try {  
  148.                         randomFile = new LineNumberReader(new FileReader(thisfile));  
  149.                     } catch (FileNotFoundException e) {  
  150.                         // TODO Auto-generated catch block  
  151.                         e.printStackTrace();  
  152.                     }       
  153.               while (true)  
  154.               {  
  155.                  try {  
  156.                     Thread.sleep(100);  
  157.                       
  158.                 } catch (InterruptedException e1) {  
  159.                     // TODO Auto-generated catch block  
  160.                     e1.printStackTrace();  
  161.                 }  
  162.                  try {       
  163.                       //获得变化部分的       
  164.                     //  randomFile.seek(lastTimeFileSize);       
  165.                       String tmp = "";       
  166.                       while( (tmp = randomFile.readLine())!= null) {    
  167.                          int currln=randomFile.getLineNumber();  
  168.                          //beginln默认为0  
  169.                          if (currln>beginln)  
  170.                              send(topicname,new String(tmp.getBytes("utf8")));  
  171.                             
  172.                           ln++;  
  173.                             
  174.                           //每发生一条写一次影响效率,连续发100次后再记录位置  
  175.                           if (ln>100)  
  176.                              {  
  177.                              writePosition(thisfile,currln,positionpath);  
  178.                              ln=0;  
  179.                              }  
  180.                        
  181.                       }     
  182.                      thisfile=getNewFile(file);  
  183.                      if(!thisfile.equals(prefile))  
  184.                        
  185.                      {  
  186.                         randomFile.close();  
  187.                        randomFile = new LineNumberReader(new FileReader(thisfile));  
  188.                       prefile=thisfile;  
  189.                      beginln=0;  
  190.                      }  
  191.                         
  192.                        
  193.                   } catch (IOException e) {       
  194.                       throw new RuntimeException(e);       
  195.                   }       
  196.               }  
  197.         }}).start();       
  198.     }       
  199.         
  200.     /**  
  201.      * @param args  
  202.      * @throws Exception  
  203.      */    
  204.     public static void main(String[] args) throws Exception {    
  205.         PortalLogTail_Line producer = new PortalLogTail_Line();     
  206.         if (args.length!=3)  
  207.         {  
  208.             System.out.println("usage:topicname pathname positionpath");  
  209.             System.exit(1);  
  210.         }  
  211.         String topicname=args[0];  
  212.         String pathname=args[1];  
  213.         String positionpath=args[2];   
  214.         final File tmpLogFile = new File(pathname);  
  215.         producer.realtimeShowLog(tmpLogFile,topicname,positionpath);   
  216.           
  217.      
  218.     
  219.     }    
  220.     
  221. }   
 
producer.properties文件放在同级目录下
[html] view plain copy
 
  1. metadata.broker.list=xxx:10909,xxx:10909  
  2.   
  3. # name of the partitioner class for partitioning events; default partition spreads data randomly  
  4. #partitioner.class=  
  5.   
  6. # specifies whether the messages are sent asynchronously (async) or synchronously (sync)  
  7. producer.type=sync  
  8. #producer.type=async  
  9.   
  10. # specify the compression codec for all data generated: none , gzip, snappy.  
  11. # the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally  
  12. compression.codec=none  
  13. #compression.codec=gzip  
  14.   
  15. # message encoder  
  16. serializer.class=kafka.serializer.StringEncoder  


 

测试

最后执行:
[java] view plain copy
 
  1. nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position  >/home/sre/portalhandler/handler.log 2>&1 &
  2. 转:http://blog.csdn.net/u011750989/article/details/21237251

以上是关于java实时监听日志写入kafka的主要内容,如果未能解决你的问题,请参考以下文章

java实时监听日志写入kafka(多目录)

原创-Hbase WAL日志数据实时推送到kafka

使用Log4j将程序日志实时写入Kafka

storm实时计算实例(socket实时接入)

Flume和Kafka

消费滚动滴log日志文件(flume监听,kafka消费,zookeeper协同)