专为实时而构建:使用Apache Kafka进行大数据消息传递,第1部分
Posted 银河系1号
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了专为实时而构建:使用Apache Kafka进行大数据消息传递,第1部分相关的知识,希望对你有一定的参考价值。
当大数据运动开始时,它主要集中在批处理上。分布式数据存储和查询工具(如MapReduce,Hive和Pig)都旨在分批处理数据而不是连续处理数据。企业每晚都会运行多个作业,从数据库中提取数据,然后分析,转换并最终存储数据。最近,企业发现了分析和处理数据和事件的能力,而不是每隔几个小时就会发生一次。然而,大多数传统的消息传递系统不能扩展以实时处理大数据。所以LinkedIn的工程师构建并开源Apache Kafka:一种分布式消息传递框架,通过扩展商用硬件来满足大数据的需求。
在过去几年中,Apache Kafka已经出现,以解决各种情况。在最简单的情况下,它可以是用于存储应用程序日志的简单缓冲区。结合Spark Streaming等技术,它可用于跟踪数据更改并对数据执行操作,然后将其保存到最终目标。Kafka的预测模式使其成为检测欺诈的有力工具,例如在信用卡交易发生时检查信用卡交易的有效性,而不是等待数小时后的批处理。
这个由两部分组成的教程介绍了Kafka,从如何在开发环境中安装和运行它开始。您将了解Kafka的架构,然后介绍如何开发开箱即用的Apache Kafka消息传递系统。最后,您将构建一个自定义生产者/消费者应用程序,通过Kafka服务器发送和使用消息。在本教程的后半部分,您将学习如何对消息进行分区和分组,以及如何控制Kafka消费者将使用哪些消息。
什么是Apache Kafka?
Apache Kafka是为大数据扩展而构建的消息传递系统。与Apache ActiveMQ或RabbitMq类似,Kafka使构建在不同平台上的应用程序能够通过异步消息传递进行通信。但Kafka与这些更传统的消息传递系统的关键方式不同:
它旨在通过添加更多服务器来横向扩展。
它为生产者和消费者流程提供了更高的吞吐量。
它可用于支持批处理和实时用例。
它不支持Java的面向消息的中间件API JMS。
Apache Kafka的架构
在我们探索Kafka的架构之前,您应该了解它的基本术语:
producer是将消息发布到主题的一个过程。
consumer是订阅一个或多个主题并且消费发布到主题的消息的过程。
topic是消息发布的主题的名称。
broker是在一台机器上运行的进程。
cluster是一起工作的一组broker。
Apache Kafka的架构非常简单,可以在某些系统中实现更好的性能和吞吐量。Kafka中的每个topic都像一个简单的日志文件。当生产者发布消息时,Kafka服务器会将其附加到其给定topic的日志文件的末尾。服务器还分配一个偏移量,该偏移量是用于永久识别每条消息的数字。随着消息数量的增加,每个偏移量的值增加; 例如,如果生产者发布三条消息,第一条消息可能获得偏移量1,第二条消息偏移量为2,第三条偏移量为3。
当Kafka消费者首次启动时,它将向服务器发送拉取请求,要求检索偏移值大于0的特定topic的任何消息。服务器将检查该topic的日志文件并返回三个新消息。消费者将处理消息,然后发送偏移量大于3的消息请求,依此类推。
在Kafka中,客户端负责记住偏移计数和检索消息.Kafka服务器不跟踪或管理消息消耗。默认情况下,Kafka服务器将保留七天的消息。服务器中的后台线程检查并删除七天或更早的消息。只要消息在服务器上,消费者就可以访问消息。它可以多次读取消息,甚至可以按收到的相反顺序读取消息。但是,如果消费者在七天之前未能检索到消息,那么它将错过该消息。
Kafka基准
LinkedIn和其他企业的生产使用表明,通过适当的配置,Apache Kafka每天能够处理数百GB的数据。2011年,三位LinkedIn工程师使用基准测试来证明Kafka可以实现比ActiveMQ和RabbitMQ更高的吞吐量。
Apache Kafka快速设置和演示
我们将在本教程中构建一个自定义应用程序,但让我们首先安装和测试一个开箱即用的生产者和消费者的Kafka实例。
访问Kafka下载页面以安装最新版本(撰写本文时为0.9)。
将二进制文件解压缩到一个
software/kafka
文件夹中。对于当前版本,它是software/kafka_2.11-0.9.0.0
。将当前目录更改为指向新文件夹。
通过执行以下命令启动Zookeeper服务器:
bin/zookeeper-server-start.sh config/zookeeper.properties
。执行以下命令启动Kafka服务器:
bin/kafka-server-start.sh config/server.properties
。创建一个可用于测试的测试topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld
。启动一个简单的控制台使用者,它可以使用发布到给定topic的消息,例如
javaworld
:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning
。启动一个简单的生产者控制台,可以将消息发布到测试topic:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld
。尝试在生产者控制台中输入一条或两条消息。您的消息应显示在使用者控制台中。
Apache Kafka的示例应用程序
您已经了解了Apache Kafka如何开箱即用。接下来,让我们开发一个自定义生产者/消费者应用程序。生产者将从控制台检索用户输入,并将每个新行作为消息发送到Kafka服务器。消费者将检索给定topic的消息并将其打印到控制台。在这种情况下,生产者和消费者组件是您自己的kafka-console-producer.sh
和kafka-console-consumer.sh
。
让我们从创建一个Producer.java
类开始。此客户端类包含从控制台读取用户输入并将该输入作为消息发送到Kafka服务器的逻辑。
我们通过从java.util.Properties
类创建对象并设置其属性来配置生产者。该ProducerConfig类定义了所有不同的属性可用,但Kafka的默认值足以满足大多数用途。对于默认配置,我们只需要设置三个必需属性:
BOOTSTRAP_SERVERS_CONFIG
KEY_SERIALIZER_CLASS_CONFIG
VALUE_SERIALIZER_CLASS_CONFIG
BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
设置主机:端口对的列表,用于以host1:port1,host2:port2,...
格式建立与Kakfa集群的初始连接。即使我们的Kafka集群中有多个代理,我们也只需要指定第一个代理的值host:port
。Kafka客户端将使用此值在代理上进行发现调用,该代理将返回集群中所有代理的列表。最好在BOOTSTRAP_SERVERS_CONFIG
中指定多个代理,这样如果第一个代理停止运行,客户端将能够尝试其他代理。
Kafka服务器需要byte[] key, byte[] value
格式化的消息。Kafka的客户端库不是转换每个键和值,而是允许我们使用更友好的类型String
和int
发送消息。库将这些转换为适当的类型。例如,示例应用程序没有特定于消息的key,因此我们将使用null作为key。对于值,我们将使用 String
,即用户在控制台上输入的数据。
要配置消息key,我们用org.apache.kafka.common.serialization.ByteArraySerializer
设定KEY_SERIALIZER_CLASS_CONFIG
的值。这是有效的,因为null不需要转换为byte[]
。对于消息值,我们为VALUE_SERIALIZER_CLASS_CONFIG
设置了org.apache.kafka.common.serialization.StringSerializer
,因为该类知道如何将String
转换为 byte[]
。
自定义键/值对象类似于StringSerializer
,Kafka为其他原语提供了序列化程序,例如int
和long
。为了使用自定义对象作为键或值,我们需要创建一个实现类org.apache.kafka.common.serialization.Serializer
。然后我们可以添加逻辑来将类序列化为byte[]
。我们还必须在我们的消费者代码中使用相应的反序列化器。
Kafka 生产者
在Properties
使用必要的配置属性填充类之后,我们可以使用它来创建对象KafkaProducer
。每当我们要发送的消息后,该Kafka服务器,我们将创建一个对象ProducerRecord
,并调用KafkaProducer
的send()
方法发送消息。ProducerRecord
有两个参数:应该发布消息的topic的名称,以及实际的消息。使用生产者时,不要忘记调用该方法:Producer.close()
清单1. KafkaProducer
public class Producer {
private static Scanner in;
public static void main(String[] argv)throws Exception {
if (argv.length != 1) {
System.err.println("Please specify 1 parameters ");
System.exit(-1);
}
String topicName = argv[0];
in = new Scanner(System.in);
System.out.println("Enter message(type exit to quit)");
//Configure the Producer
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer producer = new KafkaProducer<String, String>(configProperties);
String line = in.nextLine();
while(!line.equals("exit")) {
ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line);
producer.send(rec);
line = in.nextLine();
}
in.close();
producer.close();
}
}
配置消息使用者
接下来,我们将创建一个订阅topic的简单消费者。每当向topic发布新消息时,它将读取该消息并将其打印到控制台。消费者代码与生产者代码非常相似。我们首先创建一个对象java.util.Properties
,设置其特定于消费者的属性,然后使用它来创建一个新对象KafkaConsumer
。ConsumerConfig类定义了我们可以设置的所有属性。只有四个强制属性:
BOOTSTRAP_SERVERS_CONFIG(bootstrap.servers)KEY_DESERIALIZER_CLASS_CONFIG(key.deserializer)VALUE_DESERIALIZER_CLASS_CONFIG(value.deserializer)GROUP_ID_CONFIG(bootstrap.servers)
正如我们为生产者类所做的那样,我们将使用BOOTSTRAP_SERVERS_CONFIG
为消费者类配置主机/端口对。此配置允许我们以host1:port1,host2:port2,...
格式建立与Kakfa集群的初始连接。正如我之前提到的,Kafka服务器需要byte[]
键和byte[]
值格式的消息,并且有自己的实现来序列化不同的类型byte[]
。正如我们对生产者所做的那样,在消费者方面,我们将不得不使用自定义反序列化器转换byte[]
回适当的类型。在示例应用程序的情况下,我们知道生产者正在使用`ByteArraySerializer`
key和StringSerializer
值。因此,在客户端,我们需要使用org.apache.kafka.common.serialization.ByteArrayDeserializer
序列化key和org.apache.kafka.common.serialization.StringDeserializer
序列化值。将这些类为赋值KEY_DESERIALIZER_CLASS_CONFIG
和VALUE_DESERIALIZER_CLASS_CONFIG
将使消费者反序列化由生产者发送的byte[]类型的数据。最后,我们需要设置值GROUP_ID_CONFIG
。这应该是字符串格式的组名。我会在一分钟内详细解释这个配置。现在,只需查看具有四个强制属性集的Kafka消费者:
清单2. KafkaConsumer
public class Consumer {
private static Scanner in;
private static boolean stop = false;
public static void main(String[] argv)throws Exception{
if (argv.length != 2) {
System.err.printf("Usage: %s <topicName> <groupId>\n",
Consumer.class.getSimpleName());
System.exit(-1);
}
in = new Scanner(System.in);
String topicName = argv[0];
String groupId = argv[1];
ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
consumerRunnable.start();
String line = "";
while (!line.equals("exit")) {
line = in.next();
}
consumerRunnable.getKafkaConsumer().wakeup();
System.out.println("Stopping consumer .....");
consumerRunnable.join();
}
private static class ConsumerThread extends Thread{
private String topicName;
private String groupId;
private KafkaConsumer<String,String> kafkaConsumer;
public ConsumerThread(String topicName, String groupId){
this.topicName = topicName;
this.groupId = groupId;
}
public void run() {
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");
//Figure out where to start processing messages from
kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName));
//Start processing messages
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.value());
}
}catch(WakeupException ex){
System.out.println("Exception caught " + ex.getMessage());
}finally{
kafkaConsumer.close();
System.out.println("After closing KafkaConsumer");
}
}
public KafkaConsumer<String,String> getKafkaConsumer(){
return this.kafkaConsumer;
}
}
}
消费者和消费者线程
将清单2中的消费者代码分为两部分来确保Consumer
在退出之前关闭对象。我将依次描述每个类。首先,ConsumerThread
是一个内部类,它将topic名称和组名称作为其参数。在该类的run()
方法中,它创建一个具有适当属性的KafkaConsumer
对象。它通过调用kafkaConsumer.subscribe()
方法订阅topic,然后每100毫秒轮询Kafka服务器以检查topic中是否有任何新消息。它将遍历任何新消息的列表并将其打印到控制台。
在Consumer
类中,我们创建一个新对象,并在另一个ConsumerThread
线程中启动它。在ConsumerThead
开始一个无限循环,并保持轮询新消息的topic。同时在Consumer
类中,主线程等待用户进入exit
控制台。一旦用户进入退出,它就会调用该KafkaConsumer.wakeup()
方法,导致KafkaConsumer
停止轮询新消息并抛出一个WakeupException
。然后,我们可以通过调用kafkaConsumer
的close()
方法关闭KafkaConsumer
。
运行该应用程序
要测试此应用程序,您可以从IDE运行清单1和清单2中的代码,也可以按照以下步骤操作:
通过执行以下命令下载示例代码KafkaAPIClient :
git clone https://github.com/sdpatil/KafkaAPIClient.git
.编译代码并使用以下命令创建胖JAR :
mvn clean compile assembly:single
.启动消费者:
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer test group1
。启动生产者:
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Producer test
。在生产者控制台中输入消息,然后检查该消息是否出现在使用者中。试试几条消息。
键入
exit
消费者和生产者控制台以关闭它们。
第1部分的结论
在本教程的前半部分,您已经了解了使用Apache Kafka进行大数据消息传递的基础知识,包括Kafka的概念性概述,设置说明以及如何使用Kafka配置生产者/消费者消息传递系统。
正如您所见,Kafka的架构既简单又高效,专为性能和吞吐量而设计。在第2部分中,我将介绍一些使用Kafka进行分布式消息传递的更高级技术,从使用分区细分主题开始。我还将演示如何管理消息偏移以支持不同的用例。
英文原文:www.javaworld.com/article/306…
以上是关于专为实时而构建:使用Apache Kafka进行大数据消息传递,第1部分的主要内容,如果未能解决你的问题,请参考以下文章
实时构建:Apache Kafka的大数据消息传递,Part 1
实时构建:Apache Kafka的大数据消息传递,Part 2
Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PVUV+展示