监听kafka消息
Posted yangyongjie
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了监听kafka消息相关的知识,希望对你有一定的参考价值。
1、main方法中(1.0以上)
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * Kafka消息消费者 * 〈功能详细描述〉 * * @author 17090889 * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public class ConsumerSample { public static void main(String[] args) { String topic = "test-topic"; Properties props = new Properties(); // Kafka集群,多台服务器地址之间用逗号隔开 props.put("bootstrap.servers", "localhost:9092"); // 消费组ID props.put("group.id", "test_group1"); // Consumer的offset是否自动提交 props.put("enable.auto.commit", "true"); // 自动提交offset到zk的时间间隔,时间单位是毫秒 props.put("auto.commit.interval.ms", "1000"); // 消息的反序列化类型 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 订阅的话题 consumer.subscribe(Arrays.asList(topic)); // Consumer调用poll方法来轮询Kafka集群的消息,一直等到Kafka集群中没有消息或者达到超时时间100ms为止 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(record.partition() + record.offset()); System.out.println(record.key()); System.out.println(record.value()); } } } }
2、Spring下kafka1.0以上版本(不依赖Spring-Kafka)
3、Spring下kafka 0.8版本
1)kafka消费者抽象工厂类
/** * kafka消费者抽象工厂类 * 〈功能详细描述〉 * * @author * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public abstract class BaseKafkaConsumerFactory implements InitializingBean, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(BaseKafkaConsumerFactory.class); /** * 消费的Topic与消费线程数组成的Map */ private Map<String, Integer> topicThreadMap; /** * Consumer实例所需的配置 */ private Properties properties; /** * 线程池 */ private ThreadPoolExecutor taskExecutor; private ConsumerConnector consumerConnector; /** * zkConnect */ private String zkConnect; @Value("${kafka.groupId}") private String groupId; /** * sessionTimeOut */ @Value("${kafka.sessionTimeOut}") private String sessionTimeOut; /** * syncTime */ @Value("${kafka.syncTime}") private String syncTime; /** * commitInterval */ @Value("${kafka.commitInterval}") private String commitInterval; /** * offsetReset */ @Value("${kafka.offsetReset}") private String offsetReset; @Override public void afterPropertiesSet() { logger.info("afterPropertiesSet-start"); // 初始化properties if(properties==null){ properties = new Properties(); properties.put("zookeeper.connect", zkConnect); logger.info("zkConnect={}", zkConnect); // group 代表一个消费组 properties.put("group.id", groupId); logger.info("groupId={}", groupId); // zk连接超时 properties.put("zookeeper.session.timeout.ms", sessionTimeOut); properties.put("zookeeper.sync.time.ms", syncTime); properties.put("auto.commit.interval.ms", commitInterval); properties.put("auto.offset.reset", offsetReset); // 序列化类 properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("rebalance.max.retries", "10"); // 当rebalance发生时,两个相邻retry操作之间需要间隔的时间。 properties.put("rebalance.backoff.ms", "3100"); } ConsumerConfig consumerConfig = new ConsumerConfig(properties); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(topicThreadMap); // 实际有多少个stream,就设置多少个线程处理 // int messageProcessThreadNum = 0; // for (List<KafkaStream<byte[], byte[]>> streamList : topicMessageStreams.values()) { // messageProcessThreadNum = messageProcessThreadNum + streamList.size(); // } // 创建实际处理消息的线程池 taskExecutor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10000)); for (List<KafkaStream<byte[], byte[]>> streams : topicMessageStreams.values()) { for (final KafkaStream<byte[], byte[]> stream : streams) { taskExecutor.submit(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> data = it.next(); try { String kafkaMsg = new String(data.message(),"UTF-8"); logger.info("来自topic:{}的消息:{}", topicThreadMap.keySet(), kafkaMsg); // 消息处理 onMessage(data); } catch (RuntimeException e) { logger.error("处理消息异常.", e); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } }); } } } /** * 消息处理类 * @param data */ protected abstract void onMessage(MessageAndMetadata<byte[], byte[]> data); @Override public void destroy() throws Exception { try { if (consumerConnector != null) { consumerConnector.shutdown(); } } catch (Exception e) { logger.warn("shutdown consumer failed", e); } try { if (taskExecutor != null) { taskExecutor.shutdown(); } } catch (Exception e) { logger.warn("shutdown messageProcessExecutor failed", e); } logger.info("shutdown consumer successfully"); } public Properties getProperties() { return properties; } public void setProperties(Properties properties) { this.properties = properties; } public Map<String, Integer> getTopicThreadMap() { return topicThreadMap; } public void setTopicThreadMap(Map<String, Integer> topicThreadMap) { this.topicThreadMap = topicThreadMap; } public String getZkConnect() { return zkConnect; } public void setZkConnect(String zkConnect) { this.zkConnect = zkConnect; } }
2)具体的kafka消费者实现类
import com.xxx.sfmms.common.util.JsonConvertUtil; import com.xxx.sfmms.common.util.RedisUtil; import com.xxx.sfmms.common.util.StringUtil; import com.xxx.sfmms.service.intf.RecommendService; import kafka.message.MessageAndMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; /** * 实名kafka消费者 * 〈功能详细描述〉 * * @author 17090889 * @see [相关类/方法](可选) * @since [产品/模块版本] (可选) */ public class RealNameKafkaConsumer extends BaseKafkaConsumerFactory { private final Logger LOGGER = LoggerFactory.getLogger(RealNameKafkaConsumer.class); private static final String STR_INVOKENO = "invokeNo"; @Autowired private RecommendService recommendService; /** * 消息处理 * @param data */ @Override protected void onMessage(MessageAndMetadata<byte[], byte[]> data) { MDC.put(STR_INVOKENO, StringUtil.getUuid()); String msg=""; try { msg=new String(data.message(),"UTF-8"); LOGGER.info("RealNameKafkaConsumer-data={},topic={}",msg,data.topic()); } catch (UnsupportedEncodingException e) { LOGGER.info("字节数组转字符串异常"); e.printStackTrace(); } // 实名的事后kafka数据 Map<String, String> map = JsonConvertUtil.json2pojo(msg, Map.class); LOGGER.info("RealNameKafkaConsumer-map={}", map); String userNo = map.get("eppAccountNO"); LOGGER.info("RealNameKafkaConsumer-userNo={}", userNo); String flag = RedisUtil.getString("PULLNEW:RACCOUNTNO_" + userNo, "MEIS"); // 不是渠道6被邀请用户 if(!"1".equals(flag)){ LOGGER.info("不是渠道6拉新用户"); return; } // 20-初级认证 30-高级实名认证 40- 实名申诉降级、50-高级到期降级 60-实名撤销(人工手动降级) 70-申诉找回身份降级 String authenStatus=map.get("authenStatus"); // 真实姓名 String realName=map.get("realName"); // 身份证号码 String idNo = map.get("idNO"); // apptoken String appToken=map.get("appToken"); // 校验任务 Map<String, String> paramMap = new HashMap<String, String>(4); paramMap.put("userNo", userNo); paramMap.put("authenStatus",authenStatus); paramMap.put("realName",realName); paramMap.put("idNo", idNo); paramMap.put("appToken",appToken); Map<String,String> resultMap=recommendService.checkRulesAndRiskSendMoney(paramMap); LOGGER.info("resultMap={}", resultMap); MDC.remove(STR_INVOKENO); } }
3)实现类的bean注入配置
<bean id="realNameKafkaConsumer" class="com.xxx.sfmms.service.RealNameKafkaConsumer"> <property name="topicThreadMap"> <map> <entry key="${realTopic}" value="5"/> </map> </property> <property name="zkConnect"> <value>${realZkConnect}</value> </property> </bean> <bean id="preCreditKafkaConsumer" class="com.xxx.sfmms.service.PreCreditKafkaConsumer"> <property name="topicThreadMap"> <map> <entry key="${rxdTopic}" value="5"/> </map> </property> <property name="zkConnect"> <value>${rxdZkConnect}</value> </property> </bean>
4)kafka consumer参数配置
#kafka监听配置 #实zk realZkConnect=xxx #topic realTopic=xxx #任zk rxdZkConnect=xxx #任性贷topic rxdTopic=xxx kafka.sessionTimeOut=6000 kafka.syncTime=2000 kafka.commitInterval=30000 kafka.offsetReset=smallest kafka.groupId=xxx
5)依赖包配置
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> </exclusions> </dependency>
END
以上是关于监听kafka消息的主要内容,如果未能解决你的问题,请参考以下文章
kafka专栏SpringBoot下`@KafkaListener`消费监听属性详解
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段