Adwords API 并行报错(多处理断管)

Posted

技术标签:

【中文标题】Adwords API 并行报错(多处理断管)【英文标题】:Adwords API parallel report error (multiprocessing broken pipe) 【发布时间】:2016-09-20 11:41:28 【问题描述】:

希望有人可以帮助我。我正在尝试使用 Python 通过 Google AdWords API 提取数据。我需要提取存储在单个 MCC 下的多个帐户的数据。

Google 提供了并行下载的示例代码 (https://github.com/googleads/googleads-python-lib/blob/master/examples/adwords/v201607/reporting/parallel_report_download.py)。

但是示例代码错误并出现以下错误:

Traceback (most recent call last):
File         "C:/Users/casper.nygaard/Documents/WebTv/youtube/YouTube_ParallelDataRetrieval.py", line 226, in <module>
main(adwords_client, REPORT_DOWNLOAD_DIRECTORY)
File "C:/Users/casper.nygaard/Documents/WebTv/youtube/YouTube_ParallelDataRetrieval.py", line 85, in main
process.start()
File "C:\Users\casper.nygaard\AppData\Local\Programs\Python\Python35-32\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Users\casper.nygaard\AppData\Local\Programs\Python\Python35-32\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\casper.nygaard\AppData\Local\Programs\Python\Python35-32\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\Users\casper.nygaard\AppData\Local\Programs\Python\Python35-32\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\Users\casper.nygaard\AppData\Local\Programs\Python\Python35-32\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
BrokenPipeError: [Errno 32] Broken pipe

我正在运行 python 3.5,并且我认为示例代码是用 2.7 编写的(我必须添加打印括号并更改异常处理语法)。但我不知道我的错误是否相关。

据我所知,这些行的代码错误:

for process in processes:
process.start()

我不是编程专家,老实说,我不知道如何调试这个特定问题,因此非常感谢任何帮助。

我不需要并行运行,所以如果有人有 AdWords API 的示例代码,可以在没有多重处理的情况下在多个帐户上运行,那么这也是受欢迎的帮助。

【问题讨论】:

【参考方案1】:

这里有各种报告示例:https://github.com/googleads/googleads-python-lib/tree/master/examples/adwords/v201609/reporting

有点乱,但我正在运行此代码,基于 download_criteria_report_with_awql.py 示例。最后输入 MCC 账号,它会爬取所有子账号。广告系列和广告组报告有不同的功能。

#!/usr/bin/python

from googleads import adwords
import datetime

# Specify where to download the file here.
d = datetime.date.today() - datetime.timedelta(days=1)
file_date = int(d.strftime('%Y%m%d'))
FILE_DIRECTORY = './'
RESULT_FILE_NAME = ' ' + str(file_date) + '.csv'
PAGE_SIZE = 500
REPORT_PERIOD = 'LAST_7_DAYS'
# TODAY | YESTERDAY | LAST_7_DAYS | THIS_WEEK_SUN_TODAY | THIS_WEEK_MON_TODAY | LAST_WEEK |
# LAST_14_DAYS | LAST_30_DAYS | LAST_BUSINESS_WEEK | LAST_WEEK_SUN_SAT | THIS_MONTH | 20170101,20171231


def run_report(adwords_client, client_list):
import csv

print('Pulling AdWords API pull...')
first_run = True

for client in client_list:
    print('Reporting for ' + str(client))
    report = get_adgroup_report(adwords_client, client)
    report_array = report['report'].split('\n')
    reader = csv.reader(report_array, delimiter=',')

    print('Processing data to file...')
    if first_run:
        with open(FILE_DIRECTORY + '/' + RESULT_FILE_NAME, 'w', newline='\n', encoding='utf8') as csv_file:
            writer = csv.writer(csv_file, quoting=csv.QUOTE_ALL)
            writer.writerow(report['headers'])
            first_run = False
            csv_file.close()
    else:
        do_cleaning(reader, report['headers'])

    # print('Report saved.')
print('Done pulling from AdWords API.')


def do_cleaning(reader, header):
import csv

# Fields that Google reports in micro amounts that need to be 'x / 1000000'
micro_array = ['AverageCpc', 'Cost', 'CostPerConversion', 'AverageCpc', 'TargetCpa', 'CpcBid']
# Fields that need to be cleaned and converted to percentages
percent_array = ['CampaignDesktopBidModifier', 'CampaignMobileBidModifier', 'CampaignTabletBidModifier',
                 'AdGroupDesktopBidModifier', 'AdGroupMobileBidModifier', 'AdGroupTabletBidModifier',
                 'SearchExactMatchImpressionShare', 'SearchExactMatchImpressionShare', 'SearchImpressionShare',
                 'SearchRankLostImpressionShare']

with open(FILE_DIRECTORY + '/' + RESULT_FILE_NAME, 'a', newline='\n', encoding='utf8') as csv_file:
    for row in reader:
        if len(row) > 1:
            writer = csv.writer(csv_file, quoting=csv.QUOTE_ALL)

            i = 0
            post_row = []
            for field in row:
                if field == ' --':
                    field = ''

                if header[i] in percent_array and field != '':
                    field = field.replace('%', '').replace('<', '').replace('>', '').replace(' ', '')
                    field = float(field) / 100

                if header[i] in micro_array and field != '':
                    field = int(field) / 1000000

                if isinstance(field, float):
                    field = round(field, 2)

                post_row.append(field)
                i += 1
            writer.writerow(post_row)
    csv_file.close()


def get_account_hierarchy(client, client_id):
# Initialize appropriate service.
client.SetClientCustomerId(client_id)
managed_customer_service = client.GetService('ManagedCustomerService', version='v201607')

# Construct selector to get all accounts.
offset = 0
selector = 
        'fields': ['CustomerId', 'Name'],
        'paging': 
                'startIndex': str(offset),
                'numberResults': str(PAGE_SIZE)
        

more_pages = True
accounts = 
child_links = 
parent_links = 
root_account = None
# Shell account numbers (accounts that only contain other accounts)
mcc_accounts = []

while more_pages:
    # Get serviced account graph.
    page = managed_customer_service.get(selector)
    if 'entries' in page and page['entries']:
        # Create map from customerId to parent and child links.
        if 'links' in page:
            for link in page['links']:
                if link['managerCustomerId'] not in child_links:
                    child_links[link['managerCustomerId']] = []
                child_links[link['managerCustomerId']].append(link)
                if link['clientCustomerId'] not in parent_links:
                    parent_links[link['clientCustomerId']] = []
                parent_links[link['clientCustomerId']].append(link)
        # Map from customerID to account.
        for account in page['entries']:
            if account['customerId'] not in mcc_accounts:
                accounts[account['customerId']] = account
    offset += PAGE_SIZE
    selector['paging']['startIndex'] = str(offset)
    more_pages = offset < int(page['totalNumEntries'])

# Find the root account.
for customer_id in accounts:
    if customer_id not in parent_links:
        root_account = accounts[customer_id]

return accounts


def get_campaign_report(client, client_id):
client.SetClientCustomerId(client_id)
report_downloader = client.GetReportDownloader(version='v201609')

columns = ['Date', 'AccountDescriptiveName', 'DayOfWeek', 'CampaignName', 'CampaignId', 'Device', 'Impressions',
           'Clicks', 'Conversions', 'AverageCpc', 'Cost', 'AveragePosition', 'CostPerConversion',
           'BiddingStrategyType', 'BiddingStrategyId', 'BiddingStrategyName', 'CampaignDesktopBidModifier',
           'CampaignMobileBidModifier', 'CampaignTabletBidModifier', 'SearchExactMatchImpressionShare',
           'SearchImpressionShare', 'SearchRankLostImpressionShare', 'CampaignTrialType', 'BaseCampaignId']
separator = ','
report_query = ('SELECT ' + separator.join(columns) + ' FROM CAMPAIGN_PERFORMANCE_REPORT DURING ' + REPORT_PERIOD)

# with open('temp.csv', 'w') as output_file:
#    report_downloader.DownloadReportWithAwql(report_query, 'CSV', output_file, skip_report_header=True,
#                                             skip_column_header=False, skip_report_summary=True)

report = report_downloader.DownloadReportAsStringWithAwql(report_query, 'CSV', skip_report_header=True,
                                                          skip_column_header=True, skip_report_summary=True)
payload = 'report': report,
           'headers': columns
           
# print('Report done.')
return payload


def get_adgroup_report(client, client_id):
client.SetClientCustomerId(client_id)
report_downloader = client.GetReportDownloader(version='v201609')

columns = ['Date', 'AccountDescriptiveName', 'DayOfWeek', 'CampaignName', 'CampaignId', 'AdGroupName', 'AdGroupId',
           'Device', 'Impressions', 'Clicks', 'Conversions', 'Cost', 'AverageCpc', 'CostPerConversion',
           'AveragePosition', 'BiddingStrategyType', 'BiddingStrategyId', 'BiddingStrategyName', 'CpcBid',
           'TargetCpa', 'AdGroupDesktopBidModifier', 'AdGroupMobileBidModifier', 'AdGroupTabletBidModifier',
           'BaseCampaignId', 'BaseAdGroupId']
separator = ','
report_query = ('SELECT ' + separator.join(columns) + ' FROM ADGROUP_PERFORMANCE_REPORT DURING ' + REPORT_PERIOD)

# with open('temp.csv', 'w') as output_file:
#    report_downloader.DownloadReportWithAwql(report_query, 'CSV', output_file, skip_report_header=True,
#                                             skip_column_header=False, skip_report_summary=True)

report = report_downloader.DownloadReportAsStringWithAwql(report_query, 'CSV', skip_report_header=True,
                                                          skip_column_header=True, skip_report_summary=True)

payload = 'report': report,
           'headers': columns
           
# print('Report done.')
return payload


if __name__ == '__main__':
mcc_client_customer_id = 'MCC account number'
adwords_client =  adwords.AdWordsClient.LoadFromStorage('adwords_credentials.yaml')
accounts = get_account_hierarchy(adwords_client, mcc_client_customer_id)
run_report(adwords_client, accounts)

【讨论】:

以上是关于Adwords API 并行报错(多处理断管)的主要内容,如果未能解决你的问题,请参考以下文章

expdp在rac下并行报错

HBuilder运行-手机运行报错,怎么解决?

启动ngnix之后,配置文件告知第几行报错,快速查看行号小技巧~

javascript [adw-update ad params by ad groups] #adwords_scripts

laravel 命令行报错

sh脚本except行报错问题