将操作插入到 bigquery 表中
Posted
技术标签:
【中文标题】将操作插入到 bigquery 表中【英文标题】:insert operation into a bigquery table 【发布时间】:2014-05-27 14:33:49 【问题描述】:我想将 SQL 服务器表的所有行插入到具有相同架构的 Bigquery 表中。 逐行流式插入非常慢:要插入 1000 行,执行下面的代码大约需要 10 分钟。 在这段代码中,我循环遍历某个文件夹中的前 10 个文件,并将该文件的内容插入到唯一的 SQL Server 表中。遍历所需文件后,我会遍历 SQL Server 表(其中包含所有文件的所有行),然后将内容逐行插入 Bigquery 表中。最后我删除了这些文件并清空了 sql server 表
这个操作很慢。
有人可以有更好的解决方案来自动(通过代码)将 SQL 服务器表的内容插入到 Bigquery 表中吗?例如,将 sql server 表中的所有内容插入到 one 块中的 bigquery 表中(而不是逐行)。
谢谢
这是我的代码(在coldfusion中):
<cfsilent>
<cfinclude template="app_locals.cfm" />
<cfinclude template="act_BigqueryApiAccess.cfm" />
</cfsilent>
<!--- 1er traitement BQ: Insertion des colis traités --->
<!--- enregistrement du début du 1er traitement BQ (TShipping)--->
<cfset BigqueryTShipping_StartDate=now()>
<cfset QueryName = "InsertBigqueryLogTShippingStartDate">
<cfinclude template="qry_item.cfm">
<cfdirectory action="list" directory="#FileRoot#\_data\_Bigquery\TShipping" listinfo="all" type="file" name="FList" sort="datelastmodified">
<cfset FileList = Valuelist(FList.name)>
<cfoutput><h3>FileList: #FileList#</h3></cfoutput>
<cfif len(trim(FileList))>
<!--- traiter les 10 derniers fichiers (les MaxNbFile moins récents) --->
<cfset FileLoop = 1>
<cfloop list="#FileList#" index="FileName">
<cfset PathFile="#FileRoot#\_data\_Bigquery\TShipping\#FileName#">
<cfset QueryName = "InsertTShipping">
<cfinclude template="qry_item.cfm">
<cfset FileLoop = FileLoop+1>
<cfif FileLoop GT Attributes.MaxNbFile>
<cfbreak />
</cfif>
</cfloop>
</cfif>
<!--- instancier un objet de type (class) TableRow --->
<cfobject action="create" type="java" class="com.google.api.services.bigquery.model.TableRow" name="row">
<!--- <cfdump var="#row#"> --->
<cfset QueryName = "GetParcels">
<cfinclude template="qry_item.cfm">
<cfloop query="GetParcels">
<cfset row.set("Tracking_Date",mid(Tracking_Date,6,19))>
<cfset row.set("TShipping_ID", TShipping_ID)>
<cfset row.set("TShipping_Tracking", TShipping_Tracking)>
<cfset row.set("Shipper_ID", Shipper_ID)>
<cfset rows.setInsertId(sys.currentTimeMillis())>
<cfset rows.setJson(row)>
<cfset rowList.add(rows)>
<cfset content=rqst.setRows(rowList)>
<cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID, content).execute()>
</cfloop>
<!---vider la table TShipping_BQ--->
<cfset QueryName = "DeleteOldTShipping_BQParcels">
<cfinclude template="qry_item.cfm">
<!--- Suppression des fichiers traités --->
<cfif len(trim(FileList))>
<cfset TShippingFileNb=len(trim(FileList))>
<cfset FileLoop = 1>
<cfloop list="#FileList#" index="FileName">
<cfset PathFile="#FileRoot#\_data\_Bigquery\TShipping\#FileName#">
<cffile action="move" source="#PathFile#" destination="#FileRoot#\_data\_Bigquery\TShippingArchive">
<!--- <cffile action="delete" file="#PathFile#"> --->
<cfset FileLoop = FileLoop+1>
<cfif FileLoop GT Attributes.MaxNbFile>
<cfbreak />
</cfif>
</cfloop>
<cfelse>
<cfset TShippingFileNb=0>
</cfif>
<!--- enregistrement du nb de fichiers TShipping traités --->
<cfset QueryName = "InsertBigqueryLogTShippingNb">
<cfinclude template="qry_item.cfm">
<!--- enregistrement de la fin du 1er traitement BQ--->
<cfset BigqueryTShipping_EndDate=now()>
<cfset QueryName = "InsertBigqueryLogTShippingEndDate">
<cfinclude template="qry_item.cfm">
【问题讨论】:
这是实际将数据插入到 bigquery 表中的代码:insertAll()
移到循环之外吗?
从 sql server 获取数据的查询是什么?我看到的只是一个 cfinclude 标记。
【参考方案1】:
您应该能够将insertAll()
移到循环之外。可能有一点你试图插入太多的记录,你需要在那个时候将它们批量化。即一旦你达到 1000 条记录,插入它们并重置你的 rowList 数组
<cfloop query="GetParcels">
<cfset row = something()><!--- you need to re-create row for each loop or else you're updating a reference with each loop --->
<cfset row.set("Tracking_Date",mid(Tracking_Date,6,19))>
<cfset row.set("TShipping_ID", TShipping_ID)>
<cfset row.set("TShipping_Tracking", TShipping_Tracking)>
<cfset row.set("Shipper_ID", Shipper_ID)>
<cfset rows.setInsertId(sys.currentTimeMillis())>
<cfset rows.setJson(row)>
<cfset rowList.add(rows)>
</cfloop>
<cfset content=rqst.setRows(rowList)>
<cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>
我所说的批处理的一个例子
<cfloop query="GetParcels">
<cfset row.set("Tracking_Date",mid(Tracking_Date,6,19))>
<cfset row.set("TShipping_ID", TShipping_ID)>
<cfset row.set("TShipping_Tracking", TShipping_Tracking)>
<cfset row.set("Shipper_ID", Shipper_ID)>
<cfset rows.setInsertId(sys.currentTimeMillis())>
<cfset rows.setJson(row)>
<cfset rowList.add(rows)>
<cfif arrayLen(rowList) EQ 1000>
<cfset content=rqst.setRows(rowList)>
<cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>
<cfset rowList = []>
</cfif>
</cfloop>
<!--- add this check in case there are exactly an increment of 1000 rows --->
<cfif ! arrayIsEmpty(rowList)>
<cfset content=rqst.setRows(rowList)>
<cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>
</cfif>
【讨论】:
感谢您的建议。正如您所说,当我将 insertall() 移到循环之外时,性能(响应时间)要好得多。但我现在有另一个问题:rowList 包含 n 次最后一行。因此,如果我保留:row
,因此您会在每次迭代中不断更新对row
的引用。添加row = new Object()
或者您最初创建它应该可以解决您的问题以上是关于将操作插入到 bigquery 表中的主要内容,如果未能解决你的问题,请参考以下文章
流插入,然后定期合并到 Dataflow 管道中的 BigQuery [关闭]
通过 Google Cloud Dataflow 将 PubSub 消息插入 BigQuery