goreplay~http输出工作线程
Posted it_worker365
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了goreplay~http输出工作线程相关的知识,希望对你有一定的参考价值。
http输出工作线程
NewHTTPOutput 默认情况
if config.WorkersMin <= 0 { config.WorkersMin = 1 } if config.WorkersMin > 1000 { config.WorkersMin = 1000 } if config.WorkersMax <= 0 { config.WorkersMax = math.MaxInt32 // idealy so large } if config.WorkersMax < config.WorkersMin { config.WorkersMax = config.WorkersMin }
if config.WorkerTimeout <= 0 {
config.WorkerTimeout = time.Second * 2
}
配置后启动httpclient,然后
o.client = NewHTTPClient(o.config) o.activeWorkers += int32(o.config.WorkersMin) for i := 0; i < o.config.WorkersMin; i++ { go o.startWorker() }
启动多个发送进程
func (o *HTTPOutput) startWorker() { for { select { case <-o.stopWorker: return case msg := <-o.queue: o.sendRequest(o.client, msg) } } }
执行发送
func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) { if !isRequestPayload(msg.Meta) { return } uuid := payloadID(msg.Meta) start := time.Now() resp, err := client.Send(msg.Data) stop := time.Now() if err != nil { Debug(1, fmt.Sprintf("[HTTP-OUTPUT] error when sending: %q", err)) return } if resp == nil { return } if o.config.TrackResponses { o.responses <- &response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()} } if o.elasticSearch != nil { o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop) } }
发送细节,各种配置生效点
func (c *HTTPClient) Send(data []byte) ([]byte, error) { var req *http.Request var resp *http.Response var err error req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data))) if err != nil { return nil, err } // we don\'t send CONNECT or OPTIONS request if req.Method == http.MethodConnect { return nil, nil } if !c.config.OriginalHost { req.Host = c.config.url.Host } // fix #862 if c.config.url.Path == "" && c.config.url.RawQuery == "" { req.URL.Scheme = c.config.url.Scheme req.URL.Host = c.config.url.Host } else { req.URL = c.config.url } // force connection to not be closed, which can affect the global client req.Close = false // it\'s an error if this is not equal to empty string req.RequestURI = "" resp, err = c.Client.Do(req) if err != nil { return nil, err } if c.config.TrackResponses { return httputil.DumpResponse(resp, true) } _ = resp.Body.Close() return nil, nil }
master工作进程,超时设置生效等
func (o *HTTPOutput) workerMaster() { var timer = time.NewTimer(o.config.WorkerTimeout) defer func() { // recover from panics caused by trying to send in // a closed chan(o.stopWorker) recover() }() defer timer.Stop() for { select { case <-o.stop: return default: <-timer.C } // rollback workers rollback: if atomic.LoadInt32(&o.activeWorkers) > int32(o.config.WorkersMin) && len(o.queue) < 1 { // close one worker o.stopWorker <- struct{}{} atomic.AddInt32(&o.activeWorkers, -1) goto rollback } timer.Reset(o.config.WorkerTimeout) } }
以上是关于goreplay~http输出工作线程的主要内容,如果未能解决你的问题,请参考以下文章
流量回放工具之GoReplay output-http 源码分析
流量回放工具之 GoReplay output-http-stats(HTTP请求统计) 源码分析
流量回放工具之 GoReplay output-http-stats(HTTP请求统计) 源码分析
goreplay 输出流量捕获数据到 elasticsearch