将操作插入到 bigquery 表中

Posted

技术标签:

【中文标题】将操作插入到 bigquery 表中【英文标题】:insert operation into a bigquery table 【发布时间】:2014-05-27 14:33:49 【问题描述】:

我想将 SQL 服务器表的所有行插入到具有相同架构的 Bigquery 表中。 逐行流式插入非常慢:要插入 1000 行,执行下面的代码大约需要 10 分钟。 在这段代码中,我循环遍历某个文件夹中的前 10 个文件,并将该文件的内容插入到唯一的 SQL Server 表中。遍历所需文件后,我会遍历 SQL Server 表(其中包含所有文件的所有行),然后将内容逐行插入 Bigquery 表中。最后我删除了这些文件并清空了 sql server 表

这个操作很慢。

有人可以有更好的解决方案来自动(通过代码)将 SQL 服务器表的内容插入到 Bigquery 表中吗?例如,将 sql server 表中的所有内容插入到 one 块中的 bigquery 表中(而不是逐行)。

谢谢

这是我的代码(在coldfusion中):

<cfsilent>
    <cfinclude template="app_locals.cfm" />
    <cfinclude template="act_BigqueryApiAccess.cfm" />
</cfsilent>

<!--- 1er traitement BQ: Insertion des colis traités --->
 <!--- enregistrement du début du 1er traitement BQ (TShipping)--->
<cfset BigqueryTShipping_StartDate=now()>
<cfset QueryName = "InsertBigqueryLogTShippingStartDate">
<cfinclude template="qry_item.cfm"> 

<cfdirectory action="list" directory="#FileRoot#\_data\_Bigquery\TShipping" listinfo="all" type="file" name="FList" sort="datelastmodified">
<cfset FileList = Valuelist(FList.name)>
<cfoutput><h3>FileList: #FileList#</h3></cfoutput>

<cfif len(trim(FileList))>
    <!--- traiter les 10 derniers fichiers (les MaxNbFile moins récents) --->
    <cfset FileLoop = 1>
    <cfloop list="#FileList#" index="FileName"> 
        <cfset PathFile="#FileRoot#\_data\_Bigquery\TShipping\#FileName#">
        <cfset QueryName = "InsertTShipping">
        <cfinclude template="qry_item.cfm"> 
        <cfset FileLoop = FileLoop+1>
        <cfif FileLoop GT Attributes.MaxNbFile>
            <cfbreak />
        </cfif>
    </cfloop>
</cfif>

<!--- instancier un objet de type (class) TableRow --->
<cfobject action="create" type="java" class="com.google.api.services.bigquery.model.TableRow" name="row">
<!--- <cfdump var="#row#"> --->

<cfset QueryName = "GetParcels">
<cfinclude template="qry_item.cfm"> 
<cfloop query="GetParcels"> 
    <cfset row.set("Tracking_Date",mid(Tracking_Date,6,19))>
    <cfset row.set("TShipping_ID", TShipping_ID)>
    <cfset row.set("TShipping_Tracking", TShipping_Tracking)>
    <cfset row.set("Shipper_ID", Shipper_ID)>

    <cfset rows.setInsertId(sys.currentTimeMillis())>
    <cfset rows.setJson(row)>

    <cfset rowList.add(rows)>

    <cfset content=rqst.setRows(rowList)>

    <cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID, content).execute()>  
</cfloop>

<!---vider la table TShipping_BQ--->
<cfset QueryName = "DeleteOldTShipping_BQParcels">
<cfinclude template="qry_item.cfm">

<!--- Suppression des fichiers traités ---> 
<cfif len(trim(FileList))>
    <cfset TShippingFileNb=len(trim(FileList))>
    <cfset FileLoop = 1>
    <cfloop list="#FileList#" index="FileName"> 
        <cfset PathFile="#FileRoot#\_data\_Bigquery\TShipping\#FileName#">
        <cffile action="move" source="#PathFile#" destination="#FileRoot#\_data\_Bigquery\TShippingArchive">
        <!--- <cffile action="delete" file="#PathFile#"> --->
        <cfset FileLoop = FileLoop+1>
        <cfif FileLoop GT Attributes.MaxNbFile>
            <cfbreak />
        </cfif>
    </cfloop>
<cfelse>
    <cfset TShippingFileNb=0>
</cfif>


<!--- enregistrement du nb de fichiers TShipping traités --->
<cfset QueryName = "InsertBigqueryLogTShippingNb">
<cfinclude template="qry_item.cfm"> 
 <!--- enregistrement de la fin du 1er traitement BQ--->
<cfset BigqueryTShipping_EndDate=now()>
<cfset QueryName = "InsertBigqueryLogTShippingEndDate">
<cfinclude template="qry_item.cfm">

【问题讨论】:

这是实际将数据插入到 bigquery 表中的代码: 你能把你的insertAll()移到循环之外吗? 从 sql server 获取数据的查询是什么?我看到的只是一个 cfinclude 标记。 【参考方案1】:

您应该能够将insertAll() 移到循环之外。可能有一点你试图插入太多的记录,你需要在那个时候将它们批量化。即一旦你达到 1000 条记录,插入它们并重置你的 rowList 数组

<cfloop query="GetParcels"> 
  <cfset row = something()><!--- you need to re-create row for each loop or else you're updating a reference with each loop --->
  <cfset row.set("Tracking_Date",mid(Tracking_Date,6,19))>
  <cfset row.set("TShipping_ID", TShipping_ID)>
  <cfset row.set("TShipping_Tracking", TShipping_Tracking)>
  <cfset row.set("Shipper_ID", Shipper_ID)>
  <cfset rows.setInsertId(sys.currentTimeMillis())>
  <cfset rows.setJson(row)>
  <cfset rowList.add(rows)>
</cfloop>
<cfset content=rqst.setRows(rowList)>
<cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>  

我所说的批处理的一个例子

<cfloop query="GetParcels"> 
  <cfset row.set("Tracking_Date",mid(Tracking_Date,6,19))>
  <cfset row.set("TShipping_ID", TShipping_ID)>
  <cfset row.set("TShipping_Tracking", TShipping_Tracking)>
  <cfset row.set("Shipper_ID", Shipper_ID)>
  <cfset rows.setInsertId(sys.currentTimeMillis())>
  <cfset rows.setJson(row)>
  <cfset rowList.add(rows)>
  <cfif arrayLen(rowList) EQ 1000>
    <cfset content=rqst.setRows(rowList)>
    <cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>  
    <cfset rowList = []>
  </cfif>
</cfloop>
<!--- add this check in case there are exactly an increment of 1000 rows --->
<cfif ! arrayIsEmpty(rowList)>
  <cfset content=rqst.setRows(rowList)>
  <cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>  
</cfif>

【讨论】:

感谢您的建议。正如您所说,当我将 insertall() 移到循环之外时,性能(响应时间)要好得多。但我现在有另一个问题:rowList 包含 n 次最后一行。因此,如果我保留:,则只会在 bigquery 表中插入 1 行。如果我删除它,同一行(最后一行)将被插入 n 次(n=looplength=sql server 表中的行数)。我以这种方式实例化对象 rowList:。你看到铅了吗? 您不会为每个循环重新创建row,因此您会在每次迭代中不断更新对row 的引用。添加row = new Object() 或者您最初创建它应该可以解决您的问题

以上是关于将操作插入到 bigquery 表中的主要内容,如果未能解决你的问题,请参考以下文章

无法将数据插入现有 BigQuery 表?

如何将值插入到python中的嵌套表中?

流插入,然后定期合并到 Dataflow 管道中的 BigQuery [关闭]

通过 Google Cloud Dataflow 将 PubSub 消息插入 BigQuery

BigQuery AEAD 功能的密钥集管理最佳实践 [关闭]

流式传输到 BigQuery 表的数据何时可用于 Query 操作?