我只想将 5GB 从 MySql 加载到 BigQuery

Posted

技术标签:

【中文标题】我只想将 5GB 从 MySql 加载到 BigQuery【英文标题】:I just want to load 5GB from MySql into BigQuery 【发布时间】:2015-01-20 07:04:12 【问题描述】:

好久不见。我想将 5GB 的数据从 mysql 获取到 BigQuery。我最好的选择似乎是某种 CSV 导出/导入。由于各种原因,它不起作用,请参阅:

agile-coral-830:splitpapers1501200518aa150120052659
agile-coral-830:splitpapers1501200545aa150120055302
agile-coral-830:splitpapers1501200556aa150120060231

这可能是因为我没有正确的 MySql 咒语能够根据 RFC 4180 生成完美的 CSV。但是,与其争论 RFC 4180 细节,整个负载业务可以通过支持可定制的多在五分钟内解决-字符字段分隔符和多字符行分隔符。我很确定我的数据既不包含### 也不包含@@@,所以以下内容会像魅力一样工作:

mysql> select * from $TABLE_NAME 
into outfile '$DATA.csv' 
fields terminated by '###' 
enclosed by ''
lines terminated by '@@@'

$ bq load  --nosync -F '###' -E '@@@' $TABLE_NAME $DATA.csv $SCHEMA.json

编辑:字段包含 '\n'、'\r'、',' 和 '"'。它们还包含 NULL,MySql 表示为 [escape]N,在示例中为“N.示例行:

"10.1.1.1.1483","5","9074080","Candidate high myopia loci on chromosomes 18p and 12q do not play a major role in susceptibility to common myopia","Results
There was no strong evidence of linkage of common myopia to these candidate regions: all two-point and multipoint heterogeneity LOD scores were < 1.0 and non-parametric linkage p-values were > 0.01. However, one Amish family showed slight evidence of linkage (LOD>1.0) on 12q; another 3 Amish families each gave LOD >1.0 on 18p; and 3 Jewish families each gave LOD >1.0 on 12q.
Conclusions
Significant evidence of linkage (LOD> 3) of myopia was not found on chromosome 18p or 12q loci in these families. These results suggest that these loci do not play a major role in the causation of common myopia in our families studied.","2004","BMC MEDICAL GENETICS","JOURNAL","N,"5","20","","","","0","1","USER","2007-11-19 05:00:00","rep1","PDFLib TET","0","2009-05-24 20:33:12"

【问题讨论】:

请从您的 CSV 行和 JSON 模式中提供几行示例。另外我上次在 MySQL ***.com/questions/24610691/…987654321@ 的输出时遇到了一个错误 【参考方案1】:

我发现通过 CSV 加载非常困难。更多的限制和并发症。今天早上我一直在忙于将数据从 MySQL 移动到 BigQuery。

Bellow 是一个 Python 脚本,它将构建表装饰器并将数据直接流式传输到 BigQuery 表中。

我的数据库在云端,所以您可能需要更改连接字符串。填写您特定情况的缺失值,然后通过以下方式调用它:

SQLToBQBatch(tableName, limit)

我设置了限制进行测试。对于我的最终测试,我发送了 999999999 作为限制,一切正常。

我建议使用后端模块在 5g 上运行它。

使用“RowToJSON”清除无效字符(即任何非 utf8 字符)。

我没有在 5gb 上测试过,但它能够在大约 20 秒内完成 50k 行。 CSV 中的相同加载时间超过 2 分钟。

我写这个是为了测试,所以请原谅糟糕的编码实践和小黑客。它可以正常工作,因此请随时清理它以用于任何生产级别的工作。

import MySQLdb
import logging
from apiclient.discovery import build
from oauth2client.appengine import AppAssertionCredentials
import httplib2

OAUTH_SCOPE = 'https://www.googleapis.com/auth/bigquery'



PROJECT_ID = 
DATASET_ID = 
TABLE_ID = 

SQL_DATABASE_NAME = 
SQL_DATABASE_DB = 
SQL_USER = 
SQL_PASS = 


def Connect():
    return MySQLdb.connect(unix_socket='/cloudsql/' + SQL_DATABASE_NAME, db=SQL_DATABASE_DB, user=SQL_USER, passwd=SQL_PASS)


def RowToJSON(cursor, row, fields):
    newData = 
    for i, value in enumerate(row):
        try:
            if fields[i]["type"] == bqTypeDict["int"]:
                value = int(value)
            else:
                value = float(value)
        except:
            if value is not None:
                value = value.replace("\x92", "'") \
                                .replace("\x96", "'") \
                                .replace("\x93", '"') \
                                .replace("\x94", '"') \
                                .replace("\x97", '-') \
                                .replace("\xe9", 'e') \
                                .replace("\x91", "'") \
                                .replace("\x85", "...") \
                                .replace("\xb4", "'") \
                                .replace('"', '""')

        newData[cursor.description[i][0]] = value
    return newData


def GetBuilder():
    return build('bigquery', 'v2',http = AppAssertionCredentials(scope=OAUTH_SCOPE).authorize(httplib2.Http()))

bqTypeDict =  'int' : 'INTEGER',
                   'varchar' : 'STRING',
                   'double' : 'FLOAT',
                   'tinyint' : 'INTEGER',
                   'decimal' : 'FLOAT',
                   'text' : 'STRING',
                   'smallint' : 'INTEGER',
                   'char' : 'STRING',
                   'bigint' : 'INTEGER',
                   'float' : 'FLOAT',
                   'longtext' : 'STRING'
                  

def BuildFeilds(table):
    conn = Connect()
    cursor = conn.cursor()
    cursor.execute("DESCRIBE %s;" % table)
    tableDecorator = cursor.fetchall()
    fields = []

    for col in tableDecorator:
        field = 
        field["name"] = col[0]
        colType = col[1].split("(")[0]
        if colType not in bqTypeDict:
            logging.warning("Unknown type detected, using string: %s", str(col[1]))
        field["type"] = bqTypeDict.get(colType, "STRING")
        if col[2] == "YES":
            field["mode"] = "NULLABLE"
        fields.append(field)
    return fields


def SQLToBQBatch(table, limit=3000):
    logging.info("****************************************************")
    logging.info("Starting SQLToBQBatch. Got: Table: %s, Limit: %i" % (table, limit))   
    bqDest = GetBuilder()
    fields = BuildFeilds(table)

    try:
        responce = bqDest.datasets().insert(projectId=PROJECT_ID, body='datasetReference' : 
                                                                'datasetId' : DATASET_ID ).execute()
        logging.info("Added Dataset")
        logging.info(responce)
    except Exception, e:
        logging.info(e)
        if ("Already Exists: " in str(e)):
            logging.info("Dataset already exists")
        else:
            logging.error("Error creating dataset: " + str(e), "Error")

    try:
        responce = bqDest.tables().insert(projectId=PROJECT_ID, datasetId=DATASET_ID, body='tableReference' : 'projectId'  : PROJECT_ID,
                                                                                               'datasetId' : DATASET_ID,
                                                                                               'tableId'  : TABLE_ID,
                                                                            'schema' : 'fields' : fields
                                                                                ).execute()
        logging.info("Added Table")
        logging.info(responce)
    except Exception, e:
        logging.info(e)
        if ("Already Exists: " in str(e)):
            logging.info("Table already exists")
        else:
            logging.error("Error creating table: " + str(e), "Error")

    conn = Connect()
    cursor = conn.cursor()

    logging.info("Starting load loop")
    count = -1
    cur_pos = 0
    total = 0
    batch_size = 1000

    while count != 0 and cur_pos < limit:
        count = 0
        if batch_size + cur_pos > limit:
            batch_size = limit - cur_pos
        sqlCommand = "SELECT * FROM %s LIMIT %i, %i" % (table, cur_pos, batch_size) 
        logging.info("Running: %s", sqlCommand)
        cursor.execute(sqlCommand)
        data = []
        for _, row in enumerate(cursor.fetchall()):
            data.append("json": RowToJSON(cursor, row, fields))
            count += 1
        logging.info("Read complete")

        if count != 0:

            logging.info("Sending request")   
            insertResponse = bqDest.tabledata().insertAll(
                                                        projectId=PROJECT_ID,
                                                        datasetId=DATASET_ID,
                                                        tableId=TABLE_ID,
                                                        body="rows":data).execute()
            cur_pos += batch_size
            total += count
            logging.info("Done %i, Total: %i, Response: %s", count, total, insertResponse)
            if "insertErrors" in insertResponse:
                logging.error("Error inserting data index: %i", insertResponse["insertErrors"]["index"])
                for error in insertResponse["insertErrors"]["errors"]:
                    logging.error(error)
        else:
            logging.info("No more rows")

【讨论】:

感谢脚本。上传 8000 万行将花费一天的大部分时间,而且我们甚至没有进入错误处理/重试。 同意,如果您在脚本中遇到问题,请告诉他们,看看我是否可以重新编写它。我也有一个将它作为新行 json 文件发送到 GCS 存储桶。不确定这是否对你更好。看了你的情况后写的 @Ryan 感谢您的脚本!我尝试更新它(到最新的 google cloud api),添加了一些简单的命令行界面并更改了 mysql-loading 机制以提高性能。你可以在这里找到脚本:github.com/inkrement/MySQLbq。但是我有以下问题:我想加载超过 300 个 Mio 行,但是有配额限制(每天/表只允许 1000 个加载操作)。有没有办法解决这个问题? 看起来你正在流式传输,所以你应该没问题,因为他们使用不同的配额:cloud.google.com/bigquery/quota-policy#streaminginserts【参考方案2】:

•	Generate google service account key
o	IAM & Admin > Service account > create_Service_account
o	Once created then create key , download and save It to the project folder on local machine – google_key.json
•	Run the code in pycharm environment after installing the packages. 



NOTE : The table data in mysql remains intact. Also , if one uses preview in BQ to see that you won’t see. Go to console and fire the query. 


o	CODE
o	import MySQLdb
from google.cloud import bigquery
import mysql.connector
import logging
import os
from MySQLdb.converters import conversions
import click
import MySQLdb.cursors
from google.cloud.exceptions import ServiceUnavailable
import sys

bqTypeDict = 'int': 'INTEGER',
              'varchar': 'STRING',
              'double': 'FLOAT',
              'tinyint': 'INTEGER',
              'decimal': 'FLOAT',
              'text': 'STRING',
              'smallint': 'INTEGER',
              'char': 'STRING',
              'bigint': 'INTEGER',
              'float': 'FLOAT',
              'longtext': 'STRING',
              'datetime': 'TIMESTAMP'
              


def conv_date_to_timestamp(str_date):
    import time
    import datetime

    date_time = MySQLdb.times.DateTime_or_None(str_date)
    unix_timestamp = (date_time - datetime.datetime(1970, 1, 1)).total_seconds()

    return unix_timestamp


def Connect(host, database, user, password):
    return mysql.connector.connect(host='',
                                    port='',
                                    database='recommendation_spark',
                                    user='',
                                    password='')


def BuildSchema(host, database, user, password, table):
    logging.debug('build schema for table %s in database %s' % (table, database))
    conn = Connect(host, database, user, password)
    cursor = conn.cursor()
    cursor.execute("DESCRIBE %s;" % table)

    tableDecorator = cursor.fetchall()
    schema = []

    for col in tableDecorator:
        colType = col[1].split("(")[0]
        if colType not in bqTypeDict:
            logging.warning("Unknown type detected, using string: %s", str(col[1]))

        field_mode = "NULLABLE" if col[2] == "YES" else "REQUIRED"
        field = bigquery.SchemaField(col[0], bqTypeDict.get(colType, "STRING"), mode=field_mode)

        schema.append(field)

    return tuple(schema)


def bq_load(table, data, max_retries=5):
    logging.info("Sending request")
    uploaded_successfully = False
    num_tries = 0

    while not uploaded_successfully and num_tries < max_retries:
        try:
            insertResponse = table.insert_data(data)

            for row in insertResponse:
                if 'errors' in row:
                    logging.error('not able to upload data: %s', row['errors'])

            uploaded_successfully = True
        except ServiceUnavailable as e:
            num_tries += 1
            logging.error('insert failed with exception trying again retry %d', num_tries)
        except Exception as e:
            num_tries += 1
            logging.error('not able to upload data: %s', str(e))


@click.command()
@click.option('-h', '--host', default='tempus-qa.hashmapinc.com', help='MySQL hostname')
@click.option('-d', '--database', required=True, help='MySQL database')
@click.option('-u', '--user', default='root', help='MySQL user')
@click.option('-p', '--password', default='docker', help='MySQL password')
@click.option('-t', '--table', required=True, help='MySQL table')
@click.option('-i', '--projectid', required=True, help='Google BigQuery Project ID')
@click.option('-n', '--dataset', required=True, help='Google BigQuery Dataset name')
@click.option('-l', '--limit', default=0, help='max num of rows to load')
@click.option('-s', '--batch_size', default=1000, help='max num of rows to load')
@click.option('-k', '--key', default='key.json',help='Location of google service account key (relative to current working dir)')
@click.option('-v', '--verbose', default=0, count=True, help='verbose')
def SQLToBQBatch(host, database, user, password, table, projectid, dataset, limit, batch_size, key, verbose):
    # set to max verbose level
    verbose = verbose if verbose < 3 else 3
    loglevel = logging.ERROR - (10 * verbose)

    logging.basicConfig(level=loglevel)

    logging.info("Starting SQLToBQBatch. Got: Table: %s, Limit: %i", table, limit)
    ## set env key to authenticate application

    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = os.path.join(os.getcwd(), key)
    print('file found')
    # Instantiates a client
    bigquery_client = bigquery.Client()
    print('Project id created')

    try:

        bq_dataset = bigquery_client.dataset(dataset)
        bq_dataset.create()
        logging.info("Added Dataset")
    except Exception as e:
        if ("Already Exists: " in str(e)):
            logging.info("Dataset already exists")
        else:
            logging.error("Error creating dataset: %s Error", str(e))



    bq_table = bq_dataset.table(table)
    bq_table.schema = BuildSchema(host, database, user, password, table)
    print('Creating schema using build schema')
    bq_table.create()
    logging.info("Added Table %s", table)

    conn = Connect(host, database, user, password)
    cursor = conn.cursor()

    logging.info("Starting load loop")
    cursor.execute("SELECT * FROM %s" % (table))

    cur_batch = []
    count = 0

    for row in cursor:
        count += 1

        if limit != 0 and count >= limit:
            logging.info("limit of %d rows reached", limit)
            break

        cur_batch.append(row)

        if count % batch_size == 0 and count != 0:
            bq_load(bq_table, cur_batch)

            cur_batch = []
            logging.info("processed %i rows", count)

    # send last elements
    bq_load(bq_table, cur_batch)
    logging.info("Finished (%i total)", count)
    print("table created")


if __name__ == '__main__':
    # run the command
    SQLToBQBatch()
    
o	Command to run the file : python mysql_to_bq.py -d 'recommendation_spark' -t temp_market_store -i inductive-cocoa-250507 -n practice123 -k key.json

【讨论】:

以上是关于我只想将 5GB 从 MySql 加载到 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

如何从巨大的 csv 文件中清除不良数据

单选按钮将值发送为 Male=on 或 Female=on。我只想将 Male/Female 发送到数据库

我只想将字符串和 int 输入到某个特定的数组,但我遇到了一些错误

我只想将裁剪后的图像保存到图库中。但它保存了原始的完整图像。如何只保存裁剪的图像?

如何从datatable中的特定列加载数据

我只想将 regex_replace 用于第一个字符(电话号码)