kafka-python消费者读取数据时自定义偏移量,自定义数据读取的顺序
Posted 呆萌的代Ma
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka-python消费者读取数据时自定义偏移量,自定义数据读取的顺序相关的知识,希望对你有一定的参考价值。
kafka-python
的安装、基本使用与zookeeper启动等请参考:使用python连接kafka
自定义consumer读取的offset写法
注意在kafka-python
中使用消费者自定义offset的读取顺序时,消费者的写法:
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
tp = TopicPartition("python_test", 0) # 参数是[topic名称,partition]
consumer.assign([tp]) # 这里是声明我要手动管理这个consumer的这个partition啦
读取数据的时候使用:
consumer.seek(tp, 3) # 这里可以手动设置偏移量,比如设置从第3个数开始读
consumer.seek(tp, 50) # 从第50个数开始读取
next(consumer)
ATTENTION:初始化consumer时,不能够使用这种一次性把topic什么的都传入进来的形式:
# 不能这么初始化 consumer
consumer = KafkaConsumer(self.topic, bootstrap_servers='localhost:9092', auto_offset_reset='latest', api_version=(0, 10, 2))
示例代码
1. 启动生产者,创建数据
首先启动生产者:
from kafka import KafkaProducer
import datetime
import json
# 启动生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
my_topic = "python_test"
for i in range(100):
data = 'num': i, 'data': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
producer.send(my_topic, json.dumps(data).encode('utf-8')).get(timeout=30)
这里启动一个python_test
的topic,然后往里面写入100个数据
2. 消费者自定义offset读取
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import random
consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
tp = TopicPartition("python_test", 0) # 参数是[topic名称,partition]
consumer.assign([tp]) # 这里是声明我要手动管理这个consumer的这个partition啦
for i in range(10):
random_seek = random.randint(0, 100) # 这里是随机生成从0-100的随机整数,用于设置偏移量
consumer.seek(tp, random_seek) # 这里是设置偏移量
consumer_data = next(consumer) # 这个读取consumer的内容,注意:使用next后,偏移量自动+1
print(consumer_data)
打印结果:
ConsumerRecord(topic='python_test', partition=0, offset=66, timestamp=1646473931338, timestamp_type=0, key=None, value=b'"num": 61, "data": "2022-03-05 17:52:11"', headers=[], checksum=1900770981, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=87, timestamp=1646473931355, timestamp_type=0, key=None, value=b'"num": 82, "data": "2022-03-05 17:52:11"', headers=[], checksum=3522714781, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=71, timestamp=1646473931342, timestamp_type=0, key=None, value=b'"num": 66, "data": "2022-03-05 17:52:11"', headers=[], checksum=1871660722, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=50, timestamp=1646473931318, timestamp_type=0, key=None, value=b'"num": 45, "data": "2022-03-05 17:52:11"', headers=[], checksum=4078097399, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=11, timestamp=1646473931271, timestamp_type=0, key=None, value=b'"num": 6, "data": "2022-03-05 17:52:11"', headers=[], checksum=2130074065, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=71, timestamp=1646473931342, timestamp_type=0, key=None, value=b'"num": 66, "data": "2022-03-05 17:52:11"', headers=[], checksum=1871660722, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=83, timestamp=1646473931352, timestamp_type=0, key=None, value=b'"num": 78, "data": "2022-03-05 17:52:11"', headers=[], checksum=1428608549, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=25, timestamp=1646473931285, timestamp_type=0, key=None, value=b'"num": 20, "data": "2022-03-05 17:52:11"', headers=[], checksum=1811806078, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=36, timestamp=1646473931296, timestamp_type=0, key=None, value=b'"num": 31, "data": "2022-03-05 17:52:11"', headers=[], checksum=330696171, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=18, timestamp=1646473931278, timestamp_type=0, key=None, value=b'"num": 13, "data": "2022-03-05 17:52:11"', headers=[], checksum=719857123, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
以上是关于kafka-python消费者读取数据时自定义偏移量,自定义数据读取的顺序的主要内容,如果未能解决你的问题,请参考以下文章
kafka-python KafkaConsumer 多分区提交偏移量
Kafka-python 客户端导致的 cpu 使用过高,且无法消费消息的问题
使用 LocalDate 字段读取对象时自定义 ObjectInputStream 的意外行为
Kafka 消费者正在读取重新启动时最后提交的偏移量(Java)