kafka 介绍

Posted shenling

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka 介绍相关的知识,希望对你有一定的参考价值。

一、Kafka介绍

 kafka是消息中间件的一种,一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写

 1.应用场景

 - 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
 - 消息系统:解耦和生产者和消费者、缓存消息等。
 - 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,
  或者装载到hadoop、数据仓库中做离线分析和挖掘。
 - 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
 - 流式处理:比如spark streaming和storm
 - 事件源

1.名词解译:

  • producer:生产者,。
  • consumer:消费者,。
  • topic:可以把它理解为标签,生产者每生产出来一个消息就贴上一个标签(topic),消费者可以根据TopicName选择需要消费的消息。
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。。

 

Topic写入流程

技术图片

 


消费流程:
技术图片

 


 

二、安装
1.安装JDK 
2.安装ZK
3.安装Kafka
  1.  下载安装包 http://kafka.apache.org/downloads
  2.  进入到Kafka目录
  3.  解压并进入Kafka目录,D:softwarezookeeperapache-zookeeper-3.5.5-binapache-zookeeper-3.5.5-bin
  4.  进入config目录找到文件server.properties并打开
  5.  找到并编辑log.dirs=D:Kafkakafka_2.12-0.11.0.0kafka-logs
  6.  找到并编辑zookeeper.connect=localhost:2181
  7.  Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181
  8.  进入Kafka安装目录D:Kafkakafka_2.12-0.11.0.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入: .inwindowskafka-server-start.bat .configserver.properties       运行Kafka
  9. 创建Topic 

    .inwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tes
  10. 产品Topic是否创建  .inwindowskafka-topics.bat --list --zookeeper localhost:2181
  11. 创建生产着 
    .inwindowskafka-console-producer.bat --broker-list localhost:9092 --topic test
  12. 创建消费者
    .inwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

   13.生产者输入消息可在消费者命令这命令窗口消费到此内容

三、Java操作

1.Pom添加引用


<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.1.1.RELEASE</version>
</dependency>
<dependency>
   <groupId>com.github.danielwegener</groupId>
   <artifactId>logback-kafka-appender</artifactId>
   <version>0.2.0-RC1</version>
</dependency>

  


2.创建生产者

package com.answern.openplatform;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
* @description: Kafka生产者
* @author: shenling
* @create: 2019-08-08 12:44
**/

public class KafkaProducerDemo implements Runnable {

private final KafkaProducer<String, String> producer;
private final String topic;
public KafkaProducerDemo(String topicName) {
Properties props = new Properties();
//服务器地址,如果有多个Broker,地址参数中用","分割
props.put("bootstrap.servers", "127.0.0.1:9092");
//acks 参数共有三个值:0、1 和 all。
//0:生产者只要吧消息发送出去,不管消费者是否正确消费,生产者就认为消费者成功消费,
//1:只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。注意:kafka默认配置是为1
//allartition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。:
props.put("acks", "all");
//如果发送请求到Topic失败,重试次数
props.put("retries", 0);
//生产者发送数据大小基本单位,单位为Bytes
props.put("batch.size", 16384);
//对Key进行序列化操作(二进制格式)
props.put("key.serializer", StringSerializer.class.getName());
//对Value进行序列化操作(二进制格式)
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<String, String>(props);
this.topic = topicName;
//创建生产者结束
}

@Override
public void run() {
int messageNo = 1;
try {
for(;;) {
String messageStr="你好,这是第"+messageNo+"条数据";
producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
//生产了100条就打印
if(messageNo%100==0){
System.out.println("发送的信息:" + messageStr);
}
//生产1000条就退出
if(messageNo%1000==0){
System.out.println("成功发送了"+messageNo+"条");
break;
}
messageNo++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}

public static void main(String args[]) {
KafkaProducerDemo test = new KafkaProducerDemo("applog");
Thread thread = new Thread(test);
thread.start();
}
}
 

 


2.创建消费者
package com.answern.openplatform;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

/**
* @description: kafka消费者
* @author: shenling
* @create: 2019-08-08 12:42
**/

public class KafkaConsumerDemo implements Runnable {

private final KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private final String topic;
private static final String GROUPID = "groupA";

public KafkaConsumerDemo(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092"); ////kafka地址,多个地址用逗号分割
//相同组内,一条数据只能被消费一次。比如说我起了十个消费者,同时消费一个Topic,如果这些消费者不是在一个组里面那么,
// 会存在一条数据被十个消费者消费,共消费十次的情况
props.put("group.id", GROUPID);
//是否自动提交Offset
props.put("enable.auto.commit", "true");
//然后计算下次的auto commit时间
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}

@Override
public void run() {
int messageNo = 1;
System.out.println("---------开始消费---------");
try {
for (;;) {
msgList = consumer.poll(1000);
if(null!=msgList&&msgList.count()>0){
for (ConsumerRecord<String, String> record : msgList) {
//消费100条就打印 ,但打印的数据不一定是这个规律的
if(messageNo%100==0){
System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
}
//当消费了1000条就退出
if(messageNo%10000==0){
break;
}
messageNo++;
}
}else{
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main(String args[]) {
KafkaConsumerDemo test1 = new KafkaConsumerDemo("applog");
Thread thread1 = new Thread(test1);
thread1.start();
}
}

 

 

 

参考网址:

https://www.cnblogs.com/flower1990/p/7466882.html

http://kafka.apache.org/

 

以上是关于kafka 介绍的主要内容,如果未能解决你的问题,请参考以下文章

Kafka-文件管理

kafka安装和使用远程代码进行访问 ---附踩坑记录

kafka安装和使用远程代码进行访问 ---附踩坑记录

kafka中常用API的简单JAVA代码

Android课程---Android Studio使用小技巧:提取方法代码片段

Apache Kafka教程:入门介绍