golang golang kafka
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang golang kafka相关的知识,希望对你有一定的参考价值。
package main
import (
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster" //support automatic consumer-group rebalancing and offset tracking
"github.com/sdbaiguanghe/glog"
)
var (
topics = "topics_1"
)
// consumer 消费者
func consumer() {
groupID := "group-1"
config := cluster.NewConfig()
config.Group.Return.Notifications = true
config.Consumer.Offsets.CommitInterval = 1 * time.Second
config.Consumer.Offsets.Initial = sarama.OffsetNewest //初始从最新的offset开始
c, err := cluster.NewConsumer(strings.Split("localhost:9092", ","), groupID, strings.Split(topics, ","), config)
if err != nil {
glog.Errorf("Failed open consumer: %v", err)
return
}
defer c.Close()
go func(c *cluster.Consumer) {
errors := c.Errors()
noti := c.Notifications()
for {
select {
case err := <-errors:
glog.Errorln(err)
case <-noti:
}
}
}(c)
for msg := range c.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
c.MarkOffset(msg, "") //MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset
}
}
// syncProducer 同步生产者
// 并发量小时,可以用这种方式
func syncProducer() {
config := sarama.NewConfig()
// config.Producer.RequiredAcks = sarama.WaitForAll
// config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)
defer p.Close()
if err != nil {
glog.Errorln(err)
return
}
v := "sync: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
fmt.Fprintln(os.Stdout, v)
msg := &sarama.ProducerMessage{
Topic: topics,
Value: sarama.ByteEncoder(v),
}
if _, _, err := p.SendMessage(msg); err != nil {
glog.Errorln(err)
return
}
}
// asyncProducer 异步生产者
// 并发量大时,必须采用这种方式
func asyncProducer() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true //必须有这个选项
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewAsyncProducer(strings.Split("localhost:9092", ","), config)
defer p.Close()
if err != nil {
return
}
//必须有这个匿名函数内容
go func(p sarama.AsyncProducer) {
errors := p.Errors()
success := p.Successes()
for {
select {
case err := <-errors:
if err != nil {
glog.Errorln(err)
}
case <-success:
}
}
}(p)
v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
fmt.Fprintln(os.Stdout, v)
msg := &sarama.ProducerMessage{
Topic: topics,
Value: sarama.ByteEncoder(v),
}
p.Input() <- msg
}
golang 快速入门让Golang kafka驱动程序发布到“测试”主题,这些主题是从快速入门指南创建的http://kafka.apache.org/docum
package main
import (
"github.com/Shopify/sarama"
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"time"
)
var (
addr = flag.String("addr", ":8080", "The address to bind to")
brokers = flag.String("brokers", "localhost:9092", "The Kafka brokers to connect to, as a comma separated list") //connecting to the local kafka instance
verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
keyFile = flag.String("key", "", "The optional key file for client authentication")
caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
)
func main() {
flag.Parse()
if *verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
if *brokers == "" {
flag.PrintDefaults()
os.Exit(1)
}
brokerList := strings.Split(*brokers, ",")
log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
server := &Server{
DataCollector: newDataCollector(brokerList),
AccessLogProducer: newAccessLogProducer(brokerList),
}
defer func() {
if err := server.Close(); err != nil {
log.Println("Failed to close server", err)
}
}()
log.Fatal(server.Run(*addr))
}
func createTlsConfiguration() (t *tls.Config) {
if *certFile != "" && *keyFile != "" && *caFile != "" {
cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
if err != nil {
log.Fatal(err)
}
caCert, err := ioutil.ReadFile(*caFile)
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: *verifySsl,
}
}
// will be nil by default if nothing is provided
return t
}
type Server struct {
DataCollector sarama.SyncProducer
AccessLogProducer sarama.AsyncProducer
}
func (s *Server) Close() error {
if err := s.DataCollector.Close(); err != nil {
log.Println("Failed to shut down data collector cleanly", err)
}
if err := s.AccessLogProducer.Close(); err != nil {
log.Println("Failed to shut down access log producer cleanly", err)
}
return nil
}
func (s *Server) Handler() http.Handler {
return s.withAccessLog(s.collectQueryStringData())
}
func (s *Server) Run(addr string) error {
httpServer := &http.Server{
Addr: addr,
Handler: s.Handler(),
}
log.Printf("Listening for requests on %s...\n", addr)
return httpServer.ListenAndServe()
}
func (s *Server) collectQueryStringData() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
// We are not setting a message key, which means that all messages will
// be distributed randomly over the different partitions.
partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
Topic: "test", //writing into the test topic
Value: sarama.StringEncoder(r.URL.Query().Get("publish")), //visit http://localhost:8080?publish=publish_this_test_topic
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Failed to store your data:, %s", err)
} else {
// The tuple (topic, partition, offset) can be used as a unique identifier
// for a message in a Kafka cluster.
fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
}
})
}
type accessLogEntry struct {
Method string `json:"method"`
Host string `json:"host"`
Path string `json:"path"`
IP string `json:"ip"`
ResponseTime float64 `json:"response_time"`
encoded []byte
err error
}
func (ale *accessLogEntry) ensureEncoded() {
if ale.encoded == nil && ale.err == nil {
ale.encoded, ale.err = json.Marshal(ale)
}
}
func (ale *accessLogEntry) Length() int {
ale.ensureEncoded()
return len(ale.encoded)
}
func (ale *accessLogEntry) Encode() ([]byte, error) {
ale.ensureEncoded()
return ale.encoded, ale.err
}
func (s *Server) withAccessLog(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
started := time.Now()
next.ServeHTTP(w, r)
entry := &accessLogEntry{
Method: r.Method,
Host: r.Host,
Path: r.RequestURI,
IP: r.RemoteAddr,
ResponseTime: float64(time.Since(started)) / float64(time.Second),
}
// We will use the client's IP address as key. This will cause
// all the access log entries of the same IP address to end up
// on the same partition.
s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
Topic: "access_log",
Key: sarama.StringEncoder(r.RemoteAddr),
Value: entry,
}
})
}
func newDataCollector(brokerList []string) sarama.SyncProducer {
// For the data collector, we are looking for strong consistency semantics.
// Because we don't change the flush settings, sarama will try to produce messages
// as fast as possible to keep latency low.
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
tlsConfig := createTlsConfiguration()
if tlsConfig != nil {
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}
// On the broker side, you may want to change the following settings to get
// stronger consistency guarantees:
// - For your broker, set `unclean.leader.election.enable` to false
// - For the topic, you could increase `min.insync.replicas`.
producer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
return producer
}
func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
// For the access log, we are looking for AP semantics, with high throughput.
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
config := sarama.NewConfig()
tlsConfig := createTlsConfiguration()
if tlsConfig != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()
return producer
}
以上是关于golang golang kafka的主要内容,如果未能解决你的问题,请参考以下文章
[Golang] kafka集群搭建和golang版生产者和消费者
golang 快速入门让Golang kafka驱动程序发布到“测试”主题,这些主题是从快速入门指南创建的http://kafka.apache.org/docum