如何在 C# 中提高数据流插入 Bigquery 表的性能

Posted

技术标签:

【中文标题】如何在 C# 中提高数据流插入 Bigquery 表的性能【英文标题】:How to improve the performance of data stream insert into Bigquery table in C# 【发布时间】:2015-07-22 22:40:30 【问题描述】:

我正在使用 C# .Net 客户端库将插入数据从本地文件流式传输到 Bigquery 数据库。我用我的代码成功插入了所有数据,但是性能(插入速度)非常慢。我的加载速度为 20KB/秒。例如,我有一个大小为 323MB 的文件,加载程序大约需要 20 分钟。我测试了我的带宽,上传速度是 3MB/秒,我想知道是否有办法提高性能?(我想它应该比那快得多)

这是我的代码:

namespace BigQuery_Test

    class StreamDatacs
    


        private static ServiceAccountCredential credential;

        public StreamDatacs()
        
            Credential2().Wait();


        

        public async Task Credential2()
        
            string certificateFile = "C:\\xxxxxx.p12";
            string serviceAccountEmail = "xxxxxxxxxxxx@developer.gserviceaccount.com";
            var certificate = new X509Certificate2(certificateFile, "notasecret", X509KeyStorageFlags.Exportable);
            credential = new ServiceAccountCredential(
               new ServiceAccountCredential.Initializer(serviceAccountEmail)
               
                   Scopes = new[]  StorageService.Scope.DevstorageReadWrite,
                   BigqueryService.Scope.Bigquery 
               .FromCertificate(certificate));

            Console.WriteLine(credential.Token);
            Console.WriteLine(credential.TokenServerUrl);
        
        public void ReadData(string path)        
        

            var logs = new List<TableDataInsertAllRequest.RowsData>();
            using (StreamReader sr = new StreamReader(path))
            
                string line = "";
                string [] aLog = new string[1000];

                Log Loga = new Log();
                int count = 0;
                int i = 0;
                while((line = sr.ReadLine()) != null)
                
                    var theLog = new TableDataInsertAllRequest.RowsData();              
                    string[] token = line.Split('\t');
                    theLog.Json = new Dictionary<string, object>();
                    theLog.Json.Add("Timestamp", token[0]);
                    theLog.Json.Add("ClientIpAddress", token[1]);
                    theLog.Json.Add("Username", token[2]);
                    theLog.Json.Add("GroupID", token[3]);
                    theLog.Json.Add("CompanyID", token[4]);
                    theLog.Json.Add("FullOrSiteLogging", token[5]);
                    theLog.Json.Add("PolicyFlags", token[6]);
                    theLog.Json.Add("ActionsTaken", token[7]);
                    theLog.Json.Add("ResponseStatus", token[8]);                       
                    logs.Add(theLog);
                    count++;

                    if (count > 20000)
                    

                        BQStreamInsert(logs);
                        logs.Clear();
                        count = 0;

                    
                

            

            Task.WaitAll(tasks.ToArray());
        
        public void BQStreamInsert(List<TableDataInsertAllRequest.RowsData> rows)
        
            string projectId= "ws-2015-logs";
            string datasetId = "CompanyGroup1";
            string tableId ="RawLogsTest7";
            var service = new BigqueryService(new BaseClientService.Initializer()
            
                HttpClientInitializer = credential,

                ApplicationName = "BQ test"
            );
            try
            
                var content = new TableDataInsertAllRequest();
                content.Rows = rows;
                content.Kind = "bigquery#tableDataInsertAllRequest";
                content.IgnoreUnknownValues = true;
                content.SkipInvalidRows = true;

                var insertTask = service.Tabledata.InsertAll(content, projectId, datasetId, tableId);
                TableDataInsertAllResponse response = insertTask.Execute();

            
            catch (Exception ex)
            

                Console.WriteLine(ex.Message);
            


        
    

在program.cs中:

 class Program
    
        static void Main(string[] args)
        

            var sw = Stopwatch.StartNew();
            StreamDatacs sd = new StreamDatacs();
            sd.ReadData(@"C:\2015071600.csv");   
            Console.WriteLine(sw.ElapsedMilliseconds);
            sw.Stop();
            Console.Read();

        

在代码中,我逐行读取文件,每20,000行停止并插入BQ,直到所有行都插入为止。我在这里尝试了不同的缓冲区大小(500行,1000行,50,000行),但没有看到很大的不同。 我很感激你的任何建议。提前非常感谢。

【问题讨论】:

【参考方案1】:

由于流的有效负载大小有限,请参阅Quota policy 更容易谈论时间,因为有效负载对我们双方的限制方式相同,但我也会提到其他副作用。

您需要正确实施限制:

最大行大小:1 MB HTTP 请求大小限制:10 MB 每个请求的最大行数:500

否则你会得到错误。因此它被称为流式插入,因此您可以运行大量并行进程而不是一项大工作。

我们测量每个流式传输请求的时间在 1200-2500 毫秒之间,这在上个月是一致的,如图所示。

虽然我们看到了一些副作用:

请求随机失败,类型为“后端错误” 请求随机失败,类型为“连接错误” 请求随机失败,类型为“超时”(注意这里,因为只有一些行失败,而不是整个有效负载) 其他一些错误消息是非描述性的,它们非常模糊,对您没有帮助,请重试。 我们每天都会看到数百个此类故障,因此它们几乎是恒定的,与云运行状况无关。

对于所有这些,我们在付费的 Google 企业支持中打开了案例,但不幸的是,他们没有解决问题。接缝推荐的选项是重试的指数退避,即使是被告知这样做的支持。就个人而言,这并不让我高兴。


如果您选择的方法需要几个小时,这意味着 it does not scale,并且不会扩展。您需要重新考虑使用async processes 的方法。为了更快地完成,您需要并行运行多个工作人员,流式传输性能将是相同的。只要有 10 个并行工作人员,就意味着时间将减少 10 倍。

在后台处理 IO 绑定或 cpu 绑定任务现在是大多数 Web 应用程序的常见做法。有很多软件可以帮助构建后台作业,其中一些基于 Beanstalkd 这样的消息传递系统。

基本上,您需要在封闭的网络中分配插入作业,确定它们的优先级,并使用(运行)它们。嗯,这正是 Beanstalkd 提供的。

Beanstalkd 提供了在管中组织作业的可能性,每个管对应于一种作业类型。

您需要一个 API/生产者,可以将作业放在管子上,比方说行的 json 表示。这是我们用例的杀手级功能。因此,我们有一个 API 可以获取行并将它们放置在管上,这只需几毫秒,因此您可以实现快速响应时间。

另一方面,您现在在一些电子管上有很多工作。你需要一个代理。代理/消费者可以预订工作。

它还可以帮助您进行作业管理和重试:成功处理作业后,消费者可以从试管中删除该作业。在失败的情况下,消费者可以埋葬工作。这项工作不会被推回试管,但可供进一步检查。

消费者可以发布一个工作,Beanstalkd 会将这个工作推回管中,并使其可供另一个客户端使用。

Beanstalkd 客户端可以在大多数常用语言中找到,web interface 可用于调试。

【讨论】:

感谢您对错误处理和作业监控应用程序的推荐。但是,我现在真正担心的是流插入的速度。我必须在 10 分钟内将一组内存缓冲区(2~3GB)插入 Bigquery,但我有 20 分钟的时间来处理 1 个缓冲区(即 300MB)。这对我的情况来说是完全不可接受的。此外,我确实将文件分成小块,它并没有给我带来很大的不同。我只是想确定我的代码中是否存在问题(例如块大小、流插入方式等)或者是 Bigquery 的方式。 并行化调用,我看不到任何并发。您应该至少并行运行 50 个调用 我遇到了类似的问题,将数据流式传输到 bigquery 非常慢(每秒 2KB!)

以上是关于如何在 C# 中提高数据流插入 Bigquery 表的性能的主要内容,如果未能解决你的问题,请参考以下文章

使用 cdata ado.net 数据提供程序在 Google BigQuery 中插入 Json

使用 C# 提高在 MySQL 数据库中插入数据的速度 [重复]

如何使用带有 nodejs 的 bigquery 插入方法插入地理类型数据

如何安全地为 bigquery 节点插入转义用户输入?可以在 bigquery.insert 节点库上使用参数化查询吗?

BigQuery 流式插入如何计费?

如何从另一个 bigquery 响应将值插入 mysql 表