基于Morphia实现MongoDB按小时按天聚合操作

Posted jason1990

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Morphia实现MongoDB按小时按天聚合操作相关的知识,希望对你有一定的参考价值。

MongoDB按照天数或小时聚合

需求

最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图.
实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库中供用户后续查询.
涉及到的技术栈分别为:Spring Boot,MongoDB,Morphia.

数据模型

@Data
@Builder
@Entity(value = "rawDevStatus", noClassnameStored = true)
// 设备状态索引
@Indexes(
        // 设置数据超时时间(TTL,MongoDB根据TTL在后台进行数据删除操作)
        @Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),
        @Index(fields = @Field("userId"), @Field(value = "time", type = IndexType.DESC))
)
public class RawDevStatus 

    @Id
    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    private ObjectId objectId;

    private String userId;

    private Instant time;

    @Embedded("points")
    List<Point> protocolPoints;

    @Data
    @AllArgsConstructor
    public static class Point 
        /**
         * 协议类型
         */
        private Protocol protocol;

        /**
         * 设备总数
         */
        private Integer total;

        /**
         * 设备在线数目
         */
        private Integer onlineNum;

        /**
         * 处于启用状态设备数目
         */
        private Integer enableNum;
    

上述代码是设备状态实体类,其中设备状态数据是按照设备所属协议进行区分的.

@Data
@Builder
@Entity(value = "aggregationDevStatus", noClassnameStored = true)
@Indexes(
        @Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),
        @Index(fields = @Field("userId"), @Field(value = "time", type = IndexType.DESC))
)
public class AggregationDevStatus 

    @Id
    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    private ObjectId objectId;

    /**
     * 用户ID
     */
    private String userId;

    /**
     * 设备总数
     */
    private Double total;

    /**
     * 设备在线数目
     */
    private Double onlineNum;

    /**
     * 处于启用状态设备数目
     */
    private Double enableNum;

    /**
     * 聚合类型(按照小时还是按照天聚合)
     */
    @Property("aggDuration")
    private AggregationDuration aggregationDuration;

    private Instant time;

    /**
     * 动态设置文档过期时间
     */
    private Instant expireAt;

上述代码是期待的聚合结果,其中构建两个索引:(1)超时索引;(2)复合索引,程序会根据用户名以及时间查询设备状态聚合结果.

聚合操作符介绍

聚合操作类似于管道,管道中的每一步操作产生的中间结果作为下一步的输入源,最终输出聚合结果.
此次聚合主要涉及以下操作:

  • $project:指定输出文档中的字段.
  • $unwind:拆分数据中的数组;
  • match:选择要处理的文档数据;
  • group:根据key分组聚合结果.

原始聚合语句

db.getCollection('raw_dev_status').aggregate([
    $match:
        
            time:$gte: ISODate("2019-06-27T00:00:00Z"),
        
    ,
    $unwind: "$points",
    $project:
        
            userId:1,points:1,
            tmp: $dateToString:  format: "%Y:%m:%dT%H:00:00Z", date: "$time"  
        
    ,
    $project:
        
            userId:1,points:1,
            groupTime: $dateFromString:  dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ",  
        
    ,
    $group:
        
            _id:user_id:'$userId', cal_time:'$groupTime',
            devTotal:'$avg':'$points.total',
            onlineTotal:'$avg':'$points.onlineNum',
            enableTotal:'$avg':'$points.enableNum'
        
    ,
])

上述代码是按小时聚合数据,以下来逐步介绍处理思路:

(1) $match

根据小时聚合数据,因为只需要获取近24小时的聚合结果,所以对数据进行初步筛选.

(2) $unwind

raw_dev_status中的设备状态是按照协议区分的数组,因此需要对其进行展开,以便下一步进行筛选;

(3) $project

    $project:
        
            userId:1,points:1,
            tmp: $dateToString:  format: "%Y:%m:%dT%H:00:00Z", date: "$time"  
        
    

选择需要输出的数据,分别为:userId,points以及tmp.
需要注意,为了按照时间聚合,对$time属性进行操作,提取%Y:%m:%dT%H时信息至$tmp作为下一步的聚合依据.

如果需要按天聚合,则format数据可修改为:%Y:%m:%dT00:00:00Z即可满足要求.

(4) $project

    $project:
        
            userId:1,points:1,
            groupTime: $dateFromString:  dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ",  
        
    

因为上一步project操作中,tmp为字符串数据,最终的聚合结果需要时间戳(主要懒,不想在程序中进行转换操作).
因此,此处对$tmp进行操作,转换为时间类型数据,即groupTime.

(5) $group

对聚合结果进行分类操作,并生成最终输出结果.

    $group:
        
            # 根据_id进行分组操作,依据是`user_id`以及`$groupTime`
            _id:user_id:'$userId', cal_time:'$groupTime',
            # 求设备总数平均值
            devTotal:'$avg':'$points.total',
            # 求设备在线数平均值
            onlineTotal:'$avg':'$points.onlineNum',
            # ...
            enableTotal:'$avg':'$points.enableNum'
        
    

代码编写

此处ODM选择Morphia,亦可以使用MongoTemplate,原理类似.

    /**
     * 创建聚合条件
     *
     * @param pastTime     过去时间段
     * @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)
     * @return 聚合条件
     */
    private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) 
        Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class);
        return datastore.createAggregation(RawDevStatus.class)
                .match(query.field("time").greaterThanOrEq(pastTime))
                .unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))
                .match(query.field("points.protocol").equal("ALL"))
                .project(Projection.projection("userId"),
                        Projection.projection("points"),
                        Projection.projection("convertTime",
                                Projection.expression("$dateToString",
                                        new BasicDBObject("format", dateToString)
                                                .append("date", "$time"))
                        )
                )
                .project(Projection.projection("userId"),
                        Projection.projection("points"),
                        Projection.projection("convertTime",
                                Projection.expression("$dateFromString",
                                        new BasicDBObject("format", stringToDate)
                                                .append("dateString", "$convertTime"))
                        )
                )
                .group(
                        Group.id(Group.grouping("userId"), Group.grouping("convertTime")),
                        Group.grouping("total", Group.average("points.total")),
                        Group.grouping("onlineNum", Group.average("points.onlineNum")),
                        Group.grouping("enableNum", Group.average("points.enableNum"))
                );
    

    /**
     * 获取聚合结果
     *
     * @param pipeline 聚合条件
     * @return 聚合结果
     */
    private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) 
        List<AggregationMidDevStatus> statuses = new ArrayList<>();
        Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate(
                AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());
        while (resultIterator.hasNext()) 
            statuses.add(resultIterator.next());
        
        return statuses;
    

    //......................................................................................
    // 获取聚合结果(省略若干代码)
    AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);
    List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);
    if (CollectionUtils.isEmpty(midStatuses)) 
        log.warn("Can not get dev status aggregation result.");
        return;
    

PS:
如果您觉得我的文章对您有帮助,可以扫码领取下红包或扫码支持(随意多少,一分钱都是爱),谢谢!

支付宝红包 支付宝 微信
技术图片 技术图片 技术图片

以上是关于基于Morphia实现MongoDB按小时按天聚合操作的主要内容,如果未能解决你的问题,请参考以下文章

如何按小时聚合 MongoDB 文档?

Mongodb 聚合 - 今天按小时分组

如何在主表上加入按天聚合的数据

Morphia - mongodb之ORM框架

如何按天聚合熊猫数据框

Django如何按天 小时等查询统计?