BigQuery 代码因使用 pandas python 客户端库的 MemoryError 而失败

Posted

技术标签:

【中文标题】BigQuery 代码因使用 pandas python 客户端库的 MemoryError 而失败【英文标题】:BigQuery code failing due to MemoryError using pandas python client library 【发布时间】:2019-03-27 14:55:23 【问题描述】:

我正在使用 python 客户端库访问 google bigquery 并转换为 pandas 数据框,然后最终转换为 csv 文件。但代码因内存错误而失败。 截至目前,表中的计数数量为 74567,取消嵌套后约为 180 万。

我目前正在使用 python 客户端库来执行这些步骤。

from google.cloud import bigquery
import pandas as pd
import pandas_gbq
import os
from google.oauth2 import service_account



credentials = service_account.Credentials.from_service_account_file(json_key)

def query_to_dataframe(sql_statement):

        return pd.read_gbq(sql_statement,
                     project_id=project_id,
                     credentials=credentials,
                     dialect='standard')


sql_statement = """SELECT visitorId,
visitNumber,
visitId,
visitStartTime,
date,
totals.visits,
totals.hits,
totals.pageviews,
totals.timeOnSite,
totals.bounces,
totals.transactions,
totals.transactionRevenue,
totals.newVisits,
totals.screenviews,
totals.uniqueScreenviews,
totals.timeOnScreen,
totals.totalTransactionRevenue,
totals.sessionQualityDim,
trafficSource.referralPath,
trafficSource.campaign,
trafficSource.source,
trafficSource.medium,
trafficSource.keyword,
trafficSource.adContent,
trafficSource.adwordsClickInfo.campaignId,
trafficSource.adwordsClickInfo.adGroupId,
trafficSource.adwordsClickInfo.creativeId,
trafficSource.adwordsClickInfo.criteriaId,
trafficSource.adwordsClickInfo.page,
trafficSource.adwordsClickInfo.slot,
trafficSource.adwordsClickInfo.criteriaParameters,
trafficSource.adwordsClickInfo.gclId,
trafficSource.adwordsClickInfo.customerId,
trafficSource.adwordsClickInfo.adNetworkType,
trafficSource.adwordsClickInfo.targetingCriteria.boomUserlistId,
trafficSource.adwordsClickInfo.isVideoAd,
trafficSource.isTrueDirect,
trafficSource.campaignCode,
device.browser,
device.browserVersion,
device.browserSize,
device.operatingSystem,
device.operatingSystemVersion,
device.isMobile,
device.mobileDeviceBranding,
device.mobileDeviceModel,
device.mobileInputSelector,
device.mobileDeviceInfo,
device.mobileDeviceMarketingName,
device.flashVersion,
device.javaEnabled,
device.language,
device.screenColors,
device.screenResolution,
device.deviceCategory,
geoNetwork.continent,
geoNetwork.subContinent,
geoNetwork.country,
geoNetwork.region,
geoNetwork.metro,
geoNetwork.city,
geoNetwork.cityId,
geoNetwork.networkDomain,
geoNetwork.latitude,
geoNetwork.longitude,
geoNetwork.networkLocation,
cd.index,
cd.value,
h.hitNumber,
h.time,
h.hour,
h.minute,
h.isSecure,
h.isInteraction,
h.isEntrance,
h.isExit,
h.referer,
h.page.pagePath,
h.page.hostname,
h.page.pageTitle,
h.page.searchKeyword,
h.page.searchCategory,
h.page.pagePathLevel1,
h.page.pagePathLevel2,
h.page.pagePathLevel3,
h.page.pagePathLevel4,
h.transaction.transactionId,
h.transaction.transactionRevenue as tRevenue,
h.transaction.transactionTax,
h.transaction.transactionShipping,
h.transaction.affiliation,
h.transaction.currencyCode,
h.transaction.localTransactionRevenue,
h.transaction.localTransactionTax,
h.transaction.localTransactionShipping,
h.transaction.transactionCoupon,
h.item.transactionId as tId,
h.item.productName,
h.item.productCategory,
h.item.productSku,
h.item.itemQuantity,
h.item.itemRevenue,
h.item.currencyCode as cCode,
h.item.localItemRevenue,
h.contentInfo.contentDescription,
h.appInfo.name,
h.appInfo.version,
h.appInfo.id,
h.appInfo.installerId,
h.appInfo.appInstallerId,
h.appInfo.appName,
h.appInfo.appVersion,
h.appInfo.appId,
h.appInfo.screenName,
h.appInfo.landingScreenName,
h.appInfo.exitScreenName,
h.appInfo.screenDepth,
h.exceptionInfo.description,
h.exceptionInfo.isFatal,
h.exceptionInfo.exceptions,
h.exceptionInfo.fatalExceptions,
h.eventInfo.eventCategory,
h.eventInfo.eventAction,
h.eventInfo.eventLabel,
h.eventInfo.eventValue,
hp.productSKU as pSKU,
hp.v2ProductName,
hp.v2ProductCategory,
hp.productVariant,
hp.productBrand,
hp.productRevenue,
hp.localProductRevenue,
hp.productPrice,
hp.localProductPrice,
hp.productQuantity,
hp.productRefundAmount,
hp.localProductRefundAmount,
hp.isImpression,
hp.isClick,
hpc.index as hpcIndex,
hpc.value as hpcValue,
hpCustomMetrics.index as cusomMetricsIndex,
hpCustomMetrics.value as cusomMetricsValue,
hp.productListName,
hp.productListPosition,
hp.productCouponCode,
hpromotion.promoId, 
hpromotion.promoName,
hpromotion.promoCreative,
hpromotion.promoPosition,
h.promotionActionInfo.promoIsView,
h.promotionActionInfo.promoIsClick,
h.refund.refundAmount,
h.refund.localRefundAmount,
h.eCommerceAction.action_type,
h.eCommerceAction.step,
h.eCommerceAction.option,
hExperiment.experimentId,
hExperiment.experimentVariant,
h.publisher.dfpClicks,
h.publisher.dfpImpressions,
h.publisher.dfpMatchedQueries,
h.publisher.dfpMeasurableImpressions,
h.publisher.dfpQueries,
h.publisher.dfpRevenueCpm,
h.publisher.dfpRevenueCpc,
h.publisher.dfpViewableImpressions,
h.publisher.dfpPagesViewed,
h.publisher.adsenseBackfillDfpClicks,
h.publisher.adsenseBackfillDfpImpressions,
h.publisher.adsenseBackfillDfpMatchedQueries,
h.publisher.adsenseBackfillDfpMeasurableImpressions,
h.publisher.adsenseBackfillDfpQueries,
h.publisher.adsenseBackfillDfpRevenueCpm,
h.publisher.adsenseBackfillDfpRevenueCpc,
h.publisher.adsenseBackfillDfpViewableImpressions,
h.publisher.adsenseBackfillDfpPagesViewed,
h.publisher.adxBackfillDfpClicks,
h.publisher.adxBackfillDfpImpressions,
h.publisher.adxBackfillDfpMatchedQueries,
h.publisher.adxBackfillDfpMeasurableImpressions,
h.publisher.adxBackfillDfpQueries,
h.publisher.adxBackfillDfpRevenueCpm,
h.publisher.adxBackfillDfpRevenueCpc,
h.publisher.adxBackfillDfpViewableImpressions,
h.publisher.adxBackfillDfpPagesViewed,
h.publisher.adxClicks,
h.publisher.adxImpressions,
h.publisher.adxMatchedQueries,
h.publisher.adxMeasurableImpressions,
h.publisher.adxQueries,
h.publisher.adxRevenue,
h.publisher.adxViewableImpressions,
h.publisher.adxPagesViewed,
h.publisher.adsViewed,
h.publisher.adsUnitsViewed,
h.publisher.adsUnitsMatched,
h.publisher.viewableAdsViewed,
h.publisher.measurableAdsViewed,
h.publisher.adsPagesViewed,
h.publisher.adsClicked,
h.publisher.adsRevenue,
h.publisher.dfpAdGroup,
h.publisher.dfpAdUnits,
h.publisher.dfpNetworkId,
hcustomVariables.index as hcustomVariableIndex,
hcustomVariables.customVarName,
hcustomVariables.customVarValue,
hcustomDimensions.index as customDimensionsIndex,
hcustomDimensions.value as customDimensionsvalue,
hcustomMetrics.index as hcustoMetricsIndex,
hcustomMetrics.value as hcustomMetricsValue,
h.type,
h.social.socialInteractionNetwork,
h.social.socialInteractionAction,
h.social.socialInteractions,
h.social.socialInteractionTarget,
h.social.socialNetwork,
h.social.uniqueSocialInteractions,
h.social.hasSocialSourceReferral,
h.social.socialInteractionNetworkAction,
h.latencyTracking.pageLoadSample,
h.latencyTracking.pageLoadTime,
h.latencyTracking.pageDownloadTime,
h.latencyTracking.redirectionTime,
h.latencyTracking.speedMetricsSample,
h.latencyTracking.domainLookupTime,
h.latencyTracking.serverConnectionTime,
h.latencyTracking.serverResponseTime,
h.latencyTracking.domLatencyMetricsSample,
h.latencyTracking.domInteractiveTime,
h.latencyTracking.domContentLoadedTime,
h.latencyTracking.userTimingValue,
h.latencyTracking.userTimingSample,
h.latencyTracking.userTimingVariable,
h.latencyTracking.userTimingCategory,
h.latencyTracking.userTimingLabel,
sourcePropertyInfo.sourcePropertyDisplayName,   
sourcePropertyInfo.sourcePropertyTrackingId,
h.contentGroup.contentGroup1,
h.contentGroup.contentGroup2,
h.contentGroup.contentGroup3,
h.contentGroup.contentGroup4,
h.contentGroup.contentGroup5,
h.contentGroup.previousContentGroup1,
h.contentGroup.previousContentGroup2,
h.contentGroup.previousContentGroup3,
h.contentGroup.previousContentGroup4,
h.contentGroup.previousContentGroup5,
h.contentGroup.contentGroupUniqueViews1,
h.contentGroup.contentGroupUniqueViews2,
h.contentGroup.contentGroupUniqueViews3,
h.contentGroup.contentGroupUniqueViews4,
h.contentGroup.contentGroupUniqueViews5,
h.dataSource,
hpublisher.dfpClicks as hpublisherDfpclicks,
hpublisher.dfpImpressions as hpublisherDfpImpressions,
hpublisher.dfpMatchedQueries as hpublisherDfpMatchedQueries,
hpublisher.dfpMeasurableImpressions as hpublisherDfpMeasurableImpressions,
hpublisher.dfpQueries as hpublisherDfpQueries,
hpublisher.dfpRevenueCpm as hpublisherDfpRevenueCpm,
hpublisher.dfpRevenueCpc as hpublisherDfpRevenueCpc,
hpublisher.dfpViewableImpressions as hpublisherDfpViewableImpressions,
hpublisher.dfpPagesViewed as hpublisherDfpPagesViewed,
hpublisher.adsenseBackfillDfpClicks as hpublisherAdsenseBackfillDfpClicks,
hpublisher.adsenseBackfillDfpImpressions as hpublisherAdsenseBackfillDfpImpressions,
hpublisher.adsenseBackfillDfpMatchedQueries as hpublisherAdsenseBackfillDfpMatchedQueries,
hpublisher.adsenseBackfillDfpMeasurableImpressions as hpublisherAdsenseBackfillDfpMeasurableImpressions,
hpublisher.adsenseBackfillDfpQueries as hpublisherAdsenseBackfillDfpQueries,
hpublisher.adsenseBackfillDfpRevenueCpm as hpublisherAdsenseBackfillDfpRevenueCpm,
hpublisher.adsenseBackfillDfpRevenueCpc as hpublisherAdsenseBackfillDfpRevenueCpc,
hpublisher.adsenseBackfillDfpViewableImpressions as hpublisherAdsenseBackfillDfpViewableImpressions,
hpublisher.adsenseBackfillDfpPagesViewed as hpublisherAdsenseBackfillDfpPagesViewed,
hpublisher.adxBackfillDfpClicks as hpublisherAdxBackfillDfpClicks,
hpublisher.adxBackfillDfpImpressions as hpublisherAdxBackfillDfpImpressions,
hpublisher.adxBackfillDfpMatchedQueries as hpublisherAdxBackfillDfpMatchedQueries,
hpublisher.adxBackfillDfpMeasurableImpressions as hpublisherAdxBackfillDfpMeasurableImpressions,
hpublisher.adxBackfillDfpQueries as hpublisherAdxBackfillDfpQueries,
hpublisher.adxBackfillDfpRevenueCpm as hpublisherAdxBackfillDfpRevenueCpm,
hpublisher.adxBackfillDfpRevenueCpc as hpublisherAdxBackfillDfpRevenueCpc,
hpublisher.adxBackfillDfpViewableImpressions as hpublisherAdxBackfillDfpViewableImpressions,
hpublisher.adxBackfillDfpPagesViewed as hpublisherAdxBackfillDfpPagesViewed,
hpublisher.adxClicks as hpublisherAdxClicks,
hpublisher.adxImpressions as hpublisherAdxImpressions,
hpublisher.adxMatchedQueries as hpublisherAdxMatchedQueries,
hpublisher.adxMeasurableImpressions as hpublisherAdxMeasurableImpressions,
hpublisher.adxQueries as hpublisherAdxQueries,
hpublisher.adxRevenue as hpublisherAdxRevenue,
hpublisher.adxViewableImpressions as hpublisherAdxViewableImpressions,
hpublisher.adxPagesViewed as hpublisherAdxPagesViewed,
hpublisher.adsViewed as hpublisherAdsViewed,
hpublisher.adsUnitsViewed as hpublisherAdsUnitsViewed,
hpublisher.adsUnitsMatched as hpublisherAdsUnitsMatched,
hpublisher.viewableAdsViewed as hpublisherViewableAdsViewed,
hpublisher.measurableAdsViewed as hpublisherMeasurableAdsViewed,
hpublisher.adsPagesViewed as hpublisherAdsPagesViewed,
hpublisher.adsClicked as hpublisherAdsClicked,
hpublisher.adsRevenue as hpublisherAdsRevenue,
hpublisher.dfpAdGroup as hpublisherDfpAdGroup,
hpublisher.dfpAdUnits as hpublisherDfpAdUnits,
hpublisher.dfpNetworkId as hpublisherDfpNetworkId,
fullVisitorId,
userId,
clientId,
channelGrouping,
socialEngagementType


FROM `project_id.dataset.table`
LEFT JOIN UNNEST(customDimensions) as cd
LEFT JOIN UNNEST(hits) as h
LEFT JOIN UNNEST(h.product) as hp
LEFT JOIN UNNEST(hp.customDimensions) as hpc
LEFT JOIN UNNEST(hp.customMetrics) as hpCustomMetrics
LEFT JOIN UNNEST(h.promotion) as hpromotion
LEFT JOIN UNNEST(h.experiment) as hExperiment
LEFT JOIN UNNEST(h.customVariables) as hcustomVariables
LEFT JOIN UNNEST(h.customDimensions) as hcustomDimensions
LEFT JOIN UNNEST(h.customMetrics) as hcustomMetrics
LEFT JOIN UNNEST(h.publisher_infos) as hpublisher"""

df = query_to_dataframe(sql_statement)
df.to_csv("sample1.csv")

错误:

连接错误

    Traceback (most recent call last)
<ipython-input-60-a51dc6a6e704> in <module>()
    340 LEFT JOIN UNNEST(h.publisher_infos) as hpublisher"""
    341 
--> 342 df = query_to_dataframe(sql_statement)
    343 df.to_csv("sample1.csv")
    344 

<ipython-input-60-a51dc6a6e704> in query_to_dataframe(sql_statement)
     16                      project_id=project_id,
     17                      credentials=credentials,
---> 18                      dialect='standard')
     19 
     20 

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\gbq.pyc in read_gbq(query, project_id, index_col, col_order, reauth, auth_local_webserver, dialect, location, configuration, credentials, private_key, verbose)
    147         auth_local_webserver=auth_local_webserver, dialect=dialect,
    148         location=location, configuration=configuration,
--> 149         credentials=credentials, verbose=verbose, private_key=private_key)
    150 
    151 

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas_gbq\gbq.pyc in read_gbq(query, project_id, index_col, col_order, reauth, auth_local_webserver, dialect, location, configuration, credentials, verbose, private_key)
    834         private_key=private_key,
    835     )
--> 836     schema, rows = connector.run_query(query, configuration=configuration)
    837     final_df = _parse_data(schema, rows)
    838 

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas_gbq\gbq.pyc in run_query(self, query, **kwargs)
    484         except self.http_error as ex:
    485             self.process_http_error(ex)
--> 486         result_rows = list(rows_iter)
    487         total_rows = rows_iter.total_rows
    488         schema = 

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\google\api_core\page_iterator.pyc in _items_iter(self)
    202     def _items_iter(self):
    203         """Iterator for each item returned."""
--> 204         for page in self._page_iter(increment=False):
    205             for item in page:
    206                 self.num_results += 1

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\google\api_core\page_iterator.pyc in _page_iter(self, increment)
    239                 self.num_results += page.num_items
    240             yield page
--> 241             page = self._next_page()
    242 
    243     @abc.abstractmethod

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\google\api_core\page_iterator.pyc in _next_page(self)
    359         """
    360         if self._has_next_page():
--> 361             response = self._get_next_page_response()
    362             items = response.get(self._items_key, ())
    363             page = Page(self, items, self.item_to_value)

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\google\cloud\bigquery\table.pyc in _get_next_page_response(self)
   1304             params["maxResults"] = self._page_size
   1305         return self.api_request(
-> 1306             method=self._HTTP_METHOD, path=self.path, query_params=params
   1307         )
   1308 

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\google\cloud\bigquery\client.pyc in _call_api(self, retry, **kwargs)
    379         if retry:
    380             call = retry(call)
--> 381         return call()
    382 
    383     def get_dataset(self, dataset_ref, retry=DEFAULT_RETRY):

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\google\api_core\retry.pyc in retry_wrapped_func(*args, **kwargs)
    268                 sleep_generator,
    269                 self._deadline,
--> 270                 on_error=on_error,
    271             )
    272 

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\google\api_core\retry.pyc in retry_target(target, predicate, sleep_generator, deadline, on_error)
    177     for sleep in sleep_generator:
    178         try:
--> 179             return target()
    180 
    181         # pylint: disable=broad-except

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\google\cloud\_http.pyc in api_request(self, method, path, query_params, data, content_type, headers, api_base_url, api_version, expect_json, _target_object)
    313             content_type=content_type,
    314             headers=headers,
--> 315             target_object=_target_object,
    316         )
    317 

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\google\cloud\_http.pyc in _make_request(self, method, url, data, content_type, headers, target_object)
    190         headers["User-Agent"] = self.USER_AGENT
    191 
--> 192         return self._do_request(method, url, headers, data, target_object)
    193 
    194     def _do_request(

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\google\cloud\_http.pyc in _do_request(self, method, url, headers, data, target_object)
    219         :returns: The HTTP response.
    220         """
--> 221         return self.http.request(url=url, method=method, headers=headers, data=data)
    222 
    223     def api_request(

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\google\auth\transport\requests.pyc in request(self, method, url, data, headers, **kwargs)
    206 
    207         response = super(AuthorizedSession, self).request(
--> 208             method, url, data=data, headers=request_headers, **kwargs)
    209 
    210         # If the response indicated that the credentials needed to be

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\requests\sessions.pyc in request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
    531         
    532         send_kwargs.update(settings)
--> 533         resp = self.send(prep, **send_kwargs)
    534 
    535         return resp

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\requests\sessions.pyc in send(self, request, **kwargs)
    644 
    645         # Send the request
--> 646         r = adapter.send(request, **kwargs)
    647 
    648         # Total elapsed time of the request (approximately)

C:\Users\asmohammad\AppData\Local\Continuum\anaconda3\lib\site-packages\requests\adapters.pyc in send(self, request, stream, timeout, verify, cert, proxies)
    496 
    497         except (ProtocolError, socket.error) as err:
--> 498             raise ConnectionError(err, request=request)
    499 
    500         except MaxRetryError as e:

ConnectionError: ('Connection aborted.', error("(10060, 'WSAETIMEDOUT')",))

【问题讨论】:

一般来说,如果您怀疑结果集太大而无法放入内存,请在查询中设置限制以证明这一点。如果为真,请尝试使用基于游标的接口一次获取一行并处理然后获取下一个。 @Zhang 我尝试使用较小的表数据它有大约 80 万条记录并且它转换成功但是当我增加大小时它失败了。所以,我不知道光标之类的东西。你能帮我理解一下吗? 这个github post 应该可以解决这个问题。 【参考方案1】:

似乎 pd.read_gbq() 正在尝试将所有内容加载到内存中并导致 MemoryError。

您可以尝试仅使用 google-cloud-bigquery 工具迭代相同查询的结果吗?例如:

from google.cloud.bigquery import Client

client = Client()
query = client.query(sql)
row_count = 0 
for row in query:
    row_count += 1

您也可以尝试使用 google-cloud-bigquery 直接转换为数据框:

from google.cloud.bigquery import Client

client = Client()
query = client.query(sql)
dataframe = query.to_dataframe()

【讨论】:

感谢张老师的意见,我会尝试上述方法,并会及时通知您。 @Zhang,我也尝试了上述方法,但没有运气,这也因内存错误而失败。 @sam,如果你尝试的是 query.to_dataframe(),post 有一个补丁,我不知道它是否正式发布到 bq API。但是,第一个应该可以正常工作吗? @Zhang 我能够运行第一个不是我要求的程序,它只会给出行数 @sam,这意味着您只需要一种将行转换为数据框的方法。尝试检查 to_dataframe 的源代码,看看库是如何做到的。

以上是关于BigQuery 代码因使用 pandas python 客户端库的 MemoryError 而失败的主要内容,如果未能解决你的问题,请参考以下文章

使用 Pandas 附加 BigQuery 表时如何修复无效架构

从 bigquery 数据集和 pandas 下载大数据

使用 Pandas 界面恢复上传到 BigQuery

无法在 BigQuery 中使用 DML 语句的作业中设置目标表

使用 Pandas 从 BigQuery 呈现 JSON 响应?

使用 Pandas/Python 获取 Google BigQuery 数据的 JSON 格式