Flume整合Kafka(基于kerberos认证)——完成实时数据采集

Posted linjiqin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume整合Kafka(基于kerberos认证)——完成实时数据采集相关的知识,希望对你有一定的参考价值。

如果现在要想将flume中的sink设置为kafka,因为在实际的开发中,可能会有若干个子系统或者若干个客户端进行flume日志采集,那么能够承受这种采集任务量的只有kafka来完成,可是需要注意一个问题,现在的kafka是采用了Kerberos认证,所以要想在flume之中去使用kafka操作,就需要考虑到开发包以及jaas配置问题。

1、将kafka的客户端的程序jar文件拷贝到flume的lib目录之中:

mv kafka-clients-0.10.2.1.jar D:devapache-flume-1.7.0-binlib

 

2、在"D:"目录下建立jass配置文件
vim D:kafka_client_jaas.conf

KafkaClient {  
        org.apache.kafka.common.security.plain.PlainLoginModule required  
        username="alice"  
        password="alice-pwd";  
};

 

3、修改flume.cnf文件追加kafka
vim D:devapache-flume-1.7.0-binconfflume.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
#a1.sources.r1.type = netcat
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.0.106
a1.sources.r1.port = 44444

# Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink


# Use a channel which buffers events in memory
# a1.channels.c1.type = memory
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 203.195.205.63:9092
a1.channels.c1.kafka.topic = mldn-topic
a1.channels.c1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.channels.c1.kafka.producer.sasl.mechanism = PLAIN


a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4、window启动flume

cd D:devapache-flume-1.7.0-binind:
flume-ng.cmd agent --conf D:/dev/apache-flume-1.7.0-bin/conf --conf-file D:/dev/apache-flume-1.7.0-bin/conf/flume.conf --name a1 -property "flume.root.logger=INFO,console;java.security.auth.login.config=D:/kafka_client_jaas.conf"  

 

5、启动kafka消费端——FlumeReceiveMessageConsumer.java

package cn.mldn.mykafka.consumer;

import java.util.Arrays;

import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;

/**
 * Flume整合Kafka -- kafka消费端
 * 
 * @author hp
 *
 */
public class FlumeReceiveMessageConsumer {
    public static final String SERVERS = "203.195.205.63:9092";
    public static final String TOPIC = "mldn-topic";
    static {
        System.setProperty("java.security.auth.login.config",
                "d:/kafka_client_jaas.conf");    // 表示系统环境属性
    }
    
    public static void main(String[] args) {

        Properties props = new Properties();
        
        props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");

        
        // 定义消息消费者的连接服务器地址
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        // 消息消费者一定要设置反序列化的程序类,与消息生产者完全对应
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
        // 定义消费者处理对象
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(
                props);
        consumer.subscribe(Arrays.asList(TOPIC)); // 设置消费者读取的主题名称
        boolean flag = true; // 消费者需要一直进行数据的读取处理操作
        while (flag) { // 一直读取消息
            ConsumerRecords<String, String> allRecorders = consumer.poll(200);
            for (ConsumerRecord<String, String> record : allRecorders) {
                System.out.println(
                        "flume.key = " + record.key() + ",flume.value = " + record.value());
            }
        }
        consumer.close();
    }
}

 

6、启动业务程序,模拟打印消息——TestFlumeDemo.java

package cn.mldn.myflume;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestFlumeDemo {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(TestFlumeDemo.class);
    public static void main(String[] args) {
        
        for (int x = 0 ; x < 10 ; x ++) {
            LOGGER.info("lynch.cn" + x);
        } 
    }
}

 

7、FlumeReceiveMessageConsumer.java消费端会接收到flume采集的日志数据

flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705707577<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
20000Fflume.client.log4j.message.encodingUTF8

flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705716934<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
20000Fflume.client.log4j.message.encodingUTF8

flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705717194<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
20000Fflume.client.log4j.message.encodingUTF8

 

 

以上是关于Flume整合Kafka(基于kerberos认证)——完成实时数据采集的主要内容,如果未能解决你的问题,请参考以下文章

如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS

非Kerberos环境下Kafka数据到Flume进Hive表

非kerberos环境下,flume采集日志到kafka

Kafka 认证三:Kerberos 认证中心部署

Kafka 认证三:Kerberos 认证中心部署

Kafka Kerberos 安全认证