golang golang kafka

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang golang kafka相关的知识,希望对你有一定的参考价值。


import (

    "fmt"

    "github.com/Shopify/sarama"

    "log"

    "os"

    "strings"

    "sync"

)

 

var (

    wg     sync.WaitGroup

    logger = log.New(os.Stderr, "[srama]", log.LstdFlags)

)

 

func main() {

 

    sarama.Logger = logger

 

    consumer, err := sarama.NewConsumer(strings.Split("localhost:9092", ","), nil)

    if err != nil {

        logger.Println("Failed to start consumer: %s", err)

    }

 

    partitionList, err := consumer.Partitions("hello")

    if err != nil {

        logger.Println("Failed to get the list of partitions: ", err)

    }

 

    for partition := range partitionList {

        pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)

        if err != nil {

            logger.Printf("Failed to start consumer for partition %d: %s\n", partition, err)

        }

        defer pc.AsyncClose()

 

        wg.Add(1)

 

        go func(sarama.PartitionConsumer) {

            defer wg.Done()

            for msg := range pc.Messages() {

                fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))

                fmt.Println()

            }

        }(pc)

    }

 

    wg.Wait()

 

    logger.Println("Done consuming topic hello")

    consumer.Close()

}

import (

    "github.com/Shopify/sarama"

    "log"

    "os"

    "strings"

)

 

var (

    logger = log.New(os.Stderr, "[srama]", log.LstdFlags)

)

 

func main() {

    sarama.Logger = logger

 

    config := sarama.NewConfig()

    config.Producer.RequiredAcks = sarama.WaitForAll

    config.Producer.Partitioner = sarama.NewRandomPartitioner

 

    msg := &sarama.ProducerMessage{}

    msg.Topic = "hello"

    msg.Partition = int32(-1)

    msg.Key = sarama.StringEncoder("key")

    msg.Value = sarama.ByteEncoder("你好, 世界!")

 

    producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)

    if err != nil {

        logger.Println("Failed to produce message: %s", err)

        os.Exit(500)

    }

    defer producer.Close()

 

    partition, offset, err := producer.SendMessage(msg)

    if err != nil {

        logger.Println("Failed to produce message: ", err)

    }

    logger.Printf("partition=%d, offset=%d\n", partition, offset)

}

docker 配置 kafka+zookeeper,golang接入示例

参考技术A 配置zookeeper 使用kafka/bin/下自带的zk

运行 报错 卒。配置低了

docker-compose.yml

报错

换云搬瓦工的机器试一下

但是docker ps -a 发现只有zookeeper启动了,kafka失败, 检查日志 发现kafka运行需要java环境,而且对内存有要求,搬瓦工的vps不足够

因此修改docker-compose.yml 加入以下

stop 再启动

完美

测试
进入容器

查看已经建好的topic (docker-compose.yml)

发送消息

接收消息

接下来是golang接入kafka了

运行

以上是关于golang golang kafka的主要内容,如果未能解决你的问题,请参考以下文章

golang golang kafka

[Golang] kafka集群搭建和golang版生产者和消费者

golang 快速入门让Golang kafka驱动程序发布到“测试”主题,这些主题是从快速入门指南创建的http://kafka.apache.org/docum

Golang 连接Kafka

golang-kafka

golang kafka.go