如何在 Jython Evaluator 中获取 StreamSets 记录字段类型

Posted

技术标签:

【中文标题】如何在 Jython Evaluator 中获取 StreamSets 记录字段类型【英文标题】:How to get StreamSets Record Fields Type inside Jython Evaluator 【发布时间】:2019-07-23 16:42:36 【问题描述】:

我有一个 StreamSets 管道,我使用 JDBC 组件作为源从远程 SQL Server 数据库中读取数据,并将数据放入 Hive 和 Kudu Data Lake。

我在使用 Binary Columns 类型时遇到了一些问题,因为 Impala 中不支持 Binary 类型,我使用它来访问 Hive 和 Kudu。

我决定将 Binary 类型的列(在管道中以 Byte_Array 类型流动)转换为 String 并像这样插入。

我尝试使用字段类型转换器元素将所有 Byte_Array 类型转换为字符串,但没有成功。所以我使用了一个 Jython 组件将所有 arr.arr 类型转换为 String。它工作正常,直到我在该字段上获得 Null 值,因此 Jython 类型为 None.type 并且我无法检测到 Byte_Array 类型并且无法将其转换为 String。所以我无法将它插入 Kudu。

任何帮助?或者针对我面临的问题有什么建议的解决方法?

【问题讨论】:

【参考方案1】:

您需要使用sdcFunctions.getFieldNull() 来测试该字段是否为NULL_BYTE_ARRAY。例如:

import array

def convert(item):
  return ':-)'

def is_byte_array(record, k, v):
  # getFieldNull expect a field path, so we need to prepend the '/'
  return (sdcFunctions.getFieldNull(record, '/'+k) == NULL_BYTE_ARRAY 
          or (type(v) == array.array and v.typecode == 'b'))

for record in records:
  try:
    record.value = k: convert(v) if is_byte_array(record, k, v) else v 
                    for k, v in record.value.items()
    output.write(record)

  except Exception as e:
    error.write(record, str(e))

【讨论】:

感谢您抽出宝贵时间回答这个问题【参考方案2】:

所以这是我的最终解决方案:

您可以使用以下逻辑通过使用 NULL_CONSTANTS 来检测 Jython 组件内的任何 StreamSets 类型:

NULL_BOOLEAN, NULL_CHAR, NULL_BYTE, NULL_SHORT, NULL_INTEGER, NULL_LONG, 
NULL_FLOAT, NULL_DOUBLE, NULL_DATE, NULL_DATETIME, NULL_TIME, NULL_DECIMAL, 
NULL_BYTE_ARRAY, NULL_STRING, NULL_LIST, NULL_MAP

想法是将字段的值保存在临时变量中,将字段的值设置为 None 并使用函数 sdcFunctions.getFieldNull 通过比较它来了解 StreamSets 类型NULL_CONSTANTS 之一。

导入 binascii def toByteArrayToHexString(值): 如果值为无: 返回 NULL_STRING 值 = '0x'+binascii.hexlify(值).upper() 返回值 记录在案: 尝试: 对于 colName,record.value.items() 中的值: temp = 记录值 [colName] 记录值 [colName] = 无 如果 sdcFunctions.getFieldNull(record,'/'+colName) 为 NULL_BYTE_ARRAY: temp = toByteArrayToHexString(temp) 记录值[colName] = temp output.write(记录) 例外为 e error.write(记录,str(e))

限制: 上面的代码只有在 Date 类型有值时(当它不为 NULL 时)才将 Date 类型转换为 Datetime 类型

【讨论】:

很好奇您为什么不使用(type(v) == array.array and v.typecode == 'b') 测试来查看该值是否为字节数组? 因为我想在不涉及 Jython 转换的情况下获得 StreamSets 中使用的确切类型。如果您已经拥有 StreamSets 类型,我认为 Jython 类型是多余的信息。另外,我不确定 StreamSets 到 Jython 类型的转换是否是一对一的映射。你怎么看?另一件事,在“record.value[colName] = None”这一行之后,Jython 类型将为 None.type,我不想检查它。

以上是关于如何在 Jython Evaluator 中获取 StreamSets 记录字段类型的主要内容,如果未能解决你的问题,请参考以下文章

自定义插值器与Evaluator

如何在 Jython 中安装各种 Python 库?

如何在 Jython 中安装 ODBC 库

如何使用在 jython 上运行的 django 创建图像缩略图?

如何在 jmeter 中将其他软件包安装到 jython jar?

如何使用训练有素的模型预测 Cupy 数组?