从 Avro Debezium 数据创建基于 Avro 的 KSQL 流会生成奇怪的模式

Posted

技术标签:

【中文标题】从 Avro Debezium 数据创建基于 Avro 的 KSQL 流会生成奇怪的模式【英文标题】:Creating an Avro based KSQL Stream off Avro Debezium data generates strange schema 【发布时间】:2019-07-29 19:29:12 【问题描述】:

我正在使用 Avro 值转换,它生成如下模式(这只是一个子集,因为它太大了)


  "type": "record",
  "name": "Envelope",
  "namespace": "mssql.dbo.InvTR_T",
  "fields": [
    
      "name": "before",
      "type": [
        "null",
        
          "type": "record",
          "name": "Value",
          "fields": [
            
              "name": "InvTR_ID",
              "type": "int"
            ,
            
              "name": "Type_CH",
              "type": "string"
            ,
            
              "name": "CalcType_CH",
              "type": "string"
            ,
            
              "name": "ER_CST_ID",
              "type": "int"
            ,
            
              "name": "ER_REQ_ID",
              "type": "int"
            ,
            
              "name": "Vendor_ID",
              "type": "int"
            ,
            
              "name": "VendInv_VC",
              "type": "string"
            ,
            
              "name": "Status_CH",
              "type": "string"
            ,
            
              "name": "Stage_TI",
              "type": 
                "type": "int",
                "connect.type": "int16"
              
            ,
            
              "name": "CheckOut_ID",
              "type": [
                "null",
                "int"
              ],
              "default": null
            ,
            
              "name": "ReCalcCk_LG",
              "type": "boolean"
            ,
            
              "name": "ReCalcAll_LG",
              "type": "boolean"
            ,
            
              "name": "PatMatch_LG",
              "type": "boolean"
            ,
            
              "name": "DocPatOvRd_LG",
              "type": "boolean"
            ,
            
              "name": "Locked_LG",
              "type": [
                "null",
                "boolean"
              ],
              "default": null
            ,
            
              "name": "SegErrFlag_LG",
              "type": "boolean"
            ,
            
              "name": "Hold_LG",
              "type": "boolean"
            ,
            
              "name": "Reason_ID",
              "type": [
                "null",
                
                  "type": "int",
                  "connect.type": "int16"
                
              ],
              "default": null
            ,
            
              "name": "HoldCom_VC",
              "type": [
                "null",
                "string"
              ],
              "default": null
            ,
            
              "name": "AllSegFin_LG",
              "type": "boolean"
            ,
            
              "name": "InvAmt_MN",
              "type": 
                "type": "bytes",
                "scale": 4,
                "precision": 19,
                "connect.version": 1,
                "connect.parameters": 
                  "scale": "4",
                  "connect.decimal.precision": "19"
                ,
                "connect.name": "org.apache.kafka.connect.data.Decimal",
                "logicalType": "decimal"
              

当我运行以下命令以创建一个流时

CREATE STREAM stream_invtr_t_json   WITH (KAFKA_TOPIC='InvTR_T', VALUE_FORMAT='AVRO');

然后我描述了那个流,模式是一种非常奇怪的格式。我想使用 KSQL 来过滤掉特定信息并适当地分散这些事件。但是我无法从 Kafka Topic => KSQL Stream => Kafka Topic => Sink 出发。如果然后我从该流中创建一个新主题,并尝试将其消化到接收器中,我会得到 ​​p>

Expected Envelope for transformation, passing it unchanged

然后是关于缺少 PK 的错误。我试图删除展开转换只是为了看看它会如何出现并收到错误。

BEFORE  | STRUCT<INVTR_ID INTEGER, TYPE_CH VARCHAR(STRING), CALCTYPE_CH VARCHAR(STRING), ER_CST_ID INTEGER, ER_REQ_ID INTEGER, VENDOR_ID INTEGER, VENDINV_VC VARCHAR(STRING), STATUS_CH VARCHAR(STRING), STAGE_TI INTEGER, CHECKOUT_ID INTEGER, RECALCCK_LG BOOLEAN, RECALCALL_LG BOOLEAN, PATMATCH_LG BOOLEAN, DOCPATOVRD_LG BOOLEAN, LOCKED_LG BOOLEAN, SEGERRFLAG_LG BOOLEAN, HOLD_LG BOOLEAN, REASON_ID INTEGER, HOLDCOM_VC VARCHAR(STRING), ALLSEGFIN_LG BOOLEAN, INVDATE_DT BIGINT, SHIPDATE_DT BIGINT, PDTERMS_CH VARCHAR(STRING), PMTDUE_DT BIGINT, PMTTERMS_VC VARCHAR(STRING), BILLTERMS_CH VARCHAR(STRING), JOINT_LG BOOLEAN, COMMENT_VC VARCHAR(STRING), SOURCE_CH VARCHAR(STRING), ADDBY_ID VARCHAR(STRING), ADDED_DT BIGINT, CHGBY_ID VARCHAR(STRING), CHGED_DT BIGINT, APPROVED_LG BOOLEAN, MULTIPO_VC VARCHAR(STRING), PRVAUDITED_INVTR_ID INTEGER, PRVVENDOR_ID INTEGER, TRANSITDAYS_SI INTEGER, SHIP_NUM_VC VARCHAR(STRING), PRVTRANSITDAYS_SI INTEGER, PRVJOINT_LG BOOLEAN, CLONEDFROM_INVTR_ID INTEGER, LASTCALC_DT BIGINT, TMSFMANUAL_LG BOOLEAN, FRTRATERSOURCE_CH VARCHAR(STRING), ACTPICKUP_DT BIGINT, ROUTVEND_SI INTEGER, CALCVRSN_TI INTEGER, VENDORRANK_SI INTEGER, SEQ_SI INTEGER, PRVAUDITED_DT BIGINT, FRTRATERBATCHTYPE_CH VARCHAR(STRING), CURRENCY_TYPE_CD VARCHAR(STRING), EXCHANGE_DT BIGINT, EXCHANGE_RATE_LOCKED_LG BOOLEAN, EXCHANGE_DT_LOCKED_LG BOOLEAN, CUSTAPPROVED_LG BOOLEAN, FRTRATERMATCH_INVTR_ID INTEGER, CRC_INVOICE_LG BOOLEAN, RG_ROUTVEND_SI INTEGER, RG_PRVVE

【问题讨论】:

嗨,恕我直言,您应该在要创建 KSQL 流的主题上应用 UnwrapFromEnvelope SMT。另外你确定topic中的数据是Avro格式的吗? 是的,当我从 KSQL 打印该主题时,它会将其显示为 Avro,并且 OP 中的模式是自动构建的。首先展开它是完全有意义的,所以我必须创建一个连接器,将它返回到另一个展开的主题,然后在该主题上使用 KSQL? 您可以直接在源连接器上应用它,这样主题将包含未包装的版本。我还检查了 KSQL 文档,它们支持 STRUCT 数据类型。不能用这个吗?所以你的CREATE STREAM 将包含字符串字段opts_msSTRUCT 字段beforeaftersource @JiriPechanec 是的,确实有效,我在上面发布的最后一个模式是从 CREATE STREAM 自动生成的,其中包含 CDC 数据。我刚刚注意到的奇怪之处在于,如果您将 Avro 架构与 KSQL 中的架构进行比较,它会删除我所有的小数列。 - 请查看decimal.handling.mode。改变它可能会有所帮助。 【参考方案1】:

似乎关于UnwrapFromEnvelope 的cmets 解决了您的部分问题。这只剩下关于小数的部分没有通过。

查看连接器的文档:https://debezium.io/documentation/reference/1.1/connectors/postgresql.html

正如 Jiri 评论的那样,我可以看到有一个 decimal.handling.mode 设置。在其默认值precise 下,它看起来会以 ksqlDB 可以识别的格式输出 Avro 小数,除非源 NUMERIC 或 DECIMAL 类型没有使用任何比例。此时您将得到 STRUCT 数据结构,其中包含一个 BYTE 字段。

此规则有一个例外。当使用 NUMERIC 或 DECIMAL 类型而没有任何比例约束时,这意味着来自数据库的值对于每个值具有不同的(可变)比例。在这种情况下,使用 io.debezium.data.VariableScaleDecimal 类型,它包含传输值的值和比例。

所以要将数据导入 ksqlDB,您需要:

    等到我们支持 BYTES 数据类型(目前不在我们的路线图中) 更改源表的架构以定义列的比例。 将 decimal.handling.mode 更改为其他设置。您可能可以使用字符串,然后将值转换为 ksql 中的小数。

【讨论】:

嗨 Adrew Coates,我也有同样的问题。我打算按照在 debezium 连接器中更改“decimal.handling.mode=string”的方法。但是进行此更改需要删除旧主题并再次运行 debezium 连接器?

以上是关于从 Avro Debezium 数据创建基于 Avro 的 KSQL 流会生成奇怪的模式的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Debezium 创建的 avro 消息中获取字段?

Debezium 将 Avro 数据视为二进制

使用带有 Avro 序列化的 Debezium mongodb CDC 创建的模式太多

Debezium 可能会产生无效的模式

Avro 类不能转换为 org.springframework.messaging.Message 类

无法读取 Kafka 主题 avro 消息