golang 快速入门让Golang kafka驱动程序发布到“测试”主题,这些主题是从快速入门指南创建的http://kafka.apache.org/docum

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang 快速入门让Golang kafka驱动程序发布到“测试”主题,这些主题是从快速入门指南创建的http://kafka.apache.org/docum相关的知识,希望对你有一定的参考价值。

package main

import (
	"github.com/Shopify/sarama"

	"crypto/tls"
	"crypto/x509"
	"encoding/json"
	"flag"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"strings"
	"time"
)

var (
	addr      = flag.String("addr", ":8080", "The address to bind to")
	brokers   = flag.String("brokers", "localhost:9092", "The Kafka brokers to connect to, as a comma separated list") //connecting to the local kafka instance
	verbose   = flag.Bool("verbose", false, "Turn on Sarama logging")
	certFile  = flag.String("certificate", "", "The optional certificate file for client authentication")
	keyFile   = flag.String("key", "", "The optional key file for client authentication")
	caFile    = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
	verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
)

func main() {
	flag.Parse()

	if *verbose {
		sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
	}

	if *brokers == "" {
		flag.PrintDefaults()
		os.Exit(1)
	}

	brokerList := strings.Split(*brokers, ",")
	log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))

	server := &Server{
		DataCollector:     newDataCollector(brokerList),
		AccessLogProducer: newAccessLogProducer(brokerList),
	}
	defer func() {
		if err := server.Close(); err != nil {
			log.Println("Failed to close server", err)
		}
	}()

	log.Fatal(server.Run(*addr))
}

func createTlsConfiguration() (t *tls.Config) {
	if *certFile != "" && *keyFile != "" && *caFile != "" {
		cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
		if err != nil {
			log.Fatal(err)
		}

		caCert, err := ioutil.ReadFile(*caFile)
		if err != nil {
			log.Fatal(err)
		}

		caCertPool := x509.NewCertPool()
		caCertPool.AppendCertsFromPEM(caCert)

		t = &tls.Config{
			Certificates:       []tls.Certificate{cert},
			RootCAs:            caCertPool,
			InsecureSkipVerify: *verifySsl,
		}
	}
	// will be nil by default if nothing is provided
	return t
}

type Server struct {
	DataCollector     sarama.SyncProducer
	AccessLogProducer sarama.AsyncProducer
}

func (s *Server) Close() error {
	if err := s.DataCollector.Close(); err != nil {
		log.Println("Failed to shut down data collector cleanly", err)
	}

	if err := s.AccessLogProducer.Close(); err != nil {
		log.Println("Failed to shut down access log producer cleanly", err)
	}

	return nil
}

func (s *Server) Handler() http.Handler {
	return s.withAccessLog(s.collectQueryStringData())
}

func (s *Server) Run(addr string) error {
	httpServer := &http.Server{
		Addr:    addr,
		Handler: s.Handler(),
	}

	log.Printf("Listening for requests on %s...\n", addr)
	return httpServer.ListenAndServe()
}

func (s *Server) collectQueryStringData() http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if r.URL.Path != "/" {
			http.NotFound(w, r)
			return
		}

		// We are not setting a message key, which means that all messages will
		// be distributed randomly over the different partitions.
		partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
			Topic: "test",                                         //writing into the test topic
			Value: sarama.StringEncoder(r.URL.Query().Get("publish")), //visit http://localhost:8080?publish=publish_this_test_topic
		})

		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			fmt.Fprintf(w, "Failed to store your data:, %s", err)
		} else {
			// The tuple (topic, partition, offset) can be used as a unique identifier
			// for a message in a Kafka cluster.
			fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
		}
	})
}

type accessLogEntry struct {
	Method       string  `json:"method"`
	Host         string  `json:"host"`
	Path         string  `json:"path"`
	IP           string  `json:"ip"`
	ResponseTime float64 `json:"response_time"`

	encoded []byte
	err     error
}

func (ale *accessLogEntry) ensureEncoded() {
	if ale.encoded == nil && ale.err == nil {
		ale.encoded, ale.err = json.Marshal(ale)
	}
}

func (ale *accessLogEntry) Length() int {
	ale.ensureEncoded()
	return len(ale.encoded)
}

func (ale *accessLogEntry) Encode() ([]byte, error) {
	ale.ensureEncoded()
	return ale.encoded, ale.err
}

func (s *Server) withAccessLog(next http.Handler) http.Handler {

	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		started := time.Now()

		next.ServeHTTP(w, r)

		entry := &accessLogEntry{
			Method:       r.Method,
			Host:         r.Host,
			Path:         r.RequestURI,
			IP:           r.RemoteAddr,
			ResponseTime: float64(time.Since(started)) / float64(time.Second),
		}

		// We will use the client's IP address as key. This will cause
		// all the access log entries of the same IP address to end up
		// on the same partition.
		s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
			Topic: "access_log",
			Key:   sarama.StringEncoder(r.RemoteAddr),
			Value: entry,
		}
	})
}

func newDataCollector(brokerList []string) sarama.SyncProducer {

	// For the data collector, we are looking for strong consistency semantics.
	// Because we don't change the flush settings, sarama will try to produce messages
	// as fast as possible to keep latency low.
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
	config.Producer.Retry.Max = 10                   // Retry up to 10 times to produce the message
	tlsConfig := createTlsConfiguration()
	if tlsConfig != nil {
		config.Net.TLS.Config = tlsConfig
		config.Net.TLS.Enable = true
	}

	// On the broker side, you may want to change the following settings to get
	// stronger consistency guarantees:
	// - For your broker, set `unclean.leader.election.enable` to false
	// - For the topic, you could increase `min.insync.replicas`.

	producer, err := sarama.NewSyncProducer(brokerList, config)
	if err != nil {
		log.Fatalln("Failed to start Sarama producer:", err)
	}

	return producer
}

func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {

	// For the access log, we are looking for AP semantics, with high throughput.
	// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
	config := sarama.NewConfig()
	tlsConfig := createTlsConfiguration()
	if tlsConfig != nil {
		config.Net.TLS.Enable = true
		config.Net.TLS.Config = tlsConfig
	}
	config.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
	config.Producer.Compression = sarama.CompressionSnappy   // Compress messages
	config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms

	producer, err := sarama.NewAsyncProducer(brokerList, config)
	if err != nil {
		log.Fatalln("Failed to start Sarama producer:", err)
	}

	// We will just log to STDOUT if we're not able to produce messages.
	// Note: messages will only be returned here after all retry attempts are exhausted.
	go func() {
		for err := range producer.Errors() {
			log.Println("Failed to write access log entry:", err)
		}
	}()

	return producer
}

[易学易懂系列|golang语言|零基础|快速入门|]

golang编程语言,是google推出的一门语言。

主要应用在系统编程和高性能服务器编程,有广大的市场前景,目前整个生态也越来越强大,未来可能在企业应用和人工智能等领域占有越来越重要的地位。

本文章是【易学易懂系列|编程语言入门】第一篇幅,希望可以帮助对编程感兴趣的同学更好地入门。

本系列主要的核心思想是:实践实践再实践!每天编程至少一小时!

好吧,我们开始吧!let‘s Go!

安装

Golang 支持三个平台:Mac,Windows 和 Linux(译注:不只是这三个,也支持其他主流平台)。你可以在 https://golang.org/dl/ 中下载相应平台的二进制文件。(译注:因为众所周知的原因,如果下载不了,请到 https://studygolang.com/dl 下载)

Mac OS

https://golang.org/dl/ 下载安装程序。双击开始安装并且遵循安装提示,会将 Golang 安装到 /usr/local/go 目录下,同时 /usr/local/go/bin 文件夹也会被添加到 PATH 环境变量中。

Windows

https://golang.org/dl/ 下载 MSI 安装程序。双击开始安装并且遵循安装提示,会将 Golang 安装到 C:Go 目录下,同时 c:Goin 目录也会被添加到你的 PATH 环境变量中。

Linux

https://golang.org/dl/ 下载 tar 文件,并解压到 /usr/local

请添加 /usr/local/go/binPATH 环境变量中。Go 就已经成功安装在 Linux 上了。

配置GOROOT和GOPATH:

GOROOT 指向系统安装路径,GOPATH是作为编译后二进制的存放目的地和import包时的搜索路径 (其实也是你的工作目录, 你可以在src下创建你自己的go源文件, 然后开始工作)。

GOPATH允许多个目录,当有多个目录时,请注意分隔符,多个目录的时候Windows是分号; 当有多个GOPATH时默认将go get获取的包存放在第一个目录下。

详细配置步骤:

右键桌面“计算机”-> 选择“属性”-> 高级系统设置 -> 环境变量 -> 系统变量

GOROOT D:GoGo
GOPATH D:GoGoProjects
Path D:Program FilesGitcmd;D:Program FilesGitin;%GOROOT%in;%GOPATH%in;

在cmd命令行中,输入 go version 测试一下。

如果配置成功,则显示go 的版本好。

如下图:

技术分享图片

 

 

Liunx/Mac 安装

 

默认安装到 /usr/local/go (Windows系统:C:Go)

 

配置下环境变量:

 

1
2
3
export GOROOT=$HOME/go 
export GOPATH=$HOME/gopath
export PATH=$PATH:$GOROOT/bin:$GOPATH/bin

 

同样,在控制台中输入 go version 测试一下。

 如果显示版本号,则代表安装成功!

安装开发工具:

这里推荐:Goland,这是著名的jetbrains公司推出了智能化IDE(集成开发工具),是我们开发工程师的瑞士军刀!!也是快乐的源泉!!

下载地址:https://www.jetbrains.com/go/

如下图:

技术分享图片

下载完成后,按照默认安装就行。安装教程可以参考:

https://blog.csdn.net/qq_23599965/article/details/81947938

https://www.jianshu.com/p/d9cb6b3aacc6

--------------------------以下是穷学生福利----------------------------------------

Goland是jetbrains公司的好产品,好产品必然要成本!

首先声明:有钱的同学要大力支持正版软件,能买就买。

如果没有钱,又要想用这好产品,那怎么办?这时候,就体现知识就是力量,知识就是财富的大道理了!!你要是懂以下的英文,也是可以愉快地享用这个美好的Goland!!

 

 
Step 1: IMPORTANT:

add 0.0.0.0 account.jetbrains.com to your host file ( google if you don‘t know where it is )

Step 2:
then get the key from http://idea.lanyus.com/getkey

====

enjoy!!

 

 


以上是关于golang 快速入门让Golang kafka驱动程序发布到“测试”主题,这些主题是从快速入门指南创建的http://kafka.apache.org/docum的主要内容,如果未能解决你的问题,请参考以下文章

kakfa从入门到放弃: golang编程操作kafka

kakfa从入门到放弃: golang编程操作kafka

Golang | 一文带你快速入门context

Golang Module快速入门

Golang系列之单元测试快速入门

cgo快速入门之golang调用C语言