java工程kafka传递自定义对象,消费端获取到的是null
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java工程kafka传递自定义对象,消费端获取到的是null相关的知识,希望对你有一定的参考价值。
按照网上的办法,进行了Encoder和Decoder,producer send方法也没有报错,但是在constumer 获取不到传递的消息。也没有抛出异常
3. 启服务3.1 启zookeeper
启zk两种式第种使用kafka自带zk
bin/zookeeper-server-start.sh config/zookeeper.properties&
另种使用其zookeeper位于本机位于其址种情况需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 启 kafka
bin/kafka-server-start.sh config/server.properties
4.创建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
创建名testtopic副本区
通list命令查看刚刚创建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.启producer并发送消息启producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启发送消息
比
test
hello boy
按Ctrl+C退发送消息
6.启consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
启consumerconsole看producer发送消息
启两终端发送消息接受消息
都行查看zookeeper进程kafkatopic步步排查原吧 参考技术A 多个消费者并发啊追问
只是一个简单的producer和consumer,只是传输的是实体对象,不直接是byte
flink自定义metrics监控kafka消费
一 背景
因为业务需求,要在flink中监控kafka消费的数据量以及积压情况,在网上找了很久没找到直接能用的代码。在这里把自己的实现记录一下。
有部分代码引用了:Flink监控:自定义消费延迟Metrics
二 实现
1. CustomerJsonDeserialization
import org.apache.commons.lang3.ThreadUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;
import java.lang.reflect.Field;
import java.util.Set;
/**
* @Author: Zhang Yuyao
* @Email:
* @Description:
* @Date 2021-08-09
*/
public class CustomerJsonDeserialization extends JSONKeyValueDeserializationSchema {
private String DT_TOPIC_GROUP = "topic";
private String DT_PARTITION_GROUP = "partition";
private String DT_TOPIC_PARTITION_LAG_GAUGE = "topic_partition_lag";
private Counter inCounter;
private Counter outCounter;
boolean firstMsg = true;
private AbstractFetcher<Row, ?> fetcher;
private ObjectMapper mapper;
private final boolean includeMetadata;
private RuntimeContext runtimeContext;
private String confName;
public CustomerJsonDeserialization(boolean includeMetadata, String confName) {
super(includeMetadata);
this.includeMetadata = includeMetadata;
this.confName = confName;
}
public void initMetric(){
this.inCounter =
runtimeContext.getMetricGroup()
.addGroup("web-streaming")
.counter(this.confName+"-"+"in-count");
this.outCounter =
runtimeContext.getMetricGroup().addGroup("web-streaming").counter(this.confName+"-"+"out-count");
}
@Override
public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
inCounter.inc();
if(firstMsg){
// 只有在第一条数据到来的时候,才会调用该方法
registerPtMetric(fetcher);
firstMsg = false;
}
if (mapper == null) {
mapper = new ObjectMapper();
}
ObjectNode node = mapper.createObjectNode();
if (record.key() != null) {
node.set("key", mapper.readValue(record.key(), JsonNode.class));
}
if (record.value() != null) {
node.set("value", mapper.readValue(record.value(), JsonNode.class));
}
if (includeMetadata) {
node.putObject("metadata")
.put("offset", record.offset())
.put("topic", record.topic())
.put("partition", record.partition());
}
outCounter.inc();
return node;
}
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
this.fetcher = fetcher;
}
public void setRuntimeContext(RuntimeContext runtimeContext){
this.runtimeContext = runtimeContext;
}
protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exception {
// 通过反射获取fetcher中的kafka消费者等信息, 反射获取属性路径如下:
// Flink: Fetcher -> KafkaConsumerThread -> KafkaConsumer ->
// Kafka Consumer: KafkaConsumer -> SubscriptionState -> partitionLag()
Field consumerThreadField = ((KafkaFetcher) fetcher).getClass().getDeclaredField("consumerThread");
consumerThreadField.setAccessible(true);
KafkaConsumerThread consumerThread = (KafkaConsumerThread) consumerThreadField.get(fetcher);
Field hasAssignedPartitionsField = consumerThread.getClass().getDeclaredField("hasAssignedPartitions");
hasAssignedPartitionsField.setAccessible(true);
boolean hasAssignedPartitions = (boolean) hasAssignedPartitionsField.get(consumerThread);
if(!hasAssignedPartitions){
throw new RuntimeException("wait 50 secs, but not assignedPartitions");
}
Field consumerField = consumerThread.getClass().getDeclaredField("consumer");
consumerField.setAccessible(true);
KafkaConsumer kafkaConsumer = (KafkaConsumer) consumerField.get(consumerThread);
Field subscriptionStateField = kafkaConsumer.getClass().getDeclaredField("subscriptions");
subscriptionStateField.setAccessible(true);
SubscriptionState subscriptionState = (SubscriptionState) subscriptionStateField.get(kafkaConsumer);
Set<TopicPartition> assignedPartitions = subscriptionState.assignedPartitions();
for(TopicPartition topicPartition : assignedPartitions){
runtimeContext.getMetricGroup().addGroup(DT_TOPIC_GROUP, topicPartition.topic())
.addGroup(DT_PARTITION_GROUP, topicPartition.partition() + "")
.gauge(DT_TOPIC_PARTITION_LAG_GAUGE, (Gauge<Long>) () -> subscriptionState.partitionLag(topicPartition, IsolationLevel.READ_UNCOMMITTED));
}
}
}
2. CustomerKafkaConsumer
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.util.SerializedValue;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* @Author: Zhang Yuyao
* @Email:
* @Description:
* @Date 2021-08-09
*/
public class CustomerKafkaConsumer extends FlinkKafkaConsumer {
private static final long serialVersionUID = -1234567890L;
private CustomerJsonDeserialization customerJsonDeserialization;
public CustomerKafkaConsumer(List topics, KafkaDeserializationSchema deserializer, Properties props) {
super(topics, deserializer, props);
this.customerJsonDeserialization = (CustomerJsonDeserialization) deserializer;
}
@Override
public void run(SourceContext sourceContext) throws Exception {
customerJsonDeserialization.setRuntimeContext(getRuntimeContext());
customerJsonDeserialization.initMetric();
super.run(sourceContext);
}
@Override
protected AbstractFetcher createFetcher(SourceContext sourceContext, Map assignedPartitionsWithInitialOffsets, SerializedValue watermarkStrategy, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
AbstractFetcher fetcher = super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);
customerJsonDeserialization.setFetcher(fetcher);
return fetcher;
}
}
三 使用
DataStream dataStream = env.addSource(new CustomerKafkaConsumer(topics, new CustomerJsonDeserialization(false, this.name), props).setStartFromEarliest());
以上是关于java工程kafka传递自定义对象,消费端获取到的是null的主要内容,如果未能解决你的问题,请参考以下文章
如何获取Kafka的消费者详情——从Scala到Java的切换
客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中