Storm java.io.NotSerializableException:运行拓扑时
Posted
技术标签:
【中文标题】Storm java.io.NotSerializableException:运行拓扑时【英文标题】:Storm java.io.NotSerializableException: when running topology 【发布时间】:2016-01-28 19:32:27 【问题描述】:我终于认为我有一个在 redis 数据库上写入的拓扑。我有一个要打印的螺栓,还有一个要插入 redis 的螺栓。但是当我尝试启动拓扑时,会出现以下错误:
...5333 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
5376 [main] INFO b.s.d.supervisor - Starting supervisor with id 1917ef54-0f16-47b8-86ea-b6722aa33c68 at host amnor-A88XPLUS
5405 [main] ERROR o.a.s.s.o.a.z.s.NioserverCnxnFactory - Thread Thread[main,5,main] died
java.lang.RuntimeException: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology
at backtype.storm.utils.Utils.javaSerialize(Utils.java:91) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:107) ~[storm-core-0.10.0.jar:0.10.0]
at Storm.practice.Storm.Prova.ProvaTopology.main(ProvaTopology.java:383) ~[classes/:?]
Caused by: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[?:1.7.0_91]
at backtype.storm.utils.Utils.javaSerialize(Utils.java:87) ~[storm-core-0.10.0.jar:0.10.0]
... 2 more
我认为它可能是 Spout,但我尝试了一个示例 Spout,该示例可在 Storm 示例中使用并且发生相同的情况。我的代码只是在读取的名称中添加了笑脸,例如 (John :) :) ),我只是在尝试将流真正存储到 redis 数据库中,它只是一个从文件中读取名称的小测试拓扑。之后,我正在为我大学的一个大数据项目做一个严肃的拓扑。这是我的代码(有许多未使用的导入,但那是因为我尝试了不同的方式来写入数据库):
package Storm.practice.Storm.Prova;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import org.apache.storm.redis.bolt.AbstractRedisBolt;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.trident.state.RedisState;
import org.apache.storm.redis.trident.state.RedisStateQuerier;
import org.apache.storm.redis.trident.state.RedisStateUpdater;
import org.apache.storm.shade.com.google.common.collect.Lists;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
//import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
/**
* This is a basic example of a Storm topology.
*/
public class ProvaTopology
public static class ProvaBolt extends BaseRichBolt
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector)
_collector = collector;
public void execute(Tuple tuple)
_collector.emit(tuple, new Values(tuple.getString(0) + " :-)"));
_collector.ack(tuple);
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("Morts"));
public class ProvaSpout extends BaseRichSpout
SpoutOutputCollector _collector;
//Random _rand;
private String fileName;
//private SpoutOutputCollector _collector;
private BufferedReader reader;
private AtomicLong linesRead;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
_collector = collector;
try
fileName= (String)"/home/prova.txt";
reader = new BufferedReader(new FileReader(fileName));
// read and ignore the header if one exists
catch (Exception e)
throw new RuntimeException(e);
// _rand = new Random();
public void nextTuple()
Utils.sleep(100);
try
String line = reader.readLine();
if (line != null)
long id = linesRead.incrementAndGet();
System.out.println("Finished reading line, " + line);
_collector.emit(new Values((String)line));
else
System.out.println("Finished reading file, " + linesRead.get() + " lines read");
Thread.sleep(10000);
catch (Exception e)
e.printStackTrace();
public void ack(Object id)
public void fail(Object id)
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("Morts"));
public class RedisBolt implements IRichBolt
protected String channel = "Somriures";
// protected String configChannel;
protected OutputCollector collector;
// protected Tuple currentTuple;
// protected Logger log;
protected JedisPool pool;
// protected ConfigListenerThread configListenerThread;
public RedisBolt()
public RedisBolt(String channel)
// log = Logger.getLogger(getClass().getName());
// setupNonSerializableAttributes();
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector)
this.collector = collector;
pool = new JedisPool("localhost");
public void execute(Tuple tuple)
String current = tuple.getString(0);
if(current != null)
// for(Object obj: result)
publish(current);
collector.emit(tuple, new Values(current));
//
collector.ack(tuple);
public void cleanup()
if(pool != null)
pool.destroy();
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields(channel));
public void publish(String msg)
Jedis jedis = pool.getResource();
jedis.publish(channel, msg);
pool.returnResource(jedis);
protected void setupNonSerializableAttributes()
public Map getComponentConfiguration()
return null;
public class PrinterBolt extends BaseBasicBolt
public void execute(Tuple tuple, BasicOutputCollector collector)
System.out.println(tuple);
public void declareOutputFields(OutputFieldsDeclarer ofd)
public static void main(String[] args) throws Exception
TopologyBuilder builder = new TopologyBuilder();
ProvaTopology Pt = new ProvaTopology();
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost("127.0.0.1").setPort(666).build();
builder.setSpout("Morts", Pt.new ProvaSpout(), 10);//emisorTestWordSpout
builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("Morts");// de on llig?
builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig?
builder.setBolt("bd", Pt.new RedisBolt(), 2).shuffleGrouping("meal");// de on llig?
builder.setBolt("print", Pt.new PrinterBolt(), 2).shuffleGrouping("meal");
// builder.setBolt("StoreM", (storeMapperS));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0)
conf.setNumWorkers(5);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
//WithProgressBar
else
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
提前致谢
【问题讨论】:
异常很明显:public class ProvaTopology implements Serializable
【参考方案1】:
这里的例外很明显。如果您只是查看java.io.NotSerializableException
的文档,您会看到正在打印的消息是不可序列化的类。要解决此问题,只需让您的 Topology 类实现 Serializable
:
public class ProvaTopology implements Serializable
...
这是必需的,以便 Storm 可以序列化您的拓扑并将其发送到 Nimbus 以执行。由于您的 Bolts 和 Spout 扩展或实现了 Storm 提供的类或接口,您不必担心将它们标记为可序列化,因为这些父类和接口已经这样做了。
【讨论】:
它有效,谢谢!我是这些技术的菜鸟。现在它说该文件不存在。我看了看,它在那里。我正在使用 Eclipse 运行测试,使用 Eclipse IDE 时是否有读取文件的特定路径?再次感谢。以上是关于Storm java.io.NotSerializableException:运行拓扑时的主要内容,如果未能解决你的问题,请参考以下文章