天蓝色流分析查询太复杂?没有给出正确的输出,现在出错。接下来做啥

Posted

技术标签:

【中文标题】天蓝色流分析查询太复杂?没有给出正确的输出,现在出错。接下来做啥【英文标题】:azure stream analytics query too complex? not giving proper output and now error. what to do next天蓝色流分析查询太复杂?没有给出正确的输出,现在出错。接下来做什么 【发布时间】:2020-08-23 17:02:20 【问题描述】:

我在下面使用的查询。我正在使用事件中心从推送 api 中提取 json 格式的消息。 Json msg 中有一些数组。现在我收到类似“我们无法连接到事件中心分区 [0] 的错误,因为已达到使用者组中每个分区允许的最大接收器数量。确保其他流分析作业或服务总线资源管理器未使用相同的消费者组。以下信息可能有助于识别连接的接收器:超过了消费者组中每个分区允许的接收器的最大数量,即 5"。

我希望输出到 Azure 表存储,只要查询正常工作,它就可以工作。

我在它不是那么复杂之前就让它工作了,但现在我似乎无法将所有需要的结果加入到主要的 serviceProblem 输出中。有时查询运行成功,但有些输出是空白的,尽管不应该是。我应该将查询拆分为更多的 ASA 作业,还是应该通过另一个事件中心推送一些输出以避免错误? 感谢您的帮助。

with OrigQ as (
        select
        *
        from tmfsnowgstm Partition BY PartitionId --INTO 4
        where substring(tmfsnowgstm.eventType,1,14) = 'serviceProblem'
    )
    ,ReaderQuery as (
        select
            PartitionId,
            event.serviceProblem.*
        from OrigQ Partition BY PartitionId --INTO 4
    )
    ,comment as ( 
        SELECT
            'comment' as partitionKey,
            concat(rq.id ,'-',comment.ArrayValue.date,'-',comment.ArrayValue.[@type]) as rowKey,
            rq.PartitionId,
            rq.id as ServiceProblemId,
            comment.ArrayValue.date as date,
            comment.ArrayValue.system as system,
            comment.ArrayValue.author as author,
            comment.ArrayValue.[@type] as type,
            comment.ArrayValue.text as text
        from ReaderQuery rq Partition BY PartitionId --INTO 4
        CROSS APPLY GetArrayElements(rq.comment) AS comment
    )
    ,trackingRecord as (
        select 
            rq.PartitionId,
            rq.id,
            trackingRecord.ArrayValue.extensionInfo        
        from ReaderQuery rq Partition BY PartitionId --INTO 4
        CROSS APPLY GetArrayElements(rq.trackingRecord) AS trackingRecord
    ),
    trackingRecordDtls as (
        SELECT
            trackingRecord.PartitionId,
            trackingRecord.id as id,
            extensionInfo.ArrayValue.name as name,
            extensionInfo.ArrayValue.value as value
        from trackingRecord Partition BY PartitionId --INTO 4
        CROSS APPLY GetArrayElements(extensionInfo) AS extensionInfo
    ),
    relatedParty as (
        SELECT
            rq.PartitionId,
            rq.id as id,
            relatedParty.ArrayValue.role as name,
            relatedParty.ArrayValue.name as value
        from ReaderQuery rq Partition BY PartitionId --INTO 4
        CROSS APPLY GetArrayElements(rq.relatedParty) AS relatedParty
    )
    ,relatedPartyDtls as (
        select
            rq.PartitionId,
            rq.id as id,
            rqp.value as requestedBy,
            cal.value as caller,
            atg.value as assignedToGroup,
            onob.value as ownerNocOpenedBy,
            onc.value as ownerNoc,
            ast.value as assignedTo
        from ReaderQuery rq Partition BY PartitionId --INTO 4
        LEFT JOIN relatedParty rqp ON rqp.name = 'requestedBy' and rqp.id = rq.id AND DATEDIFF(minute,rq,rqp) BETWEEN 0 AND 0 and rq.PartitionId = rqp.PartitionId
        LEFT JOIN relatedParty cal ON cal.name = 'caller' and cal.id = rq.id AND DATEDIFF(minute,rq,cal) BETWEEN 0 AND 0 and rq.PartitionId = cal.PartitionId
        LEFT JOIN relatedParty atg ON atg.name = 'assignedToGroup' and atg.id = rq.id AND DATEDIFF(minute,rq,atg) BETWEEN 0 AND 0 and rq.PartitionId = atg.PartitionId
        LEFT JOIN relatedParty onob ON onob.name = 'ownerNocOpenedBy,openedBy' and onob.id = rq.id AND DATEDIFF(minute,rq,onob) BETWEEN 0 AND 0 and rq.PartitionId = onob.PartitionId
        LEFT JOIN relatedParty ast ON ast.name = 'assignedTo' and ast.id = rq.id AND DATEDIFF(minute,rq,ast) BETWEEN 0 AND 0 and rq.PartitionId = ast.PartitionId
        LEFT JOIN relatedParty onc ON onc.name = 'ownerNoc' and onc.id = rq.id AND DATEDIFF(minute,rq,onc) BETWEEN 0 AND 0 and rq.PartitionId = onc.PartitionId
    )
    ,extensionInfo as (
       SELECT
            rq.PartitionId,
            rq.id as id,
            extensionInfo.ArrayValue.name as name,
            extensionInfo.ArrayValue.value as value
        from ReaderQuery rq Partition BY PartitionId --INTO 4
        CROSS APPLY GetArrayElements(rq.extensionInfo) AS extensionInfo
    )
    ,extensionInfoDtls3 as (
        select
            rq.PartitionId,
            rq.id as id,
            rq.requestedBy,
            rq.caller,
            rq.assignedToGroup,
            rq.ownerNocOpenedBy,
            rq.ownerNoc,
            rq.assignedTo,
            cc.value as closureCode,
            ooi.value as originOfIssue,
            rec.value as resolutionCode,
            rc.value as rootCause,
            src.value as subRootCause
        from relatedPartyDtls rq Partition BY PartitionId --INTO 4
        LEFT JOIN trackingRecordDtls cc ON cc.name = 'closureCode' and cc.id = rq.id AND DATEDIFF(minute,rq,cc) BETWEEN 0 AND 0 and rq.PartitionId = cc.PartitionId
        LEFT JOIN trackingRecordDtls ooi ON ooi.name = 'originOfIssue' and ooi.id = rq.id AND DATEDIFF(minute,rq,ooi) BETWEEN 0 AND 0 and rq.PartitionId = ooi.PartitionId
        LEFT JOIN trackingRecordDtls rec ON rec.name = 'resolutionCode' and rec.id = rq.id AND DATEDIFF(minute,rq,rec) BETWEEN 0 AND 0 and rq.PartitionId = rec.PartitionId
        LEFT JOIN trackingRecordDtls rc ON rc.name = 'rootCause' and rc.id = rq.id AND DATEDIFF(minute,rq,rc) BETWEEN 0 AND 0 and rq.PartitionId = rc.PartitionId
        LEFT JOIN trackingRecordDtls src ON src.name = 'subRootCause' and src.id = rq.id AND DATEDIFF(minute,rq,src) BETWEEN 0 AND 0 and rq.PartitionId = src.PartitionId
    )
    ,extensionInfoDtls1 as (
            select
            rq.PartitionId,
            rq.id as id,
            rq.closureCode,
            rq.originOfIssue,
            rq.resolutionCode,
            rq.rootCause,
            rq.subRootCause,
            rq.requestedBy,
            rq.caller,
            rq.assignedToGroup,
            rq.ownerNocOpenedBy,
            rq.ownerNoc,
            rq.assignedTo,
            wrif.value as weatherRelatedIssueFlag,
            ncf.value as notifyCustomerFlag,
            mdu.value as monitoringDuration,
            cot.value as contactType,
            cpr.value as calculatedPriority
        from extensionInfoDtls3 rq Partition BY PartitionId --INTO 4
        LEFT JOIN extensionInfo wrif ON wrif.name = 'weatherRelatedIssueFlag' and wrif.id = rq.id AND DATEDIFF(minute,rq,wrif) BETWEEN 0 AND 0 and rq.PartitionId = wrif.PartitionId
        LEFT JOIN extensionInfo ncf ON ncf.name = 'notifyCustomerFlag' and ncf.id = rq.id AND DATEDIFF(minute,rq,ncf) BETWEEN 0 AND 0 and rq.PartitionId = ncf.PartitionId
        LEFT JOIN extensionInfo mdu ON mdu.name = 'monitoringDuration' and mdu.id = rq.id AND DATEDIFF(minute,rq,mdu) BETWEEN 0 AND 0 and rq.PartitionId = mdu.PartitionId
        LEFT JOIN extensionInfo cpr ON cpr.name = 'calculatedPriority' and cpr.id = rq.id AND DATEDIFF(minute,rq,cpr) BETWEEN 0 AND 0  and rq.PartitionId = cpr.PartitionId
        LEFT JOIN extensionInfo cot ON cot.name = 'contactType' and cot.id = rq.id AND DATEDIFF(minute,rq,cot) BETWEEN 0 AND 0 and rq.PartitionId = cot.PartitionId
    )
    ,affectedResource as (
    SELECT
        rq.PartitionId,
        'affectedResource' as partitionKey,
        concat(rq.id,'-',case when len(affectedResource.ArrayValue.id) < 3 then affectedResource.ArrayValue.role else affectedResource.ArrayValue.id end) as rowKey,
        'serviceProblem' as sourceType,
        rq.id as sourceTypeId,
        affectedResource.ArrayValue.id as name,
        affectedResource.ArrayValue.name as value,
        affectedResource.ArrayValue.role as role,
        affectedResource.ArrayValue.[@referredType] as type
    from ReaderQuery rq Partition BY PartitionId --INTO 4
    CROSS APPLY GetArrayElements(rq.affectedResource) AS affectedResource
    )
    ,affectedService as (
    SELECT
        rq.PartitionId,
        'affectedService' as partitionKey,
        concat(rq.id,'-',affectedService.ArrayValue.id) as rowKey,
        'serviceProblem' as sourceType,
        rq.id as sourceTypeId,
        affectedService.ArrayValue.id as name,
        affectedService.ArrayValue.name as value,
        'affectedService' as role,
        'Service' as type
    from ReaderQuery rq Partition BY PartitionId --INTO 4
    CROSS APPLY GetArrayElements(rq.affectedService) AS affectedService
    )
    ,relatedObject as (
        SELECT
            rq.PartitionId,
            rq.id as id,
            relatedObject.ArrayValue.[@referredType] as name,
            relatedObject.ArrayValue.name  as value
        from tmfsnowgstm rq Partition BY PartitionId --INTO 2
        CROSS APPLY GetArrayElements(rq.relatedObject) AS relatedObject
    )
    ,relatedObjectDtls as (
        select
            'relatedObject' as partitionKey,
            rq.id as rowKey,
            rq.PartitionId,
            rq.id as id,
            prd.value as product,
            chr.value as changeRequest,
            ser.value as service,
            res.value as resource,
            rccr.value as rootCauseChangeRequest
        from readerQuery rq Partition BY PartitionId --INTO 4
        LEFT JOIN relatedObject prd ON prd.name = 'product' and prd.id = rq.id AND DATEDIFF(minute,rq,prd) BETWEEN 0 AND 0 and rq.PartitionId = prd.PartitionId
        LEFT JOIN relatedObject chr ON chr.name = 'changeRequest' and chr.id = rq.id AND DATEDIFF(minute,rq,chr) BETWEEN 0 AND 0 and rq.PartitionId = chr.PartitionId
        LEFT JOIN relatedObject ser ON ser.name = 'service' and ser.id = rq.id AND DATEDIFF(minute,rq,ser) BETWEEN 0 AND 0 and rq.PartitionId = ser.PartitionId
        LEFT JOIN relatedObject res ON res.name = 'resource' and res.id = rq.id AND DATEDIFF(minute,rq,res) BETWEEN 0 AND 0 and rq.PartitionId = res.PartitionId
        LEFT JOIN relatedObject rccr ON rccr.name = 'rootCauseChangeRequest' and rccr.id = rq.id AND DATEDIFF(minute,rq,rccr) BETWEEN 0 AND 0 and rq.PartitionId = rccr.PartitionId
    )
    , serviceProblem as (
        Select
        rq.PartitionId,
        'serviceProblem' as partitionKey,
        rq.id as rowKey,
        rq.id,
        rq.name,
        rq.statusChangeReason,
        rq.reason,
        rq.resolutionDate,
        rq.responsibleParty,
        rq.description,
        rq.underlyingProblem,
        rq.statusChangeDate,
        rq.problemEscalation,
        rq.associatedSLAViolation,
        rq.timeChanged,
        rq.severity,
        rq.impactPatterns,
        rq.timeRaised,
        rq.underlyingAlarm,
        rq.rootCauseService,
        rq.relatedEvent,
        rq.originatorParty,
        rq.priority,
        rq.firstAlert,
        rq.originatingSystem,
        --rq.associatedTroubleTicket,
        rq.affectedNumberOfServices,
        rq.impactImportanceFactor,
        rq.category,
        --rq.parentProblem,
        rq.affectedLocation,
        rq.status,
        rq.rootCauseResource,
        rq.ssociatedTroubleTicket,
        case when GetArrayLength(rq.associatedTroubleTicket) < 1 then '' else associatedTroubleTicket.ArrayValue.Id end as associatedTroubleTicket,
        parentProblem.ArrayValue.Id as parentProblemId,
        parentProblem.ArrayValue.correlationId as correlationId
        --,rpdtls.*
    from ReaderQuery rq Partition BY PartitionId --INTO 4
    --left join relatedPartyDtls rpdtls on rq.id = rpdtls.id AND DATEDIFF(minute,rq,rpdtls) BETWEEN 0 AND 0 and rq.PartitionId = rpdtls.PartitionId
    CROSS APPLY GetArrayElements(rq.associatedTroubleTicket) AS associatedTroubleTicket
    CROSS APPLY GetArrayElements(rq.parentProblem) AS parentProblem
    )
    
    select
        'serviceProblem' as partitionKey,
        rq.id as rowKey,
        rq.id,
        rq.name,
        rq.statusChangeReason,
        rq.reason,
        rq.resolutionDate,
        rq.responsibleParty,
        rq.description,
        rq.underlyingProblem,
        rq.statusChangeDate,
        rq.problemEscalation,
        rq.associatedSLAViolation,
        rq.timeChanged,
        rq.severity,
        rq.impactPatterns,
        rq.timeRaised,
        rq.underlyingAlarm,
        rq.rootCauseService,
        rq.relatedEvent,
        rq.originatorParty,
        rq.priority,
        rq.firstAlert,
        rq.originatingSystem,
        rq.affectedNumberOfServices,
        rq.impactImportanceFactor,
        rq.category,
        rq.affectedLocation,
        rq.status,
        rq.rootCauseResource,
        rq.associatedTroubleTicket,
        resnt.text resolutionNote,
        svcdesc.text serviceProblemDescription,
        rq.parentProblemId,
        rq.correlationId
    --    eidtls3.*,
    --    rpdtls.*
    into tsserviceProblem
    from serviceProblem rq Partition BY PartitionId
    left join comment resnt on rq.id = resnt.serviceProblemId AND DATEDIFF(minute,rq,resnt) BETWEEN 0 AND 0 and rq.PartitionId = resnt.PartitionId and resnt.type = 'resolutionNote'
    left join comment svcdesc on rq.id = svcdesc.serviceProblemId AND DATEDIFF(minute,rq,svcdesc) BETWEEN 0 AND 0 and rq.PartitionId = svcdesc.PartitionId and svcdesc.type = 'serviceProblemDescription'
    left join relatedObjectDtls rpdtls on rq.id = rpdtls.id AND DATEDIFF(minute,rq,rpdtls) BETWEEN 0 AND 0 and rq.PartitionId = rpdtls.PartitionId
    
    SELECT rq.* into tsextensionInfo from extensionInfoDtls1 rq Partition BY PartitionId
    SELECT * into tscomment from comment PARTITION BY PartitionId where type <> 'serviceProblemDescription' and type <> 'resolutionNote'
    SELECT * into tsrelatedInfo from affectedResource PARTITION BY PartitionId union SELECT* from affectedService PARTITION BY PartitionId
    select * into veritaslake from OrigQ PARTITION BY PartitionId

【问题讨论】:

流分析适用于事件流,即事件到达时,而不是表。每个 CTE 都是处理管道中的一个单独步骤。一旦你以这种方式看待它,不同步骤之间的JOINing 就没有多大意义了。事件流可能永远不会结束。如果 CTE 仍处于活动状态,JOIN 将在哪里找到它需要的数据?哪个步骤向哪个步骤提供数据?您最终会得到一个相互关联的步骤图,而不是管道,所有步骤都在处理潜在的无限事件 JOIN documentation 清楚地表明 JOIN 可用于组合来自多个流的数据,但只能在 ON DATEDIFF 子句指定的特定时间范围内。如果源是静态的,例如查找表,则不需要该子句。 【参考方案1】:

根据错误消息,事件中心每个分区的每个使用者组仅允许 5 个接收器连接。使用联合、自连接等运算符进行复杂查询时,该作业可能会创建多个输入接收器。您可以按照此故障排除文档maximum number of allowed receivers per partition 重新构建您的查询并解决此错误。

【讨论】:

感谢分享链接 Julia。由于某种原因之前没有发现。目前,我实际上创建了不同的消费者群体和不同的工作来分担工作量。这现在有效,但我会尝试链接中的工会建议以及同一工作中不同消费者群体的不同意见。

以上是关于天蓝色流分析查询太复杂?没有给出正确的输出,现在出错。接下来做啥的主要内容,如果未能解决你的问题,请参考以下文章

为复杂的输出连接输入

流分析查询错误

Azure 流分析:多个 Windows JOIN 生成太多行

天蓝色流分析同时结合数据

Azure 流分析:SQL 输出不起作用

Azure 流分析测试查询编辑器与 ComosDB 和 Blob 中的最终输出不匹配