如何使用Apache Flink阅读Cassandra?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Apache Flink阅读Cassandra?相关的知识,希望对你有一定的参考价值。

我的flink程序应该对每个输入记录进行Cassandra查找,并根据结果进行一些进一步的处理。

但我目前仍在阅读Cassandra的数据。这是我到目前为止提出的代码片段。

ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
                    .withPort(props.getCassandraPort())
                    .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
                    .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
                    .build();
        }
    };

    for (int i=1; i<5; i++) {
        CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat =
                new CassandraInputFormat<>("select * from test where id=hello" + i, secureCassandraSinkClusterBuilder);
        cassandraInputFormat.configure(null);
        cassandraInputFormat.open(null);
        Tuple2<String, String> out = new Tuple8<>();
        cassandraInputFormat.nextRecord(out);
        System.out.println(out);
    }

但问题是,每次查找需要将近10秒,换句话说,这个for循环需要50秒才能执行。

我如何加快这项操作?另外,还有其他方法可以在Flink查找Cassandra吗?

答案

我想出了一个在使用流数据查询Cassandra时相当快的解决方案。对具有相同问题的人有用。

首先,可以用尽可能少的代码查询Cassandra,

Session session = secureCassandraSinkClusterBuilder.getCluster().connect();
ResultSet resultSet = session.execute("SELECT * FROM TABLE");

但问题是,创建Session是一个非常耗时的操作,应该在每个关键空间完成一次。您创建一次Session并将其重用于所有读取查询。

现在,由于Session不是Java Serializable,它不能作为参数传递给flink运算符,如MapProcessFunction。有几种方法可以解决这个问题,您可以使用RichFunction并在其Open方法中初始化它,或使用Singleton。我将使用第二种解决方案。

在我们创建Session的地方创建一个Singleton类。

public class CassandraSessionSingleton {
    private static CassandraSessionSingleton cassandraSessionSingleton = null;

    public Session session;

    private CassandraSessionSingleton(ClusterBuilder clusterBuilder) {
        Cluster cluster = clusterBuilder.getCluster();
        session = cluster.connect();
    }

    public static CassandraSessionSingleton getInstance(ClusterBuilder clusterBuilder) {
        if (cassandraSessionSingleton == null)
            cassandraSessionSingleton = new CassandraSessionSingleton(clusterBuilder);
        return cassandraSessionSingleton;
    }

}

然后,您可以将此会话用于将来的所有查询。在这里,我使用ProcessFunction作为示例进行查询。

public class SomeProcessFunction implements ProcessFunction <Object, ResultSet> {
    ClusterBuilder secureCassandraSinkClusterBuilder;

    // Constructor
    public SomeProcessFunction (ClusterBuilder secureCassandraSinkClusterBuilder) {
        this.secureCassandraSinkClusterBuilder = secureCassandraSinkClusterBuilder;
    }

    @Override
    public void  ProcessElement (Object obj) throws Exception {
        ResultSet resultSet = CassandraLookUp.cassandraLookUp("SELECT * FROM TEST", secureCassandraSinkClusterBuilder);
        return resultSet;
    }
}

请注意,您可以将ClusterBuilder传递给ProcessFunction,因为它是Serializable。现在我们执行查询的cassandraLookUp方法。

public class CassandraLookUp {
    public static ResultSet cassandraLookUp(String query, ClusterBuilder clusterBuilder) {
        CassandraSessionSingleton cassandraSessionSingleton = CassandraSessionSingleton.getInstance(clusterBuilder);
        Session session = cassandraSessionSingleton.session;
        ResultSet resultSet = session.execute(query);
        return resultSet;
    }
}

单例对象仅在第一次运行查询时创建,之后,同一对象被重用,因此查找没有延迟。

以上是关于如何使用Apache Flink阅读Cassandra?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink源码阅读环境搭建

Apache Flink源码阅读环境搭建

Apache Flink TaskExecutor关闭

2021年大数据Flink(四十六):扩展阅读  异步IO

Flink源码阅读--Checkpoint触发机制

2021年大数据Flink(四十五):​​​​​​扩展阅读 双流Join