如何防止来自 Cassandra 的 Dataflow 读取并行度降低
Posted
技术标签:
【中文标题】如何防止来自 Cassandra 的 Dataflow 读取并行度降低【英文标题】:How to prevent decrease of Dataflow read parallelism from Cassandra 【发布时间】:2018-01-19 04:39:22 【问题描述】:您可以阅读我的设置here。我解决了那里描述的问题,但我有新问题。
我正在从 3 个表中读取数据。我对一张(最大的)桌子有疑问。我从表中读取了很多数据,速度很高~300000 行/秒,但在~10 小时后(当从其他两个表读取完成时)它下降到~20000 行/秒。 24 小时后它还没有完成。
日志中有很多可疑的行:
I Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at "fractionConsumed":0.5
I Rejecting split request because custom reader returned null residual source.
I Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at "fractionConsumed":0.5
I Rejecting split request because custom reader returned null residual source.
I Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at "fractionConsumed":0.5
I Rejecting split request because custom reader returned null residual source.
I Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at "fractionConsumed":0.5
I Rejecting split request because custom reader returned null residual source.
I Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at "fractionConsumed":0.5
I Rejecting split request because custom reader returned null residual source.
I Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at "fractionConsumed":0.5
I Rejecting split request because custom reader returned null residual source.
I Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at "fractionConsumed":0.5
I Rejecting split request because custom reader returned null residual source.
更新
作业以异常结束:
(f000632be487340d): Workflow failed. Causes: (844d65bb40eb132b): S14:Read from Cassa table/Read(CassandraSource)+Transform to KV by id+CoGroupByKey id/MakeUnionTable0+CoGroupByKey id/GroupByKey/Reify+CoGroupByKey id/GroupByKey/Write failed., (c07ceebe5d95f668): A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on:
starterpipeline-sosenko19-01172254-4260-harness-wrdk,
starterpipeline-sosenko19-01172254-4260-harness-xrkd,
starterpipeline-sosenko19-01172254-4260-harness-hvfd,
starterpipeline-sosenko19-01172254-4260-harness-0pf5
关于 CoGroupByKey
>有两张桌子。一个有大约 20 亿行,每行都有唯一的键(每个键 1 行)。第二个有大约 200 亿行,每个键少于或等于 10 行。
管道图
这是CoGroupByKey match_id
块内的内容:
管道代码
// Create pipeline
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
// Read data from Cassandra table opendota_player_match_by_account_id2
PCollection<OpendotaPlayerMatch> player_matches = p.apply("Read from Cassa table opendota_player_match_by_account_id2", CassandraIO.<OpendotaPlayerMatch>read()
.withHosts(Arrays.asList("10.132.9.101", "10.132.9.102", "10.132.9.103", "10.132.9.104")).withPort(9042)
.withKeyspace("cybermates").withTable(CASSA_OPENDOTA_PLAYER_MATCH_BY_ACCOUNT_ID_TABLE_NAME)
.withEntity(OpendotaPlayerMatch.class).withCoder(SerializableCoder.of(OpendotaPlayerMatch.class))
.withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));
// Transform player_matches to KV by match_id
PCollection<KV<Long, OpendotaPlayerMatch>> opendota_player_matches_by_match_id = player_matches
.apply("Transform player_matches to KV by match_id", ParDo.of(new DoFn<OpendotaPlayerMatch, KV<Long, OpendotaPlayerMatch>>()
@ProcessElement
public void processElement(ProcessContext c)
// LOG.info(c.element().match_id.toString());
c.output(KV.of(c.element().match_id, c.element()));
));
// Read data from Cassandra table opendota_match
PCollection<OpendotaMatch> opendota_matches = p.apply("Read from Cassa table opendota_match", CassandraIO.<OpendotaMatch>read()
.withHosts(Arrays.asList("10.132.9.101", "10.132.9.102", "10.132.9.103", "10.132.9.104")).withPort(9042)
.withKeyspace("cybermates").withTable(CASSA_OPENDOTA_MATCH_TABLE_NAME).withEntity(OpendotaMatch.class)
.withCoder(SerializableCoder.of(OpendotaMatch.class))
.withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));
// Read data from Cassandra table match
PCollection<OpendotaMatch> matches = p.apply("Read from Cassa table match", CassandraIO.<Match>read()
.withHosts(Arrays.asList("10.132.9.101", "10.132.9.102", "10.132.9.103", "10.132.9.104")).withPort(9042)
.withKeyspace("cybermates").withTable(CASSA_MATCH_TABLE_NAME).withEntity(Match.class)
.withCoder(SerializableCoder.of(Match.class))
.withConsistencyLevel(CASSA_CONSISTENCY_LEVEL))
.apply("Adopt match for uniform structure", ParDo.of(new DoFn<Match, OpendotaMatch>()
@ProcessElement
public void processElement(ProcessContext c)
// LOG.info(c.element().match_id.toString());
OpendotaMatch m = new OpendotaMatch();
// opendota_match and match tables have slightly different schema. I've cut out conversion here because it's large and dummy
c.output(m);
));
// Union match and opendota_match
PCollectionList<OpendotaMatch> matches_collections = PCollectionList.of(opendota_matches).and(matches);
PCollection<OpendotaMatch> all_matches = matches_collections.apply("Union match and opendota_match", Flatten.<OpendotaMatch>pCollections());
// Transform matches to KV by match_id
PCollection<KV<Long, OpendotaMatch>> matches_by_match_id = all_matches
.apply("Transform matches to KV by match_id", ParDo.of(new DoFn<OpendotaMatch, KV<Long, OpendotaMatch>>()
@ProcessElement
public void processElement(ProcessContext c)
// LOG.info(c.element().players.toString());
c.output(KV.of(c.element().match_id, c.element()));
));
// CoGroupByKey match_id
// Replicate data
final TupleTag<OpendotaPlayerMatch> player_match_tag = new TupleTag<OpendotaPlayerMatch>();
final TupleTag<OpendotaMatch> match_tag = new TupleTag<OpendotaMatch>();
PCollection<KV<Long, PMandM>> joined_matches = KeyedPCollectionTuple
.of(player_match_tag, opendota_player_matches_by_match_id).and(match_tag, matches_by_match_id)
.apply("CoGroupByKey match_id", CoGroupByKey.<Long>create())
.apply("Replicate data", ParDo.of(new DoFn<KV<Long, CoGbkResult>, KV<Long, PMandM>>()
@ProcessElement
public void processElement(ProcessContext c)
try
OpendotaMatch m = c.element().getValue().getAll(match_tag).iterator().next();
Iterable<OpendotaPlayerMatch> pms = c.element().getValue().getAll(player_match_tag);
for (OpendotaPlayerMatch pm : pms)
if (0 <= pm.account_id && pm.account_id < MAX_UINT)
for (OpendotaPlayerMatch pm2 : pms)
c.output(KV.of(pm.account_id, new PMandM(pm2, m)));
catch (NoSuchElementException e)
LOG.error(c.element().getValue().getAll(player_match_tag).iterator().next().match_id.toString() + " " + e.toString());
));
// Transform to byte array
// Write to BQ
joined_matches
.apply("Transform to byte array, Write to BQ", BigQueryIO.<KV<Long, PMandM>>write().to(new DynamicDestinations<KV<Long, PMandM>, String>()
public String getDestination(ValueInSingleWindow<KV<Long, PMandM>> element)
return element.getValue().getKey().toString();
public TableDestination getTable(String account_id_str)
return new TableDestination("cybrmt:" + BQ_DATASET_NAME + ".player_match_" + account_id_str,
"Table for user " + account_id_str);
public TableSchema getSchema(String account_id_str)
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("value").setType("BYTES"));
return new TableSchema().setFields(fields);
).withFormatFunction(new SerializableFunction<KV<Long, PMandM>, TableRow>()
public TableRow apply(KV<Long, PMandM> element)
OpendotaPlayerMatch pm = element.getValue().pm;
OpendotaMatch m = element.getValue().m;
TableRow tr = new TableRow();
ByteBuffer bb = ByteBuffer.allocate(114);
// I've cut out transform to byte buffer here because it's large and dummy
tr.set("value", bb.array());
return tr;
));
p.run();
UPD2。我尝试单独阅读问题表
我试图单独从上面阅读问题表。管道包含 CassandraIO.Read 转换和虚拟 ParDo 转换以及一些日志记录输出。现在它的行为就像完整的管道。有一个(我相信是最后一个)无法完成的拆分:
I Proposing dynamic split of work unit cybrmt;2018-01-20_21_28_01-3451798636786921663;1617811313034836533 at "fractionConsumed":0.5
I Rejecting split request because custom reader returned null residual source.
这是管道图:
这里是代码:
// Create pipeline
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
// Read data from Cassandra table opendota_player_match_by_account_id2
PCollection<OpendotaPlayerMatch> player_matches = p.apply("Read from Cassa table opendota_player_match_by_account_id2", CassandraIO.<OpendotaPlayerMatch>read()
.withHosts(Arrays.asList("10.132.9.101", "10.132.9.102", "10.132.9.103", "10.132.9.104")).withPort(9042)
.withKeyspace("cybermates").withTable(CASSA_OPENDOTA_PLAYER_MATCH_BY_ACCOUNT_ID_TABLE_NAME)
.withEntity(OpendotaPlayerMatch.class).withCoder(SerializableCoder.of(OpendotaPlayerMatch.class))
.withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));
// Print my matches
player_matches.apply("Print my matches", ParDo.of(new DoFn<OpendotaPlayerMatch, Long>()
@ProcessElement
public void processElement(ProcessContext c)
if (c.element().account_id == 114688838)
LOG.info(c.element().match_id.toString());
c.output(c.element().match_id);
));
p.run();
UPD3
小管道(CassandraIO.Read 和 ParDo)在 23 小时内成功完成。前 4 小时有最大数量的工作人员 (40) 和出色的读取速度 (~300000 行/秒)。之后,工作人员的数量自动调整为 1,读取速度提高到 ~15000 行/秒。这是图表:
这里是日志结尾:
I Proposing dynamic split of work unit cybrmt;2018-01-20_21_28_01-3451798636786921663;1617811313034836533 at "fractionConsumed":0.5
I Rejecting split request because custom reader returned null residual source.
I Proposing dynamic split of work unit cybrmt;2018-01-20_21_28_01-3451798636786921663;1617811313034836533 at "fractionConsumed":0.5
I Rejecting split request because custom reader returned null residual source.
I Success processing work item cybrmt;2018-01-20_21_28_01-3451798636786921663;1617811313034836533
I Finished processing stage s01 with 0 errors in 75268.681 seconds
【问题讨论】:
你能分享一个工作ID吗? 当然,@jkff。职位编号:2018-01-17_22_54_11-12138573770170126316 在我得到的最后一个异常(见问题)中说:“GroupByKey/Write failed”。 @jkff “失去与服务的联系”通常意味着工作人员由于某种原因而崩溃,例如哎呀。您是否在 Stackdriver 日志中看到类似的内容?是你的工作吗?执行每个键的大量数据的连接,或类似的东西?如果您共享一些代码会有所帮助 - 但我相信这个错误不再与从 Cassandra 读取有关。 我之前有 OOM 错误(并且它在日志中),但我已将机器类型更改为n1-highmem-2
并且错误消失了。有问题的错误消息是我在日志中看到的唯一一条错误消息。我的工作是执行CoGroupByKey
操作(请参阅管道图和代码),每个键约 10 行(约 20 亿键)。 @jkff
【参考方案1】:
我终于使用@jkff 建议并从具有不同分区键的表中读取数据,该表分布更均匀(实际上在我的数据模式中有两个具有相同数据但分区键不同的表)。
【讨论】:
以上是关于如何防止来自 Cassandra 的 Dataflow 读取并行度降低的主要内容,如果未能解决你的问题,请参考以下文章