在 Siddhi 中相互比较长度批次的数据
Posted
技术标签:
【中文标题】在 Siddhi 中相互比较长度批次的数据【英文标题】:Compare length batches of data with each other in Siddhi 【发布时间】:2019-10-15 13:06:42 【问题描述】:我在下面定义了输入流。 Datetime 字符串类似于 2010-09-01 06:59:00.000
,结果是 double,例如 157,382
,而 UnixDateTime 是 long 类型,例如 1283324340111
。
define stream HStream(ID int, DateTime String, Result double, UnixDateTime long);
我想为 100 个事件创建长度批次,显示 result
列的平均值,并且我想将这些批次相互比较。我想对接下来的 5 个批次(每个批次都包含 100 个事件)进行滑动比较。所以我想比较第一批(0-100 事件)和第二批(101-200),直到第六批(501-600)。我希望第二批比较到第七批。我想通过比较实现的是,当 4 个或更多(来自 5 个)批次的批次平均结果全部大于或全部小于 1(与原始批次的平均结果相比)时,我想记录有关原始批次的信息。
我的代码如下。问题我不知道确切的语法。我查看了 WSO2 和 Siddhi 的教程和文档,但我无法解决问题。
@info(name = 'MovingAverageQuery')
from every e1=HStream, e2=HStream[e1.avg(Result) <= avg(Result))+, e2=HStream[e2[last].avg(Result) <= avg(Result)]
select ID, DateTime, Result,
avg(Result), UnixDateTime
output last every 100 events
insert into OutputStream;
@sink(type='log', prefix='LOGGER')
define stream OutputStream(Nr ID, DateTime String, Result double, Avg double, UnixDateTime long);
【问题讨论】:
【参考方案1】:您必须对需求使用两个查询,一个是计算平均值(Average100Query),另一个是比较平均值(IdentifyIncreaseingTrend)。
@App:name("AverageSequence")
@App:description("Identify the average increase trend")
define stream HStream(ID int, DateTime String, Result double, UnixDateTime long);
@sink(type='log', prefix='LOGGER')
define stream OutputStream(ID int, DateTime String, avgResult double, UnixDateTime long);
@info(name = 'Average100Query')
from HStream#window.lengthBatch(100)
select ID, DateTime, avg(Result) as avgResult, UnixDateTime
insert into AverageStream;
@info(name='IdentifyIncreaseingTrend')
from every e1=AverageStream, e2=AverageStream[e2.avgResult >= (e1.avgResult + 1)], e3=AverageStream[e3.avgResult >= (e2.avgResult + 1)], e4=AverageStream[e4.avgResult >= (e3.avgResult + 1)], e5=AverageStream[e5.avgResult >= (e4.avgResult + 1)]
select e1.ID, e1.DateTime, e1.avgResult, e1.UnixDateTime
insert into OutputStream;
我注意到的一些语法问题是在计算完成后,例如 sum(result) 您必须使用 as
关键字将该属性命名为 sum(result) as totalResult
。在序列中,您不能使用平均函数,因为它需要对多个事件执行,但您可以使用重命名的属性totalResult
。
【讨论】:
感谢您的回答。我已经实现了您的代码,但是当我运行应用程序时出现此错误:AverageSequence.siddhi - Siddhi AppAverageSequence is in faulty state.
我在 Siddhi Tooling 5.1.0 中进行了测试,siddhi.io/en/v5.1/docs/quick-start 它正在运行
在启动应用程序之前,该应用程序可能尚未部署。等到你看到日志,`Siddhi App AverageSequence successfully deploy.`,然后再启动 siddhi 应用
我做错了什么。这不是悉地的错误。谢谢你的帮助。我对你写的查询还有另一个问题。你写的查询只看 avgResult 是否大 1,我是否也可以输入它寻找小 1 的相同查询?还是我必须为此单独查询?
您必须为此添加单独的查询,因为您需要检查所有 5 个连续事件以上是关于在 Siddhi 中相互比较长度批次的数据的主要内容,如果未能解决你的问题,请参考以下文章