golang kafka.go

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang kafka.go相关的知识,希望对你有一定的参考价值。

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"github.com/bsm/sarama-cluster"
	"log"
	"os"
	"os/signal"
	"strings"
	"syscall"
)

var (
	logger = log.New(os.Stdout, "", 0)
)

func trigger1() {
	brokerList := "x.x.x.x:9092"
	groupID := "test-cons-1"
	topicList := "test"
	config := cluster.NewConfig()

	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest // Does not work unless u use a different grp id

	consumer, err := cluster.NewConsumer(strings.Split(brokerList, ","), groupID, strings.Split(topicList, ","), config)
	if err != nil {
		logger.Println("Failed to start consumer: %s", err)
	}

	go func() {
		for err := range consumer.Errors() {
			logger.Printf("Error: %s\n", err.Error())
		}
	}()

	go func() {
		for note := range consumer.Notifications() {
			logger.Printf("Rebalanced: %+v\n", note)
		}
	}()

	go func() {
		for msg := range consumer.Messages() {
			fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
			consumer.MarkOffset(msg, "")
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
	<-wait
	if err := consumer.Close(); err != nil {
		logger.Println("Failed to close consumer: ", err)
	}
}

func trigger2() {
	config := sarama.NewConfig()
	config.Consumer.Fetch.Max = 1

	cons, err := sarama.NewConsumer([]string{"x.x.x.x:9092"}, config)
	if err != nil {
		logger.Println("Error creating consumer ", err)
	}
	defer func() {
		if err := cons.Close(); err != nil {
			logger.Fatalln(err)
		}
	}()

	pc, err := cons.ConsumePartition("test", 0, sarama.OffsetOldest)
	if err != nil {
		logger.Println(err)
	}
	defer func() {
		if err := pc.Close(); err != nil {
			logger.Fatalln(err)
		}
	}()

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
	consumed := 0
ConsumerLoop:
	for {
		select {
		case msg := <-pc.Messages():
			{
				logger.Printf("Message : %s\nOffset : %d, Highwatermark: %d", string(msg.Value), msg.Offset, pc.HighWaterMarkOffset())
				consumed++
			}
		case <-signals:
			{
				break ConsumerLoop
			}
		}
	}
	<-signals
}

func main() {
	trigger2()
}

golang如何打印内存内容

参考技术A golang如何打印内存内容

以上是关于golang kafka.go的主要内容,如果未能解决你的问题,请参考以下文章

Golang 学习之路

Golang 入门

Golang入门到项目实战 第一个golang应用

golang编译androidso无法加载

golang如何打印内存内容

Golang入门到项目实战 golang匿名函数