Spring数据Cassandra可以与动态模​​型和键空间一起使用吗

Posted

技术标签:

【中文标题】Spring数据Cassandra可以与动态模​​型和键空间一起使用吗【英文标题】:Can Spring data Cassandra work with Dynamic model and Keyspaces 【发布时间】:2020-03-09 20:06:59 【问题描述】:

我需要构建一个应用程序,从 Kafka 获取数据,将数据持久化到一些数据库中,并将转换后的数据发送到另一个 Kafka 主题。

就我而言,我需要解析从 Kafka 收到的消息。数据将来自不同的设备和提要,因此它们将具有不同的架构。所以我需要能够在运行时使用消息元数据创建键空间,并在运行时使用接收到的模式插入该键空间。

我不知道 Spring data Cassandra 是否能够做到这一点。像某种动态模型而不是固定模式,这样我就可以使用包含键值对的映射,而不是固定的域类。

谢谢

【问题讨论】:

你不能界定将数据注入主题的设备类型吗?毕竟,在保存到 Cassandra 之前,您需要知道数据的架构,尤其是要知道读取 Cassandra 数据时哪个是正确的架构,您必须定义分区键和 Cassandra 需要具有线性基数读取的其他内容。 我在考虑以下问题,如果您有一个包含可以注入数据的不同类型设备的注册表,这意味着您可以保存为 json 格式或您想要的方案,然后有了这个方案,想想在 Cassandra 中哪个是最好的读写方式。 当然,如果你真的有这么多不同类型(数十、数百或数千)的设备将使用不同的 json 方案将数据注入 Cassandra,那么,是的,尝试制作原型并测量性能,看看是什么。 【参考方案1】:

我不确定你在问什么,但你可以定义持有 Map<String, Object> 类型的实体/表的属性/字段,这样你就可以保留从 Kafka 接收到的任何数据结构。

如果这对您有用,请告诉我,我将给出一个完整的示例如何设置它。

编辑

动态定义键空间的最佳方法是使用带有显式键空间的CassandraTemplate/ReactiveCassandraTemplate 查询:

@Autowired
private CassandraTemplate cassandraTemplate;

public List<SomeClass> findByKeySpaceAndPartition(String keyspace, String id) 
  return cassandraTemplate.select(
      select()
          .from(keyspace, "table_name")
          .where(eq("id", id)),
           SomeClass.class);
 

为属性转换和映射定义一个实体:

@Table("some_class")
public class SomeClass 

    @PrimaryKey
    private SomeClassPk pk;

    @CassandraType(type = DataType.Name.TEXT)
    private Map<String, Object> parameters;

然后定义转换器(一个用于写入,一个用于读取):

public class MapToStringConverter implements Converter<Map<String, Object>, String> 

    private final ObjectMapper objectMapper;

    @Override
    public String convert(@Nonnull Map<String, Object> source) 
        try 
            // do flattening here or whatever you need to do then return it stringified
            return objectMapper.writeValueAsString(source);
         catch (JsonProcessingException e) 
            log.error("Error occurred while serializing map to JSON: ", source, e);
        
        return "";
    



public class StringToMapConverter implements Converter<String, Map<String, Object>> 

    private final ObjectMapper objectMapper;

    @Override
    public Map<String, Object> convert(@Nonnull String json) 
        try 
            return objectMapper.readValue(json, new TypeReference<Map<String, Object>>() );
         catch (IOException e) 
            log.error("Problem while parsing JSON: ", json, e);
        
        return new HashMap<>();
    


那么剩下的唯一事情就是在你的 Cassandra 配置中连接转换器:

@Override
public CustomConversions customConversions() 
    List<Converter<?, ?>> converters = new ArrayList<>();
    converters.add(new StringToMapConverter(objectMapper));
    converters.add(new MapToStringConverter(objectMapper));
    return new CassandraCustomConversions(converters);

您应该在扩展 AbstractReactiveCassandraConfigurationAbstractCassandraConfiguration 的配置类中添加此代码,具体取决于您使用的是同步还是反应式 Cassandra 驱动程序

【讨论】:

所以基本上我需要一个表级多租户应用程序,我不知道有多少租户,以及架构是什么样的。例如,我收到一条来自 Kafka 的消息,格式为 tenantId: "device1", metadata1: d1, metadata2: d2, data: names:["column1", column2], type::["system.string", "system.sring"], values:["value1", "value2"] 如果不存在,我将创建一个名为“tenandId”的表名,并插入带有column1和comun2的行。 @BuggyBug 应该可以工作,但您应该注意 Cassandra 的限制。使用不是表的分区键的条件来查询表几乎是不可能的,因此没有花哨的复杂 where 条件。因此,对于数据分析和 BI,您需要将 Cassandra 中的数据倒入其他基础或使用其他技术。它主要用于存储大量数据而不会丢失数据(多节点、无主架构等)。 我知道我需要分区键和集群键。它将使用元数据生成,column1 和 column2 只是该复合键下的有效负载。我的问题是,Spring data cassandra 是否能够执行诸如定义模型类之类的操作,例如具有 primary-key, clustering-key, Map 属性的类,并且 column1 和 column2 将在 Map,框架插入到 (partition-key, clustering-key),column1,column2,...。这些列是扁平的,而不是在 Map 对象中,因为其他应用程序(不是我实现的)只会使用扁平数据。 另外,如果您有大量的键空间,性能可能会出现问题,请参阅:groups.google.com/a/lists.datastax.com/forum/#!searchin/… - 此线程中的解决方案是使用具有完全限定名称的 CQL 查询,但这可能会否定 Spring Cassandra 的大部分便利功能。

以上是关于Spring数据Cassandra可以与动态模​​型和键空间一起使用吗的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 和 Spring Data with Cassandra:在数据库连接失败时继续

Spring Boot 1.4:Spring Data Cassandra 1.4.2 与 Cassandra 3.0 不兼容?

Spring Boot 1.5.x with Spring Data Cassandra 1.5.x 与 Cassandra 2.1 不兼容

Spring Data Cassandra如何设置有限的连接重试次数?

如何从Cassandra DB获取/导出所有数据

具有响应式Cassandra 的Spring Data