将 PL/SQL ETL 流程翻译成 HiveQL

Posted

技术标签:

【中文标题】将 PL/SQL ETL 流程翻译成 HiveQL【英文标题】:Translate PL/SQL ETL process into HiveQL 【发布时间】:2013-06-25 08:17:35 【问题描述】:

我正在尝试在 HiveQL 中翻译不同的 PL/SQL 脚本。

这些不同的脚本 fts 用于 ETL 过程,以从不同的表导入数据。

我正在尝试使用 HiveQL 在 Hadoop/Hive 中做同样的事情

但是,这个脚本之一给我带来了一些问题。

这是我的 PL/SQL 脚本:

BEGIN

  -- Mise a jour au niveau magasin et famille
  MERGE INTO KPI.THM_CA_RGRP_PRODUITS_JOUR cible USING (
    SELECT
      in_co_societe                                               as CO_SOCIETE,
      in_dt_jour                                                  as DT_JOUR,
      'MAG'                                                       as TYPE_ENTITE,
      m.co_magasin                                                as CODE_ENTITE,
      'FAM'                                                       as TYPE_RGRP_PRODUITS,
      sourceunion.CO_RGRP_PRODUITS                                as CO_RGRP_PRODUITS,
      SUM(MT_CA_NET_TTC)                                          as MT_CA_NET_TTC,
      SUM(MT_OBJ_CA_NET_TTC)                                      as MT_OBJ_CA_NET_TTC,
      SUM(NB_CLIENTS)                                             as NB_CLIENTS,
      SUM(MT_CA_NET_TTC_COMP)                                     as MT_CA_NET_TTC_COMP,
      SUM(MT_OBJ_CA_NET_TTC_COMP)                                 as MT_OBJ_CA_NET_TTC_COMP,
      SUM(NB_CLIENTS_COMP)                                        as NB_CLIENTS_COMP
    FROM (
      -- Mise a jour du CA
      SELECT
        mtransf.id_mag_transfere             as ID_MAGASIN,
        v.co_famille                         as CO_RGRP_PRODUITS,
        sum(v.mt_ca_net_ttc)                 as MT_CA_NET_TTC,
        0                                    as MT_OBJ_CA_NET_TTC,
        0                                    as NB_CLIENTS,
        sum(v.mt_ca_net_ttc * DECODE(mtransf.flag_mag_comp, 'NC', 0, 1))
                                             as MT_CA_NET_TTC_COMP,
        0                                    as MT_OBJ_CA_NET_TTC_COMP,
        0                                    as NB_CLIENTS_COMP
      FROM themis.VENTES_FAM v
      INNER JOIN kpi.kpi_magasin mtransf
      ON  mtransf.co_societe = CASE WHEN v.co_societe = 1 THEN 1 ELSE 2 END
      AND mtransf.id_magasin = v.id_magasin
      WHERE
          mtransf.co_societe    = in_co_societe
      AND v.dt_jour             = in_dt_jour
      GROUP BY
        mtransf.id_mag_transfere,
        v.co_famille
      UNION
      -- Mise a jour des Objectifs ->Non car les objectifs ne sont pas d¿¿finis ¿¿ la famille
      -- Mise a jour du Nombre de clients
      SELECT
        mtransf.id_mag_transfere             as ID_MAGASIN,
        v.co_famille                         as CO_RGRP_PRODUITS,
        0                                    as MT_CA_NET_TTC,
        0                                    as MT_OBJ_CA_NET_TTC,
        sum(nb_client)                       as NB_CLIENTS,
        0                                    as MT_CA_NET_TTC_COMP,
        0                                    as MT_OBJ_CA_NET_TTC_COMP,
        sum(nb_client * DECODE(mtransf.flag_mag_comp, 'NC', 0, 1))
                                             as NB_CLIENTS_COMP
      FROM ods.nb_clients_mag_fam_j v
      INNER JOIN kpi.kpi_magasin mtransf
      ON  mtransf.co_societe = CASE WHEN v.co_societe = 1 THEN 1 ELSE 2 END
      AND mtransf.id_magasin = v.id_magasin
      WHERE
          mtransf.co_societe    = in_co_societe
      AND v.dt_jour             = in_dt_jour
      GROUP BY
        mtransf.id_mag_transfere,
        v.co_famille
    ) sourceunion
    INNER JOIN kpi.kpi_magasin m
    ON  m.co_societe = in_co_societe
    AND m.id_magasin = sourceunion.id_magasin
    GROUP BY
      m.co_magasin,
      sourceunion.CO_RGRP_PRODUITS
  ) source
  ON (
        cible.co_societe  = source.co_societe
    and cible.dt_jour     = source.dt_jour
    and cible.type_entite = source.type_entite
    and cible.code_entite = source.code_entite
    and cible.type_rgrp_produits = source.type_rgrp_produits
    and cible.co_rgrp_produits = source.co_rgrp_produits
  )
 WHEN NOT MATCHED THEN
    INSERT (
      cible.CO_SOCIETE,
      cible.DT_JOUR,
      cible.TYPE_ENTITE,
      cible.CODE_ENTITE,
      cible.TYPE_RGRP_PRODUITS,
      cible.CO_RGRP_PRODUITS,
      cible.MT_CA_NET_TTC,
      cible.MT_OBJ_CA_NET_TTC,
      cible.NB_CLIENTS,
      cible.MT_CA_NET_TTC_COMP,
      cible.MT_OBJ_CA_NET_TTC_COMP,
      cible.NB_CLIENTS_COMP
    )
    VALUES (
      source.CO_SOCIETE,
      source.DT_JOUR,
      source.TYPE_ENTITE,
      source.CODE_ENTITE,
      source.TYPE_RGRP_PRODUITS,
      source.CO_RGRP_PRODUITS,
      source.MT_CA_NET_TTC,
      source.MT_OBJ_CA_NET_TTC,
      source.NB_CLIENTS,
      source.MT_CA_NET_TTC_COMP,
      source.MT_OBJ_CA_NET_TTC_COMP,
      source.NB_CLIENTS_COMP
    );

有没有办法用 Hive 做到这一点?

感谢您的帮助。

【问题讨论】:

我建议你从简单的例子开始。它会帮助你理解问题。您给出的示例很复杂..无法直接回答。 【参考方案1】:

对于这样一个一般性问题,您问题中的 PL/SQL 语句有点太长了。我可能无法跟踪它,但我的理解是您正在将某些查询的结果插入 KPI.THM_CA_RGRP_PRODUITS_JOUR 表中,除非它们与现有行匹配。

Hadoop 不支持附加到现有 HDFS 文件,但您可以告诉 Hive 将某些 HDFS 目录视为分区。

你的表名中的“JOUR”一词让我觉得里面的数据可以自然地按天分区。我建议在您的源系统中执行 E 和 T 步骤,即生成一个带有 SELECT 结果的 CSV 文件。然后将其加载到 HDFS 中。如果您进行每日导出并且可以缩小要在源端插入的记录,您只需告诉 Hive 您正在向表中添加一个新分区。

如果您必须过滤掉表中已经存在的记录,这可能是您使用 MERGE 而不是直接插入的原因,您可能需要编写一个简单的 Map/Reduce 作业来合并新的与现有的数据。

【讨论】:

以上是关于将 PL/SQL ETL 流程翻译成 HiveQL的主要内容,如果未能解决你的问题,请参考以下文章

如何将 nvarchar 从 T-SQL 方言转换为 hiveQL?

PL/SQL 异常翻译

更好的 HiveQL 语法将一列结构分解成一个表,每个结构成员一列?

PL/SQL流程控制

Hive的概念工作流程优劣势架构介绍

PLSQL简介翻译