连接两个具有不同键名的表
Posted
技术标签:
【中文标题】连接两个具有不同键名的表【英文标题】:Join two tables with different key name 【发布时间】:2018-12-03 14:13:07 【问题描述】:尝试实现以下场景,
-
使用相同的键连接两个表(A、B)
过滤表(c)
加入第 1 步的结果和第 2 步的结果。这里它有不同的键名但相同的值(例如:第一个表列名是“id”第二个表列名是“Fid”但两个值相同)。
使用 Cloud Dataflow 执行代码时出现以下错误,
严重:2018-12-03T13:52:47.634Z: java.lang.IllegalStateException: 需要唯一键,但发现键 127348#null 值为 HEADER_ID=18219955, ORDER_TYPE_ID=2124, ORDER_NUMBER=729637, ORDERED_DATE=10 /29/2018 下午 4:01:25,TRANSACTIONAL_CURR_CODE=USD,CUST_PO_NUMBER=942634,SOLD_TO_ORG_ID=127348,SHIP_FROM_ORG_ID=934,PRICE_LIST_ID=7035,CREATION_DATE=2018-10-29 16:10:41 UTC,LAST_UPDATE_DATE -29 16:10:13 UTC,FLOW_STATUS_CODE=BOOKED 和 HEADER_ID=18219945,ORDER_TYPE_ID=2124,ORDER_NUMBER=729636,ORDERED_DATE=10/29/2018 3:56:05 PM,TRANSACTIONAL_CURR_CODE=USD,CUST_PO_NUMBER=941674,SOLDTO_ORGID =127348, SHIP_FROM_ORG_ID=934, PRICE_LIST_ID=7035, CREATION_DATE=2018-10-29 15:10:20 UTC, LAST_UPDATE_DATE=2018-10-29 16:10:34 UTC, FLOW_STATUS_CODE=BOOKED 在窗口 org.apache.beam .sdk.transforms.windowing.GlobalWindow@6c5cc8ee。 在 org.apache.beam.runners.dataflow.BatchViewOverrides$BatchViewAsMultimap$ToIsmRecordForMapLikeDoFn.processElement(BatchViewOverrides.java:442)
这是我尝试过的全部代码:
WithKeys<String, TableRow> headerKey = WithKeys.of( (TableRow row) -> String.format("%s",row.get("PARTY_ID"))).withKeyType(TypeDescriptors.strings());
PCollection<KV<String,TableRow>> mainInput = p.apply("ReadCustomerAccount",BigQueryIO.readTableRows().from(options.getCustAccount())).apply("WithKeys", headerKey);
PCollection<KV<String,TableRow>> sideInput = p.apply("ReadCustomerParty",BigQueryIO.readTableRows().from(options.getPartyTable())).apply("WithKeys", headerKey);
PCollection<TableRow> result = CommonFunctions.innerJoinBQTbls("InnerJoin",mainInput,sideInput);
@SuppressWarnings("serial")
PCollection<TableRow> finalResultCollection = result.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
@ProcessElement
public void processElement(ProcessContext c)
TableRow keyString = c.element();
TableRow mainList = (TableRow) keyString.get("main");
TableRow sideList = (TableRow) keyString.get("side");
TableRow targetRow = new TableRow();
targetRow.set("partyID", Integer.valueOf(keyString.get("key").toString()));
targetRow.set("accountNumber", mainList.get("ACCOUNT_NUMBER"));
targetRow.set("customerName", sideList.get("PARTY_NAME"));
targetRow.set("updatedDate",keyString.get("updatedDate"));
c.output(targetRow);
));
PCollection<TableRow> headerData = p.apply("ReadInvoice",BigQueryIO.readTableRows().from(options.getOrderHeaderAll()));
PCollection<TableRow> pc934Collection = headerData.apply(Filter.by(
(TableRow t) ->
String orgCode = t.get("SHIP_FROM_ORG_ID").toString();
if (orgCode.equals("934"))
return true;
return false;
));
WithKeys<String, TableRow> soltoOrg = WithKeys.of(
(TableRow row) ->
String.format("%s#%s",
row.get("SOLD_TO_ORG_ID"),
row.get("CUST_ACCOUNT_ID")))
.withKeyType(TypeDescriptors.strings());
PCollection<KV<String,TableRow>> customerHeaderAccount = pc934Collection.apply("WithKeys", soltoOrg);
PCollection<KV<String,TableRow>> customerHeaderAll = finalResultCollection.apply("WithKeys", soltoOrg);
PCollection<TableRow> secondResult = CommonFunctions.innerJoinBQTbls("InnerJoin1",customerHeaderAll,customerHeaderAccount);
@SuppressWarnings("serial")
PCollection<TableRow> secondResultCollection = secondResult.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
@ProcessElement
public void processElement(ProcessContext c)
TableRow keyString = c.element();
TableRow mainList = (TableRow) keyString.get("main");
TableRow sideList = (TableRow) keyString.get("side");
TableRow targetRow = new TableRow();
targetRow.set("orderNumber", mainList.get("ORDER_NUMBER"));
targetRow.set("headerId", Integer.valueOf(mainList.get("HEADER_ID").toString()));
targetRow.set("partyID", Integer.valueOf(keyString.get("key").toString()));
targetRow.set("accountNumber", mainList.get("ACCOUNT_NUMBER"));
targetRow.set("customerName", sideList.get("PARTY_NAME"));
targetRow.set("updatedDate",keyString.get("updatedDate"));
c.output(targetRow);
));
【问题讨论】:
【参考方案1】:很可能您的其中一个键为空。您可以通过不将此作为主键来解决此问题。主键不能为 NULL,或者,如果它们是复合主键,则不能包含 NULL。取而代之的是使其成为唯一索引。例如使用自动编号字段作为主键。
【讨论】:
以上是关于连接两个具有不同键名的表的主要内容,如果未能解决你的问题,请参考以下文章
使用 TSQL OPENJSON 如何从具有动态键名的 JSON 数组中提取值
在 Typescript 中访问具有动态键名的状态 [重复]