java工程kafka传递自定义对象,消费端获取到的是null

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java工程kafka传递自定义对象,消费端获取到的是null相关的知识,希望对你有一定的参考价值。

按照网上的办法,进行了Encoder和Decoder,producer send方法也没有报错,但是在constumer 获取不到传递的消息。也没有抛出异常

3. 启服务
3.1 启zookeeper
启zk两种式第种使用kafka自带zk
bin/zookeeper-server-start.sh config/zookeeper.properties&
另种使用其zookeeper位于本机位于其址种情况需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 启 kafka
bin/kafka-server-start.sh config/server.properties
4.创建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
创建名testtopic副本区
通list命令查看刚刚创建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.启producer并发送消息启producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启发送消息

test
hello boy
按Ctrl+C退发送消息
6.启consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
启consumerconsole看producer发送消息
启两终端发送消息接受消息
都行查看zookeeper进程kafkatopic步步排查原吧
参考技术A 多个消费者并发啊追问

只是一个简单的producer和consumer,只是传输的是实体对象,不直接是byte

flink自定义metrics监控kafka消费

一 背景

因为业务需求,要在flink中监控kafka消费的数据量以及积压情况,在网上找了很久没找到直接能用的代码。在这里把自己的实现记录一下。

有部分代码引用了:Flink监控:自定义消费延迟Metrics

二 实现

1. CustomerJsonDeserialization

import org.apache.commons.lang3.ThreadUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;

import java.lang.reflect.Field;
import java.util.Set;

/**
 * @Author: Zhang Yuyao
 * @Email: 
 * @Description:
 * @Date 2021-08-09
 */
public class CustomerJsonDeserialization extends JSONKeyValueDeserializationSchema {
    private String DT_TOPIC_GROUP = "topic";
    private String DT_PARTITION_GROUP = "partition";
    private String DT_TOPIC_PARTITION_LAG_GAUGE = "topic_partition_lag";
    private Counter inCounter;
    private Counter outCounter;
    boolean firstMsg = true;

    private AbstractFetcher<Row, ?> fetcher;
    private ObjectMapper mapper;
    private final boolean includeMetadata;
    private RuntimeContext runtimeContext;
    private String confName;

    public CustomerJsonDeserialization(boolean includeMetadata, String confName) {
        super(includeMetadata);
        this.includeMetadata = includeMetadata;
        this.confName = confName;
    }

    public void initMetric(){
        this.inCounter =
                runtimeContext.getMetricGroup()
                        .addGroup("web-streaming")
                        .counter(this.confName+"-"+"in-count");

        this.outCounter =
                runtimeContext.getMetricGroup().addGroup("web-streaming").counter(this.confName+"-"+"out-count");

    }

    @Override
    public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        inCounter.inc();

        if(firstMsg){
            // 只有在第一条数据到来的时候,才会调用该方法
            registerPtMetric(fetcher);

            firstMsg = false;
        }
        if (mapper == null) {
            mapper = new ObjectMapper();
        }
        ObjectNode node = mapper.createObjectNode();
        if (record.key() != null) {
            node.set("key", mapper.readValue(record.key(), JsonNode.class));
        }
        if (record.value() != null) {
            node.set("value", mapper.readValue(record.value(), JsonNode.class));
        }
        if (includeMetadata) {
            node.putObject("metadata")
                    .put("offset", record.offset())
                    .put("topic", record.topic())
                    .put("partition", record.partition());
        }
        outCounter.inc();
        return node;
    }

    public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
        this.fetcher = fetcher;
    }

    public void setRuntimeContext(RuntimeContext runtimeContext){
        this.runtimeContext = runtimeContext;
    }

    protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exception {
        // 通过反射获取fetcher中的kafka消费者等信息, 反射获取属性路径如下:
        // Flink: Fetcher -> KafkaConsumerThread -> KafkaConsumer ->
        // Kafka Consumer: KafkaConsumer -> SubscriptionState -> partitionLag()
        Field consumerThreadField = ((KafkaFetcher) fetcher).getClass().getDeclaredField("consumerThread");

        consumerThreadField.setAccessible(true);
        KafkaConsumerThread consumerThread = (KafkaConsumerThread) consumerThreadField.get(fetcher);

        Field hasAssignedPartitionsField = consumerThread.getClass().getDeclaredField("hasAssignedPartitions");
        hasAssignedPartitionsField.setAccessible(true);

        boolean hasAssignedPartitions = (boolean) hasAssignedPartitionsField.get(consumerThread);
        if(!hasAssignedPartitions){
            throw new RuntimeException("wait 50 secs, but not assignedPartitions");
        }
        Field consumerField = consumerThread.getClass().getDeclaredField("consumer");
        consumerField.setAccessible(true);
        KafkaConsumer kafkaConsumer = (KafkaConsumer) consumerField.get(consumerThread);
        Field subscriptionStateField = kafkaConsumer.getClass().getDeclaredField("subscriptions");
        subscriptionStateField.setAccessible(true);
        SubscriptionState subscriptionState = (SubscriptionState) subscriptionStateField.get(kafkaConsumer);
        Set<TopicPartition> assignedPartitions = subscriptionState.assignedPartitions();
        for(TopicPartition topicPartition : assignedPartitions){
            runtimeContext.getMetricGroup().addGroup(DT_TOPIC_GROUP, topicPartition.topic())
                    .addGroup(DT_PARTITION_GROUP, topicPartition.partition() + "")
.gauge(DT_TOPIC_PARTITION_LAG_GAUGE, (Gauge<Long>) () -> subscriptionState.partitionLag(topicPartition, IsolationLevel.READ_UNCOMMITTED));

        }
    }
}

2. CustomerKafkaConsumer

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.util.SerializedValue;

import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * @Author: Zhang Yuyao
 * @Email: 
 * @Description:
 * @Date 2021-08-09
 */
public class CustomerKafkaConsumer extends FlinkKafkaConsumer {
    private static final long serialVersionUID = -1234567890L;
    private CustomerJsonDeserialization customerJsonDeserialization;

    public CustomerKafkaConsumer(List topics, KafkaDeserializationSchema deserializer, Properties props) {
        super(topics, deserializer, props);
        this.customerJsonDeserialization = (CustomerJsonDeserialization) deserializer;
    }

    @Override
    public void run(SourceContext sourceContext) throws Exception {
        customerJsonDeserialization.setRuntimeContext(getRuntimeContext());
        customerJsonDeserialization.initMetric();
        super.run(sourceContext);
    }

    @Override
    protected AbstractFetcher createFetcher(SourceContext sourceContext, Map assignedPartitionsWithInitialOffsets, SerializedValue watermarkStrategy, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
        AbstractFetcher fetcher = super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);
        customerJsonDeserialization.setFetcher(fetcher);
        return fetcher;
    }
}

三 使用

DataStream dataStream = env.addSource(new CustomerKafkaConsumer(topics, new CustomerJsonDeserialization(false, this.name), props).setStartFromEarliest());

以上是关于java工程kafka传递自定义对象,消费端获取到的是null的主要内容,如果未能解决你的问题,请参考以下文章

如何获取Kafka的消费者详情——从Scala到Java的切换

客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中

Java工程师面试题及答案,java正则替换指定字符

java kafka 怎么传输对象

Spring Boot 自定义kafka 消费者配置 ContainerFactory最佳实践

5种kafka消费端性能优化方法