CassandraIO 无法保存时间戳
Posted
技术标签:
【中文标题】CassandraIO 无法保存时间戳【英文标题】:CassandraIO didn't work for saving Timestamp 【发布时间】:2019-11-03 12:22:50 【问题描述】:这是我的简单代码,它从 pubsub 订阅中读取,并将消息正文保存到带有当前时间戳的 Cassandra 表中。
消息从订阅消费,但没有记录插入到表中,也没有错误消息。
但是,如果我在 TestTable 类中将日期类型“时间戳”更改为 Long,则此代码正在运行并将记录插入到表中。
这里是创建表的脚本。
DROP TABLE IF EXISTS test_table;
CREATE TABLE IF NOT EXISTS test_table(
post_index int,
ingestion_time TIMESTAMP,
body text,
PRIMARY KEY ((post_index))
);
@Table(keyspace = "keyspace_name", name = "table_name",
readConsistency = "LOCAL_QUORUM",
writeConsistency = "LOCAL_QUORUM",
caseSensitiveKeyspace = false,
caseSensitiveTable = false)
class TestTable implements Serializable
@PartitionKey
@Column(name="post_index")
Integer postIndex;
@Column(name="ingestion_time")
Timestamp ingestionTime;
@Column(name = "body")
String body;
public Integer getPostIndex()
return postIndex;
public void setPostIndex(Integer postIndex)
this.postIndex = postIndex;
public Timestamp getIngestionTime()
return ingestionTime;
public void setIngestionTime(Timestamp ingestionTime)
this.ingestionTime = ingestionTime;
public String getBody()
return body;
public void setBody(String body)
this.body = body;
public TestTable(Integer postIndex, Timestamp ingestionTime, String body)
this.body = body;
this.ingestionTime = ingestionTime;
this.postIndex = postIndex;
public TestTable()
this.body = "";
this.ingestionTime = Timestamp.from(Instant.now());
this.postIndex = 0;
public class TestCassandraJobJava
public static void main(String[] args)
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());
PCollection<String> data = pipeline.apply("ReadStrinsFromPubsub",
PubsubIO.readStrings().fromSubscription("projects/project_id/subscriptions/subscription_name"))
.apply("window", Window.into(FixedWindows.of(Duration.standardSeconds(5))))
.apply("CreateMutation", ParDo.of(new DoFn<String, TestTable>()
@ProcessElement
public void processElement(@Element String word, OutputReceiver<TestTable> out)
TestTable t = new TestTable(new Random().nextInt(), java.sql.Timestamp.from(Instant.now()), word);
out.output(t);
)).apply(CassandraIO.<TestTable>write()
.withHosts(Arrays.asList("127.0.0.1"))
.withPort(9042)
.withKeyspace("keyspace")
.withLocalDc("Cassandra")
.withEntity(TestTable.class)
);
pipeline.run().waitUntilFinish();
【问题讨论】:
我们可以查看您尝试处理的消息示例吗?Timestamp
的完整类名是什么?
消息很简单 json "body":"abc"
Timestamp的全类名是java.sql.Timestamp
【参考方案1】:
要使其正常工作,您需要在 Cassandra 的 timestamp
和 java.sql.Timestamp
之间有一个编解码器。默认情况下,在 Java 驱动程序 3.x 中,timestamp
被转换为 java.util.Date
(请参阅mapping),尽管您也可以通过 extra codecs 使用 Joda Time 或 Java 8.x 时间 API。而在 Java 驱动程序 4.x 中,Instant
用于表示时间戳。
java.sql.Timestamp
没有内置编解码器,但实现您自己的编解码器应该不难 - documentation 详细描述了自定义编解码器的创建和使用过程。
【讨论】:
以上是关于CassandraIO 无法保存时间戳的主要内容,如果未能解决你的问题,请参考以下文章
时间戳作为“时间戳”保存到 Firestore 是不是正常?