kafka+docker+python

Posted 永远的幻想

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka+docker+python相关的知识,希望对你有一定的参考价值。

昨天晚上刚刚才花3小时看完《日志:每个软件工程师都应该知道的有关实时数据的统一概念》

今天就把kafka在docker容器里运行起来,github上有几个,但都太复杂了。

我自己写个最简单的python的demo体验一下:https://github.com/xuqinghan/docker-kafka

和上周部署taiga相比,kafka不愧是大家手笔,基本无坑,简单记录一下:

首先是docker-compose.yml

version: \'3.1\'

services:
  zoo:
    image: zookeeper
    restart: always
    hostname: zookeeper
    volumes:
      #- zookeeper/conf:/conf
      - ./zookeeper/data:/data
      - ./zookeeper/datalog:/datalog

  kafka:
    build: kafka/
    restart: always
    volumes:
      - ./kafka/config:/kafka/config
    ports:
      - "9092:9092"
    depends_on:
      - zoo

  producer:
    stdin_open: true
    tty: true
    restart: always
    build: ./app
    volumes:
      - ./app:/app
    depends_on:
      - zoo
      - kafka
    command: [\'python3\', \'producer.py\']

  consumer:
    stdin_open: true
    tty: true
    build: ./app
    restart: always
    volumes:
      - ./app:/app
    depends_on:
      - zoo
      - kafka
    command: [\'python3\', \'consumer.py\']

1共4个容器,1个zookeeper(保存日志数据,类似celery里的backend,其实更像是git),1个kafka(类似broker),然后就是生产、消费者各1个

分别说一下

1zookeeper

这个有官方镜像:    https://hub.docker.com/_/zookeeper/  。直接用就行了,不需要写build

但是要注意看一下官网的dockerfile,./data 和 /datalog 的位置,和有些文章说的不一样,不在/var/...里

本地建个文件夹,用来挂/data  和/datalog  

2kafka

根据kafka的官网教程https://kafka.apache.org/quickstart,安装非常简单,所以照着写一个简单的dockerfile

FROM java:openjdk-8-jre
LABEL author="xuqinghan"
LABEL purpose = \'kafka\'

# ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && \\
    apt-get install -y wget

RUN wget -q http://mirrors.hust.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
RUN tar -xzf kafka_2.11-1.0.0.tgz -C /home
RUN mv /home/kafka_2.11-1.0.0 /kafka
WORKDIR /kafka
#CMD ["/bin/bash"]
CMD ["/kafka/bin/kafka-server-start.sh", "/kafka/config/server.properties"]

注意不要大跃进,不要把openjdk-8-jre改成openjdk-9-jre, 会报错。

然后本地也下载一下kafka的安装包,才47M,解出/config目录,在外面改配置,然后在dockercompose里挂进去

主要就是server.properties 里的这里

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=zoo:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

注意因为在dockercompose创建的net bridge里,所以是连接到 dockercompose.yml里 zookeeper的名字(我这里是zoo),不是localhost

3 producer和consumer

dockerfile用一个就可以了,producer.py 和consumer.py也放一个文件夹,只是在 dockercompose.yml分别起1个service就可以了

dockerfile:

FROM python
LABEL author="xuqinghan"
LABEL purpose = \'kafka\'


RUN apt update
#RUN apt install -y nginx supervisor
RUN pip3 install setuptools
RUN pip3 install kafka-python

ENV PYTHONIOENCODING=utf-8

RUN mkdir -p /app
WORKDIR /app


CMD ["/bin/bash"]

只为了测试kafka,所以异常简单,只安装了kafka-python,有文章说这个丢数据,要用C++版的,作为萌新,暂时没必要关心这个,就用它。

然后

producer.py

from kafka import KafkaProducer
import time
#  connect to Kafka
producer = KafkaProducer(bootstrap_servers=\'kafka:9092\')


def emit():
    for i in range(100):
        print(f\'send message {i}\')
        str_res = f\'{i}\'
        producer.send(\'foobar\', str_res.encode())
        time.sleep(1)

if __name__ == \'__main__\':
    emit()

consumer.py

from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(bootstrap_servers=\'kafka:9092\')
#consumer.assign([TopicPartition(\'foobar\',1)])
consumer.subscribe(\'foobar\')
print(\'consumer connected\')
for msg in consumer:
    print(msg)
    res = msg.value.decode()
    print(f\'received data:{res}\')

kafka如果不配置,topic默认是这样动态创建的,并不需要在kafka那里用sh脚本创建。

注意只能发bytes字节码。json之类在文档里也有例子http://kafka-python.readthedocs.io/en/master/,略过

最后上结果截图:

 

总结

从昨晚到今天先补日志、流处理的知识,再实做下来。总的感觉。这玩意不就是给系统上了个git么。producer往里push commit,  consumer在那里pull

现在看来,一切都被记录下来(变更过程),一切都脚本化。一切都可以播放/重放

开发时:代码变更有git管起来,代码仓库包括了全部提交的变更过程;

部署时:有docker系的脚本,CI/CD系统 一样有DSL脚本,把部署过程的全部细节都记录下来;

运行时:有kafka,把原来不怎么记录的event,用户操作,都给全部记下了。各种分系统的数据库表反而自由了,如果修改了,数据丢了,重新播放一遍日志,重新生产一遍就OK。这么干,对很多应用来说,确实李菊福。

——如果连我写的矬软件系统都可以这样,那么国家和互联网巨头,肯定能把每个人的行为都全部记录下来

将来的道德、社会风貌,一定和现在迥然不同把。

 

以上是关于kafka+docker+python的主要内容,如果未能解决你的问题,请参考以下文章

在 dockerized 环境中无法从 Flask 连接到 Kafka

我的 Python/Java/Spring/Go/Whatever Client Won’t Connect to My Apache Kafka Cluster in Docker/AWS/My B

docker运行kafka示例

从 Docker 容器将 PySpark 连接到 Kafka

需要分布式模式的 jdbc Kafka 连接配置设置作为 docker 容器的参考文档或代码

docker快速安装kafka,zookeeper ,体验spring-boot-demo-mq-kafka