并行运行异步查询 (IAsyncEnumerable)

Posted

技术标签:

【中文标题】并行运行异步查询 (IAsyncEnumerable)【英文标题】:Running asynchronous queries parallelly (IAsyncEnumerable) 【发布时间】:2021-04-30 11:22:09 【问题描述】:

下面的代码连接到数据库并查询文章信息,然后将它们的计数存储在混乱的存储中。我想并行运行这些查询。现在QueryItems 首先运行,因为我先等待那个foreach。只有这样QueryCount才能运行。

我的问题是:如何并行运行这些异步查询?

internal async void InitializeItems()

    await foreach (Item i in this.QueryItems())
    
        this.items.Add(i);
        this.view.ComboBoxArticle.Items.Add(i.ArticleNumber);
    

    await foreach (ValueTuple<int, string> t in this.QueryCount())
        this.items.Where(it => it.ArticleNumber.Equals(t.Item2))
            .FirstOrDefault().Count = t.Item1;


private async IAsyncEnumerable<Item> QueryItems()

    using (OdbcConnection conItems = new OdbcConnection(
        Properties.Settings.Default.PaConnString))
    using (OdbcCommand cmdItems = new OdbcCommand())
    
        cmdItems.Connection = conItems;
        cmdItems.CommandText = @"SELECT PUB.S_Artikel.Artikel AS ArticleNumber, 
        PUB.S_ArtikelSpr.Bezeichnung AS ArticleName, 
        PUB.S_ArtKtoGr.KontenGruppe AS CapacityGroup, 
        PUB.S_ArtKtoGr.Verbrauchskonto AS CostCenter, 
        PUB.S_KontoSpr.Bezeichnung AS CostName, 
        PUB.S_MengenEinheitSpr.Bezeichnung AS Unit
    FROM PUB.S_Artikel 
        INNER JOIN PUB.S_ArtikelSpr 
            ON PUB.S_Artikel.Artikel = PUB.S_ArtikelSpr.Artikel 
                AND PUB.S_ArtikelSpr.Sprache = 'H' 
        INNER JOIN PUB.SBM_ValueFlowGroup 
            ON PUB.S_Artikel.SBM_ValueFlowGroup_Obj
                = PUB.SBM_ValueFlowGroup.SBM_ValueFlowGroup_Obj 
        INNER JOIN PUB.S_ArtKtoGr 
            ON PUB.SBM_ValueFlowGroup.S_ArtKtoGr_Obj = PUB.S_ArtKtoGr.S_ArtKtoGr_Obj
        INNER JOIN PUB.S_KontoSpr 
            ON PUB.S_ArtKtoGr.Verbrauchskonto = PUB.S_KontoSpr.Konto
                AND PUB.S_KontoSpr.Sprache = 'H'
        INNER JOIN PUB.S_MengenEinheitSpr 
            ON PUB.S_Artikel.LagerME = PUB.S_MengenEinheitSpr.MengenEinheit 
                AND PUB.S_MengenEinheitSpr.Sprache = 'H' " +
    "WHERE PUB.S_Artikel.Firma = '100' " + "AND PUB.S_Artikel.archiviert = 0 " +
    "ORDER BY PUB.S_Artikel.Artikel";

        conItems.Open();
        if (!conItems.State.Equals(ConnectionState.Open))
            throw new Exception(
                "Nem sikerült a cikkinformációk lekérdezése (a PA szerverről).");

        using (DbDataReader readerItems = await Task.Run(
            () => cmdItems.ExecuteReader()))
        
            while (await Task.Run(() => readerItems.ReadAsync()))
            
                yield return new Item
                    (
                        readerItems.GetString(0),
                        readerItems.GetString(1),
                        readerItems.GetString(2),
                        readerItems.GetString(3),
                        readerItems.GetString(4),
                        readerItems.GetString(5)
                    );
            
            await Task.Run(() => MessageBox.Show("ITEMS END"));
        
    


private async IAsyncEnumerable<ValueTuple<int, string>> QueryCount()

    using (OdbcConnection conCount = new OdbcConnection(
        Properties.Settings.Default.PaConnString))
    using (OdbcCommand cmdCount = new OdbcCommand())
    
        cmdCount.Connection = conCount;
        cmdCount.CommandText = @"SELECT SUM(PUB.MP_ArtPlatz.Bestand),
            PUB.MP_ArtPlatz.Artikel AS ArticleNumber
        FROM PUB.MP_ArtPlatz
        GROUP BY PUB.MP_ArtPlatz.Artikel";

        conCount.Open();
        if (!conCount.State.Equals(ConnectionState.Open))
            throw new Exception("Nem skerült a ProAlpha adatbázisához"
                + " kapcsolódni a darabszám lekérdezése érdekében.");

        using (DbDataReader readerCount = await Task.Run(
            () => cmdCount.ExecuteReaderAsync()))
            while (await Task.Run(() => readerCount.ReadAsync()))
            
                //
                //    Item i = this.items.Where(
                //        it => it.ArticleNumber.Equals(artNum)).FirstOrDefault();
                //    i.Count = count;
                //
                yield return new ValueTuple<int, string>(
                    readerCount.GetInt32(0), readerCount.GetString(1));
            
    

在 UserControl 的加载方法中,我这样称呼它们:

private void MaterialRequestUC_Load(object sender, EventArgs e)

    try
    
        //...
        this.presenter.InitializeItems();
        //...
    
    catch (Exception ex)  MessageBox.Show(
        "Hiba: " + ex.Message, "Hiba", MessageBoxButtons.OK, MessageBoxIcon.Error); 

【问题讨论】:

“caotic storage” 嗨,Ferenc。这是什么意思? SQL Server 是多线程的,所有查询都会根据机器中的内核数自动并行运行。 杂乱无章的存储意味着并非每个项目在存储中都有固定的位置(行、列和楼层)。但是,一种类型的项目可以同时位于多个位置。但是,这些数据可以明确地从数据库中检索。而且我必须处理所有项目(QueryItems())并且需要存储中每个项目的计数(QueryCount)。但是在我的软件中,我放置的所有断点都证明,首先,查询项目并填充组合框。并且只有在第一次 foreach 完成后才会查询项目的总和。 this.items 字段的类型是什么? 整个项目很大,复制到这里... QueryItems的结果集的每条记录都是从类型Item(自己的类型)开始的。有几个数据成员,你可以在代码中看到它。 【参考方案1】:

如果您的意思是希望查询计数并行工作,请使用Task.Run 而不是await

internal async void InitializeItems()

    await foreach (Item i in this.QueryItems())
    
        this.items.Add(i);
        this.view.ComboBoxArticle.Items.Add(i.ArticleNumber);
    

    Task.Run(async () => await foreach (ValueTuple<int, string> t in this.QueryCount())
        this.items.Where(it => it.ArticleNumber.Equals(t.Item2))
            .FirstOrDefault().Count = t.Item1);

【讨论】:

以上是关于并行运行异步查询 (IAsyncEnumerable)的主要内容,如果未能解决你的问题,请参考以下文章

JAVA并行框架Fork/Join:同步和异步

SPA 可以针对单个 GraphQL 服务器发出并行异步查询吗?

并发并行同步和异步

使用异步的并行函数调用 [重复]

仅使用一个回调并行运行异步

在 iOS swift 中异步/并发/并行运行任务