java kafka单列模式生产者客户端

Posted 小宇1994

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java kafka单列模式生产者客户端相关的知识,希望对你有一定的参考价值。

1、所需要的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kafkaCli</groupId>
    <artifactId>kafkaCli</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <!--这部分可有可无,加上的话则直接生成可运行jar包-->
                    <!--<archive>-->
                    <!--<manifest>-->
                    <!--<mainClass>${exec.mainClass}</mainClass>-->
                    <!--</manifest>-->
                    <!--</archive>-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.0.2</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                     <encoding>GBK</encoding>
                </configuration>
            </plugin>


        </plugins>

    </build>


    <dependencies>
        <!-- webSocket所需依赖 -->
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>7.0</version>
        </dependency>
        <!-- kafka 所需依赖 -->
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>
</project>

2、生产者代码

package com.kafka.producer;

import com.kafka.systemConfig.SystemConfig;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;

public class ProducerKafka {
    private static final Logger log = LoggerFactory.getLogger(ProducerKafka.class);
    public static  Producer<String, String> procuder;

    {
        Properties props = new Properties();
        props.put("bootstrap.servers", SystemConfig.getProperty("bootstrap.servers","10.12.1.229:9092"));
        props.put("acks", SystemConfig.getProperty("acks","all"));
        props.put("retries", SystemConfig.getProperty("retries","0"));
        props.put("batch.size", SystemConfig.getProperty("batch.size","16384"));
        props.put("linger.ms",SystemConfig.getProperty("linger.ms","1"));
        props.put("buffer.memory", SystemConfig.getProperty("buffer.memory","33554432"));
        props.put("key.serializer", SystemConfig.getProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"));
        props.put("value.serializer", SystemConfig.getProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"));
        procuder = new KafkaProducer<String,String>(props);
    }

    /**
     * 向kafka发送消息
     * @param message
     * @return
     */
    public void sendMessgae(ProducerRecord message)  throws Exception{
     procuder.send(message, new Callback() {
             @Override
             public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                 log.info("向kafka发送数据返回偏移量: {}" , recordMetadata.offset());
             }
         });
    }


    /**
     *  向kafka发送消息
     * @param topic 主题
     * @param value 值
     * @throws Exception
     */
    public void sendMessgae(String topic, String value)  throws Exception{
        sendMessgae(new ProducerRecord<String, String>(topic, value));
    }

    /**
     *  向kafka发送消息
     * @param topic 主题
     * @param value 值
     * @throws Exception
     */
    public void sendMessgae(String topic,String key, String value)  throws Exception{
        sendMessgae(new ProducerRecord(topic, key, value));
    }
    /**
     * 刷新缓存
     */
    public void flush()  {
        procuder.flush();
    }


    /**
     * 关闭连接
     */
    public void close() {
        procuder.close();
    }

    /**
     * 单例模式确保全局中只有一份该实例
     */
    private static class ProducerKafkaHolder{
      private static  ProducerKafka instance = new  ProducerKafka();
    }

    /**
     * 延迟加载,避免启动加载
     * @return
     */
    public static  ProducerKafka getInstance(){
        return ProducerKafkaHolder.instance;
    }



    public static void main(String []args){

        try {
            ProducerKafka producerKafka =   ProducerKafka.getInstance();
            producerKafka.sendMessgae("TEST_JAVA","key","value");
            producerKafka.flush();
            producerKafka.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

3、配置项代码

package com.kafka.systemConfig;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;


public class SystemConfig {
    private static Properties properties = null;
  //  private final static String FILE_PATH = System.getProperty("user.dir") + "/conf/kafkaProducer.properties";
  private final static String FILE_PATH =  "kafkaProducer.properties";


    private SystemConfig() {
        System.out.println("FILE_PATH" + FILE_PATH);
        properties = getConfig();
    }

    /**
     * Get property value.
     *
     * @param name
     *            property name.
     * @return the value.
     */
    public static String getProperty(String name) {
        return getProperty(name, null);
    }

    /**
     * Get property value.
     *
     * @param name
     *            property name.
     * @param defaultValue
     *            value if property not found.
     * @return the value.
     */
    public static String getProperty(String name, String defaultValue) {
        String ret = null;
        if (properties == null) {
            properties = getConfig();
        }
        if (properties != null) {
            ret = properties.getProperty(name);
            if (ret != null) {
                try {
                    ret = new String(ret.getBytes("ISO-8859-1"), "GBK");
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ret.trim();
            } else{
                return defaultValue;
            }

        }
        return defaultValue;
    }

    /**
     * @param name
     * @param defaultValue
     * @return
     */
    public static int getIntProperty(String name, int defaultValue) {
        int res = Integer.parseInt(getProperty(name, defaultValue + ""));
        return res == 0 ? defaultValue : res;
    }

    private static Properties getConfig() {
        if (properties == null) {
            properties = new Properties();
            InputStream is = null;
            try {
                is = SystemConfig.class.getClassLoader()
                        .getResourceAsStream(FILE_PATH );
                properties.load(is);
            } catch (IOException e) {
            } finally {
                if (is != null) {
                    try {
                        is.close();
                    } catch (IOException e) {
                    }
                }
            }
        }
        return properties;
    }

    public static void main(String args[]){
       // System.out.println(SystemConfig.getProperty("bootstrap.servers"));
       // System.out.println(FILE_PATH);
      System.out.println(SystemConfig.class.getClassLoader().getResourceAsStream(FILE_PATH )); ;
    }
}

3、webSocket代码

package com.kafka.wbSocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
/**
 * 用于 webSocket 应用相关
 *
 **/
@ServerEndpoint("/webSocket")
public class WebSocket {
    private static final Logger log = LoggerFactory.getLogger(WebSocket.class);
    private Session session;
    public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>();

    /**
     * 建立连接。
     * 建立连接时入参为session
     */
    @OnOpen
    public void onOpen(Session session){
        this.session = session;
        wbSockets.add(this);
        log.info("New session insert,sessionId is "+ session.getId());
    }
    /**
     * 关闭连接
     */
    @OnClose
    public void onClose(){
        wbSockets.remove(this);
        log.info("A session insert,sessionId is "+ session.getId());
    }
    /**
     * 接收数据。
     *
     */
    @OnMessage
    public void onMessage(String message ,Session session){
        log.info(message + "from " + session.getId());
    }

    /**
     * 发送数据
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }
}

        因平台jdk只支持1.6 、kafka所需版本为1.8 顾此消息中间件展示被忽略

以上是关于java kafka单列模式生产者客户端的主要内容,如果未能解决你的问题,请参考以下文章

用于从 cloudkit 检索单列的代码模式/片段

Kafka--05---java客户端代码实现

Kafka的Java客户端-生产者

Kafka的Java客户端-生产者

kafka消息中间件及java示例

kafka生产者java客户端