通过 talend 从 oracle 加载到 greenplum 时的数据不能很好地处理 CLOB 列

Posted

技术标签:

【中文标题】通过 talend 从 oracle 加载到 greenplum 时的数据不能很好地处理 CLOB 列【英文标题】:Data when loaded from oracle to greenplum via talend is not handling CLOB columns well 【发布时间】:2016-09-02 10:05:02 【问题描述】:

我正在使用 tOracleConnection 连接到 oracle 源并使用 tOracleInput 来获取数据,然后使用 tMap 然后将数据写入到使用 tfileoutputdelimited 的文件(自定义组件)

我正在使用 tGreenPlumLoad 从文件将数据加载到 Greenplum。

所有数据都被很好地加载,只有一列具有 CLOB 数据。 需要加载的数据

(CLOB)2*C111*SUB_TYPE_CD2*N118*Tool Investigation2*O10

但列中的数据说

oracle.sql.CLOB@6513543e

对于少数记录,此问题正在发生,对于其他记录,CLOB 数据加载正常。

我希望将 CLOB 数据加载到所有记录的列中。

【问题讨论】:

我在使用动态类型时看到过这种情况,因为我不得不修改 talend 附带的代码。我已经向他们提出了同样的要求。您使用的是哪个 Talend 版本? @BalazsGunics 我使用的是 Talend Studio 企业版 5.6.1。 【参考方案1】:

Talend 5.6.1 有很多错误,这是其中之一。当我尝试使用动态类型时,我也遇到了这个问题。

找到这个文件:

Talend-5.6.1\plugins\org.talend.desinger.routines.tisprovider_5.6.1.20141207_1530\resources\java\routines\system\DynamicUtils.java

修改如下例程:

    public static void readColumnsFromDatabase(Dynamic column, java.sql.ResultSet rs, int fixedColumnCount) throws Exception 
    column.clearColumnValues();
    for (int i = 0; i < column.getColumnCount(); i++) 
        DynamicMetadata dcm = column.getColumnMetadata(i);
        if ("id_String".equals(dcm.getType()) && !"BLOB".equals(dcm.getDbType()) ) 
            column.addColumnValue(rs.getString(fixedColumnCount + i + 1));
         else if ("id_Date".equals(dcm.getType())) 
            if (DBMSConstants.MSSQL.getDBmsId().equalsIgnoreCase(column.getDbmsId())
                    && !(dcm.getDbType().toLowerCase().indexOf("timestamp") < 0)) 
                column.addColumnValue(rs.getString(fixedColumnCount + i + 1));
             else if (DBMSConstants.NETEZZA.getDBmsId().equalsIgnoreCase(column.getDbmsId())
                    && "time".equalsIgnoreCase(dcm.getDbType())) 
                column.addColumnValue(rs.getTime(fixedColumnCount + i + 1));
             else 
                column.addColumnValue(rs.getTimestamp(fixedColumnCount + i + 1));
            
         else if ("id_Integer".equals(dcm.getType()) || "id_Long".equals(dcm.getType()) || "id_Double".equals(dcm.getType())
                || "id_Byte".equals(dcm.getType()) || "id_byte[]".equals(dcm.getType()) || "BLOB".equals(dcm.getDbType())
                || "CLOB".equals(dcm.getDbType()) ) 

            //oracle ...
            if ( "LONG RAW".equals(dcm.getDbType()) ) 
                java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
                byte[] buf = new byte[1024];
                java.io.InputStream in = rs.getBinaryStream(fixedColumnCount + i + 1);
                if (in == null) 
                    column.addColumnValue(null);
                    continue;
                
                int n = 0;
                while ((n=in.read(buf))>=0)
                
                   baos.write(buf, 0, n);
                
                in.close();
                column.addColumnValue(baos.toByteArray());
                continue;
            

            if (rs.getObject(fixedColumnCount + i + 1) == null) 
                column.addColumnValue(null);
                continue;
            
            if ("id_Integer".equals(dcm.getType())) 
                column.addColumnValue(rs.getInt(fixedColumnCount + i + 1));
             else if ("id_Long".equals(dcm.getType())) 
                column.addColumnValue(rs.getLong(fixedColumnCount + i + 1));
             else if ("id_Double".equals(dcm.getType())) 
                column.addColumnValue(rs.getDouble(fixedColumnCount + i + 1));
             else if ("id_Byte".equals(dcm.getType())) 
                column.addColumnValue(rs.getByte(fixedColumnCount + i + 1));
             else if ("id_byte[]".equals(dcm.getType())) 
                       column.addColumnValue(rs.getBytes(fixedColumnCount + i + 1));
                     else if ( "BLOB".equals(dcm.getDbType()) ) 
               column.addColumnValue(rs.getBytes(fixedColumnCount + i + 1));
             else if ( "CLOB".equals(dcm.getDbType()) ) 
               column.addColumnValue(rs.getString(fixedColumnCount + i + 1));
            
         else 
            column.addColumnValue(rs.getObject(fixedColumnCount + i + 1));
        
    

这也包含对 oracle long raw 类型的修复。如果您不使用动态类型,我不完全确定需要做什么,但基于此您应该检查 talend 如何从该列中提取数据。

它应该使用:rs.getString(n) 而不是rs.getObject(n)

询问您的 Talend 代表,因为他们在一年左右的时间里就针对这种行为提供了补丁。

【讨论】:

嗨,我怎么知道我的工作使用的是什么程序?有没有办法找出 talend 如何从列中提取数据? 您可以在此处发布您向 talend 团队报告此错误的链接吗? 我打算添加一个java例程并调用tmap。但是是否可以调整您在 talend 中提到的代码? 我不关注你,我为你提供了例程的位置以及如何修改它。我还能做什么? :) 案例编号 00009467 Oracle CLOB 未映射到 id_String 您必须修改输入组件架构,并将 CLOB 映射到 java String。

以上是关于通过 talend 从 oracle 加载到 greenplum 时的数据不能很好地处理 CLOB 列的主要内容,如果未能解决你的问题,请参考以下文章

记使用talend从oracle抽取数据时,数字变为0的问题

如何使用 talend 中的 tmysql 行将唯一行从数据库加载到数据仓库中

Talend tCreateTable 错误:NullPointerException

使用 Talend 循环遍历 .csv 文件

哪个ETL最适合Hbase

复制到运行良好但不加载数据 gzip 文件