在Avro Debezium数据上创建一个基于Avro的KSQL流,会产生奇怪的模式。

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Avro Debezium数据上创建一个基于Avro的KSQL流,会产生奇怪的模式。相关的知识,希望对你有一定的参考价值。

我正在使用Avro值变换,生成如下模式(这只是一个子集,因为它太大)。


  "type": "record",
  "name": "Envelope",
  "namespace": "mssql.dbo.InvTR_T",
  "fields": [
    
      "name": "before",
      "type": [
        "null",
        
          "type": "record",
          "name": "Value",
          "fields": [
            
              "name": "InvTR_ID",
              "type": "int"
            ,
            
              "name": "Type_CH",
              "type": "string"
            ,
            
              "name": "CalcType_CH",
              "type": "string"
            ,
            
              "name": "ER_CST_ID",
              "type": "int"
            ,
            
              "name": "ER_REQ_ID",
              "type": "int"
            ,
            
              "name": "Vendor_ID",
              "type": "int"
            ,
            
              "name": "VendInv_VC",
              "type": "string"
            ,
            
              "name": "Status_CH",
              "type": "string"
            ,
            
              "name": "Stage_TI",
              "type": 
                "type": "int",
                "connect.type": "int16"
              
            ,
            
              "name": "CheckOut_ID",
              "type": [
                "null",
                "int"
              ],
              "default": null
            ,
            
              "name": "ReCalcCk_LG",
              "type": "boolean"
            ,
            
              "name": "ReCalcAll_LG",
              "type": "boolean"
            ,
            
              "name": "PatMatch_LG",
              "type": "boolean"
            ,
            
              "name": "DocPatOvRd_LG",
              "type": "boolean"
            ,
            
              "name": "Locked_LG",
              "type": [
                "null",
                "boolean"
              ],
              "default": null
            ,
            
              "name": "SegErrFlag_LG",
              "type": "boolean"
            ,
            
              "name": "Hold_LG",
              "type": "boolean"
            ,
            
              "name": "Reason_ID",
              "type": [
                "null",
                
                  "type": "int",
                  "connect.type": "int16"
                
              ],
              "default": null
            ,
            
              "name": "HoldCom_VC",
              "type": [
                "null",
                "string"
              ],
              "default": null
            ,
            
              "name": "AllSegFin_LG",
              "type": "boolean"
            ,
            
              "name": "InvAmt_MN",
              "type": 
                "type": "bytes",
                "scale": 4,
                "precision": 19,
                "connect.version": 1,
                "connect.parameters": 
                  "scale": "4",
                  "connect.decimal.precision": "19"
                ,
                "connect.name": "org.apache.kafka.connect.data.Decimal",
                "logicalType": "decimal"
              

当我运行下面的代码来创建一个流时

CREATE STREAM stream_invtr_t_json   WITH (KAFKA_TOPIC='InvTR_T', VALUE_FORMAT='AVRO');

然后我描述那个流,模式的格式非常奇怪。我想使用KSQL,以便过滤出特定的信息,并适当地分散这些事件。然而我无法从Kafka Topic => KSQL Stream => Kafka Topic => Sink。如果我然后从该流中创建一个新的主题,并尝试将其消化为Sink,我得到的是

Expected Envelope for transformation, passing it unchanged

然后出现了关于PK缺失的错误。我试着去掉拆包转换,想看看它的结果如何,结果也收到了错误。

BEFORE  | STRUCT<INVTR_ID INTEGER, TYPE_CH VARCHAR(STRING), CALCTYPE_CH VARCHAR(STRING), ER_CST_ID INTEGER, ER_REQ_ID INTEGER, VENDOR_ID INTEGER, VENDINV_VC VARCHAR(STRING), STATUS_CH VARCHAR(STRING), STAGE_TI INTEGER, CHECKOUT_ID INTEGER, RECALCCK_LG BOOLEAN, RECALCALL_LG BOOLEAN, PATMATCH_LG BOOLEAN, DOCPATOVRD_LG BOOLEAN, LOCKED_LG BOOLEAN, SEGERRFLAG_LG BOOLEAN, HOLD_LG BOOLEAN, REASON_ID INTEGER, HOLDCOM_VC VARCHAR(STRING), ALLSEGFIN_LG BOOLEAN, INVDATE_DT BIGINT, SHIPDATE_DT BIGINT, PDTERMS_CH VARCHAR(STRING), PMTDUE_DT BIGINT, PMTTERMS_VC VARCHAR(STRING), BILLTERMS_CH VARCHAR(STRING), JOINT_LG BOOLEAN, COMMENT_VC VARCHAR(STRING), SOURCE_CH VARCHAR(STRING), ADDBY_ID VARCHAR(STRING), ADDED_DT BIGINT, CHGBY_ID VARCHAR(STRING), CHGED_DT BIGINT, APPROVED_LG BOOLEAN, MULTIPO_VC VARCHAR(STRING), PRVAUDITED_INVTR_ID INTEGER, PRVVENDOR_ID INTEGER, TRANSITDAYS_SI INTEGER, SHIP_NUM_VC VARCHAR(STRING), PRVTRANSITDAYS_SI INTEGER, PRVJOINT_LG BOOLEAN, CLONEDFROM_INVTR_ID INTEGER, LASTCALC_DT BIGINT, TMSFMANUAL_LG BOOLEAN, FRTRATERSOURCE_CH VARCHAR(STRING), ACTPICKUP_DT BIGINT, ROUTVEND_SI INTEGER, CALCVRSN_TI INTEGER, VENDORRANK_SI INTEGER, SEQ_SI INTEGER, PRVAUDITED_DT BIGINT, FRTRATERBATCHTYPE_CH VARCHAR(STRING), CURRENCY_TYPE_CD VARCHAR(STRING), EXCHANGE_DT BIGINT, EXCHANGE_RATE_LOCKED_LG BOOLEAN, EXCHANGE_DT_LOCKED_LG BOOLEAN, CUSTAPPROVED_LG BOOLEAN, FRTRATERMATCH_INVTR_ID INTEGER, CRC_INVOICE_LG BOOLEAN, RG_ROUTVEND_SI INTEGER, RG_PRVVE
答案

好像是关于 UnwrapFromEnvelope 解决了你的部分问题。那就只剩下小数没有通过的部分了。

看一下连接器的文档。https:/debezium.iodocumentationreference1.1connectorspostgresql.html。

我可以看到有一个 decimal.handling.mode 正如Jiri评论的那样,设置。在其默认值为 precise 看起来它将以一种ksqlDB能够识别的格式输出Avro十进制,除非使用源NUMERIC或DECIMAL类型,而不使用任何比例。在这一点上,你最终会得到 STRUCT 数据结构,其中包括一个BYTE字段。

这个规则有一个例外。当使用NUMERIC或DECIMAL类型时,没有任何比例限制,这意味着来自数据库的值有一个不同的(可变)比例。在这种情况下,使用的是io.debezium.data.VariableScaleDecimal类型,它包含了传输值的值和比例。

所以要把数据导入到ksqlDB中,你需要。

  1. 等到我们支持BYTES数据类型,(目前还不在我们的路线图上)。
  2. 改变源表的模式来定义列的比例。
  3. 将decimal.handling.mode改为其他设置。你也许可以使用字符串,然后在ksql中把值CAST为小数。

以上是关于在Avro Debezium数据上创建一个基于Avro的KSQL流,会产生奇怪的模式。的主要内容,如果未能解决你的问题,请参考以下文章

Debezium 将 Avro 数据视为二进制

如何从 Debezium 创建的 avro 消息中获取字段?

使用带有 Avro 序列化的 Debezium mongodb CDC 创建的模式太多

Debezium 可能会产生无效的模式

无法读取 Kafka 主题 avro 消息

Avro 类不能转换为 org.springframework.messaging.Message 类