如何修复 Apache Storm Trident 拓扑中的错误“组件:[x] 订阅不存在的组件 [y]”

Posted

技术标签:

【中文标题】如何修复 Apache Storm Trident 拓扑中的错误“组件:[x] 订阅不存在的组件 [y]”【英文标题】:How to fix error 'Component: [x] subscribes from non-existent component [y]' in Apache Storm Trident topology 【发布时间】:2019-06-12 14:33:22 【问题描述】:

我刚刚实现了一个 trident DRPC 函数来处理传入的消息,并且我试图将在拓扑的最后阶段处理的元组的计数保持为 Trident 状态。这是我的拓扑:

topology.newDRPCStream("portfolio")
    .map(parseMapFunction,
        new Fields("portfolioUrn", "portfoliosourceSystem", "portfolioRegion",
            "portfolioTimestamp", "portfolioPayload"))
    .filter(new FilterNull())
    .flatMap(splitMapFunction,
        new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
            "portfolioTimestamp", "strategyCode"))
    .parallelismHint(1)
    .shuffle()
    .each(new Fields("strategyCode"), findMongoTradesFunction,
        new Fields("uitid", "id", "sourceSystem", "sourceTransactionTime", "publicationTime",
            "tradeVersion", "urn", "riskViewFrom", "riskViewTo", "authorized"))
    .parallelismHint(10)
    .shuffle()
    .filter(tradeFilterFunction)
    .parallelismHint(150)
    .groupBy(new Fields("uitid"))
    .aggregate(
        new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
            "sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
            "riskViewTo", "uitid"), reduceAggregateFunction,
        new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
            "sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
            "riskViewTo"))
    .parallelismHint(200)
    .groupBy(new Fields("portfolioUrn"))
    .persistentAggregate(stateFactory, new Count(), new Fields("count"));

当我尝试将此拓扑提交给 Storm 时,我遇到了这个错误:

Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
at com.citi.tm.portfolio.tps.PortfolioLauncher.main(PortfolioLauncher.java:34)
Caused by: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
... 3 more

如果我从拓扑中删除最后两个函数,我可以成功提交拓扑,即:

.groupBy(new Fields("portfolioUrn"))
.persistentAggregate(stateFactory, new Count(), new Fields("count"));

在我运行我的聚合函数 (aggregate()) 后,我想用 'portfolioUrn' 字段对元组进行分组,并将计数保存到 mongoDB 中。我不明白为什么最后一个 groupBy().persistentAggregate() 部分会导致此错误。能否请您帮忙找出原因?

【问题讨论】:

【参考方案1】:

经过一番研究,我发现this 页面对我来说似乎是类似的情况。 Nathan Marz 声明 DRPC 拓扑不支持分区持久化(截至 2013 年),我相信我的情况也是如此。我认为,(未完全验证)Storm 1.2.1 DRPC 拓扑可能根本不支持状态持久性。

【讨论】:

以上是关于如何修复 Apache Storm Trident 拓扑中的错误“组件:[x] 订阅不存在的组件 [y]”的主要内容,如果未能解决你的问题,请参考以下文章

Apache Storm 官方文档 —— Trident Spouts

Trident整合Kafka

Storm Trident示例shuffle&parallelismHint

Apache Storm 官方文档 —— Trident State

Storm-HBase Trident - 同时查询多个列

如何使用 Storm Trident 对元组进行批处理?