将 Apache Cassandra 与 Apache Ignite 集成

Posted

技术标签:

【中文标题】将 Apache Cassandra 与 Apache Ignite 集成【英文标题】:Integrating Apache Cassandra with Apache Ignite 【发布时间】:2018-10-04 09:16:19 【问题描述】:

我正在尝试将 Apache Ignite 与 Apache Cassandra(3.11.2) 集成,因为我想使用 Ignite 缓存现有 Cassandra 数据库中的数据。

浏览了网上的资源,到现在我做了以下工作:

    已下载Apache Ignite。 将“libs/optional/”中存在的所有文件夹复制到“libs/”(我不知道 Cassandra 需要哪些文件夹)。 在 config 文件夹中创建了 3 个 xml,即“cassandra-config.xml”、“connection-settings.xml”和“persistance-settings.xml”。目前我为 Cassandra 和 Ignite 使用相同的节点 (172.16.129.68)。

cassandra-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- Cassandra connection settings -->
<import resource="connection-settings.xml" />

<!-- Persistence settings for 'cache1' -->
<bean id="cache1_persistence_settings" class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings">
        <constructor-arg type="org.springframework.core.io.Resource" value="file:/home/cass/apache_ignite/apache-ignite-fabric-2.4.0-bin/config/persistance-settings.xml" />
</bean>
<!-- Ignite configuration -->
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
    <property name="cacheConfiguration">
        <list>
            <!-- Configuring persistence for "cache1" cache -->
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <property name="name" value="cache1"/>
                <property name="readThrough" value="true"/>
                <property name="writeThrough" value="true"/>
                <property name="writeBehindEnabled" value="true"/>
                <property name="cacheStoreFactory">
                    <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
                        <property name="dataSourceBean" value="cassandraAdminDataSource"/>
                        <property name="persistenceSettingsBean" value="cache1_persistence_settings"/>
                    </bean>
                </property>
            </bean>
        </list>
    </property>
    <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
    <property name="discoverySpi">
        <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
            <property name="ipFinder">
                <!--
                    Ignite provides several options for automatic discovery that can be used
                    instead os static IP based discovery. For information on all options refer
                    to our documentation: http://apacheignite.readme.io/docs/cluster-config
                -->
                <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
                <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                    <property name="addresses">
                        <list>
                            <!-- In distributed environment, replace with actual host IP address. -->
                            <value>172.16.129.68:47500..47509</value>
                        </list>
                    </property>
                </bean>
            </property>
        </bean>
    </property>
</bean>

connection-settings.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="loadBalancingPolicy" class="com.datastax.driver.core.policies.TokenAwarePolicy">
    <constructor-arg type="com.datastax.driver.core.policies.LoadBalancingPolicy">
        <bean class="com.datastax.driver.core.policies.RoundRobinPolicy"/>
    </constructor-arg>
</bean>

<bean id="cassandraAdminDataSource" class="org.apache.ignite.cache.store.cassandra.datasource.DataSource">
    <property name="port" value="9042"/>
    <property name="contactPoints" value="172.16.129.68"/>
    <property name="readConsistency" value="ONE"/>
    <property name="writeConsistency" value="ONE"/>
    <property name="loadBalancingPolicy" ref="loadBalancingPolicy"/>
</bean>

persistance-settings.xml

<persistence keyspace="test" table="epc_table">
    <keyPersistence class="java.lang.String" strategy="PRIMITIVE" column="imsi"/>
    <valuePersistence strategy="BLOB"/>
</persistence>

    我运行以下命令从 bin 文件夹启动 Ignite。

    ignite.sh ../config/cassandra-config.xml

现在,我想通过 sqlline 查看 cassandra 表。我尝试了以下方法:

./sqlline.sh -u jdbc:cassandra://172.16.129.68:9042/test //(test是keyspace的名字)

我得到以下输出:

No known driver to handle "jdbc:cassandra://172.16.129.68:9042/test". Searching for known drivers...
java.lang.NullPointerException
sqlline version 1.3.0
0: jdbc:cassandra://172.16.129.68:9042/test>

我也试过了:

./sqlline.sh -u jdbc:ignite:thin://172.16.129.68

但是当我使用“!tables”时,我看不到任何表格。

究竟缺少了什么?如何使用 sqlline 访问/修改 Cassandra 中存在的表?

操作系统:RHEL 6.5

【问题讨论】:

【参考方案1】:

Apache Ignite 是一个键值对数据库,默认情况下没有创建您可以使用 JDBC 连接器查看的表。 CacheStore 是一种将 Ignite 与外部数据库或任何其他存储集成的方法,它将数据作为键值对加载。

在您的配置中,您说 Ignite 要在 Cassandra 中存储和加载条目,但 Ignite 仍然不知道条目结构(顺便说一句,Ignite 真的不在乎放入了哪些对象)。

为了能够列出表并对其进行查询,您需要创建表。为此,您需要在 /lib 目录中进行 ignite-indexing 并设置 QueryEntity 或 indexed types 如果您已注释 POJO。以下是此类配置的示例:

<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="mycache"/>
<!-- Configure query entities -->
<property name="queryEntities">
    <list>
        <bean class="org.apache.ignite.cache.QueryEntity">
            <property name="keyType" value="java.lang.Long"/>
            <property name="valueType" value="org.apache.ignite.examples.Person"/>

            <property name="fields">
                <map>
                    <entry key="id" value="java.lang.Long"/>
                    <entry key="orgId" value="java.lang.Long"/>
                    <entry key="firstName" value="java.lang.String"/>
                    <entry key="lastName" value="java.lang.String"/>
                    <entry key="resume" value="java.lang.String"/>
                    <entry key="salary" value="java.lang.Double"/>
                </map>
            </property>

            <property name="indexes">
                <list>
                    <bean class="org.apache.ignite.cache.QueryIndex">
                        <constructor-arg value="id"/>
                    </bean>
                    <bean class="org.apache.ignite.cache.QueryIndex">
                        <constructor-arg value="orgId"/>
                    </bean>
                    <bean class="org.apache.ignite.cache.QueryIndex">
                        <constructor-arg value="salary"/>
                    </bean>
                </list>
            </property>
        </bean>
    </list>
</property>

如果您进行了配置,您将能够通过SQLine 登记和查询该表。 (请注意,您无法查询未加载到 Ignite 中的数据。要加载它们,您可以使用 IgniteCache.get() 启用 readThrough 选项或 IgniteCache.loadCache() 从 Cassandra 表中加载所有内容)。

要使用 JDBC 查询 Cassandra,您需要一个 JDBC 驱动程序,例如DBSchema。

【讨论】:

persistance-settings.xml(在问题中)怎么样?我基本上已经在那里描述了我的表的模式。那么它是用来做什么的呢?另外,我应该为您提供的代码创建一个新的 xml,还是应该将其包含在现有的 xml 中? 另外,究竟如何使用“IgniteCache.get() 启用 readThrough 选项或 IgniteCache.loadCache() 从 Cassandra 表加载所有内容)”?我必须为此编写一个java程序还是需要修改一个xml? 持久性设置不依赖于 Ignite 表,它说明了 Ignite 应该如何序列化/反序列化键和值以从 Cassandra 存储/读取。您可以保持原样,但 SQL 查询是另一个需要额外配置的模块。要调用 loadCache(),您需要编写 java 代码并使用 Ignite 的 java API,但 get() 您可以使用 REST API apacheignite.readme.io/docs/rest-api

以上是关于将 Apache Cassandra 与 Apache Ignite 集成的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam - 将 BigQuery TableRow 写入 Cassandra

将 Apache Cassandra 连接到 DataGrip

使用 Apache Sqoop 将数据从 Mongo/Cassandra 导出到 HDFS

Apache Cassandra 时区问题

无法将地图列表插入到 Apache Cassandra

CSV 到 RDD 到 Apache Spark 中的 Cassandra 存储