基于Flume做FTP文件实时同步到本地磁盘的windows服务。

Posted braska

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Flume做FTP文件实时同步到本地磁盘的windows服务。相关的知识,希望对你有一定的参考价值。

需求:做一个windows服务,实现从ftp服务器实时下载或者更新文件到本地磁盘。

功能挺简单的。直接写个ftp工具类用定时器跑就能搞定,那我为什么不用呢?

别问,问就是我无聊啊,然后研究一下Flume打发时间。哈哈~

一、Flume部分

Source组件和Sink组件用的都是第三方。

source组件:https://github.com/keedio/flume-ftp-source

Sink组件用的谁的目前已经找不到了,网上搜到了一个升级版的。

sink组件:https://github.com/huyanping/flume-sinks-safe-roll-file-sink

因为一些个性化的需求,所以我对他们源代码做了些变动。

具体代码参考:https://gitee.com/syher/spring-boot-project/tree/master/spring-boot-flume

Ftp-Source组件的关键技术是Apache FtpClient,而TailDir-sink则用的RandomAccessFile。

Junit测试类我已经写好了,如果不想安装服务又有兴趣了解的朋友,可以自己改下配置跑一下看看。

技术分享图片
package com.syher.flume;

import com.google.common.collect.Lists;
import com.urey.flume.sink.taildir.SafeRollingFileSink;
import org.apache.flume.*;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
import org.apache.flume.sink.DefaultSinkProcessor;
import org.apache.flume.sink.RollingFileSink;
import org.apache.flume.source.PollableSourceRunner;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.keedio.flume.source.ftp.source.Source;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

//@RunWith(SpringRunner.class)
//@SpringBootTest
public class SpringBootFlumeApplicationTests {
    Context defaultContext = new Context();

    @Before
    public void init() throws Exception {
        Map<String, String> prop = new HashMap<>();
        prop.put("channel.capacity", "1000");
        prop.put("channel.transactionCapacity", "1000");

        prop.put("source.client.source", "ftp");
        prop.put("source.name.server", "192.168.1.150");
        prop.put("source.port", "21");
        prop.put("source.user", "username");
        prop.put("source.password", "secret");
        prop.put("source.working.directory", "/ftp/source");
        prop.put("source.filter.pattern", ".+\\.pdf");
       // prop.put("source.folder", "/ftp");
        prop.put("source.flushlines", "false");

        prop.put("sink.sink.directory", "G:/ftp/target/rolling");
        prop.put("sink.sink.moveFile", "false");
        prop.put("sink.sink.targetDirectory", "G:/ftp/target/PDFfiles");
        prop.put("sink.sink.useCopy", "true");
        prop.put("sink.sink.copyDirectory", "G:/ftp/target/copy");
        prop.put("sink.sink.useFileSuffix", "false");
        prop.put("sink.sink.fileSuffix", ".log");
        defaultContext.putAll(prop);
    }

    public MemoryChannel getChannel() {
        MemoryChannel channel = new MemoryChannel();
        channel.setName("channel");

        configure(channel, "channel.");
        return channel;
    }

    public Source getSource(Channel channel) {
        Source source = new Source();
        source.setName("source");

        ChannelSelector selector = new ReplicatingChannelSelector();
        selector.setChannels(Lists.newArrayList(channel));

        ChannelProcessor processor = new ChannelProcessor(selector);
        source.setChannelProcessor(processor);

        configure(source, "source.");
        return source;
    }

    public Sink getSink(Channel channel) {
        SafeRollingFileSink sink = new SafeRollingFileSink();
        sink.setName("sink");

        sink.setChannel(channel);

        configure(sink, "sink.");
        return sink;
    }

    public void configure(Object target, String prefixProperty) {
        Context context = new Context();
        context.putAll(defaultContext.getSubProperties(prefixProperty));
        Configurables.configure(target, context);
    }

    @Test
    public void contextLoads() throws Exception {
        MemoryChannel channel = getChannel();
        Source source = getSource(channel);
        Sink sink = getSink(channel);

        PollableSourceRunner sourceRunner = new PollableSourceRunner();
        sourceRunner.setSource(source);
        channel.start();
        sourceRunner.start();

        SinkProcessor sinkProcessor = new DefaultSinkProcessor();
        sinkProcessor.setSinks(Arrays.<Sink>asList(sink));
        SinkRunner sinkRunner = new SinkRunner(sinkProcessor);

        channel.start();
        sourceRunner.start();
        sinkRunner.start();

        while (!Thread.interrupted()) {
            Thread.sleep(200);
        }
    }

}
View Code

 

二、JSW服务部分

用的java service wrapper把java程序做成了windows服务。

工具包已经上传在我上面提到的gitee码云项目上。flume-wrapper.zip。

解压后在conf目录可以看到两个配置文件。一个是flume的,一个是jsw的。

bin目录里面是一些装卸启停的批命令。

lib目录里面有项目运行依赖的jar包。

lib.d目录没啥用,是我备份了从flume拷出来的一些无用的jar包。可删。

具体的配置和用法可以看压缩包里的使用说明文档。

注意,jsw的logfile的日志级别最好指定ERROR级别的,不然听说、可能会造成内存不足。

技术分享图片

 

三、采集结果

技术分享图片

 可以看到,采集效率还是很稳的。一分钟不到就搞定了。

以上是关于基于Flume做FTP文件实时同步到本地磁盘的windows服务。的主要内容,如果未能解决你的问题,请参考以下文章

使用阿里云服务器,能不能直接映射文件夹到OSS

实时同步

Flume概述

flume与hdfs

中国移动实时数据分析-基于spark+kafka+flume

rsync+inotify实现实时同步