用于 TimeUUID 列的 Cassandra Map Reduce

Posted

技术标签:

【中文标题】用于 TimeUUID 列的 Cassandra Map Reduce【英文标题】:Cassandra Map Reduce for TimeUUID columns 【发布时间】:2012-11-11 12:07:24 【问题描述】:

我最近设置了 4 个节点的 Cassandra 集群,用于使用一个包含时间序列数据的列族进行学习。

Key -> column name: timeUUID, column value: csv log line, ttl: 1year,我使用Netflix Astyanax java客户端加载了大约100万条日志行。

我还将 Hadoop 配置为使用 1 个名称节点和 4 个数据节点运行 map-reduce 作业,以对 Cassandra 数据运行一些分析。

互联网上所有可用的示例都使用列名作为 Hadoop 作业配置的 SlicePredicate,其中我有 timeUUID 作为列,我如何有效地将 Cassandra 数据提供给 Hadoop 作业配置器,一次批处理 1000 列。

此测试数据中某些行的列数超过 10000 列,预计实际数据中会更多。


我将我的工作配置为

public int run(String[] arg0) throws Exception 
    Job job = new Job(getConf(), JOB_NAME);
Job.setJarByClass(LogTypeCounterByDate.class);
job.setMapperClass(LogTypeCounterByDateMapper.class);       
job.setReducerClass(LogTypeCounterByDateReducer.class);

job.setInputFormatClass(ColumnFamilyInputFormat.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

ConfigHelper.setRangeBatchSize(getConf(), 1000);


SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]), 
    ByteBuffer.wrap(new byte[0]), true, 1000);

SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(sliceRange);


ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
ConfigHelper.setInputRpcPort(job.getConfiguration(), INPUT_RPC_PORT);
ConfigHelper.setInputInitialAddress(job.getConfiguration(), INPUT_INITIAL_ADRESS);
    ConfigHelper.setInputPartitioner(job.getConfiguration(), INPUT_PARTITIONER);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), slicePredicate);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;

但是我无法理解我是如何定义 Mapper 的,请您提供 Mapper 类的模板。

public static class LogTypeCounterByDateMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable>

    private Text key = null;
    private LongWritable value = null;

    @Override
    protected void setup(Context context)

    

    public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context)
        //String[] lines = columns.;

    

【问题讨论】:

有人可以回复我的问题吗,从过去 2 天开始就对此感到震惊。提前致谢。 【参考方案1】:
ConfigHelper.setRangeBatchSize(getConf(), 1000)
...
SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(TimeUUID.asByteBuffer(startValue), TimeUUID.asByteBuffer(endValue), false, 1000))
ConfigHelper.setInputSlicePredicate(conf, predicate)

【讨论】:

感谢您的回复,抱歉回复延迟。 SlicePredicate 会将获取的列数限制为 1000,还是会获取批量大小为 1000 的所有列? ConfigHelper.setRangeBatchSize(getConf(), 1000) 设置批量大小,而SlicePredicate 限制列数。 我想读取列族中每个键的所有列,但 SlicePredicate 限制在上述示例中每行仅返回 1000 条记录,但列数超过了该限制。我从文档中了解到,如果我将限制设置为高值,那么 Thrift 将在返回给客户端之前将整个结果带入内存。实际上,我想在每批中为相同的行键获取 1000 个列值。 您通常有多少列?你真的遇到内存问题了吗? 我预计 100000-500000 范围内的列在 1 年内,目前我没有达到内存限制,因为示例数据限制为一行最多 1000 列。

以上是关于用于 TimeUUID 列的 Cassandra Map Reduce的主要内容,如果未能解决你的问题,请参考以下文章

Cassandra:使用 DataStax Java 驱动程序选择一系列 TimeUUID

如何在 Pig 中过滤 Cassandra TimeUUID/UUID

如何按数据范围查询 Cassandra 中以 TimeUUID 为第一个组件的复合列?

Datastax C# 驱动程序中的 Cassandra timeuuid

Cassandra,查询主键,跳过聚类列(timeuuid)

TimeUUID 列上的切片查询