Azure 流分析:多个 Windows + JOINS

Posted

技术标签:

【中文标题】Azure 流分析:多个 Windows + JOINS【英文标题】:Azure Stream Analytics: Multiple Windows + JOINS 【发布时间】:2018-05-22 22:11:45 【问题描述】:

我的架构:

1 个带有 8 个分区和 2 个 TPU 的 EventHub 1 个流分析作业 基于相同输入的 6 个窗口(从 1mn 到 6mn)

样本数据:

side: 'BUY', ticker: 'MSFT', qty: 1, price: 123, tradeTimestamp: 10000000000
side: 'SELL', ticker: 'MSFT', qty: 1, price: 124, tradeTimestamp:1000000000

EventHub PartitionKeyticker

我想每秒发出以下数据:

(Total quantity bought / Total quantity sold) in the last minute, last 2mn, last 3mn and more

我尝试了什么:

WITH TradesWindow AS (
    SELECT
        windowEnd = System.Timestamp,
        ticker,
        side,
        totalQty = SUM(qty)
    FROM [Trades-Stream] TIMESTAMP BY tradeTimestamp PARTITION BY PartitionId
    GROUP BY ticker, side, PartitionId, HoppingWindow(second, 60, 1)
),
TradesRatio1MN AS (
    SELECT 
        ticker = b.ticker,
        buySellRatio = b.totalQty / s.totalQty
    FROM TradesWindow b /* SHOULD I PARTITION HERE TOO ? */
    JOIN TradesWindow s /* SHOULD I PARTITION HERE TOO ? */
    ON s.ticker = b.ticker AND s.side = 'SELL'
    AND DATEDIFF(second, b, s) BETWEEN 0 AND 1
    WHERE b.side = 'BUY'
)

 /* .... More windows.... */

/* FINAL OUTPUT: Joining all the windows */
SELECT
   buySellRatio1MN = bs1.buySellRatio,
   buySellRatio2MN = bs2.buySellRatio
   /* more windows */
INTO [output]
FROM buySellRatio1MN bs1 /* SHOULD I PARTITION HERE TOO ? */
JOIN buySellRatio2MN bs2 /* SHOULD I PARTITION HERE TOO ? */
ON bs2.ticker = bs1.ticker
AND DATEDIFF(second, bs1, bs2) BETWEEN 0 AND 1

问题:

这需要6个EventHub消费者组(每个只能有5个读者),为什么?我的输入没有 5x6 SELECT 语句,那为什么? 输出似乎不一致(我不知道我的 JOIN 是否正确)。 有时作业根本不输出(可能是一些分区问题?请参阅代码中关于分区的 cmets)

简而言之,有没有更好的方法来实现这一点?我在文档和示例中找不到任何关于拥有多个窗口并加入它们然后仅从 1 个输入加入先前加入的结果的示例。

【问题讨论】:

【参考方案1】:

对于第一个问题,这取决于横向扩展逻辑的内部实现。详情见here。

对于连接的输出,我看不到整个查询,但是如果您将一个 1 分钟窗口的查询与一个 2 分钟窗口和 1 秒时间“缓冲区”的查询连接起来,那么每个2分钟。 UNION 运算符对此会更好。

根据您的示例和目标,我认为使用 UDA(用户定义的聚合)编写此查询有一种更简单的方法。

为此,我将首先定义一个名为“ratio”的 UDA 函数:

function main() 
this.init = function () 
    this.sumSell = 0.0;
    this.sumBuy  = 0.0;


this.accumulate = function (value, timestamp) 
    if (value.side=="BUY") this.sumBuy+=value.qty;
    if (value.side=="SELL") this.sumSell+=value.qty;
   

this.computeResult = function () 
    if(this.sumSell== 0) 
        result = 0;
    
    else 
        result =  this.sumBuy/this.sumSell;
    
    return result;



然后我可以简单地将这个 SQL 查询用于 60 秒的窗口:

SELECT
  windowEnd = System.Timestamp,
  ticker,
  uda.ratio(iothub) as ratio
FROM iothub PARTITION BY PartitionId
GROUP BY ticker, PartitionId, SlidingWindow(second, 60)

【讨论】:

以上是关于Azure 流分析:多个 Windows + JOINS的主要内容,如果未能解决你的问题,请参考以下文章

Azure 流分析:如何使用两个 Azure 机器学习函数

如何在 Microsoft Azure 流分析上从多个设备中分离数据

Azure 流分析过滤多个输入列以输出到 SQL 表,卡在 5 个接收器的限制

Windows IoT、IoT 中心、流分析、Azure SQL、Power BI - 啥是正确的路径? [关闭]

Azure 流分析时间窗口查询

Azure 流分析输出到 azure 函数应用