CassandraIO 无法保存时间戳

Posted

技术标签:

【中文标题】CassandraIO 无法保存时间戳【英文标题】:CassandraIO didn't work for saving Timestamp 【发布时间】:2019-11-03 12:22:50 【问题描述】:

这是我的简单代码,它从 pubsub 订阅中读取,并将消息正文保存到带有当前时间戳的 Cassandra 表中。

消息从订阅消费,但没有记录插入到表中,也没有错误消息。

但是,如果我在 TestTable 类中将日期类型“时间戳”更改为 Long,则此代码正在运行并将记录插入到表中。

这里是创建表的脚本。

DROP TABLE IF EXISTS test_table;

CREATE TABLE IF NOT EXISTS test_table(
    post_index int,
    ingestion_time TIMESTAMP,
    body text,
    PRIMARY KEY ((post_index))
);
@Table(keyspace = "keyspace_name", name = "table_name",
        readConsistency = "LOCAL_QUORUM",
        writeConsistency = "LOCAL_QUORUM",
        caseSensitiveKeyspace = false,
        caseSensitiveTable = false)
class TestTable implements Serializable 
  @PartitionKey
  @Column(name="post_index")
  Integer postIndex;
  @Column(name="ingestion_time")
  Timestamp ingestionTime;
  @Column(name = "body")
  String body;

    public Integer getPostIndex() 
        return postIndex;
    

    public void setPostIndex(Integer postIndex) 
        this.postIndex = postIndex;
    

    public Timestamp getIngestionTime() 
        return ingestionTime;
    

    public void setIngestionTime(Timestamp ingestionTime) 
        this.ingestionTime = ingestionTime;
    

    public String getBody() 
        return body;
    

    public void setBody(String body) 
        this.body = body;
    

    public TestTable(Integer postIndex, Timestamp ingestionTime, String body) 
      this.body = body;
      this.ingestionTime = ingestionTime;
      this.postIndex = postIndex;
  
  public TestTable() 
        this.body = "";
        this.ingestionTime = Timestamp.from(Instant.now());
        this.postIndex = 0;
  


public class TestCassandraJobJava 

    public static void main(String[] args) 

        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());

       PCollection<String> data = pipeline.apply("ReadStrinsFromPubsub",
                PubsubIO.readStrings().fromSubscription("projects/project_id/subscriptions/subscription_name"))
                .apply("window", Window.into(FixedWindows.of(Duration.standardSeconds(5))))
        .apply("CreateMutation", ParDo.of(new DoFn<String, TestTable>() 
            @ProcessElement
            public void processElement(@Element String word, OutputReceiver<TestTable> out) 
                TestTable t = new TestTable(new Random().nextInt(), java.sql.Timestamp.from(Instant.now()), word);
                out.output(t);
            
        )).apply(CassandraIO.<TestTable>write()
                        .withHosts(Arrays.asList("127.0.0.1"))
                        .withPort(9042)
                        .withKeyspace("keyspace")
                        .withLocalDc("Cassandra")
                        .withEntity(TestTable.class)
                );
        pipeline.run().waitUntilFinish();
    

【问题讨论】:

我们可以查看您尝试处理的消息示例吗? Timestamp 的完整类名是什么? 消息很简单 json "body":"abc" Timestamp的全类名是java.sql.Timestamp 【参考方案1】:

要使其正常工作,您需要在 Cassandra 的 timestampjava.sql.Timestamp 之间有一个编解码器。默认情况下,在 Java 驱动程序 3.x 中,timestamp 被转换为 java.util.Date(请参阅mapping),尽管您也可以通过 extra codecs 使用 Joda Time 或 Java 8.x 时间 API。而在 Java 驱动程序 4.x 中,Instant 用于表示时间戳。

java.sql.Timestamp 没有内置编解码器,但实现您自己的编解码器应该不难 - documentation 详细描述了自定义编解码器的创建和使用过程。

【讨论】:

以上是关于CassandraIO 无法保存时间戳的主要内容,如果未能解决你的问题,请参考以下文章

时间戳作为“时间戳”保存到 Firestore 是不是正常?

使用 C# 使用 Interop 在 en excel 文件中添加日期时间戳

bigQuery 不支持毫秒时间戳

CMPedometer 保存步骤时间戳

使用php在mysql表中保存时间戳

往返于 json 的 Pandas 时间戳