实时构建:Apache Kafka的大数据消息传递,Part 1
Posted 猿问
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时构建:Apache Kafka的大数据消息传递,Part 1相关的知识,希望对你有一定的参考价值。
当大数据运动开始时,它主要集中于批处理。分布式数据存储和查询工具(如MapReduce、Hive和Pig)都被设计成批量处理数据,而不是连续处理数据。企业将每天晚上运行多个作业从数据库中提取数据,然后分析、转换并最终存储数据。最近,企业发现了在数据和事件发生时分析和处理它们的能力,而不仅仅是每隔几个小时进行一次。然而,大多数传统的消息传递系统都无法实时处理大数据。因此,LinkedIn的工程师们构建了开源的Apache Kafka:一个分布式的消息传递框架,通过在普通硬件上扩展来满足大数据的需求。
在过去的几年中,Apache Kafka解决了各种各样的用例。在最简单的情况下,它可以是存储应用程序日志的简单缓冲区。结合Spark流等技术,它可以用于跟踪数据更改,并在将数据保存到最终目的地之前对数据采取行动。Kafka的预测模式使其成为检测欺诈的强大工具,例如在信用卡交易发生时检查其有效性,而不是在数小时后等待批处理。
本教程分为两部分,首先介绍如何在开发环境中安装和运行Kafka。您将了解Kafka的体系结构,然后介绍如何开发一个开箱即用的Apache Kafka消息传递系统。最后,您将构建一个定制的生产者/消费者应用程序,该应用程序通过Kafka服务器发送和消费消息。在本教程的第二部分中,您将了解如何对消息进行分区和分组,以及如何控制Kafka使用者将使用哪些消息。
Apache Kafka是什么?
Apache Kafka是为大数据构建的消息传递系统。类似于Apache ActiveMQ或RabbitMq, Kafka允许构建在不同平台上的应用程序通过异步消息传递进行通信。但Kafka与这些更传统的消息传递系统在关键方面有所不同:
它被设计成水平伸缩,通过添加更多的商品服务器。
它为生产者和使用者流程提供了更高的吞吐量。
它可以用于支持批处理用例和实时用例。
它不支持JMS, Java的面向消息的中间件API。
Apache Kafka的架构
在我们探索卡夫卡的建筑之前,你应该知道它的基本术语:
生产者是可以向主题发布消息的流程。
消费者是可以订阅一个或多个主题并使用发布到主题的消息的流程。
主题类别是发布消息的提要的名称。
一个broker是在单台机器上运行的进程。
一个cluster是一组一起工作的brokers。
Apache Kafka的体系结构非常简单,可以在某些系统中获得更好的性能和吞吐量。Kafka中的每个主题都像一个简单的日志文件。当生产者发布消息时,Kafka服务器将其附加到给定主题的日志文件的末尾。服务器还分配一个偏移量,这是一个用于永久标识每个消息的数字。随着消息数量的增加,每个偏移量的值也随之增加;例如,如果生产者发布三个消息,第一个消息的偏移量可能是1,第二个消息的偏移量是2,第三个消息的偏移量是3。
当Kafka使用者第一次启动时,它将向服务器发送一个pull请求,请求检索偏移量大于0的特定主题的任何消息。服务器将检查该主题的日志文件,并返回三条新消息。使用者将处理消息,然后发送一个偏移量大于3的消息请求,以此类推。
在Kafka中,客户机负责记住偏移量计数并检索消息。Kafka服务器不跟踪或管理消息消费。默认情况下,Kafka服务器将保存消息七天。服务器中的后台线程检查和删除七天以上的消息。只要消息在服务器上,使用者就可以访问它们。它可以多次读取消息,甚至可以以相反的接收顺序读取消息。但是,如果使用者在七天结束前没有检索到该消息,那么它将错过该消息。
Apache Kafka快速设置和演示
在本教程中,我们将构建一个定制的应用程序,但是首先让我们使用开箱即用的生产者和消费者来安装和测试Kafka实例。
访问Kafka下载页面安装最新的版本(撰写本文时为0.9)。
将二进制文件解压缩到软件/kafka文件夹中。对于当前版本,它是software/kafka_2.11-0.9.0.0。
将当前目录更改为指向新文件夹。
通过执行以下命令启动Zookeeper服务器:bin/ Zookeeper -server- Start.sh config/ zookeeper.properties。
通过执行来启动Kafka服务器:
bin/kafka-server-start.sh config/server.properties
.创建一个可用于测试的测试主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld
.启动一个简单的控制台使用者,该使用者可以使用发布到给定主题的消息, 例如
javaworld
:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning
.启动一个可以向测试主题发布消息的简单生成器控制台:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld
.尝试在生成器控制台输入一到两条消息。您的消息应该显示在使用者控制台中。
Apache Kafka的示例应用程序
您已经看到了Apache Kafka是如何开箱即用的。接下来,让我们开发一个定制的生产者/消费者应用程序。生成器将从控制台检索用户输入,并将每个新行作为消息发送到Kafka服务器。使用者将检索给定主题的消息并将其打印到控制台。在这种情况下,生产者和消费者组件是您自己的kafka-console-producer实现。sh和kafka-console-consumer.sh。
让我们从创建一个Producer.java类。这个客户机类包含从控制台读取用户输入并将该输入作为消息发送到Kafka服务器的逻辑。
我们通过从java.util.Properties中创建一个对象来配置生成器。属性类并设置其属性。ProducerConfigclass定义了所有可用的不同属性,但是Kafka的默认值对于大多数使用来说已经足够了。对于默认配置,我们只需要设置三个强制属性:
BOOTSTRAP_SERVERS_CONFIG
KEY_SERIALIZER_CLASS_CONFIG
VALUE_SERIALIZER_CLASS_CONFIG
BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)设置一个主机:端口对列表,用于在host1:port1、host2:port2、…格式。即使Kafka集群中有多个代理,我们也只需要指定第一个代理的主机:port的值。Kafka客户机将使用这个值对代理进行discover调用,该调用将返回集群中所有代理的列表。在BOOTSTRAP_SERVERS_CONFIG中指定多个代理是一个好主意,这样如果第一个代理宕机,客户机就可以尝试其他代理。
Kafka服务器需要byte[] key, byte[] value格式的消息。Kafka的客户端库允许我们使用更友好的类型,如String
和int来发送消息,而不是转换每个键和值。库将把这些转换为适当的类型。例如,示例应用程序没有特定于消息的键,因此我们将对该键使用null。对于这个值,我们将使用一个String,它是用户在控制台上输入的数据
To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIG
on the org.apache.kafka.common.serialization.ByteArraySerializer
. This works because null doesn't need to be converted into byte[]
. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG
on the org.apache.kafka.common.serialization.StringSerializer
, because that class knows how to convert a String
into a byte[]
.
为了配置消息键,我们在org.apache.kafka.common. serialize . bytearrayserializer中设置KEY_SERIALIZER_CLASS_CONFIG的值。这是因为null不需要转换为byte[]。对于消息值,我们在org.apache.kafka.common. serialize . stringserializer上设置VALUE_SERIALIZER_CLASS_CONFIG,因为该类知道如何将String
转换为byte[]。
Kafka生产者
在用必要的配置属性填充Properties类之后,我们可以使用它来创建KafkaProducer的对象。在此之后,每当我们想向Kafka服务器发送消息时,我们将创建一个ProducerRecord对象,并使用该记录调用KafkaProducer的send()方法来发送消息。ProducerRecord接受两个参数:应该向其发布消息的主题的名称和实际消息。在使用生成器时,不要忘记调用Producer.close()方法:
清单1. KafkaProducer
public class Producer {
private static Scanner in;
public static void main(String[] argv)throws Exception {
if (argv.length != 1) {
System.err.println("Please specify 1 parameters ");
System.exit(-1);
}
String topicName = argv[0];
in = new Scanner(System.in);
System.out.println("Enter message(type exit to quit)");
//Configure the Producer
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer producer = new KafkaProducer<String, String>(configProperties);
String line = in.nextLine();
while(!line.equals("exit")) {
ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line);
producer.send(rec);
line = in.nextLine();
}
in.close();
producer.close();
}
}
配置消息使用者
接下来,我们将创建订阅主题的简单使用者。每当向主题发布新消息时,它将读取该消息并将其打印到控制台。消费者代码与生产者代码非常相似。我们首先创建一个java.util.Properties对象。属性,设置其特定于使用者的属性,然后使用它创建KafkaConsumer的新对象。ConsumerConfig类定义了我们可以设置的所有属性。
BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
GROUP_ID_CONFIG (bootstrap.servers)
Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG
to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,...
format.
就像我们对producer类所做的那样,我们将使用BOOTSTRAP_SERVERS_CONFIG来为consumer类配置主机/端口对。这个配置允许我们用host1:port1,host2:port2,…格式建立对kafka集群的初始化的链接。
如前所述,Kafka服务器需要byte[]键和byte[]值格式的消息,并有自己的实现来将不同的类型序列化为byte[]。就像我们对生成器所做的那样,在消费者端,我们必须使用自定义反序列化器将byte[]转换回适当的类型。
在示例应用程序中,我们知道生成器对键使用ByteArraySerializer,对值使用StringSerializer。因此,在客户端,我们需要为键使用org.apache.kafka.common. serialize . ByteArrayDeserializer,为值使用org.apache.kafka.common. serialize . StringDeserializer。将这些类设置为KEY_DESERIALIZER_CLASS_CONFIG和VALUE_DESERIALIZER_CLASS_CONFIG的值将使使用者能够反序列化生产者发送的byte[]编码类型。
最后,我们需要设置GROUP_ID_CONFIG的值。这应该是字符串格式的组名。稍后我将进一步解释这个配置。现在,只需查看Kafka消费者设置的4个强制属性:
清单 2. KafkaConsumer
public class Consumer {
private static Scanner in;
private static boolean stop = false;
public static void main(String[] argv)throws Exception{
if (argv.length != 2) {
System.err.printf("Usage: %s <topicName> <groupId>\n",
Consumer.class.getSimpleName());
System.exit(-1);
}
in = new Scanner(System.in);
String topicName = argv[0];
String groupId = argv[1];
ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
consumerRunnable.start();
String line = "";
while (!line.equals("exit")) {
line = in.next();
}
consumerRunnable.getKafkaConsumer().wakeup();
System.out.println("Stopping consumer .....");
consumerRunnable.join();
}
private static class ConsumerThread extends Thread{
private String topicName;
private String groupId;
private KafkaConsumer<String,String> kafkaConsumer;
public ConsumerThread(String topicName, String groupId){
this.topicName = topicName;
this.groupId = groupId;
}
public void run() {
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");
//Figure out where to start processing messages from
kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName));
//Start processing messages
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.value());
}
}catch(WakeupException ex){
System.out.println("Exception caught " + ex.getMessage());
}finally{
kafkaConsumer.close();
System.out.println("After closing KafkaConsumer");
}
}
public KafkaConsumer<String,String> getKafkaConsumer(){
return this.kafkaConsumer;
}
}
}
Consumer and ConsumerThread
Writing the consumer code in Listing 2 in two parts ensures that we close the Consumer
object before exiting. I'll describe each class in turn. First, ConsumerThread
is an inner class that takes a topic name and group name as its arguments. In the run()
method it creates a KafkaConsumer
object, with appropriate properties. It subscribes to the topic that was passed as an argument in the constructor, by calling the kafkaConsumer.subscribe()
method, then polls the Kafka server every 100 milliseconds to check if there are any new messages in the topic. It will iterate through the list of any new messages and print them to the console.
将清单2中的使用者代码分为两部分编写,可以确保在退出之前关闭Consumer对象。我将依次描述每个类。首先,ConsumerThread是一个内部类,它以主题名和组名作为参数。在run()方法中,它创建一个具有适当属性的KafkaConsumer对象。它通过调用kafkaConsumer.subscribe()方法订阅主题(在构造器中主题会被作为参数传递),然后每100毫秒轮询Kafka服务器,检查主题中是否有任何新消息。它将遍历新消息的列表并且打印到控制台。
在Consumer类中,我们创建了一个新的ConsumerThread对象,并在另一个线程中启动它。ConsumerThead启动一个无限循环,并不断轮询主题以获取新消息。同时,在Consumer类中,主线程等待用户在控制台输入exit。一旦用户进入exit,它将调用KafkaConsumer.wakeup()方法,导致KafkaConsumer停止轮询新消息并抛出WakeupException。然后,我们可以通过调用KafkaConsumer的close()方法优雅地关闭KafkaConsumer。
测试运行
要测试这个应用程序,可以在IDE中运行清单1和清单2中的代码,或者遵循以下步骤:
通过执行以下命令下载示例代码KafkaAPIClient:
git clone https://github.com/sdpatil/KafkaAPIClient.git
.编译代码并使用该命令创建一个fat JAR:
mvn clean compile assembly:single
.运行consumer:
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer test group1
.运行producer:
java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Producer test
.在生产者控制台中输入一条消息,并检查该消息是否出现在使用者中。试着发几条信息。
在consumer和producer控制台中键入exit关闭它们。
在本教程的前半部分,您已经学习了使用Apache Kafka进行大数据消息传递的基础知识,包括Kafka的概念概述、设置说明以及如何使用Kafka配置生产者/消费者消息传递系统。
正如您所看到的,Kafka的体系结构既简单又高效,是为性能和吞吐量而设计的。在第2部分中,我将介绍使用Kafka进行分布式消息传递的一些更高级的技术,首先使用分区来细分主题。我还将演示如何管理消息偏移量,以支持不同的用例。
以上是关于实时构建:Apache Kafka的大数据消息传递,Part 1的主要内容,如果未能解决你的问题,请参考以下文章
专为实时而构建:使用Apache Kafka进行大数据消息传递 第2部分