在 UDF 中使用大小写错误调用函数

Posted

技术标签:

【中文标题】在 UDF 中使用大小写错误调用函数【英文标题】:erroneous function call with case when inside UDF 【发布时间】:2021-12-17 00:42:28 【问题描述】:

当我在 google bigquery 中的一个查询中使用 UDF 中的 CASE WHEN 语句时,我注意到一些非常奇怪的行为。结果真的很奇怪,所以要么我遗漏了一些非常明显的东西,要么在查询执行中出现了一些奇怪的行为。

(旁注:如果有更有效的方法来实现下面的查询逻辑,我全神贯注,我的查询需要永远)

我正在处理一些日志行,其中每一行包含用于解码的data: stringtopics: array<string> 字段。每种类型的日志行都会有不同的topics长度,并且需要不同的解码逻辑。我在 UDF 中使用 CASE WHEN 来切换到不同的解码方法。我最初遇到了一个奇怪的错误,即对数组的索引太远。这要么意味着不符合标准的数据,要么意味着在某些时候调用了错误的解码器。我验证了所有数据都符合规范,所以肯定是后者。

我已将其范围缩小为在我的 CASE WHEN 中针对错误类型执行的错误/无关解码器。

最奇怪的是,当我插入固定值而不是解码函数时,CASE WHEN 的返回值并不表示它是错误匹配。不知何故,当我使用函数时,第一个函数被调用,但在调试时,我从第二个 WHEN 的正确值中获取值。

我从 udf 中提取了逻辑,并使用 if(..) 而不是 CASE WHEN 来实现它,并且一切都可以正常解码。我想知道这里发生了什么,如果它是 bigquery 中的错误,或者在使用 UDF 时发生了一些奇怪的事情。

这是查询的精简版

-- helper function to normalize different payloads into a flattened struct
create temporary function wrap_struct(payload array<struct<name string, value string>>) as (
    (select as struct
        decode_field_type1(field1) as field1,
        decode_field_type1(field2) as field2,
        decode_field_type2(field3) as field3,
        -- a bunch more fields
    from (select * from 
        (select p.name, p.value 
            from unnest(payload) as p) pivot(string_agg(value) for name in (
                'field1', 'field2', 'field3', --a bunch more fields
            )
        )
    ))
);

-- this topic uses the data and topics in the decoding, and has a topics array of length 4
-- this gets called from the switch with a payload from topics2, which has a shorter topics array of length 1, causing a failure
create temporary function decode_topic1(data string, topics array<string>) as
(
    wrap_struct([
        struct("field1" as name, substring(topics[offset(1)], 3) as value),
        struct("field2" as name, substring(topics[offset(2)], 3) as value),
        struct("field3" as name, substring(topics[offset(3)], 3) as value),
        struct("field4" as name, substring(data, 3, 64) as value)
    ])
);

--this uses only the data_payload, and has a topics array of length 1
create temporary function decode_topic2(data string, topics array<string>) as
(
    wrap_struct([
        struct("field1" as name, substring(data, 3, 64) as value),
        struct("field5" as name, substring(data, 67, 64) as value),
        struct("field6" as name, substring(data, 131, 64) as value)
    ])
);

create temporary function decode_event_data(data string, topics array<string>) as 
(
    -- first element of topics denotes the type of event
    case
        -- somehow the function decode_topic1 gets called when topics[0] == topic2
        -- HOWEVER, when i replaced the functions with a fixed value to debug
        -- i get the expected results, indicating a proper match.
        -- this is not unique these topics
        -- it happens with other combinations also.
        when topics[offset(0)] = 'topic1' then decode_topic1(data, topics)
        when topics[offset(0)] = 'topic2' then decode_topic2(data, topics)
        -- a bunch more topics
        else wrap_struct([])
    end
);

select
    id, data, topics,
    decode_event_data(data, topics) as decoded_payload
from (select * from mytable
where 
    topics[offset(0)] = 'topic1'
    or topics[offset(0)] = 'topic2'

当我将基本查询更改为:

select
    id, data, topics, decode_topic2(data, topics)
from (select * from mytable
where 
topics[offset(0)] = 'topic2'

它解码得很好。

CASE WHEN 怎么了?

编辑:这是对可能产生问题的公共数据集的查询:

    concat('0x', substring(raw, 25, 40))
);

create temporary function decode_amount(raw string) as (
    concat('0x', raw)
);

create temporary function wrap_struct(payload array<struct<name string, value string>>) as (
    (select as struct
        decode_address(sender) as reserve,
        decode_address(`to`) as `to`,
        decode_amount(amount1) as amount1,
        decode_amount(amount2) as amount2,
    from (select * from 
        (select p.name, p.value 
            from unnest(payload) as p) pivot(string_agg(value) for name in (
                'sender', 'to', 'amount1', 'amount2'
            )
        )
    ))
);

create temporary function decode_mint(data_payload string, topics array<string>) as
(
    wrap_struct([
        struct("sender" as name, substring(topics[offset(1)], 3) as value),
        struct("amount1" as name, substring(data_payload, 3, 64) as value),
        struct("amount2" as name, substring(data_payload, 67, 64) as value)
    ])
);

create temporary function decode_burn(data_payload string, topics array<string>) as
(
    wrap_struct([
        struct("sender" as name, substring(topics[offset(1)], 3) as value),
        struct("amount1" as name, substring(data_payload, 3, 64) as value),
        struct("amount2" as name, substring(data_payload, 67, 64) as value),
        struct("to" as name, substring(topics[offset(2)], 67, 64) as value)
    ])
); 

select
    *,
    case
        when topics[offset(0)] = '0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f' then decode_mint(data, topics)
        when topics[offset(0)] = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822' then decode_burn(data, topics)
    end as decoded_payload
from `public-data-finance.crypto_ethereum_kovan.logs` 
where
    array_length(topics) > 0
    and (
        (array_length(topics) = 2 and topics[offset(0)] = '0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f')
        or (array_length(topics) = 3 and topics[offset(0)] = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822')
    )

【问题讨论】:

我无法在测试数据上复制问题。我使用了相同级别的嵌套临时函数,所以这似乎与数据有关。您能否提供一些产生问题的示例数据? 您好,感谢您的回复。我编辑了我的初始帖子,以包含对可以重现问题的公共数据集的查询(在帖子的底部)。 代码似乎不完整...我试图修复decode_address函数,但它无法正常运行,显示Array index 2 is out of bounds (overflow)... 是的,我减少了很多查询,因为那里没有我的业务逻辑的详细信息。 这个数组问题是我发布的问题。使用 case 语句执行 UDF 的方式有些奇怪。也许所有都被执行,但它只选择与 WHEN 匹配的执行。 where 子句应该保证我们永远不会有一个与解码器期望的不匹配的主题长度。无论如何,我现在运行多个作业,每个主题行 1 个,它工作正常。当我尝试一次运行所有主题时,数组超出范围。 【参考方案1】:

offset(nn) 正在杀死您的函数,将其更改为 safe_offset(nn) 即可解决问题。此外,to 字段在decode_burn 中通常为空,或者在decode_mint 时为空,因此,至少对于这些数据,它只是导致问题。

以下工作并解决此问题:

create temporary function decode_burn(data_payload string, topics array<string>) as
(
    wrap_struct([
        struct("sender" as name, substring(topics[offset(1)], 3) as value),
        struct("amount1" as name, substring(data_payload, 3, 64) as value),
        struct("amount2" as name, substring(data_payload, 67, 64) as value),
        struct("to" as name, substring(topics[SAFE_OFFSET(2)], 67, 64) as value)
    ])
); 


编辑1:

我已经详细分析了发送到功能和步骤的数据,并且您对过滤器及其工作方式有一定的了解。接缝你已经达到了特定的角落案例(或错误?)

就处理步骤所能理解的而言,BQ 正在优化您的函数,因为它们非常相似,只是其中一个字段中有一个附加字段。

因此,BQ 引擎对这两个数据使用相同的优化函数,当输入为 topics[OFFSET(0)] = '0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f' 行的数据时会导致异常

当这种情况发生时:

使用safe_offset 仍然是一个不错的电话; 或者,只创建一个函数并对其使用条件,在这种情况下,查询将被正确处理:
CREATE TEMPORARY FUNCTION decode(data_payload string, topics ARRAY<string>) AS ( 
  wrap_struct([ 
    STRUCT("sender" AS name,
    SUBSTRING(topics[OFFSET(1)], 3) AS value),
    struct("amount1" as name, substring(data_payload, 3, 64) as value),
    struct("amount2" as name, substring(data_payload, 67, 64) as value),
    if(topics[OFFSET(0)] = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822',
        STRUCT("to" AS name, topics[OFFSET(2)] AS value),null )])
      
);

select *, decode(DATA,topics) ...
同时,您可以在issue tracker 上打开一个案例

【讨论】:

我不久前通过将 1 个单一作业更改为许多较小的作业来修复它,每个作业都处理一个案例。我会看看这是否有效。 我很好奇为什么会有用。你能详细说明一下offset与safe_offset吗?考虑到我将它放在基本查询中的条件,我不明白如何使用超出范围的偏移量调用偏移量? 在位置 (2) 中发送到函数 offsef 的某些数据超出范围,您向您展示的故障排除。函数 safe_offset 将与函数 offset 做同样的事情,但如果访问超出范围,它将返回一个空值而不是错误。 Here is the documentation参考 我输入的条件是:(array_length(topics) = 2 and topics[offset(0)] = '0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f')(array_length(topics) = 3 and topics[offset(0)] = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822') 我不明白我们如何进入使用简短主题数组解码后者的函数。任何符合条件进入解码器的主题也应该有正确数量的主题。 嗨@sprw121,你是对的,它不应该发生。我在回复中添加了有关我所做的最后一次分析的更多信息。希望对您的理解有所帮助。

以上是关于在 UDF 中使用大小写错误调用函数的主要内容,如果未能解决你的问题,请参考以下文章

调用 UDF 时出现“此函数不带参数”错误

从 ms 访问调用标量 UDF 函数

在另一个 UDF 中调用 UDF

在Spark中如何使用UDO作为参数调用UDF以避免二进制错误

mysql udf函数怎么调用

在 Java 中调用 SQL Server 用户定义函数 (UDF)