java实时监听日志写入kafka(多目录)

Posted 世界那么大,我想去看看

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java实时监听日志写入kafka(多目录)相关的知识,希望对你有一定的参考价值。

目的

实时监听多个目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)

源码

[java] view plain copy
 
  1. import java.io.BufferedReader;  
  2. import java.io.BufferedWriter;  
  3. import java.io.File;       
  4. import java.io.FileInputStream;  
  5. import java.io.FileNotFoundException;  
  6. import java.io.FileReader;  
  7. import java.io.FileWriter;  
  8. import java.io.IOException;       
  9. import java.io.LineNumberReader;  
  10.      
  11. import java.util.ArrayList;  
  12. import java.util.Collection;  
  13. import java.util.HashMap;  
  14. import java.util.List;  
  15. import java.util.Properties;  
  16. import java.util.Random;  
  17.   
  18. import kafka.javaapi.producer.Producer;  
  19. import kafka.producer.KeyedMessage;  
  20. import kafka.producer.ProducerConfig;  
  21. ;       
  22.       
  23. public class XTail_Line {       
  24.        
  25.    
  26.   
  27.       
  28.     public static class TailFileThread extends Thread  
  29.     {  
  30.         File file;  
  31.         LineNumberReader randomFile=null;  
  32.          String newfile=null;  
  33.          String thisfile=null;  
  34.          String prefile=null;  
  35.          private long lastTimeFileSize = 0;  
  36.          private String drname=null;  
  37.          int ln=0;  
  38.          int beginln=0;  
  39.          private Producer<String,String> inner;    
  40.             java.util.Random ran = new Random();  
  41.             String topicname=null;  
  42.       
  43.         public TailFileThread(String path,String drname,String topicname) throws FileNotFoundException, IOException  
  44.         {  
  45.             file=new File(path);  
  46.             this.drname=drname;  
  47.             this.topicname=topicname;  
  48.       
  49.                Properties properties = new Properties();    
  50.                //   properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));    
  51.                   
  52.                   properties.load(new FileInputStream("producer.properties"));    
  53.                    
  54.                   ProducerConfig config = new ProducerConfig(properties);   
  55.                   
  56.                   inner = new Producer<String, String>(config);    
  57.         }  
  58.           
  59.          public void send(String topicName,String message) {    
  60.                 if(topicName == null || message == null){    
  61.                     return;    
  62.                 }    
  63.              //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);    
  64.                 //随机作为key,hash分散到各个分区  
  65.               KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);    
  66.              //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message);  
  67.                 inner.send(km);    
  68.                   
  69.             }    
  70.                 
  71.             public void send(String topicName,Collection<String> messages) {    
  72.                 if(topicName == null || messages == null){    
  73.                     return;    
  74.                 }    
  75.                 if(messages.isEmpty()){    
  76.                     return;    
  77.                 }    
  78.                 List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();    
  79.                 for(String entry : messages){    
  80.                     KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);    
  81.                     kms.add(km);    
  82.                 }    
  83.                 inner.send(kms);    
  84.             }    
  85.                 
  86.             public void close(){    
  87.                 inner.close();    
  88.             }    
  89.               
  90.         public String getNewFile(File file)  
  91.         {  
  92.             File[] fs=file.listFiles();  
  93.             long maxtime=0;  
  94.             String newfilename="";  
  95.             for (int i=0;i<fs.length;i++)  
  96.             {  
  97.                 if (fs[i].isFile()&&fs[i].lastModified()>maxtime)  
  98.                 {  
  99.                     maxtime=fs[i].lastModified();  
  100.                     newfilename=fs[i].getAbsolutePath();  
  101.                       
  102.                 }  
  103.             }  
  104.             return newfilename;  
  105.         }  
  106.         //写入文件名及行号  
  107.         public void writePosition(String path,int rn)  
  108.         {  
  109.             try {  
  110.                    BufferedWriter out = new BufferedWriter(new FileWriter(drname+".position"));  
  111.                    out.write(path+","+rn);  
  112.                    out.close();  
  113.             } catch (IOException e) {  
  114.             }  
  115.         }  
  116.           
  117.         public void run()  
  118.         {  
  119.               
  120.             thisfile=getNewFile(file);  
  121.            prefile=thisfile;  
  122.             //访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件  
  123.             try {  
  124.                 BufferedReader br=new BufferedReader(new FileReader(drname+".position"));  
  125.                 String line=br.readLine();  
  126.                 if (line!=null &&line.contains(","))  
  127.                 {  
  128.                     thisfile=line.split(",")[0];  
  129.                      prefile=thisfile;  
  130.                      beginln=Integer.parseInt(line.split(",")[1]);  
  131.                 }  
  132.                   
  133.                   
  134.             } catch (FileNotFoundException e2) {  
  135.                 // TODO Auto-generated catch block  
  136.                 e2.printStackTrace();  
  137.             }  
  138.              catch (IOException e2) {  
  139.                     // TODO Auto-generated catch block  
  140.                     e2.printStackTrace();  
  141.                 }  
  142.               
  143.             //指定文件可读可写       
  144.                 try {  
  145.                     randomFile = new LineNumberReader(new FileReader(thisfile));  
  146.                 } catch (FileNotFoundException e) {  
  147.                     // TODO Auto-generated catch block  
  148.                     e.printStackTrace();  
  149.                 }       
  150.          while (true)  
  151.          {  
  152.              try {  
  153.                 Thread.sleep(100);  
  154.                 //调用interrupt方法后  
  155.                   if(isInterrupted())    
  156.                     {    
  157.                         System.out.println("Interrupted...");    
  158.                         break;    
  159.                     }   
  160.             } catch (InterruptedException e1) {  
  161.                 // TODO Auto-generated catch block  
  162.                 e1.printStackTrace();  
  163.             }  
  164.              try {       
  165.                  //获得变化部分的       
  166.                //  randomFile.seek(lastTimeFileSize);       
  167.                  String tmp = "";       
  168.                  while( (tmp = randomFile.readLine())!= null) {    
  169.                      int currln=randomFile.getLineNumber();  
  170.                      //beginln默认为0  
  171.                      if (currln>beginln)  
  172.                          send(topicname,new String(tmp.getBytes("utf8")));  
  173.                        
  174.                      ln++;  
  175.                      //每发生一条写一次影响效率  
  176.                      if (ln>100)  
  177.                          {  
  178.                          writePosition(thisfile,currln);  
  179.                          ln=0;  
  180.                          }  
  181.                   
  182.                  }     
  183.                 thisfile=getNewFile(file);  
  184.                 if(!thisfile.equals(prefile))  
  185.                   
  186.                 {  
  187.                     randomFile.close();  
  188.                    randomFile = new LineNumberReader(new FileReader(thisfile));  
  189.                   prefile=thisfile;  
  190.                  beginln=0;  
  191.                 }  
  192.                    
  193.                   
  194.              } catch (IOException e) {       
  195.                  throw new RuntimeException(e);       
  196.              }       
  197.          }  
  198.         }  
  199.     }  
  200.            
  201.     public static void main(String[] args) throws Exception {       
  202.         /* 
  203.         LogView view = new LogView();      
  204.    
  205.         final File tmpLogFile = new File("D:\\test.txt");      
  206.         view.realtimeShowLog(tmpLogFile); 
  207.         */  
  208.         if (args.length!=2)  
  209.         {  
  210.             System.out.println("usage:topicname pathname");  
  211.             System.exit(1);  
  212.         }  
  213.         String topicname=args[0];  
  214.         String pathname=args[1];  
  215.       
  216.           
  217.         HashMap<String,TailFileThread> hm=new HashMap<String,TailFileThread>();  
  218.         File tmpLogFile = new File(pathname);  
  219.         File[] fs=tmpLogFile.listFiles();  
  220.         while (true)  
  221.         {  
  222.              fs=tmpLogFile.listFiles();  
  223.         for (int i=0;i<fs.length;i++)  
  224.         {  
  225.             if(fs[i].isDirectory())  
  226.             {     
  227.                 String path=fs[i].getAbsolutePath();  
  228.                 //以drname作为position文件名  
  229.                 String drname=fs[i].getName();  
  230.                 //如果该目录对应的处理线程已经存在,判断是否存活  
  231.                   
  232.             if (drname.contains("xx") || drname.contains("yy") || drname.contains("zz") || drname.contains("aa")  
  233.                     )     
  234.             {  
  235.                 if (hm.containsKey(path))  
  236.             {  
  237.                     if (!hm.get(path).isAlive())  
  238.                     {  
  239.                         hm.get(path).interrupt();  
  240.                         TailFileThread tt=new TailFileThread(path,drname,topicname);  
  241.                          tt.start();  
  242.                          hm.put(path, tt);  
  243.                     }  
  244.                   
  245.             }  
  246.                 //如果不存在,新建  
  247.                 else  
  248.                 {  
  249.                     TailFileThread tt=new TailFileThread(path,drname,topicname);  
  250.                  tt.start();  
  251.                  hm.put(path, tt);  
  252.                       
  253.               
  254.                 }  
  255.                   
  256.             }     
  257.                   
  258.             }           //System.out.println(fs[i].getAbsolutePath());  
  259.         }  
  260.         Thread.sleep(100);  
  261.         }  
  262.        
  263.     }       
  264.       
  265. }  

转:http://blog.csdn.net/u011750989/article/details/21957741

以上是关于java实时监听日志写入kafka(多目录)的主要内容,如果未能解决你的问题,请参考以下文章

java实时监听日志写入kafka(转)

sparkstreaming+flume+kafka实时流式处理完整流程

使用Log4j将程序日志实时写入Kafka

原创-Hbase WAL日志数据实时推送到kafka

数据湖(十九):SQL API 读取Kafka数据实时写入Iceberg表

storm实时计算实例(socket实时接入)