golang kafka.go
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang kafka.go相关的知识,希望对你有一定的参考价值。
package main
import (
"fmt"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"log"
"os"
"os/signal"
"strings"
"syscall"
)
var (
logger = log.New(os.Stdout, "", 0)
)
func trigger1() {
brokerList := "x.x.x.x:9092"
groupID := "test-cons-1"
topicList := "test"
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest // Does not work unless u use a different grp id
consumer, err := cluster.NewConsumer(strings.Split(brokerList, ","), groupID, strings.Split(topicList, ","), config)
if err != nil {
logger.Println("Failed to start consumer: %s", err)
}
go func() {
for err := range consumer.Errors() {
logger.Printf("Error: %s\n", err.Error())
}
}()
go func() {
for note := range consumer.Notifications() {
logger.Printf("Rebalanced: %+v\n", note)
}
}()
go func() {
for msg := range consumer.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
consumer.MarkOffset(msg, "")
}
}()
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
<-wait
if err := consumer.Close(); err != nil {
logger.Println("Failed to close consumer: ", err)
}
}
func trigger2() {
config := sarama.NewConfig()
config.Consumer.Fetch.Max = 1
cons, err := sarama.NewConsumer([]string{"x.x.x.x:9092"}, config)
if err != nil {
logger.Println("Error creating consumer ", err)
}
defer func() {
if err := cons.Close(); err != nil {
logger.Fatalln(err)
}
}()
pc, err := cons.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
logger.Println(err)
}
defer func() {
if err := pc.Close(); err != nil {
logger.Fatalln(err)
}
}()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
consumed := 0
ConsumerLoop:
for {
select {
case msg := <-pc.Messages():
{
logger.Printf("Message : %s\nOffset : %d, Highwatermark: %d", string(msg.Value), msg.Offset, pc.HighWaterMarkOffset())
consumed++
}
case <-signals:
{
break ConsumerLoop
}
}
}
<-signals
}
func main() {
trigger2()
}
golang如何打印内存内容
参考技术A
golang如何打印内存内容
以上是关于golang kafka.go的主要内容,如果未能解决你的问题,请参考以下文章
Golang 学习之路
Golang 入门
Golang入门到项目实战 第一个golang应用
golang编译androidso无法加载
golang如何打印内存内容
Golang入门到项目实战 golang匿名函数