在Avro Debezium数据上创建一个基于Avro的KSQL流,会产生奇怪的模式。
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Avro Debezium数据上创建一个基于Avro的KSQL流,会产生奇怪的模式。相关的知识,希望对你有一定的参考价值。
我正在使用Avro值变换,生成如下模式(这只是一个子集,因为它太大)。
"type": "record",
"name": "Envelope",
"namespace": "mssql.dbo.InvTR_T",
"fields": [
"name": "before",
"type": [
"null",
"type": "record",
"name": "Value",
"fields": [
"name": "InvTR_ID",
"type": "int"
,
"name": "Type_CH",
"type": "string"
,
"name": "CalcType_CH",
"type": "string"
,
"name": "ER_CST_ID",
"type": "int"
,
"name": "ER_REQ_ID",
"type": "int"
,
"name": "Vendor_ID",
"type": "int"
,
"name": "VendInv_VC",
"type": "string"
,
"name": "Status_CH",
"type": "string"
,
"name": "Stage_TI",
"type":
"type": "int",
"connect.type": "int16"
,
"name": "CheckOut_ID",
"type": [
"null",
"int"
],
"default": null
,
"name": "ReCalcCk_LG",
"type": "boolean"
,
"name": "ReCalcAll_LG",
"type": "boolean"
,
"name": "PatMatch_LG",
"type": "boolean"
,
"name": "DocPatOvRd_LG",
"type": "boolean"
,
"name": "Locked_LG",
"type": [
"null",
"boolean"
],
"default": null
,
"name": "SegErrFlag_LG",
"type": "boolean"
,
"name": "Hold_LG",
"type": "boolean"
,
"name": "Reason_ID",
"type": [
"null",
"type": "int",
"connect.type": "int16"
],
"default": null
,
"name": "HoldCom_VC",
"type": [
"null",
"string"
],
"default": null
,
"name": "AllSegFin_LG",
"type": "boolean"
,
"name": "InvAmt_MN",
"type":
"type": "bytes",
"scale": 4,
"precision": 19,
"connect.version": 1,
"connect.parameters":
"scale": "4",
"connect.decimal.precision": "19"
,
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
当我运行下面的代码来创建一个流时
CREATE STREAM stream_invtr_t_json WITH (KAFKA_TOPIC='InvTR_T', VALUE_FORMAT='AVRO');
然后我描述那个流,模式的格式非常奇怪。我想使用KSQL,以便过滤出特定的信息,并适当地分散这些事件。然而我无法从Kafka Topic => KSQL Stream => Kafka Topic => Sink。如果我然后从该流中创建一个新的主题,并尝试将其消化为Sink,我得到的是
Expected Envelope for transformation, passing it unchanged
然后出现了关于PK缺失的错误。我试着去掉拆包转换,想看看它的结果如何,结果也收到了错误。
BEFORE | STRUCT<INVTR_ID INTEGER, TYPE_CH VARCHAR(STRING), CALCTYPE_CH VARCHAR(STRING), ER_CST_ID INTEGER, ER_REQ_ID INTEGER, VENDOR_ID INTEGER, VENDINV_VC VARCHAR(STRING), STATUS_CH VARCHAR(STRING), STAGE_TI INTEGER, CHECKOUT_ID INTEGER, RECALCCK_LG BOOLEAN, RECALCALL_LG BOOLEAN, PATMATCH_LG BOOLEAN, DOCPATOVRD_LG BOOLEAN, LOCKED_LG BOOLEAN, SEGERRFLAG_LG BOOLEAN, HOLD_LG BOOLEAN, REASON_ID INTEGER, HOLDCOM_VC VARCHAR(STRING), ALLSEGFIN_LG BOOLEAN, INVDATE_DT BIGINT, SHIPDATE_DT BIGINT, PDTERMS_CH VARCHAR(STRING), PMTDUE_DT BIGINT, PMTTERMS_VC VARCHAR(STRING), BILLTERMS_CH VARCHAR(STRING), JOINT_LG BOOLEAN, COMMENT_VC VARCHAR(STRING), SOURCE_CH VARCHAR(STRING), ADDBY_ID VARCHAR(STRING), ADDED_DT BIGINT, CHGBY_ID VARCHAR(STRING), CHGED_DT BIGINT, APPROVED_LG BOOLEAN, MULTIPO_VC VARCHAR(STRING), PRVAUDITED_INVTR_ID INTEGER, PRVVENDOR_ID INTEGER, TRANSITDAYS_SI INTEGER, SHIP_NUM_VC VARCHAR(STRING), PRVTRANSITDAYS_SI INTEGER, PRVJOINT_LG BOOLEAN, CLONEDFROM_INVTR_ID INTEGER, LASTCALC_DT BIGINT, TMSFMANUAL_LG BOOLEAN, FRTRATERSOURCE_CH VARCHAR(STRING), ACTPICKUP_DT BIGINT, ROUTVEND_SI INTEGER, CALCVRSN_TI INTEGER, VENDORRANK_SI INTEGER, SEQ_SI INTEGER, PRVAUDITED_DT BIGINT, FRTRATERBATCHTYPE_CH VARCHAR(STRING), CURRENCY_TYPE_CD VARCHAR(STRING), EXCHANGE_DT BIGINT, EXCHANGE_RATE_LOCKED_LG BOOLEAN, EXCHANGE_DT_LOCKED_LG BOOLEAN, CUSTAPPROVED_LG BOOLEAN, FRTRATERMATCH_INVTR_ID INTEGER, CRC_INVOICE_LG BOOLEAN, RG_ROUTVEND_SI INTEGER, RG_PRVVE
好像是关于 UnwrapFromEnvelope
解决了你的部分问题。那就只剩下小数没有通过的部分了。
看一下连接器的文档。https:/debezium.iodocumentationreference1.1connectorspostgresql.html。
我可以看到有一个 decimal.handling.mode
正如Jiri评论的那样,设置。在其默认值为 precise
看起来它将以一种ksqlDB能够识别的格式输出Avro十进制,除非使用源NUMERIC或DECIMAL类型,而不使用任何比例。在这一点上,你最终会得到 STRUCT
数据结构,其中包括一个BYTE字段。
这个规则有一个例外。当使用NUMERIC或DECIMAL类型时,没有任何比例限制,这意味着来自数据库的值有一个不同的(可变)比例。在这种情况下,使用的是io.debezium.data.VariableScaleDecimal类型,它包含了传输值的值和比例。
所以要把数据导入到ksqlDB中,你需要。
- 等到我们支持BYTES数据类型,(目前还不在我们的路线图上)。
- 改变源表的模式来定义列的比例。
- 将decimal.handling.mode改为其他设置。你也许可以使用字符串,然后在ksql中把值CAST为小数。
以上是关于在Avro Debezium数据上创建一个基于Avro的KSQL流,会产生奇怪的模式。的主要内容,如果未能解决你的问题,请参考以下文章
如何从 Debezium 创建的 avro 消息中获取字段?