pentaho 数据集成中新插入或更新的行数
Posted
技术标签:
【中文标题】pentaho 数据集成中新插入或更新的行数【英文标题】:Newly inserted or updated row count in pentaho data integration 【发布时间】:2015-10-20 07:18:40 【问题描述】:我是 Pentaho 数据集成的新手;我需要将一个数据库作为 ETL 作业集成到另一个位置。我想计算 ETL 作业期间的插入/更新次数,并将该计数插入另一个表。谁能帮我解决这个问题?
【问题讨论】:
【参考方案1】:我认为没有内置功能可以返回迄今为止 PDI 中插入/更新步骤的受影响的行数。
尽管如此,大多数数据库供应商都能够为您提供从给定操作中获取受影响的行数的能力。
例如,在 PostgreSQL 中,它看起来像这样:
/* Count affected rows from INSERT */
WITH inserted_rows AS (
INSERT INTO ...
VALUES
...
RETURNING 1
)
SELECT count(*) FROM inserted_rows;
/* Count affected rows from UPDATE */
WITH updated_rows AS (
UPDATE ...
SET ...
WHERE ...
RETURNING 1
)
SELECT count(*) FROM updated_rows;
但是,您的目标是在 PDI 作业中执行此操作,因此我建议您尝试达到可以控制 SQL 脚本的程度。
建议: 将源数据保存在目标数据库服务器上的文件中,然后使用它(可能带有批量加载功能)来插入/更新,然后将受影响的行数保存到PDI 变量。请注意,您可能需要在 Job 的范围内使用 SQL 脚本步骤。
编辑:实施是选择设计的问题,因此建议的解决方案是众多解决方案之一。在非常高的层次上,您可以执行以下操作。
转换 I - 从源中提取数据 从源中获取数据,无论是数据库还是其他任何东西 以适合目标数据库结构的方式准备输出 使用文件系统上的文本文件输出步骤保存 CSV 文件 父作业 如果 PDI 服务器与目标 DB 服务器相同: 使用执行 SQL 脚本步骤: 从文件中读取数据并执行 INSERT/UPDATE 将受影响的行数写入表中(理想情况下,该表还可以包含操作的时间戳,以便您跟踪事物) 如果 PDI 服务器与目标 DB 服务器不同: 将源数据文件上传到服务器,例如使用 FTP/SFTP 文件上传步骤 使用执行 SQL 脚本步骤: 从文件中读取数据并执行 INSERT/UPDATE 将受影响的行数写入表中编辑 2:另一个建议的解决方案
正如@user3123116 所建议的那样,您可以使用比较字段 步骤(如果不是您的环境的一部分,请检查市场)。
我看到的唯一缺点是您必须在插入/更新之前查询目标数据库,这当然会降低性能。
最终它可能看起来像这样(请注意,这只是比较和计数部分):
还请注意,您可以拆分源数据流的输入(COPY,而不是 DISTRIBUTE),然后进行插入/更新,但此流必须等待字段比较的流结束对目标数据库的查询,否则可能会得到错误的统计信息。
【讨论】:
【参考方案2】:“比较字段”步骤将 2 个流作为输入进行比较,其输出是 4 个不同的流,分别代表“相同”、“已更改”、“已添加”和“已删除”记录。您可以计算这 4 个,并且然后使用插入/更新处理“已更改”、“已添加”和“已删除”记录。
【讨论】:
正确的@user3123116,也有那个解决方案。有些版本没有开箱即用的这一步,您还应该注意,这需要对目标数据进行额外查询,因此性能会降低。【参考方案3】:您可以通过转换设置中的日志记录选项来执行此操作。请按照以下步骤操作:
-
点击编辑菜单 --> 设置
切换到日志记录标签
从左侧菜单中选择步骤
提供日志连接和日志表名(说StepLog)
选择记录所需的字段(LINES_OUTPUT - 用于插入计数和 LINES_UPDATED - 用于更新计数)
点击SQL按钮并通过点击Execute按钮创建表
现在所有步骤都将记录到日志表中(StepLog),您可以将其用于进一步的操作。
享受
【讨论】:
以上是关于pentaho 数据集成中新插入或更新的行数的主要内容,如果未能解决你的问题,请参考以下文章
Pentaho 数据集成,插入/更新步骤即使在发现一些错误后仍继续