KafkaLogstashZookeeper实现数据采集系统

Posted 砂之寞架构说

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了KafkaLogstashZookeeper实现数据采集系统相关的知识,希望对你有一定的参考价值。

一、环境要求

JDK: 1.8

Kafka: kafka_2.11-2.1.0

Logstash: logstash-6.5.4

Zookeeper: zookeeper-3.4.12

系统:CentOS Linux release 7.4.1708 (Core)

 

二、Zookeeper安装

出门左转,看之前写的[zookeeper集群管理solrCloud],里边有详细的Zookeeper安装说明

https://mp.weixin.qq.com/s?__biz=MzA5MTI1OTg0Mg==&mid=2805440931&idx=1&sn=befc29126370522d9e34c8a0250bb904&chksm=b2bc170e85cb9e186d041dd6b52dec56a1a28e6212815a7c7343dbbb856f7f25157ce916e16b&token=878086380&lang=zh_CN#rd

 

三、Kafka安装

1、Kafka简介

Introduction

Apache Kafka® is a distributed streaming platform. What exactly does that mean?

A streaming platform has three key capabilities:

Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.

Store streams of records in a fault-tolerant durable way.

Process streams of records as they occur.

Kafka is generally used for two broad classes of applications:

Building real-time streaming data pipelines that reliably get data between systems or applications

Building real-time streaming applications that transform or react to the streams of data

To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up.

First a few concepts:

Kafka is run as a cluster on one or more servers that can span multiple datacenters.

The Kafka cluster stores streams of records in categories called topics.

Each record consists of a key, a value, and a timestamp.

1)Kafka是一个分布式的流处理平台;

2)、发布和订阅流数据;

3)、高容错性;

4)、在系统或应用间实时处理流数据;

5)Kafka运行在集群上。

 

2、下载

http://kafka.apache.org/quickstart

 

到镜像站:

http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

 

3、单个broker安装

说明: 单个broker、多个broker这里不贴演示效果,放在多brokerlogstash结合时演示

3.1 解压到某个指定目录即可

tar -zxvf kafka_2.11-2.1.0.tgz -C /usr/local

cd /usr/local/kafka_2.11-2.1.0

3.2 Kafka自带有Zookeeper,启动zookeeper服务

bin/zookeeper-server-start.sh config/zookeeper.properties

3.3 启动Kafka服务

bin/kafka-server-start.sh config/server.properties

3.4 创建一个topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

3.4.1 参数说明

--create 表示创建

--zookeeper 表示zookeeper

--replication-factor 表示用几个备份

--partitions 表示几个分区

--topic 表示什么主题

3.5 发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

3.5.1 参数说明

--broker-list 表示有哪些节点,多个可以用逗号隔开

3.6 开启一个consumer接收消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

3.6.1 参数说明

--bootstrap-server 用哪台机器的服务

--from-beginning 表示是否从头开始消费,不从头消费,不用有此参数

 

4、多个brokerzookeeper集群结合

4.1 启动zookeeper集群

./start-zookeeper.sh

/usr/local/solrcloud/zookeeper1/bin/zkServer.sh start &&

/usr/local/solrcloud/zookeeper2/bin/zkServer.sh start &&

/usr/local/solrcloud/zookeeper3/bin/zkServer.sh start

 

4.2 配置多broker

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties

cp config/server.properties config/server-3.properties

 

4.3 修改server-*.properties文件

将三个brokerid分别设置成123

端口设置成909290939094

日志目录设置成:/tmp/kafka-logs-1/tmp/kafka-logs-2/tmp/kafka-logs-3

注:如果日志/tmp目录是会丢失的,可以设置成其他目录

配置与zookeeper集群连接

 

下边是broker-1的主要几项修改,broker-2broker-3同样道理:

# The id of the broker. This must be set to a unique integer for each broker.

# 注释是说用一个唯一的整形表示每个broker

broker.id=1

 

# The address the socket server listens on. It will get the value returned from

# java.net.InetAddress.getCanonicalHostName() if not configured.

#   FORMAT:

#     listeners = listener_name://host_name:port

#   EXAMPLE:

#     listeners = PLAINTEXT://your.host.name:9092

listeners=PLAINTEXT://192.168.1.104:9092

 

# Hostname and port the broker will advertise to producers and consumers. If not set,

# it uses the value for "listeners" if configured.  Otherwise, it will use the value

# returned from java.net.InetAddress.getCanonicalHostName().

advertised.listeners=PLAINTEXT://192.168.1.104:9092

 

# A comma separated list of directories under which to store log files

log.dirs=/tmp/kafka-logs-1

 

zookeeper.connect=192.168.1.104:2181,192.168.1.104:2182,192.168.1.104:2183

 

4.4 启动kafka的多个broker

bin/kafka-server-start.sh --daemon config/server-1.properties &

bin/kafka-server-start.sh --daemon config/server-2.properties &

bin/kafka-server-start.sh --daemon config/server-3.properties &

 

4.5 创建topic

bin/kafka-topics.sh --create --zookeeper 192.168.1.104:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

 

4.6 查看kafka集群状态

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:

    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3

PartitionCount:1 表示一个分区

ReplicationFactor:3 表示三个备份

Leader:1 表示主

Replicas:1,2,3 表示三个复制集

Lsr:1,2,3 表示存活节点

 

4.7 生产消息

bin/kafka-console-producer.sh

--broker-list yyd:9092,yyd:9093,yyd:9094

--topic my-replicated-topic

 

4.8 消费消息

bin/kafka-console-consumer.sh

--bootstrap-server yyd:9092

--from-beginning

--topic my-replicated-topic

 

 

 

4.9 容错测试

jps 查看java进程

1969 QuorumPeerMain

6978 Jps

2387 Kafka

2743 Kafka

3175 Kafka

4919 ConsoleConsumer

1934 QuorumPeerMain

1903 QuorumPeerMain

杀掉其中的1个或2kafka进行测试

并不影响kafka的正常使用

 

四、elastic的logstash安装

1、logstash简介

Centralize, Transform & Stash Your Data

Logstash is an open source, server-side data processing pipeline that ingests data from a multitude of sources simultaneously, transforms it, and then sends it to your favorite stash.(Ours is Elasticsearch, naturally.)

1)、集中,转换和存储您的数据;

2)Logstash是一个开源的服务器端数据处理管道,它可以同时从多个源中提取数据,对其进行转换,然后将其发送到您最喜欢的“存储”。(我们自然就是Elasticsearch)。

 

2、下载

https://www.elastic.co/products/logstash

 

Kafka、Logstash、Zookeeper实现数据采集系统

Kafka、Logstash、Zookeeper实现数据采集系统

 

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.5.4.tar.gz

 

3、解压

tar -zxvf logstash-6.5.4.tar.gz -C /usr/local

 

4、使用

4.1 logstash分别用配置文件来实现输入与输出,跟flume差不多。来看一个代码中给的范例

/usr/local/logstash-6.5.4/config/logstash-sample.conf

# Sample Logstash configuration for creating a simple

# Beats -> Logstash -> Elasticsearch pipeline.

input {

  beats {

    port => 5044

  }

}

output {

  elasticsearch {

    hosts => ["http://localhost:9200"]

    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"

    #user => "elastic"

    #password => "changeme"

  }

}

4.2 logstash有许多种形式的输入与输出,参考官网

https://www.elastic.co/guide/en/logstash/current/configuration.html

https://www.elastic.co/guide/en/logstash/current/input-plugins.html

https://www.elastic.co/guide/en/logstash/current/output-plugins.html

 

五、Kafka与Logstash结合

1、启动zookeeper集群

2、启动kafka集群

3、配置logstash配置

input {

  file {

    path => "/usr/local/logstash-6.5.4/test/logstash.txt"

  }

}

 

output {

  kafka {

    codec => json

    topic_id => "logstash_topic"

    bootstrap_servers => "192.168.1.104:9092,192.168.1.104:9093,192.168.1.104:9094"

    batch_size => 1

  }

}

3.1、表示监控一个文件的变化输出到到kafka

3.2、创建logstash.txt文件

3.3、参考链接

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html

 

4、测试

4.1、启动logstash

bin/logstash -f config/logstash-kafka.conf

Kafka、Logstash、Zookeeper实现数据采集系统

 

4.2、创建topic

./kafka-topics.sh

--create

--zookeeper yyd:2181

--replication-factor 3

--partitions 1

--topic logstash_topic

 

4.3、启动kafkaconsumer

./kafka-console-consumer.sh

--bootstrap-server yyd:9093

--topic logstash_topic

 

4.2、修改logstash.txt文件

echo 111111111111111111 >> test/logstash.txt

 

4.3、在kafkaconsumer端可以看到

Kafka、Logstash、Zookeeper实现数据采集系统

{"host":"yyd","@timestamp":"2019-01-09T21:31:49.768Z","@version":"1","message":"111111111111111111","path":"/usr/local/logstash-6.5.4/test/logstash.txt"}

 

4.4、继续修改logstash.txt

echo 222222222222222222 >> test/logstash.txt

4.5consumer

{"host":"yyd","@timestamp":"2019-01-09T21:31:49.768Z","@version":"1","message":"111111111111111111","path":"/usr/local/logstash-6.5.4/test/logstash.txt"}

{"host":"yyd","@timestamp":"2019-01-09T21:34:22.238Z","@version":"1","message":"22222222222222222222222","path":"/usr/local/logstash-6.5.4/test/logstash.txt"}

 

其实一直想与你交流,但你却在很远的地方,如果你有时间经过这里,不妨再花几秒,让我们相识!

以上是关于KafkaLogstashZookeeper实现数据采集系统的主要内容,如果未能解决你的问题,请参考以下文章

用一个宏实现求两个数中的最大数

python实现三位的水仙花数(阿姆斯特朗数)

第5章-10.两数之和 (30分)(列表实现和字典实现)

Golang 实现卡特兰数

算法基础:最大递减数问题(Golang实现)

Oracle实现分页,每页有多少条记录数