初识消息队列处理机框架KClient

Posted 云时代架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了初识消息队列处理机框架KClient相关的知识,希望对你有一定的参考价值。

KClient是一个简单易用,有效集成,高性能,高稳定的Kafka Java客户端。 此文档包含了背景介绍、功能特性、使用指南、API简介、后台监控和管理、消息处理机模板项目、架 构设计以及性能压测相关章节。如果你想使用KClient快速的构建Kafka处理机服务,请参考消息处理机模板项目章节; 如果你想了解KClient的其他使用方式、功能特性、监控和管 理等,请参考背景介绍、功能特性、使用指南、API简介、后台监控和管理等章节; 如果你想更深入的理解KClient的架构设计和性能Benchmark,请参考架构设计和性能压测章节


使用指南

KClient提供了三种使用方法,对于每一种方法,按照下面的步骤可快速构建Kafka生产者和消费者程序。

前置步骤

1).下载源代码后在项目根目录执行如下命令安装打包文件到你的Maven本地库。

mvn install

2).在你的项目pom.xml文件中添加对KClient的依赖。

<dependency>
	<groupId>com.robert.kafka</groupId>
	<artifactId>kclient-core</artifactId>
	<version>0.0.1</version>
</dependency>

3).根据Kafka官方文档搭建Kafka环境,并创建两个Topic, test1和test2。

4).然后,从Kafka安装目录的config目录下拷贝kafka-consumer.properties和kafka-producer.properties到你的项目类路径下,通常是src/main/resources目录。

1.Java API

Java API提供了最直接,最简单的使用KClient的方法。

构建Producer示例:

KafkaProducer kafkaProducer = new KafkaProducer("kafka-producer.properties", "test");for (int i = 0; i < 10; i++) {	Dog dog = new Dog();
	dog.setName("Yours " + i);
	dog.setId(i);
	kafkaProducer.sendBean2Topic("test", dog);	System.out.format("Sending dog: %d \n", i + 1);	Thread.sleep(100);
}

构建Consumer示例:

DogHandler mbe = new DogHandler();KafkaConsumer kafkaConsumer = new KafkaConsumer("kafka-consumer.properties", "test", 1, mbe);try {
	kafkaConsumer.startup();	try {		System.in.read();
	} catch (IOException e) {
		e.printStackTrace();
	}
} finally {
	kafkaConsumer.shutdownGracefully();
}
public class DogHandler extends BeanMessageHandler<Dog> {	public DogHandler() {		super(Dog.class);
	}	protected void doExecuteBean(Dog dog) {		System.out.format("Receiving dog: %s\n", dog);
	}
}

2.Spring环境集成

KClient可以与Spring环境无缝集成,你可以像使用Spring Bean一样来使用KafkaProducer和KafkaConsumer。

构建Producer示例:

ApplicationContext ac = new ClassPathXmlApplicationContext("kafka-producer.xml");KafkaProducer kafkaProducer = (KafkaProducer) ac.getBean("producer");for (int i = 0; i < 10; i++) {	Dog dog = new Dog();
	dog.setName("Yours " + i);
	dog.setId(i);
	kafkaProducer.send2Topic("test", JSON.toJSONString(dog));	System.out.format("Sending dog: %d \n", i + 1);	Thread.sleep(100);
}
<bean name="producer" class="com.robert.kafka.kclient.core.KafkaProducer" init-method="init">
	<property name="propertiesFile" value="kafka-producer.properties"/>
	<property name="defaultTopic" value="test"/>
</bean>

构建Consumer示例:

ApplicationContext ac = new ClassPathXmlApplicationContext(		"kafka-consumer.xml");KafkaConsumer kafkaConsumer = (KafkaConsumer) ac.getBean("consumer");try {
	kafkaConsumer.startup();	try {		System.in.read();
	} catch (IOException e) {
		e.printStackTrace();
	}
} finally {
	kafkaConsumer.shutdownGracefully();
}
public class DogHandler extends BeanMessageHandler<Dog> {	public DogHandler() {		super(Dog.class);
	}	protected void doExecuteBean(Dog dog) {		System.out.format("Receiving dog: %s\n", dog);
	}
}
<bean name="dogHandler" class="com.robert.kafka.kclient.sample.api.DogHandler" />

<bean name="consumer" class="com.robert.kafka.kclient.core.KafkaConsumer" init-method="init">
	<property name="propertiesFile" value="kafka-consumer.properties" />
	<property name="topic" value="test" />
	<property name="streamNum" value="1" />
	<property name="handler" ref="dogHandler" />
</bean>

3.服务源码注解

KClient提供了类似Spring声明式的编程方法,使用注解声明Kafka处理器方法,所有的线程模型、异常处理、服务启动和关闭等都由后台服务自动完成,极大程度的简化了API的使用方法,提高了开发者的工作效率。

注解声明Kafka消息处理器:

@KafkaHandlerspublic class AnnotatedDogHandler {	@InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1)	@OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1")	public Cat dogHandler(Dog dog) {		System.out.println("Annotated dogHandler handles: " + dog);		return new Cat(dog);
	}	@InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1)	public void catHandler(Cat cat) throws IOException {		System.out.println("Annotated catHandler handles: " + cat);		throw new IOException("Man made exception.");
	}	@ErrorHandler(exception = IOException.class, topic = "test1")	public void ioExceptionHandler(IOException e, String message) {		System.out.println("Annotated excepHandler handles: " + e);
	}
}

注解启动程序:

public static void main(String[] args) {	ApplicationContext ac = new ClassPathXmlApplicationContext(			"annotated-kafka-consumer.xml");	try {		System.in.read();
	} catch (IOException e) {
		e.printStackTrace();
	}
}

注解Spring环境配置:

<bean name="kClientBoot" class="com.robert.kafka.kclient.boot.KClientBoot" init-method="init"/>

<context:component-scan base-package="com.robert.kafka.kclient.sample.annotation" />


更多内容请点击文章下面“阅读原文”。

以上是关于初识消息队列处理机框架KClient的主要内容,如果未能解决你的问题,请参考以下文章

初识消息队列

后台消息队列处理简易框架

初识RabbitMQ系列之一:简单介绍

初识Message Queue之--基础篇

RabbitMQ系列之——初识

01-初识消息队列MQ&&Rabbit相关概念介绍