在 Siddhi 中相互比较长度批次的数据

Posted

技术标签:

【中文标题】在 Siddhi 中相互比较长度批次的数据【英文标题】:Compare length batches of data with each other in Siddhi 【发布时间】:2019-10-15 13:06:42 【问题描述】:

我在下面定义了输入流。 Datetime 字符串类似于 2010-09-01 06:59:00.000,结果是 double,例如 157,382,而 UnixDateTime 是 long 类型,例如 1283324340111

define stream HStream(ID int, DateTime String, Result double, UnixDateTime long);

我想为 100 个事件创建长度批次,显示 result 列的平均值,并且我想将这些批次相互比较。我想对接下来的 5 个批次(每个批次都包含 100 个事件)进行滑动比较。所以我想比较第一批(0-100 事件)和第二批(101-200),直到第六批(501-600)。我希望第二批比较到第七批。我想通过比较实现的是,当 4 个或更多(来自 5 个)批次的批次平均结果全部大于或全部小于 1(与原始批次的平均结果相比)时,我想记录有关原始批次的信息。

我的代码如下。问题我不知道确切的语法。我查看了 WSO2 和 Siddhi 的教程和文档,但我无法解决问题。

@info(name = 'MovingAverageQuery')
from every e1=HStream, e2=HStream[e1.avg(Result) <= avg(Result))+, e2=HStream[e2[last].avg(Result) <= avg(Result)]
select ID, DateTime, Result, 
avg(Result), UnixDateTime
output last every 100 events
insert into OutputStream;

@sink(type='log', prefix='LOGGER')
define stream OutputStream(Nr ID, DateTime String, Result double, Avg double, UnixDateTime long);

【问题讨论】:

【参考方案1】:

您必须对需求使用两个查询,一个是计算平均值(Average100Query),另一个是比较平均值(IdentifyIncreaseingTrend)。

@App:name("AverageSequence")
@App:description("Identify the average increase trend")

define stream HStream(ID int, DateTime String, Result double, UnixDateTime long);

@sink(type='log', prefix='LOGGER')
define stream OutputStream(ID int, DateTime String, avgResult double, UnixDateTime long);

@info(name = 'Average100Query')
from HStream#window.lengthBatch(100)
select ID, DateTime, avg(Result) as avgResult, UnixDateTime 
insert into AverageStream;

@info(name='IdentifyIncreaseingTrend')
from every e1=AverageStream, e2=AverageStream[e2.avgResult >= (e1.avgResult + 1)],  e3=AverageStream[e3.avgResult >= (e2.avgResult + 1)],  e4=AverageStream[e4.avgResult >= (e3.avgResult + 1)], e5=AverageStream[e5.avgResult >= (e4.avgResult + 1)]
select e1.ID, e1.DateTime, e1.avgResult, e1.UnixDateTime 
insert into OutputStream;

我注意到的一些语法问题是在计算完成后,例如 sum(result) 您必须使用 as 关键字将该属性命名为 sum(result) as totalResult 。在序列中,您不能使用平均函数,因为它需要对多个事件执行,但您可以使用重命名的属性totalResult

【讨论】:

感谢您的回答。我已经实现了您的代码,但是当我运行应用程序时出现此错误:AverageSequence.siddhi - Siddhi AppAverageSequence is in faulty state. 我在 Siddhi Tooling 5.1.0 中进行了测试,siddhi.io/en/v5.1/docs/quick-start 它正在运行 在启动应用程序之前,该应用程序可能尚未部署。等到你看到日志,`Siddhi App AverageSequence successfully deploy.`,然后再启动 siddhi 应用 我做错了什么。这不是悉地的错误。谢谢你的帮助。我对你写的查询还有另一个问题。你写的查询只看 avgResult 是否大 1,我是否也可以输入它寻找小 1 的相同查询?还是我必须为此单独查询? 您必须为此添加单独的查询,因为您需要检查所有 5 个连续事件

以上是关于在 Siddhi 中相互比较长度批次的数据的主要内容,如果未能解决你的问题,请参考以下文章

如何在 TensorFlow 中处理具有可变长度序列的批次?

Storm集成Siddhi

WSO2流处理器与mysql数据库集成无法正常工作

Siddhi初探

FLINK SIDDHI ADDON 学习笔记

Siddhi中卡夫卡源初始化的问题