Apache Storm java.nio.channels.ClosedChannelException: null
Posted
技术标签:
【中文标题】Apache Storm java.nio.channels.ClosedChannelException: null【英文标题】: 【发布时间】:2014-08-24 13:55:33 【问题描述】:我们正在尝试使用 Apache Storm 来处理大量(虚假)消息。 消息示例:
""clientName":"Sergey Bakulin","sum":12925,"group":"property","suspicious":false,"clientId":2,"dt":1404387303764,"coord":"lat":55.767842588357645,"lon":37.46920361823332".
我们使用 Apache Kafka 作为 Storm 集群的消息源。我们的目的是能够处理至少 50k msg/sec/node。万一我们使用多个节点时,我们会经常遇到错误(log sn-p is from worker-*.log):
2014-07-03 15:14:47 b.s.m.n.Client [INFO] failed to send requests to ip-172-31-23-123.eu-west-1.compute.internal/172.31.23.123:6701: java.nio.channels.ClosedChannelException: null
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:381) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:349) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:107) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioselector.run(AbstractNioSelector.java:312) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:88) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.6.3.Final.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
2014-07-03 15:14:47 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-ip-172-31-23-123.eu-west-1.compute.internal/172.31.23.123:6701
我们当前的风暴配置:
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
- "172.31.*.*"
storm.local.dir: "/home/*/storm/data"
nimbus.host: "127.0.0.1"
supervisor.slots.ports:
- 6701
- 6702
ui.port: 8090
worker.childopts: "-Xmx6g -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=1%ID% -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun$
supervisor.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
supervisor.worker.start.timeout.secs: 10
supervisor.worker.timeout.secs: 10
supervisor.monitor.frequency.secs: 3
supervisor.heartbeat.frequency.secs: 5
supervisor.enable: true
storm.messaging.netty.server_worker_threads: 2
storm.messaging.netty.client_worker_threads: 2
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 25
storm.messaging.netty.max_wait_ms: 1000
我们的风暴拓扑:
Properties conf = Util.readProperties(ClientTopology.class, "storm.properties");
prepareRedisDB(conf);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka_trans_spout", getKafkaSpout(conf, conf.getProperty("kafka_trans_topic")), 3);
builder.setSpout("kafka_socevent_spout", getKafkaSpout(conf, conf.getProperty("kafka_socevent_topic")), 3);
builder.setBolt("json_to_tuple_trans_bolt", new JSONToTupleBolt(Transaction.class), 6)
.shuffleGrouping("kafka_trans_spout");
builder.setBolt("json_to_tuple_socevent_bolt", new JSONToTupleBolt(SocialEvent.class), 3)
.shuffleGrouping("kafka_socevent_spout");
builder.setBolt("alert_bolt", new AlertBolt(conf), 3)
.fieldsGrouping("json_to_tuple_trans_bolt", new Fields("cl_id"))
.fieldsGrouping("json_to_tuple_socevent_bolt", new Fields("cl_id"));
builder.setBolt("offer_bolt", new NearestOfferBolt(conf), 3)
.shuffleGrouping("json_to_tuple_trans_bolt");
run(builder, args, 6);
private static KafkaSpout getKafkaSpout(Properties conf, String topic)
SpoutConfig spoutConfig = new SpoutConfig(
new ZkHosts(conf.getProperty("zk_host"), "/brokers"),
topic,
"/brokers",
conf.getProperty("kafka_consumer_group_id"));
List<String> zkServers = new ArrayList<String>();
zkServers.add(conf.getProperty("zk_host"));
spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = Integer.valueOf(conf.getProperty("zk_port"));
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.forceFromStart = true;
spoutConfig.fetchSizeBytes = 5*1024*1024;
spoutConfig.bufferSizeBytes = 5*1024*1024;
storm.kafka.KafkaSpout kafkaSpout = new storm.kafka.KafkaSpout(spoutConfig);
return kafkaSpout;
我们使用 AWS c3.2xlarge 机器, Apache Storm 0.9.2-孵化, 阿帕奇卡夫卡 2.9.2-0.8.1.1。
【问题讨论】:
你可以检查你在 172.31.23.123:6701 上是否真的有监听服务试试 netstat -antp |在这台机器上 grep 6701 您找到解决方案了吗?我现在遇到同样的错误。 异常看起来像是无法从外部世界访问主管的端口。看看这个链接:gist.github.com/amontalenti/8ff0c31a7b95a6dea3d2你试过telnet到那个主机端口吗? 【参考方案1】:尝试先添加负载,然后运行拓扑,这在我身上发生了几次,因为主题是新的并且没有负载。
【讨论】:
【参考方案2】:测试 Ping 和 Telnet: 确保每台运行storm的机器都通过ping连接到所有其他机器(所有worker、nimbus和zookeeper)。 尝试通过 IP、主机名和 FQDN ping,如果不起作用,请编辑主机 (/etc/hosts) 文件。
另外,远程登录机器以检查storm.yaml(6701、6702)中的开放端口。 动物园管理员 (2181)。
在我的测试环境中,storm.yaml 设置适用于以下网络设置:
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100
storm.messaging.netty.server_worker_threads: 1
storm.messaging.transport: backtype.storm.messaging.netty.Context
【讨论】:
以上是关于Apache Storm java.nio.channels.ClosedChannelException: null的主要内容,如果未能解决你的问题,请参考以下文章