大数据之Flume+Kafka
Posted A Scut Coder
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据之Flume+Kafka相关的知识,希望对你有一定的参考价值。
1. 什么是Kafka?
Apache Kafka是一个分布式消息发布订阅系统。
随着kafka的越来越流行,把kafka作为消息(特别是日志)的传输消息队列的越来越多,同时其他各种开源工具都开始跟kafka对接。
在Flume中我们可以了解到,flume是常用语日志采集的工具,而kafka又常用于日志传输。Flume可以灵活地与Kafka集成,Flume侧重数据收集,Kafka侧重数据分发。 Flume可配置source为Kafka,也可配置sink 为Kafka。
2.Kafka的架构
(1)Broker
消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
(2)Topic
主题,Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
(3)Producer
消息生产者,向Broker发送消息的客户端
(4)Consumer
消息消费者,从Broker读取消息的客户端
(5)ConsumerGroup
每个Consumer属于一个特定的Consumer Group,一条消息可以发送到多个不同的Consumer Group,但是一个Consumer Group中只能有一个Consumer能够消费该消息
(6)Partition
物理上的概念,一个topic可以分为多个partition,每个partition内部是有序的
3. Kafka的命令使用
(1)Kafka使用Zookeeper所以你可能先要安装一个ZooKeeper.你可以使用kafka中打包好的脚本或者一个配置好的Zookeeper.
bin/zookeeper-server-start.sh config/zookeeper.properties
(2)启动kafka
bin/kafka-server-start.sh config/server.properties
(3)创建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
(4)查看(list或者describe)
bin/kafka-topics.sh --zookeeper localhost:2181 --describe
(5)发送消息到topic
Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(6) 消费消息
from-begginning会获得从producer启动到目前为止的所有数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
(7)设置多个broker
copy多几份配置文件
cp config/server.properties config/server-1.properties
修改以下内容
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
分别启动
bin/kafka-server-start.sh config/server-1.properties
(8)这时就可以配置多个备份了
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic multest
(9)验证
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic multest
Topic:multest PartitionCount:1 ReplicationFactor:3 Configs:
Topic: multest Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
"leader" 节点是1.
"replicas" 信息,在节点1,2,0上,不管node死活,只是列出信息而已.
"isr" 工作中的复制节点的集合. 也就是活的节点的集合.
测试一下容错. 干掉leader,也就是Broker 1:
Leader被切换到一个follower上节, 点 1 不会被列在isr中了,因为它死了:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:multest PartitionCount:1 ReplicationFactor:3 Configs:
Topic: multestc Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
4. kafka api
(1)producer
package com.randy;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerDemo {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 100; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
可以使用KafkaProducer类的实例来创建一个Producer,KafkaProducer类的参数是一系列属性值,下面分析一下所使用到的重要的属性:
bootstrap.servers
properties.put("bootstrap.servers", "192.168.1.110:9092");
key.serializer & value.serializer
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消
息序列化为二进制类型。本例是发送文本消息到Kafka集群,所以使用的是StringSerializer。
(2)Consumer
package com.randy;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* Author : RandySun
* Date : 2017-08-13 17:06
* Comment :
*/
public class ConsumerDemo {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.1.110:9092");
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
}
5 Flume + Kafka
(1)先配置flume
a1.channels = c1
a1.sources = r1
a1.sinks = k1
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sources.r1.type = avro
# For using a thrift source set the following instead of the above line.
# a1.source.r1.type = thrift
a1.sources.r1.bind = 127.0.0.1
a1.sources.r1.port = 41414
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flumeTest
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
(2)开启flume服务
flume-ng agent -c conf -f $FLUME_HOME/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
(3) 运行flume的client
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
public class FlumeAvroClient{
public static void main(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent's host and port
client.init("127.0.0.1", 41414);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
(4)打开kafka的consumer
~$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flumeTest --from-beginning :
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
Hello Flume!
a total of 10 messages
(5)或者运行kafka的consumer程序
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
//String topicName = "testmul";
String topicName = "flumeTest";
Properties props = new Properties();
//props.put("zookeeper","localhost:2181"); 为什么不能用这个
//props.put("bootstrap.servers", "localhost:9094");
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "ljq");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName));
//print the topic name
System.out.println("Subscribed to topic "+topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
下面是结果
offset = 20, key = null, value = Hello Flume!
offset = 21, key = null, value = Hello Flume!
offset = 22, key = null, value = Hello Flume!
offset = 23, key = null, value = Hello Flume!
offset = 24, key = null, value = Hello Flume!
offset = 25, key = null, value = Hello Flume!
offset = 26, key = null, value = Hello Flume!
offset = 27, key = null, value = Hello Flume!
offset = 28, key = null, value = Hello Flume!
offset = 29, key = null, value = Hello Flume!
补充:可以kafka producer直接发给kafka comsumer。
offset = 30, key = my, value = 0
offset = 31, key = name, value = 1
offset = 32, key = is, value = 2
offset = 33, key = lin, value = 3
offset = 34, key = jia, value = 4
offset = 35, key = qin, value = 5
offset = 36, key = ha, value = 6
offset = 37, key = ha, value = 7
offset = 38, key = lin, value = 8
6总结
client.init("127.0.0.1", 41414);
(3)flume到kafka的topic = flume brokerList = localhost:9092
String topicName = "flumeTest";
props.put("bootstrap.servers", "localhost:9092");
数据流程图:
flume client--------->flume source-------->channel---->flume sink------->
kafka topic---------->kafka consumer
以上是关于大数据之Flume+Kafka的主要内容,如果未能解决你的问题,请参考以下文章
大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题
大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题
大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题