storm问题记录 python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理
Posted kane_zch
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm问题记录 python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理相关的知识,希望对你有一定的参考价值。
一、问题背景
Python 写的脚本,不断从txt文件中读取一行数据封装成消息,作为producer发给kafka, storm的spout从kafka中读取这些消息后做一些处理发送给bolt,bolt最后将数据按既定的格式写入到HBASE
二、问题描述
一共14000条左右的数据,加调试信息观察到spout把消息都读到处理并发射了,但是bolt中只处理了一部分(2000多条,还有一万条显然没有处理到),写入HBASE的也只有2000多条,即Bolt读到的那些
出问题时的最后的log:
OLT + acSN=210235A1AMB159000008clientMacStr = d3f4-6b29-c82eonLineTimeStr2015-07-03 17:49:18 BOLT count = 38215 472728 [Thread-14-HBASE_BOLT] INFO b.s.d.executor - BOLT ack TASK: 1 TIME: TUPLE: source: KAFKA_SPOUT:2, stream: default, id: {}, [{"acSN": "210235A1AMB159000008", "onLineTime": "2015-07-03 17:49:18", "clientMAC": "d3f4-6b29-c82e"}] 472728 [Thread-14-HBASE_BOLT] INFO b.s.d.executor - Execute done TUPLE source: KAFKA_SPOUT:2, stream: default, id: {}, [{"acSN": "210235A1AMB159000008", "onLineTime": "2015-07-03 17:49:18", "clientMAC": "d3f4-6b29-c82e"}] TASK: 1 DELTA: 472728 [Thread-14-HBASE_BOLT] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60] 472729 [Thread-14-HBASE_BOLT] INFO b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@781650e2> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=37640}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16394, read_pos=16387, capacity=16384, population=7}]> #<DataPoint [__process-latency = {KAFKA_SPOUT:default=7.727948990435706}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {KAFKA_SPOUT:default=7.951167728237792}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=37680}]>]] 472734 [Thread-14-HBASE_BOLT] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60] 472735 [Thread-14-HBASE_BOLT] INFO b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@5a51d4b2> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16394, read_pos=16387, capacity=16384, population=7}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=20}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]] 472735 [Thread-14-HBASE_BOLT] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60] 472735 [Thread-14-HBASE_BOLT] INFO b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@32632071> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16394, read_pos=16387, capacity=16384, population=7}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=0}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]] 472735 [Thread-14-HBASE_BOLT] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60] 472736 [Thread-14-HBASE_BOLT] INFO b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@9e6f48f> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16394, read_pos=16392, capacity=16384, population=2}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=0}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]] 472736 [Thread-14-HBASE_BOLT] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60] 472736 [Thread-14-HBASE_BOLT] INFO b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@19c1dd5d> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16394, read_pos=16392, capacity=16384, population=2}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=0}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]] 500126 [Thread-12-__acker] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60] 500126 [Thread-16-__system] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60] 500127 [Thread-12-__acker] INFO b.s.d.task - Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@16c17bfd> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=8, read_pos=7, capacity=16384, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]] 500128 [Thread-16-__system] INFO b.s.d.task - Emitting: __system __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@16c17bfd> [#<DataPoint [__ack-count = {}]> #<DataPoint [__send-iconnection = {}]> #<DataPoint [GC/PSScavenge = {count=0, timeMs=0}]> #<DataPoint [memory/heap = {unusedBytes=48303576, usedBytes=632222248, maxBytes=1908932608, initBytes=134209408, virtualFreeBytes=1276710360, committedBytes=680525824}]> #<DataPoint [__receive = {write_pos=8, read_pos=7, capacity=16384, population=1}]> #<DataPoint [GC/PSMarkSweep = {count=0, timeMs=0}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 0]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=405520, usedBytes=63229936, maxBytes=136314880, initBytes=24576000, virtualFreeBytes=73084944, committedBytes=63635456}]> #<DataPoint [uptimeSecs = 500.127]> #<DataPoint [startTimeSecs = 1.467698245917E9]> #<DataPoint [__transfer = {write_pos=16387, read_pos=16387, capacity=32, population=0}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]] 506118 [Thread-14-HBASE_BOLT] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60] 506119 [Thread-14-HBASE_BOLT] INFO b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@21a8aeba> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16395, read_pos=16394, capacity=16384, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=0}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]] 514859 [Thread-10-KAFKA_SPOUT] INFO b.s.d.task - Emitting: KAFKA_SPOUT default [{"acSN": "11111111", "onLineTime": "2018-12-01 11:06:25", "clientMAC": "0000-0000-0001"}] receive:{"acSN": "11111111", "onLineTime": "2018-12-01 11:06:25", "clientMAC": "0000-0000-0001"} spout emit:43173 514859 [Thread-10-KAFKA_SPOUT] INFO b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: KAFKA_SPOUT:2, stream: default, id: {}, [{"acSN": "11111111", "onLineTime": "2018-12-01 11:06:25", "clientMAC": "0000-0000-0001"}] 560126 [Thread-12-__acker] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60] 560126 [Thread-16-__system] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60] 560128 [Thread-12-__acker] INFO b.s.d.task - Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@73e0f92d> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=9, read_pos=8, capacity=16384, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]] 560128 [Thread-16-__system] INFO b.s.d.task - Emitting: __system __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@370734ca> [#<DataPoint [__ack-count = {}]> #<DataPoint [__send-iconnection = {}]> #<DataPoint [GC/PSScavenge = {count=1, timeMs=13}]> #<DataPoint [memory/heap = {unusedBytes=522122784, usedBytes=146868704, maxBytes=1908932608, initBytes=134209408, virtualFreeBytes=1762063904, committedBytes=668991488}]> #<DataPoint [__receive = {write_pos=9, read_pos=8, capacity=16384, population=1}]> #<DataPoint [GC/PSMarkSweep = {count=0, timeMs=0}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 0]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=331208, usedBytes=63304248, maxBytes=136314880, initBytes=24576000, virtualFreeBytes=73010632, committedBytes=63635456}]> #<DataPoint [uptimeSecs = 560.128]> #<DataPoint [startTimeSecs = 1.467698245917E9]> #<DataPoint [__transfer = {write_pos=16387, read_pos=16387, capacity=32, population=0}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]] 566119 [Thread-14-HBASE_BOLT] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60] 566120 [Thread-14-HBASE_BOLT] INFO b.s.d.task - Emitting: HBASE_BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@4f2d2517> [#<DataPoint [__ack-count = {KAFKA_SPOUT:default=0}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=16384, population=0}]> #<DataPoint [__receive = {write_pos=16396, read_pos=16395, capacity=16384, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {__metrics=0}]> #<DataPoint [__execute-count = {KAFKA_SPOUT:default=0}]>]]
三、问题原因,请参考以下这篇文章,介绍了storm内部的通信原理
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
经过排查,是TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE这个参数导致了问题
四、解决办法
参考上文
总体来说是spout发的太快了,把自已的出buffer写满了,满了后的消息都丢弃,
经测试,通过把TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE调大(如下红色部分),或放慢spout的发送速度都是可以规避该问题
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
以上是关于storm问题记录 python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理的主要内容,如果未能解决你的问题,请参考以下文章