初识消息队列处理机框架KClient
Posted 云时代架构
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了初识消息队列处理机框架KClient相关的知识,希望对你有一定的参考价值。
KClient是一个简单易用,有效集成,高性能,高稳定的Kafka Java客户端。 此文档包含了背景介绍、功能特性、使用指南、API简介、后台监控和管理、消息处理机模板项目、架 构设计以及性能压测相关章节。如果你想使用KClient快速的构建Kafka处理机服务,请参考消息处理机模板项目章节; 如果你想了解KClient的其他使用方式、功能特性、监控和管 理等,请参考背景介绍、功能特性、使用指南、API简介、后台监控和管理等章节; 如果你想更深入的理解KClient的架构设计和性能Benchmark,请参考架构设计和性能压测章节
使用指南
KClient提供了三种使用方法,对于每一种方法,按照下面的步骤可快速构建Kafka生产者和消费者程序。
前置步骤
1).下载源代码后在项目根目录执行如下命令安装打包文件到你的Maven本地库。
mvn install
2).在你的项目pom.xml文件中添加对KClient的依赖。
<dependency> <groupId>com.robert.kafka</groupId> <artifactId>kclient-core</artifactId> <version>0.0.1</version> </dependency>
3).根据Kafka官方文档搭建Kafka环境,并创建两个Topic, test1和test2。
4).然后,从Kafka安装目录的config目录下拷贝kafka-consumer.properties和kafka-producer.properties到你的项目类路径下,通常是src/main/resources目录。
1.Java API
Java API提供了最直接,最简单的使用KClient的方法。
构建Producer示例:
KafkaProducer kafkaProducer = new KafkaProducer("kafka-producer.properties", "test");for (int i = 0; i < 10; i++) { Dog dog = new Dog(); dog.setName("Yours " + i); dog.setId(i); kafkaProducer.sendBean2Topic("test", dog); System.out.format("Sending dog: %d \n", i + 1); Thread.sleep(100); }
构建Consumer示例:
DogHandler mbe = new DogHandler();KafkaConsumer kafkaConsumer = new KafkaConsumer("kafka-consumer.properties", "test", 1, mbe);try { kafkaConsumer.startup(); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } finally { kafkaConsumer.shutdownGracefully(); }
public class DogHandler extends BeanMessageHandler<Dog> { public DogHandler() { super(Dog.class); } protected void doExecuteBean(Dog dog) { System.out.format("Receiving dog: %s\n", dog); } }
2.Spring环境集成
KClient可以与Spring环境无缝集成,你可以像使用Spring Bean一样来使用KafkaProducer和KafkaConsumer。
构建Producer示例:
ApplicationContext ac = new ClassPathXmlApplicationContext("kafka-producer.xml");KafkaProducer kafkaProducer = (KafkaProducer) ac.getBean("producer");for (int i = 0; i < 10; i++) { Dog dog = new Dog(); dog.setName("Yours " + i); dog.setId(i); kafkaProducer.send2Topic("test", JSON.toJSONString(dog)); System.out.format("Sending dog: %d \n", i + 1); Thread.sleep(100); }
<bean name="producer" class="com.robert.kafka.kclient.core.KafkaProducer" init-method="init"> <property name="propertiesFile" value="kafka-producer.properties"/> <property name="defaultTopic" value="test"/> </bean>
构建Consumer示例:
ApplicationContext ac = new ClassPathXmlApplicationContext( "kafka-consumer.xml");KafkaConsumer kafkaConsumer = (KafkaConsumer) ac.getBean("consumer");try { kafkaConsumer.startup(); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } finally { kafkaConsumer.shutdownGracefully(); }
public class DogHandler extends BeanMessageHandler<Dog> { public DogHandler() { super(Dog.class); } protected void doExecuteBean(Dog dog) { System.out.format("Receiving dog: %s\n", dog); } }
<bean name="dogHandler" class="com.robert.kafka.kclient.sample.api.DogHandler" /> <bean name="consumer" class="com.robert.kafka.kclient.core.KafkaConsumer" init-method="init"> <property name="propertiesFile" value="kafka-consumer.properties" /> <property name="topic" value="test" /> <property name="streamNum" value="1" /> <property name="handler" ref="dogHandler" /> </bean>
3.服务源码注解
KClient提供了类似Spring声明式的编程方法,使用注解声明Kafka处理器方法,所有的线程模型、异常处理、服务启动和关闭等都由后台服务自动完成,极大程度的简化了API的使用方法,提高了开发者的工作效率。
注解声明Kafka消息处理器:
@KafkaHandlerspublic class AnnotatedDogHandler { @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1) @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1") public Cat dogHandler(Dog dog) { System.out.println("Annotated dogHandler handles: " + dog); return new Cat(dog); } @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1) public void catHandler(Cat cat) throws IOException { System.out.println("Annotated catHandler handles: " + cat); throw new IOException("Man made exception."); } @ErrorHandler(exception = IOException.class, topic = "test1") public void ioExceptionHandler(IOException e, String message) { System.out.println("Annotated excepHandler handles: " + e); } }
注解启动程序:
public static void main(String[] args) { ApplicationContext ac = new ClassPathXmlApplicationContext( "annotated-kafka-consumer.xml"); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } }
注解Spring环境配置:
<bean name="kClientBoot" class="com.robert.kafka.kclient.boot.KClientBoot" init-method="init"/> <context:component-scan base-package="com.robert.kafka.kclient.sample.annotation" />
更多内容请点击文章下面“阅读原文”。
以上是关于初识消息队列处理机框架KClient的主要内容,如果未能解决你的问题,请参考以下文章