监听kafka消息

Posted yangyongjie

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了监听kafka消息相关的知识,希望对你有一定的参考价值。

1、main方法中(1.0以上)

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Kafka消息消费者
 * 〈功能详细描述〉
 *
 * @author 17090889
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
public class ConsumerSample {
    public static void main(String[] args) {
        String topic = "test-topic";
        Properties props = new Properties();
        // Kafka集群,多台服务器地址之间用逗号隔开
        props.put("bootstrap.servers", "localhost:9092");
        // 消费组ID
        props.put("group.id", "test_group1");
        // Consumer的offset是否自动提交
        props.put("enable.auto.commit", "true");
        // 自动提交offset到zk的时间间隔,时间单位是毫秒
        props.put("auto.commit.interval.ms", "1000");
        // 消息的反序列化类型
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 订阅的话题
        consumer.subscribe(Arrays.asList(topic));
        // Consumer调用poll方法来轮询Kafka集群的消息,一直等到Kafka集群中没有消息或者达到超时时间100ms为止
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.println(record.partition() + record.offset());
                System.out.println(record.key());
                System.out.println(record.value());
            }
        }
    }
}

 

2、Spring下kafka1.0以上版本(不依赖Spring-Kafka)

 

3、Spring下kafka 0.8版本

  1)kafka消费者抽象工厂类

/**
 * kafka消费者抽象工厂类
 * 〈功能详细描述〉
 *
 * @author
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;

import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public abstract class BaseKafkaConsumerFactory implements InitializingBean, DisposableBean {

    private static final Logger logger = LoggerFactory.getLogger(BaseKafkaConsumerFactory.class);

    /**
     * 消费的Topic与消费线程数组成的Map
     */
    private Map<String, Integer> topicThreadMap;
    /**
     * Consumer实例所需的配置
     */
    private Properties properties;

    /**
     * 线程池
     */
    private ThreadPoolExecutor taskExecutor;

    private ConsumerConnector consumerConnector;

    /**
     * zkConnect
     */
    private String zkConnect;

    @Value("${kafka.groupId}")
    private String groupId;

    /**
     * sessionTimeOut
     */
    @Value("${kafka.sessionTimeOut}")
    private String sessionTimeOut;

    /**
     * syncTime
     */
    @Value("${kafka.syncTime}")
    private String syncTime;

    /**
     * commitInterval
     */
    @Value("${kafka.commitInterval}")
    private String commitInterval;

    /**
     * offsetReset
     */
    @Value("${kafka.offsetReset}")
    private String offsetReset;


    @Override
    public void afterPropertiesSet() {
        logger.info("afterPropertiesSet-start");
        // 初始化properties
        if(properties==null){
            properties = new Properties();
            properties.put("zookeeper.connect", zkConnect);
            logger.info("zkConnect={}", zkConnect);
            // group 代表一个消费组
            properties.put("group.id", groupId);
            logger.info("groupId={}", groupId);
            // zk连接超时
            properties.put("zookeeper.session.timeout.ms", sessionTimeOut);
            properties.put("zookeeper.sync.time.ms", syncTime);
            properties.put("auto.commit.interval.ms", commitInterval);
            properties.put("auto.offset.reset", offsetReset);
            // 序列化类
            properties.put("serializer.class", "kafka.serializer.StringEncoder");

            properties.put("rebalance.max.retries", "10");
            // 当rebalance发生时,两个相邻retry操作之间需要间隔的时间。
            properties.put("rebalance.backoff.ms", "3100");
        }

        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(topicThreadMap);
        // 实际有多少个stream,就设置多少个线程处理
//        int messageProcessThreadNum = 0;
//        for (List<KafkaStream<byte[], byte[]>> streamList : topicMessageStreams.values()) {
//            messageProcessThreadNum = messageProcessThreadNum + streamList.size();
//        }
        // 创建实际处理消息的线程池
        taskExecutor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10000));
        for (List<KafkaStream<byte[], byte[]>> streams : topicMessageStreams.values()) {
            for (final KafkaStream<byte[], byte[]> stream : streams) {
                taskExecutor.submit(new Runnable() {
                    @Override
                    public void run() {
                        ConsumerIterator<byte[], byte[]> it = stream.iterator();
                        while (it.hasNext()) {
                            MessageAndMetadata<byte[], byte[]> data = it.next();
                            try {
                                String kafkaMsg = new String(data.message(),"UTF-8");
                                logger.info("来自topic:{}的消息:{}", topicThreadMap.keySet(), kafkaMsg);
                                // 消息处理
                                onMessage(data);
                            } catch (RuntimeException e) {
                                logger.error("处理消息异常.", e);
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }

                        }
                    }

                });
            }
        }

    }

    /**
     * 消息处理类
     * @param data
     */
    protected abstract void onMessage(MessageAndMetadata<byte[], byte[]> data);

    @Override
    public void destroy() throws Exception {
        try {
            if (consumerConnector != null) {
                consumerConnector.shutdown();
            }
        } catch (Exception e) {
            logger.warn("shutdown consumer failed", e);
        }
        try {
            if (taskExecutor != null) {
                taskExecutor.shutdown();
            }
        } catch (Exception e) {
            logger.warn("shutdown messageProcessExecutor failed", e);
        }
        logger.info("shutdown consumer successfully");
    }

    public Properties getProperties() {
        return properties;
    }

    public void setProperties(Properties properties) {
        this.properties = properties;
    }

    public Map<String, Integer> getTopicThreadMap() {
        return topicThreadMap;
    }

    public void setTopicThreadMap(Map<String, Integer> topicThreadMap) {
        this.topicThreadMap = topicThreadMap;
    }

    public String getZkConnect() {
        return zkConnect;
    }

    public void setZkConnect(String zkConnect) {
        this.zkConnect = zkConnect;
    }
}

  2)具体的kafka消费者实现类

import com.xxx.sfmms.common.util.JsonConvertUtil;
import com.xxx.sfmms.common.util.RedisUtil;
import com.xxx.sfmms.common.util.StringUtil;
import com.xxx.sfmms.service.intf.RecommendService;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

/**
 * 实名kafka消费者
 * 〈功能详细描述〉
 *
 * @author 17090889
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
public class RealNameKafkaConsumer extends BaseKafkaConsumerFactory {

    private final Logger LOGGER = LoggerFactory.getLogger(RealNameKafkaConsumer.class);

    private static final String STR_INVOKENO = "invokeNo";

    @Autowired
    private RecommendService recommendService;


    /**
     * 消息处理
     * @param data
     */
    @Override
    protected void onMessage(MessageAndMetadata<byte[], byte[]> data) {
        MDC.put(STR_INVOKENO, StringUtil.getUuid());
        String msg="";
        try {
            msg=new String(data.message(),"UTF-8");
            LOGGER.info("RealNameKafkaConsumer-data={},topic={}",msg,data.topic());
        } catch (UnsupportedEncodingException e) {
            LOGGER.info("字节数组转字符串异常");
            e.printStackTrace();
        }
        // 实名的事后kafka数据
        Map<String, String> map = JsonConvertUtil.json2pojo(msg, Map.class);
        LOGGER.info("RealNameKafkaConsumer-map={}", map);
        String userNo = map.get("eppAccountNO");
        LOGGER.info("RealNameKafkaConsumer-userNo={}", userNo);
        String flag = RedisUtil.getString("PULLNEW:RACCOUNTNO_" + userNo, "MEIS");
        // 不是渠道6被邀请用户
        if(!"1".equals(flag)){
            LOGGER.info("不是渠道6拉新用户");
            return;
        }
        // 20-初级认证 30-高级实名认证   40- 实名申诉降级、50-高级到期降级 60-实名撤销(人工手动降级) 70-申诉找回身份降级
        String authenStatus=map.get("authenStatus");
        // 真实姓名
        String realName=map.get("realName");
        // 身份证号码
        String idNo = map.get("idNO");
        // apptoken
        String appToken=map.get("appToken");
        // 校验任务
        Map<String, String> paramMap = new HashMap<String, String>(4);
        paramMap.put("userNo", userNo);
        paramMap.put("authenStatus",authenStatus);
        paramMap.put("realName",realName);
        paramMap.put("idNo", idNo);
        paramMap.put("appToken",appToken);
        Map<String,String> resultMap=recommendService.checkRulesAndRiskSendMoney(paramMap);
        LOGGER.info("resultMap={}", resultMap);
        MDC.remove(STR_INVOKENO);
    }
}

  3)实现类的bean注入配置

<bean id="realNameKafkaConsumer" class="com.xxx.sfmms.service.RealNameKafkaConsumer">
   <property name="topicThreadMap">
      <map>
         <entry key="${realTopic}" value="5"/>
      </map>
   </property>
   <property name="zkConnect">
      <value>${realZkConnect}</value>
   </property>
</bean>


<bean id="preCreditKafkaConsumer" class="com.xxx.sfmms.service.PreCreditKafkaConsumer">
   <property name="topicThreadMap">
      <map>
         <entry key="${rxdTopic}" value="5"/>
      </map>
   </property>
   <property name="zkConnect">
      <value>${rxdZkConnect}</value>
   </property>
</bean>

  4)kafka consumer参数配置

#kafka监听配置
#实zk
realZkConnect=xxx
#topic
realTopic=xxx
#任zk
rxdZkConnect=xxx
#任性贷topic
rxdTopic=xxx
kafka.sessionTimeOut=6000
kafka.syncTime=2000
kafka.commitInterval=30000
kafka.offsetReset=smallest
kafka.groupId=xxx

  5)依赖包配置

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.9.2</artifactId>
   <version>0.8.1.1</version>
   <exclusions>
      <exclusion>
         <artifactId>jmxtools</artifactId>
         <groupId>com.sun.jdmk</groupId>
      </exclusion>
      <exclusion>
         <artifactId>jmxri</artifactId>
         <groupId>com.sun.jmx</groupId>
      </exclusion>
   </exclusions>
</dependency>

 

END

以上是关于监听kafka消息的主要内容,如果未能解决你的问题,请参考以下文章

Kafka-再均衡监听器

kafka专栏SpringBoot下`@KafkaListener`消费监听属性详解

Kafka-文件管理

Kafka 监听器详解

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

Spring Kafka 之 @KafkaListener 单条或批量处理消息