如何在 C# 中提高数据流插入 Bigquery 表的性能
Posted
技术标签:
【中文标题】如何在 C# 中提高数据流插入 Bigquery 表的性能【英文标题】:How to improve the performance of data stream insert into Bigquery table in C# 【发布时间】:2015-07-22 22:40:30 【问题描述】:我正在使用 C# .Net 客户端库将插入数据从本地文件流式传输到 Bigquery 数据库。我用我的代码成功插入了所有数据,但是性能(插入速度)非常慢。我的加载速度为 20KB/秒。例如,我有一个大小为 323MB 的文件,加载程序大约需要 20 分钟。我测试了我的带宽,上传速度是 3MB/秒,我想知道是否有办法提高性能?(我想它应该比那快得多)
这是我的代码:
namespace BigQuery_Test
class StreamDatacs
private static ServiceAccountCredential credential;
public StreamDatacs()
Credential2().Wait();
public async Task Credential2()
string certificateFile = "C:\\xxxxxx.p12";
string serviceAccountEmail = "xxxxxxxxxxxx@developer.gserviceaccount.com";
var certificate = new X509Certificate2(certificateFile, "notasecret", X509KeyStorageFlags.Exportable);
credential = new ServiceAccountCredential(
new ServiceAccountCredential.Initializer(serviceAccountEmail)
Scopes = new[] StorageService.Scope.DevstorageReadWrite,
BigqueryService.Scope.Bigquery
.FromCertificate(certificate));
Console.WriteLine(credential.Token);
Console.WriteLine(credential.TokenServerUrl);
public void ReadData(string path)
var logs = new List<TableDataInsertAllRequest.RowsData>();
using (StreamReader sr = new StreamReader(path))
string line = "";
string [] aLog = new string[1000];
Log Loga = new Log();
int count = 0;
int i = 0;
while((line = sr.ReadLine()) != null)
var theLog = new TableDataInsertAllRequest.RowsData();
string[] token = line.Split('\t');
theLog.Json = new Dictionary<string, object>();
theLog.Json.Add("Timestamp", token[0]);
theLog.Json.Add("ClientIpAddress", token[1]);
theLog.Json.Add("Username", token[2]);
theLog.Json.Add("GroupID", token[3]);
theLog.Json.Add("CompanyID", token[4]);
theLog.Json.Add("FullOrSiteLogging", token[5]);
theLog.Json.Add("PolicyFlags", token[6]);
theLog.Json.Add("ActionsTaken", token[7]);
theLog.Json.Add("ResponseStatus", token[8]);
logs.Add(theLog);
count++;
if (count > 20000)
BQStreamInsert(logs);
logs.Clear();
count = 0;
Task.WaitAll(tasks.ToArray());
public void BQStreamInsert(List<TableDataInsertAllRequest.RowsData> rows)
string projectId= "ws-2015-logs";
string datasetId = "CompanyGroup1";
string tableId ="RawLogsTest7";
var service = new BigqueryService(new BaseClientService.Initializer()
HttpClientInitializer = credential,
ApplicationName = "BQ test"
);
try
var content = new TableDataInsertAllRequest();
content.Rows = rows;
content.Kind = "bigquery#tableDataInsertAllRequest";
content.IgnoreUnknownValues = true;
content.SkipInvalidRows = true;
var insertTask = service.Tabledata.InsertAll(content, projectId, datasetId, tableId);
TableDataInsertAllResponse response = insertTask.Execute();
catch (Exception ex)
Console.WriteLine(ex.Message);
在program.cs中:
class Program
static void Main(string[] args)
var sw = Stopwatch.StartNew();
StreamDatacs sd = new StreamDatacs();
sd.ReadData(@"C:\2015071600.csv");
Console.WriteLine(sw.ElapsedMilliseconds);
sw.Stop();
Console.Read();
在代码中,我逐行读取文件,每20,000行停止并插入BQ,直到所有行都插入为止。我在这里尝试了不同的缓冲区大小(500行,1000行,50,000行),但没有看到很大的不同。 我很感激你的任何建议。提前非常感谢。
【问题讨论】:
【参考方案1】:由于流的有效负载大小有限,请参阅Quota policy 更容易谈论时间,因为有效负载对我们双方的限制方式相同,但我也会提到其他副作用。
您需要正确实施限制:
最大行大小:1 MB HTTP 请求大小限制:10 MB 每个请求的最大行数:500否则你会得到错误。因此它被称为流式插入,因此您可以运行大量并行进程而不是一项大工作。
我们测量每个流式传输请求的时间在 1200-2500 毫秒之间,这在上个月是一致的,如图所示。
虽然我们看到了一些副作用:
请求随机失败,类型为“后端错误” 请求随机失败,类型为“连接错误” 请求随机失败,类型为“超时”(注意这里,因为只有一些行失败,而不是整个有效负载) 其他一些错误消息是非描述性的,它们非常模糊,对您没有帮助,请重试。 我们每天都会看到数百个此类故障,因此它们几乎是恒定的,与云运行状况无关。对于所有这些,我们在付费的 Google 企业支持中打开了案例,但不幸的是,他们没有解决问题。接缝推荐的选项是重试的指数退避,即使是被告知这样做的支持。就个人而言,这并不让我高兴。
如果您选择的方法需要几个小时,这意味着 it does not scale
,并且不会扩展。您需要重新考虑使用async processes
的方法。为了更快地完成,您需要并行运行多个工作人员,流式传输性能将是相同的。只要有 10 个并行工作人员,就意味着时间将减少 10 倍。
在后台处理 IO 绑定或 cpu 绑定任务现在是大多数 Web 应用程序的常见做法。有很多软件可以帮助构建后台作业,其中一些基于 Beanstalkd 这样的消息传递系统。
基本上,您需要在封闭的网络中分配插入作业,确定它们的优先级,并使用(运行)它们。嗯,这正是 Beanstalkd 提供的。
Beanstalkd 提供了在管中组织作业的可能性,每个管对应于一种作业类型。
您需要一个 API/生产者,可以将作业放在管子上,比方说行的 json 表示。这是我们用例的杀手级功能。因此,我们有一个 API 可以获取行并将它们放置在管上,这只需几毫秒,因此您可以实现快速响应时间。
另一方面,您现在在一些电子管上有很多工作。你需要一个代理。代理/消费者可以预订工作。
它还可以帮助您进行作业管理和重试:成功处理作业后,消费者可以从试管中删除该作业。在失败的情况下,消费者可以埋葬工作。这项工作不会被推回试管,但可供进一步检查。
消费者可以发布一个工作,Beanstalkd 会将这个工作推回管中,并使其可供另一个客户端使用。
Beanstalkd 客户端可以在大多数常用语言中找到,web interface 可用于调试。
【讨论】:
感谢您对错误处理和作业监控应用程序的推荐。但是,我现在真正担心的是流插入的速度。我必须在 10 分钟内将一组内存缓冲区(2~3GB)插入 Bigquery,但我有 20 分钟的时间来处理 1 个缓冲区(即 300MB)。这对我的情况来说是完全不可接受的。此外,我确实将文件分成小块,它并没有给我带来很大的不同。我只是想确定我的代码中是否存在问题(例如块大小、流插入方式等)或者是 Bigquery 的方式。 并行化调用,我看不到任何并发。您应该至少并行运行 50 个调用 我遇到了类似的问题,将数据流式传输到 bigquery 非常慢(每秒 2KB!)以上是关于如何在 C# 中提高数据流插入 Bigquery 表的性能的主要内容,如果未能解决你的问题,请参考以下文章
使用 cdata ado.net 数据提供程序在 Google BigQuery 中插入 Json
使用 C# 提高在 MySQL 数据库中插入数据的速度 [重复]
如何使用带有 nodejs 的 bigquery 插入方法插入地理类型数据
如何安全地为 bigquery 节点插入转义用户输入?可以在 bigquery.insert 节点库上使用参数化查询吗?