java kafka单列模式生产者客户端
Posted 小宇1994
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java kafka单列模式生产者客户端相关的知识,希望对你有一定的参考价值。
1、所需要的依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kafkaCli</groupId> <artifactId>kafkaCli</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <!--这部分可有可无,加上的话则直接生成可运行jar包--> <!--<archive>--> <!--<manifest>--> <!--<mainClass>${exec.mainClass}</mainClass>--> <!--</manifest>--> <!--</archive>--> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>3.0.2</version> <executions> <execution> <id>copy-dependencies</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> <encoding>GBK</encoding> </configuration> </plugin> </plugins> </build> <dependencies> <!-- webSocket所需依赖 --> <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> </dependency> <!-- kafka 所需依赖 --> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>RELEASE</version> </dependency> </dependencies> </project>
2、生产者代码
package com.kafka.producer; import com.kafka.systemConfig.SystemConfig; import org.apache.kafka.clients.producer.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class ProducerKafka { private static final Logger log = LoggerFactory.getLogger(ProducerKafka.class); public static Producer<String, String> procuder; { Properties props = new Properties(); props.put("bootstrap.servers", SystemConfig.getProperty("bootstrap.servers","10.12.1.229:9092")); props.put("acks", SystemConfig.getProperty("acks","all")); props.put("retries", SystemConfig.getProperty("retries","0")); props.put("batch.size", SystemConfig.getProperty("batch.size","16384")); props.put("linger.ms",SystemConfig.getProperty("linger.ms","1")); props.put("buffer.memory", SystemConfig.getProperty("buffer.memory","33554432")); props.put("key.serializer", SystemConfig.getProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")); props.put("value.serializer", SystemConfig.getProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")); procuder = new KafkaProducer<String,String>(props); } /** * 向kafka发送消息 * @param message * @return */ public void sendMessgae(ProducerRecord message) throws Exception{ procuder.send(message, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { log.info("向kafka发送数据返回偏移量: {}" , recordMetadata.offset()); } }); } /** * 向kafka发送消息 * @param topic 主题 * @param value 值 * @throws Exception */ public void sendMessgae(String topic, String value) throws Exception{ sendMessgae(new ProducerRecord<String, String>(topic, value)); } /** * 向kafka发送消息 * @param topic 主题 * @param value 值 * @throws Exception */ public void sendMessgae(String topic,String key, String value) throws Exception{ sendMessgae(new ProducerRecord(topic, key, value)); } /** * 刷新缓存 */ public void flush() { procuder.flush(); } /** * 关闭连接 */ public void close() { procuder.close(); } /** * 单例模式确保全局中只有一份该实例 */ private static class ProducerKafkaHolder{ private static ProducerKafka instance = new ProducerKafka(); } /** * 延迟加载,避免启动加载 * @return */ public static ProducerKafka getInstance(){ return ProducerKafkaHolder.instance; } public static void main(String []args){ try { ProducerKafka producerKafka = ProducerKafka.getInstance(); producerKafka.sendMessgae("TEST_JAVA","key","value"); producerKafka.flush(); producerKafka.close(); } catch (Exception e) { e.printStackTrace(); } } }
3、配置项代码
package com.kafka.systemConfig; import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class SystemConfig { private static Properties properties = null; // private final static String FILE_PATH = System.getProperty("user.dir") + "/conf/kafkaProducer.properties"; private final static String FILE_PATH = "kafkaProducer.properties"; private SystemConfig() { System.out.println("FILE_PATH" + FILE_PATH); properties = getConfig(); } /** * Get property value. * * @param name * property name. * @return the value. */ public static String getProperty(String name) { return getProperty(name, null); } /** * Get property value. * * @param name * property name. * @param defaultValue * value if property not found. * @return the value. */ public static String getProperty(String name, String defaultValue) { String ret = null; if (properties == null) { properties = getConfig(); } if (properties != null) { ret = properties.getProperty(name); if (ret != null) { try { ret = new String(ret.getBytes("ISO-8859-1"), "GBK"); } catch (Exception e) { e.printStackTrace(); } return ret.trim(); } else{ return defaultValue; } } return defaultValue; } /** * @param name * @param defaultValue * @return */ public static int getIntProperty(String name, int defaultValue) { int res = Integer.parseInt(getProperty(name, defaultValue + "")); return res == 0 ? defaultValue : res; } private static Properties getConfig() { if (properties == null) { properties = new Properties(); InputStream is = null; try { is = SystemConfig.class.getClassLoader() .getResourceAsStream(FILE_PATH ); properties.load(is); } catch (IOException e) { } finally { if (is != null) { try { is.close(); } catch (IOException e) { } } } } return properties; } public static void main(String args[]){ // System.out.println(SystemConfig.getProperty("bootstrap.servers")); // System.out.println(FILE_PATH); System.out.println(SystemConfig.class.getClassLoader().getResourceAsStream(FILE_PATH )); ; } }
3、webSocket代码
package com.kafka.wbSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; /** * 用于 webSocket 应用相关 * **/ @ServerEndpoint("/webSocket") public class WebSocket { private static final Logger log = LoggerFactory.getLogger(WebSocket.class); private Session session; public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>(); /** * 建立连接。 * 建立连接时入参为session */ @OnOpen public void onOpen(Session session){ this.session = session; wbSockets.add(this); log.info("New session insert,sessionId is "+ session.getId()); } /** * 关闭连接 */ @OnClose public void onClose(){ wbSockets.remove(this); log.info("A session insert,sessionId is "+ session.getId()); } /** * 接收数据。 * */ @OnMessage public void onMessage(String message ,Session session){ log.info(message + "from " + session.getId()); } /** * 发送数据 * @param message * @throws IOException */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } }
因平台jdk只支持1.6 、kafka所需版本为1.8 顾此消息中间件展示被忽略
以上是关于java kafka单列模式生产者客户端的主要内容,如果未能解决你的问题,请参考以下文章