5Storm集成Kafka

Posted xidianzxm

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了5Storm集成Kafka相关的知识,希望对你有一定的参考价值。

1、pom文件依赖

<!--storm相关jar  -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
    <!--排除相关依赖  -->
    <exclusions>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-1.2-api</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-web</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
            <artifactId>ring-cors</artifactId>
            <groupId>ring-cors</groupId>
        </exclusion>
    </exclusions>
    <!--<scope>provided</scope>--><!--注意本地调试和集群部署-->
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>1.2.2</version>
    <!--<scope>provided</scope>--><!--注意本地调试和集群部署-->
</dependency>

<!--注:老版本使用的storm-kafka依赖已经被废弃,建议在以后使用storm-kafka-client依赖进行开发,老版本的storm-kafka依赖为:-->
<!--    <dependency> -->
<!--        <groupId>org.apache.storm</groupId> -->
<!--        <artifactId>storm-kafka</artifactId> -->
<!--        <version>1.2.2</version> -->
<!--    </dependency> -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>

3、Bolt, 设计拓扑请跟根据自己的业务

public class ReadKafkaSpoutBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {

        System.out.println(input.getValues().get(4)+"消息接受bolt");
        /*
        input 获取到的值

        0索引代表kafka的topic
        1索引代表kafka的分区
        2索引代表kafka的偏移量
        3索引代表kafka的key值
        4索引代表kafka的value值
        */
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

以上是关于5Storm集成Kafka的主要内容,如果未能解决你的问题,请参考以下文章

大数据-kafka学习——集成SpringBoot

angular.js 与 apache kafka 的集成

MySQL系列:kafka停止命令

学习使用哪个 Kafka API 以将传统集成系统转换为 Apache Kafka

Kafka 应用实践与生态集成

kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用