golang golang kafka

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang golang kafka相关的知识,希望对你有一定的参考价值。

package main

import (
    "fmt"
    "math/rand"
    "os"
    "strconv"
    "strings"
    "time"

    "github.com/Shopify/sarama"
    "github.com/bsm/sarama-cluster" //support automatic consumer-group rebalancing and offset tracking
    "github.com/sdbaiguanghe/glog"
)

var (
    topics = "topics_1"
)

// consumer 消费者
func consumer() {
    groupID := "group-1"
    config := cluster.NewConfig()
    config.Group.Return.Notifications = true
    config.Consumer.Offsets.CommitInterval = 1 * time.Second
    config.Consumer.Offsets.Initial = sarama.OffsetNewest //初始从最新的offset开始

    c, err := cluster.NewConsumer(strings.Split("localhost:9092", ","), groupID, strings.Split(topics, ","), config)
    if err != nil {
        glog.Errorf("Failed open consumer: %v", err)
        return
    }
    defer c.Close()
    go func(c *cluster.Consumer) {
        errors := c.Errors()
        noti := c.Notifications()
        for {
            select {
            case err := <-errors:
                glog.Errorln(err)
            case <-noti:
            }
        }
    }(c)

    for msg := range c.Messages() {
        fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
        c.MarkOffset(msg, "") //MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset
    }
}

// syncProducer 同步生产者
// 并发量小时,可以用这种方式
func syncProducer() {
    config := sarama.NewConfig()
    //  config.Producer.RequiredAcks = sarama.WaitForAll
    //  config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Timeout = 5 * time.Second
    p, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)
    defer p.Close()
    if err != nil {
        glog.Errorln(err)
        return
    }

    v := "sync: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
    fmt.Fprintln(os.Stdout, v)
    msg := &sarama.ProducerMessage{
        Topic: topics,
        Value: sarama.ByteEncoder(v),
    }
    if _, _, err := p.SendMessage(msg); err != nil {
        glog.Errorln(err)
        return
    }
}

// asyncProducer 异步生产者
// 并发量大时,必须采用这种方式
func asyncProducer() {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true //必须有这个选项
    config.Producer.Timeout = 5 * time.Second
    p, err := sarama.NewAsyncProducer(strings.Split("localhost:9092", ","), config)
    defer p.Close()
    if err != nil {
        return
    }

    //必须有这个匿名函数内容
    go func(p sarama.AsyncProducer) {
        errors := p.Errors()
        success := p.Successes()
        for {
            select {
            case err := <-errors:
                if err != nil {
                    glog.Errorln(err)
                }
            case <-success:
            }
        }
    }(p)

    v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
    fmt.Fprintln(os.Stdout, v)
    msg := &sarama.ProducerMessage{
        Topic: topics,
        Value: sarama.ByteEncoder(v),
    }
    p.Input() <- msg
}

golang 快速入门让Golang kafka驱动程序发布到“测试”主题,这些主题是从快速入门指南创建的http://kafka.apache.org/docum

package main

import (
	"github.com/Shopify/sarama"

	"crypto/tls"
	"crypto/x509"
	"encoding/json"
	"flag"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"strings"
	"time"
)

var (
	addr      = flag.String("addr", ":8080", "The address to bind to")
	brokers   = flag.String("brokers", "localhost:9092", "The Kafka brokers to connect to, as a comma separated list") //connecting to the local kafka instance
	verbose   = flag.Bool("verbose", false, "Turn on Sarama logging")
	certFile  = flag.String("certificate", "", "The optional certificate file for client authentication")
	keyFile   = flag.String("key", "", "The optional key file for client authentication")
	caFile    = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
	verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
)

func main() {
	flag.Parse()

	if *verbose {
		sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
	}

	if *brokers == "" {
		flag.PrintDefaults()
		os.Exit(1)
	}

	brokerList := strings.Split(*brokers, ",")
	log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))

	server := &Server{
		DataCollector:     newDataCollector(brokerList),
		AccessLogProducer: newAccessLogProducer(brokerList),
	}
	defer func() {
		if err := server.Close(); err != nil {
			log.Println("Failed to close server", err)
		}
	}()

	log.Fatal(server.Run(*addr))
}

func createTlsConfiguration() (t *tls.Config) {
	if *certFile != "" && *keyFile != "" && *caFile != "" {
		cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
		if err != nil {
			log.Fatal(err)
		}

		caCert, err := ioutil.ReadFile(*caFile)
		if err != nil {
			log.Fatal(err)
		}

		caCertPool := x509.NewCertPool()
		caCertPool.AppendCertsFromPEM(caCert)

		t = &tls.Config{
			Certificates:       []tls.Certificate{cert},
			RootCAs:            caCertPool,
			InsecureSkipVerify: *verifySsl,
		}
	}
	// will be nil by default if nothing is provided
	return t
}

type Server struct {
	DataCollector     sarama.SyncProducer
	AccessLogProducer sarama.AsyncProducer
}

func (s *Server) Close() error {
	if err := s.DataCollector.Close(); err != nil {
		log.Println("Failed to shut down data collector cleanly", err)
	}

	if err := s.AccessLogProducer.Close(); err != nil {
		log.Println("Failed to shut down access log producer cleanly", err)
	}

	return nil
}

func (s *Server) Handler() http.Handler {
	return s.withAccessLog(s.collectQueryStringData())
}

func (s *Server) Run(addr string) error {
	httpServer := &http.Server{
		Addr:    addr,
		Handler: s.Handler(),
	}

	log.Printf("Listening for requests on %s...\n", addr)
	return httpServer.ListenAndServe()
}

func (s *Server) collectQueryStringData() http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if r.URL.Path != "/" {
			http.NotFound(w, r)
			return
		}

		// We are not setting a message key, which means that all messages will
		// be distributed randomly over the different partitions.
		partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
			Topic: "test",                                         //writing into the test topic
			Value: sarama.StringEncoder(r.URL.Query().Get("publish")), //visit http://localhost:8080?publish=publish_this_test_topic
		})

		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			fmt.Fprintf(w, "Failed to store your data:, %s", err)
		} else {
			// The tuple (topic, partition, offset) can be used as a unique identifier
			// for a message in a Kafka cluster.
			fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
		}
	})
}

type accessLogEntry struct {
	Method       string  `json:"method"`
	Host         string  `json:"host"`
	Path         string  `json:"path"`
	IP           string  `json:"ip"`
	ResponseTime float64 `json:"response_time"`

	encoded []byte
	err     error
}

func (ale *accessLogEntry) ensureEncoded() {
	if ale.encoded == nil && ale.err == nil {
		ale.encoded, ale.err = json.Marshal(ale)
	}
}

func (ale *accessLogEntry) Length() int {
	ale.ensureEncoded()
	return len(ale.encoded)
}

func (ale *accessLogEntry) Encode() ([]byte, error) {
	ale.ensureEncoded()
	return ale.encoded, ale.err
}

func (s *Server) withAccessLog(next http.Handler) http.Handler {

	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		started := time.Now()

		next.ServeHTTP(w, r)

		entry := &accessLogEntry{
			Method:       r.Method,
			Host:         r.Host,
			Path:         r.RequestURI,
			IP:           r.RemoteAddr,
			ResponseTime: float64(time.Since(started)) / float64(time.Second),
		}

		// We will use the client's IP address as key. This will cause
		// all the access log entries of the same IP address to end up
		// on the same partition.
		s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
			Topic: "access_log",
			Key:   sarama.StringEncoder(r.RemoteAddr),
			Value: entry,
		}
	})
}

func newDataCollector(brokerList []string) sarama.SyncProducer {

	// For the data collector, we are looking for strong consistency semantics.
	// Because we don't change the flush settings, sarama will try to produce messages
	// as fast as possible to keep latency low.
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
	config.Producer.Retry.Max = 10                   // Retry up to 10 times to produce the message
	tlsConfig := createTlsConfiguration()
	if tlsConfig != nil {
		config.Net.TLS.Config = tlsConfig
		config.Net.TLS.Enable = true
	}

	// On the broker side, you may want to change the following settings to get
	// stronger consistency guarantees:
	// - For your broker, set `unclean.leader.election.enable` to false
	// - For the topic, you could increase `min.insync.replicas`.

	producer, err := sarama.NewSyncProducer(brokerList, config)
	if err != nil {
		log.Fatalln("Failed to start Sarama producer:", err)
	}

	return producer
}

func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {

	// For the access log, we are looking for AP semantics, with high throughput.
	// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
	config := sarama.NewConfig()
	tlsConfig := createTlsConfiguration()
	if tlsConfig != nil {
		config.Net.TLS.Enable = true
		config.Net.TLS.Config = tlsConfig
	}
	config.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
	config.Producer.Compression = sarama.CompressionSnappy   // Compress messages
	config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms

	producer, err := sarama.NewAsyncProducer(brokerList, config)
	if err != nil {
		log.Fatalln("Failed to start Sarama producer:", err)
	}

	// We will just log to STDOUT if we're not able to produce messages.
	// Note: messages will only be returned here after all retry attempts are exhausted.
	go func() {
		for err := range producer.Errors() {
			log.Println("Failed to write access log entry:", err)
		}
	}()

	return producer
}

以上是关于golang golang kafka的主要内容,如果未能解决你的问题,请参考以下文章

golang golang kafka

[Golang] kafka集群搭建和golang版生产者和消费者

golang 快速入门让Golang kafka驱动程序发布到“测试”主题,这些主题是从快速入门指南创建的http://kafka.apache.org/docum

Golang 连接Kafka

golang-kafka

golang kafka.go