大数据之Flume+Kafka

Posted A Scut Coder

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据之Flume+Kafka相关的知识,希望对你有一定的参考价值。

1. 什么是Kafka

   Apache Kafka是一个分布式消息发布订阅系统。

   随着kafka的越来越流行,把kafka作为消息(特别是日志)的传输消息队列的越来越多,同时其他各种开源工具都开始跟kafka对接。
   在Flume中我们可以了解到,flume是常用语日志采集的工具,而kafka又常用于日志传输。Flume可以灵活地与Kafka集成,Flume侧重数据收集,Kafka侧重数据分发。 Flume可配置source为Kafka,也可配置sink 为Kafka。


2.Kafka的架构

(1)Broker

消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群

(2)Topic

主题,Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic

(3)Producer

消息生产者,向Broker发送消息的客户端

(4)Consumer

消息消费者,从Broker读取消息的客户端

(5)ConsumerGroup

每个Consumer属于一个特定的Consumer Group,一条消息可以发送到多个不同的Consumer Group,但是一个Consumer Group中只能有一个Consumer能够消费该消息

(6)Partition

物理上的概念,一个topic可以分为多个partition,每个partition内部是有序的


3. Kafka的命令使用

(1)Kafka使用Zookeeper所以你可能先要安装一个ZooKeeper.你可以使用kafka中打包好的脚本或者一个配置好的Zookeeper.

bin/zookeeper-server-start.sh config/zookeeper.properties

(2)启动kafka

bin/kafka-server-start.sh config/server.properties

(3)创建topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

(4)查看(list或者describe)

bin/kafka-topics.sh --zookeeper localhost:2181 --describe

(5)发送消息到topic

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

(6) 消费消息

from-begginning会获得从producer启动到目前为止的所有数据

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \

(7)设置多个broker

copy多几份配置文件

cp config/server.properties config/server-1.properties

修改以下内容

config/server-1.properties: broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 port=9094    log.dir=/tmp/kafka-logs-2

分别启动

bin/kafka-server-start.sh config/server-1.properties

(8)这时就可以配置多个备份了

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic multest

(9)验证

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic multestTopic:multest  PartitionCount:1  ReplicationFactor:3  Configs: Topic: multest Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

"leader" 节点是1.
"replicas" 信息,在节点1,2,0上,不管node死活,只是列出信息而已.
"isr" 工作中的复制节点的集合. 也就是活的节点的集合.

测试一下容错. 干掉leader,也就是Broker 1:

Leader被切换到一个follower上节, 点 1 不会被列在isr中了,因为它死了:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicTopic:multest PartitionCount:1 ReplicationFactor:3 Configs: Topic: multestc Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

4. kafka api

(1)producer

package com.randy;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerDemo {
public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "127.0.0.1:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = null; try { producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 100; i++) { String msg = "Message " + i; producer.send(new ProducerRecord<String, String>("HelloWorld", msg)); System.out.println("Sent:" + msg); } } catch (Exception e) { e.printStackTrace();
} finally { producer.close(); }
}}

可以使用KafkaProducer类的实例来创建一个Producer,KafkaProducer类的参数是一系列属性值,下面分析一下所使用到的重要的属性:

bootstrap.servers

properties.put("bootstrap.servers", "192.168.1.110:9092");

key.serializer   &  value.serializer

       properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

     序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消

   息序列化为二进制类型。本例是发送文本消息到Kafka集群,所以使用的是StringSerializer。

(2)Consumer

package com.randy;
import java.util.Arrays;import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;
/** * Author : RandySun * Date : 2017-08-13 17:06 * Comment : */public class ConsumerDemo {
public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.1.110:9092"); properties.put("group.id", "group-1"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("HelloWorld")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } }
}}

5 Flume + Kafka

(1)先配置flume

a1.channels = c1a1.sources = r1a1.sinks = k1
a1.channels.c1.type = memory
a1.sources.r1.channels = c1a1.sources.r1.type = avro# For using a thrift source set the following instead of the above line.# a1.source.r1.type = thrifta1.sources.r1.bind = 127.0.0.1a1.sources.r1.port = 41414
a1.sinks.k1.channel = c1a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = flumeTesta1.sinks.k1.brokerList = localhost:9092a1.sinks.k1.requiredAcks = 1a1.sinks.k1.batchSize = 20


(2)开启flume服务

flume-ng agent -c conf -f $FLUME_HOME/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

(3) 运行flume的client

class MyRpcClientFacade { private RpcClient client; private String hostname; private int port;
public void init(String hostname, int port) { // Setup the RPC connection this.hostname = hostname; this.port = port; this.client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead of the above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); }
public void sendDataToFlume(String data) { // Create a Flume Event object that encapsulates the sample data Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event try { client.append(event); } catch (EventDeliveryException e) { // clean up and recreate the client client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead of the above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } }
public void cleanUp() { // Close the RPC connection client.close(); }}public class FlumeAvroClient{ public static void main(String[] args) { MyRpcClientFacade client = new MyRpcClientFacade(); // Initialize client with the remote Flume agent's host and port client.init("127.0.0.1", 41414);
// Send 10 events to the remote Flume agent. That agent should be // configured to listen with an AvroSource. String sampleData = "Hello Flume!"; for (int i = 0; i < 10; i++) { client.sendDataToFlume(sampleData); }
client.cleanUp(); }}

(4)打开kafka的consumer

linjiaqin@linjiaqin-computer:~$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flumeTest --from-beginningHello Flume!Hello Flume!Hello Flume!Hello Flume!Hello Flume!Hello Flume!Hello Flume!Hello Flume!Hello Flume!Hello Flume!^CProcessed a total of 10 messages

(5)或者运行kafka的consumer程序

import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;import java.util.Collections;import java.util.LinkedList;import java.util.Properties;
public class SimpleConsumer { public static void main(String[] args) { //String topicName = "testmul";        String topicName = "flumeTest";   Properties props = new Properties();
//props.put("zookeeper","localhost:2181"); 为什么不能用这个 //props.put("bootstrap.servers", "localhost:9094"); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "ljq"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props);
//Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName));
//print the topic name System.out.println("Subscribed to topic "+topicName); int i = 0;
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); }
}}

下面是结果

offset = 20, key = null, value = Hello Flume!offset = 21, key = null, value = Hello Flume!offset = 22, key = null, value = Hello Flume!offset = 23, key = null, value = Hello Flume!offset = 24, key = null, value = Hello Flume!offset = 25, key = null, value = Hello Flume!offset = 26, key = null, value = Hello Flume!offset = 27, key = null, value = Hello Flume!offset = 28, key = null, value = Hello Flume!offset = 29, key = null, value = Hello Flume!

补充:可以kafka producer直接发给kafka comsumer。

offset = 30, key = my, value = 0offset = 31, key = name, value = 1offset = 32, key = is, value = 2offset = 33, key = lin, value = 3offset = 34, key = jia, value = 4offset = 35, key = qin, value = 5offset = 36, key = ha, value = 6offset = 37, key = ha, value = 7offset = 38, key = lin, value = 8



6总结

        client.init("127.0.0.1", 41414);

(3)flume到kafka的topic = flume     brokerList = localhost:9092

        String topicName = "flumeTest";

        props.put("bootstrap.servers", "localhost:9092");

数据流程图:

flume client--------->flume source-------->channel---->flume sink------->

kafka topic---------->kafka consumer








以上是关于大数据之Flume+Kafka的主要内容,如果未能解决你的问题,请参考以下文章

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题

打怪升级之小白的大数据之旅(七十四)<初识Kafka>

打怪升级之小白的大数据之旅(七十四)<初识Kafka>