KafkaLogstashZookeeper实现数据采集系统
Posted 砂之寞架构说
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了KafkaLogstashZookeeper实现数据采集系统相关的知识,希望对你有一定的参考价值。
一、环境要求
JDK: 1.8
Kafka: kafka_2.11-2.1.0
Logstash: logstash-6.5.4
Zookeeper: zookeeper-3.4.12
系统:CentOS Linux release 7.4.1708 (Core)
二、Zookeeper安装
出门左转,看之前写的[zookeeper集群管理solrCloud],里边有详细的Zookeeper安装说明
https://mp.weixin.qq.com/s?__biz=MzA5MTI1OTg0Mg==&mid=2805440931&idx=1&sn=befc29126370522d9e34c8a0250bb904&chksm=b2bc170e85cb9e186d041dd6b52dec56a1a28e6212815a7c7343dbbb856f7f25157ce916e16b&token=878086380&lang=zh_CN#rd |
三、Kafka安装
1、Kafka简介
Introduction Apache Kafka® is a distributed streaming platform. What exactly does that mean? A streaming platform has three key capabilities: Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. Store streams of records in a fault-tolerant durable way. Process streams of records as they occur. Kafka is generally used for two broad classes of applications: Building real-time streaming data pipelines that reliably get data between systems or applications Building real-time streaming applications that transform or react to the streams of data To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up. First a few concepts: Kafka is run as a cluster on one or more servers that can span multiple datacenters. The Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a timestamp. |
1)、Kafka是一个分布式的流处理平台; 2)、发布和订阅流数据; 3)、高容错性; 4)、在系统或应用间实时处理流数据; 5)、Kafka运行在集群上。 |
2、下载
http://kafka.apache.org/quickstart |
|
到镜像站: http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz |
3、单个broker安装
说明: 单个broker、多个broker这里不贴演示效果,放在多broker与logstash结合时演示
3.1 解压到某个指定目录即可
tar -zxvf kafka_2.11-2.1.0.tgz -C /usr/local cd /usr/local/kafka_2.11-2.1.0 |
3.2 Kafka自带有Zookeeper,启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties |
3.3 启动Kafka服务
bin/kafka-server-start.sh config/server.properties |
3.4 创建一个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
3.4.1 参数说明
--create 表示创建
--zookeeper 表示zookeeper
--replication-factor 表示用几个备份
--partitions 表示几个分区
--topic 表示什么主题
3.5 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test |
3.5.1 参数说明
--broker-list 表示有哪些节点,多个可以用逗号隔开
3.6 开启一个consumer接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning |
3.6.1 参数说明
--bootstrap-server 用哪台机器的服务
--from-beginning 表示是否从头开始消费,不从头消费,不用有此参数
4、多个broker与zookeeper集群结合
4.1 启动zookeeper集群
./start-zookeeper.sh |
/usr/local/solrcloud/zookeeper1/bin/zkServer.sh start && /usr/local/solrcloud/zookeeper2/bin/zkServer.sh start && /usr/local/solrcloud/zookeeper3/bin/zkServer.sh start |
4.2 配置多broker
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties cp config/server.properties config/server-3.properties |
4.3 修改server-*.properties文件
将三个broker的id分别设置成1、2、3 端口设置成9092、9093、9094 日志目录设置成:/tmp/kafka-logs-1、/tmp/kafka-logs-2、/tmp/kafka-logs-3 注:如果日志/tmp目录是会丢失的,可以设置成其他目录 配置与zookeeper集群连接
下边是broker-1的主要几项修改,broker-2与broker-3同样道理: # The id of the broker. This must be set to a unique integer for each broker. # 注释是说用一个唯一的整形表示每个broker broker.id=1
# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://192.168.1.104:9092
# Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://192.168.1.104:9092
# A comma separated list of directories under which to store log files log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.1.104:2181,192.168.1.104:2182,192.168.1.104:2183 |
4.4 启动kafka的多个broker
bin/kafka-server-start.sh --daemon config/server-1.properties & bin/kafka-server-start.sh --daemon config/server-2.properties & bin/kafka-server-start.sh --daemon config/server-3.properties & |
4.5 创建topic
bin/kafka-topics.sh --create --zookeeper 192.168.1.104:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic |
4.6 查看kafka集群状态
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic |
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 |
PartitionCount:1 表示一个分区 ReplicationFactor:3 表示三个备份 Leader:1 表示主 Replicas:1,2,3 表示三个复制集 Lsr:1,2,3 表示存活节点 |
4.7 生产消息
bin/kafka-console-producer.sh --broker-list yyd:9092,yyd:9093,yyd:9094 --topic my-replicated-topic |
4.8 消费消息
bin/kafka-console-consumer.sh --bootstrap-server yyd:9092 --from-beginning --topic my-replicated-topic |
4.9 容错测试
jps 查看java进程 |
1969 QuorumPeerMain 6978 Jps 2387 Kafka 2743 Kafka 3175 Kafka 4919 ConsoleConsumer 1934 QuorumPeerMain 1903 QuorumPeerMain |
杀掉其中的1个或2个kafka进行测试 |
并不影响kafka的正常使用 |
四、elastic的logstash安装
1、logstash简介
Centralize, Transform & Stash Your Data Logstash is an open source, server-side data processing pipeline that ingests data from a multitude of sources simultaneously, transforms it, and then sends it to your favorite “stash.” (Ours is Elasticsearch, naturally.) |
1)、集中,转换和存储您的数据; 2)、Logstash是一个开源的服务器端数据处理管道,它可以同时从多个源中提取数据,对其进行转换,然后将其发送到您最喜欢的“存储”。(我们自然就是Elasticsearch)。 |
2、下载
https://www.elastic.co/products/logstash |
|
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.5.4.tar.gz |
3、解压
tar -zxvf logstash-6.5.4.tar.gz -C /usr/local |
4、使用
4.1 logstash分别用配置文件来实现输入与输出,跟flume差不多。来看一个代码中给的范例
/usr/local/logstash-6.5.4/config/logstash-sample.conf |
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { beats { port => 5044 } } output { elasticsearch { hosts => ["http://localhost:9200"] index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" #user => "elastic" #password => "changeme" } } |
4.2 logstash有许多种形式的输入与输出,参考官网
https://www.elastic.co/guide/en/logstash/current/configuration.html |
https://www.elastic.co/guide/en/logstash/current/input-plugins.html |
https://www.elastic.co/guide/en/logstash/current/output-plugins.html |
五、Kafka与Logstash结合
1、启动zookeeper集群
2、启动kafka集群
3、配置logstash配置
input { file { path => "/usr/local/logstash-6.5.4/test/logstash.txt" } }
output { kafka { codec => json topic_id => "logstash_topic" bootstrap_servers => "192.168.1.104:9092,192.168.1.104:9093,192.168.1.104:9094" batch_size => 1 } } |
3.1、表示监控一个文件的变化输出到到kafka
3.2、创建logstash.txt文件
3.3、参考链接
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html |
4、测试
4.1、启动logstash
bin/logstash -f config/logstash-kafka.conf |
4.2、创建topic
./kafka-topics.sh --create --zookeeper yyd:2181 --replication-factor 3 --partitions 1 --topic logstash_topic |
4.3、启动kafka的consumer
./kafka-console-consumer.sh --bootstrap-server yyd:9093 --topic logstash_topic |
4.2、修改logstash.txt文件
echo “111111111111111111” >> test/logstash.txt |
4.3、在kafka的consumer端可以看到
{"host":"yyd","@timestamp":"2019-01-09T21:31:49.768Z","@version":"1","message":"“111111111111111111”","path":"/usr/local/logstash-6.5.4/test/logstash.txt"} |
4.4、继续修改logstash.txt
echo “222222222222222222” >> test/logstash.txt |
4.5、consumer端
{"host":"yyd","@timestamp":"2019-01-09T21:31:49.768Z","@version":"1","message":"“111111111111111111”","path":"/usr/local/logstash-6.5.4/test/logstash.txt"} {"host":"yyd","@timestamp":"2019-01-09T21:34:22.238Z","@version":"1","message":"“22222222222222222222222”","path":"/usr/local/logstash-6.5.4/test/logstash.txt"} |
其实一直想与你交流,但你却在很远的地方,如果你有时间经过这里,不妨再花几秒,让我们相识!
以上是关于KafkaLogstashZookeeper实现数据采集系统的主要内容,如果未能解决你的问题,请参考以下文章