如何从 pcollection 将多个值写入红移表

Posted

技术标签:

【中文标题】如何从 pcollection 将多个值写入红移表【英文标题】:How do I write multiple values to a redshift table from a pcollection 【发布时间】:2019-07-16 05:48:53 【问题描述】:

所以我有一个模板,可以将单个字符串作为记录写入红移表。

public static void main(String[] args) throws Exception 
        // Step 1: Create Options
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

        // Step 2: Create Pipeline
        Pipeline pipeline = Pipeline.create(options);

        // Step 3: Create PCollection from array of random words <Strings>
        PCollection<String> collection = pipeline
                .apply(Create.of(Arrays.asList("start", "test", "case", "single", "end")))
                .setCoder(StringUtf8Coder.of());

        // Step 4: Execute transforms on the collection. This transform writes the string value to a table named 'test'
        collection.apply(JdbcIO.<String>write()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                        .create("com.amazon.redshift.jdbc42.Driver", options.getRedshiftUrl())
                        .withUsername(options.getUser()).withPassword(options.getPassword()))
                .withStatement("insert into example_schema.test values (?)")
                .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<String>() 
                    public void setParameters(String element, PreparedStatement query) throws SQLException 
                        query.setString(1, element);
                    
                ));

        pipeline.run().waitUntilFinish();
    

我想调整它以写入由整数、双精度和字符串组成的多个字段。

我认为我的方法存在很多问题,但我觉得我可能会在没有完全理解流程的情况下随机尝试正确的实现

public static void main(String[] args) throws Exception 
        // Step 1: Create Options
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

        String insertQuery = "insert into sample.mytable (item_int, item_string, item_double" +
                "values (?, ?, ?)";

        CustomObj custom_obj = new CustomObj(1, "", 0.5);

        // Step 2: Create Pipeline
        Pipeline pipeline = Pipeline.create(options);

        // Step 3: Create PCollection from array of random words <Strings>
        PCollection<CustomObj> collection = pipeline
                .apply(Create.of());

        // Step 4: Execute transforms on the collection. This transform writes the string value to a table named 'test'
        collection.apply(JdbcIO.<CustomObj>write()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                        .create("com.amazon.redshift.jdbc42.Driver", options.getRedshiftUrl())
                        .withUsername(options.getUser()).withPassword(options.getPassword()))
                .withStatement(insertQuery)
                .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<CustomObj>() 
                    public void setParameters(CustomObj element, PreparedStatement query) throws SQLException 
                        query.setInt(1, element.intVal);
                        query.setString(2, element.stringVal);
                        query.setDouble(3, element.doubleVal);
                    
                ));
        pipeline.run().waitUntilFinish();
    


    public static class CustomObj
    
        private Integer intVal;
        private String stringVal;
        private Double doubleVal;

        public CustomObj (Integer intVal, String stringVal, Double doubleVal)
        
            this.intVal = intVal;
            this.stringVal = stringVal;
            this.doubleVal = doubleVal;
        
    

到目前为止,我了解我需要为我的 PCollection 设置适当的编码器,鉴于我正在使用的对象类型,我不确定。

我也未能正确使用 PreparedStatementSetter,但是当我搜索清楚时,我得到了完全使用不同方法的示例。

我知道我的问题可能有点模糊,但如果我能被引导到一个可以更清楚地说明我上面展示的方法的来源,我将不胜感激。

这产生的输出是

 no suitable method found for of(no arguments)
[ERROR]     method org.apache.beam.sdk.transforms.Create.<T>of(java.lang.Iterable<T>) is not applicable
[ERROR]       (cannot infer type-variable(s) T
[ERROR]         (actual and formal argument lists differ in length))
[ERROR]     method org.apache.beam.sdk.transforms.Create.<T>of(T,T...) is not applicable
[ERROR]       (cannot infer type-variable(s) T
[ERROR]         (actual and formal argument lists differ in length))
[ERROR]     method org.apache.beam.sdk.transforms.Create.<K,V>of(java.util.Map<K,V>) is not applicable
[ERROR]       (cannot infer type-variable(s) K,V
[ERROR]         (actual and formal argument lists differ in length))
[ERROR]
[ERROR] -> [Help 1]

【问题讨论】:

好的,所以从原始模板开始工作,我选择将查询参数组合成一个字符串,然后在拆分该字符串后设置查询参数时适当地解析它们。我确信有更好的方法可以做到这一点,所以我会继续研究它 【参考方案1】:

错误表示编译器未能选择正确的 Create.of() 重载。如果您查看Create 的文档,则没有采用零参数的重载,您必须传递带有非可选第一个参数的可迭代、映射或可变参数。您的意思可能是Create.of(custom_obj),它应该可以正常工作(在这种情况下,它将创建一个包含单个元素的PCollection&lt;CustomObj&gt;)。

语句设置器也应该像你一样工作,这是一个做同样事情的例子:https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java#L479

【讨论】:

以上是关于如何从 pcollection 将多个值写入红移表的主要内容,如果未能解决你的问题,请参考以下文章

Python/SQLAlchemy:如何将巨大的红移表保存到 CSV?

提高效率的红移表设计

从红移表中获取上次更新时间戳

我们可以使用复制命令使用访问密钥和秘密密钥将数据从 S3 加载到红移表中吗(不使用 IAM 角色)

以追加模式写入pyspark中的红移

Spark Redshift:使用火花读取红移表时出错