我只想将 5GB 从 MySql 加载到 BigQuery
Posted
技术标签:
【中文标题】我只想将 5GB 从 MySql 加载到 BigQuery【英文标题】:I just want to load 5GB from MySql into BigQuery 【发布时间】:2015-01-20 07:04:12 【问题描述】:好久不见。我想将 5GB 的数据从 mysql 获取到 BigQuery。我最好的选择似乎是某种 CSV 导出/导入。由于各种原因,它不起作用,请参阅:
agile-coral-830:splitpapers1501200518aa150120052659
agile-coral-830:splitpapers1501200545aa150120055302
agile-coral-830:splitpapers1501200556aa150120060231
这可能是因为我没有正确的 MySql 咒语能够根据 RFC 4180 生成完美的 CSV。但是,与其争论 RFC 4180 细节,整个负载业务可以通过支持可定制的多在五分钟内解决-字符字段分隔符和多字符行分隔符。我很确定我的数据既不包含### 也不包含@@@,所以以下内容会像魅力一样工作:
mysql> select * from $TABLE_NAME
into outfile '$DATA.csv'
fields terminated by '###'
enclosed by ''
lines terminated by '@@@'
$ bq load --nosync -F '###' -E '@@@' $TABLE_NAME $DATA.csv $SCHEMA.json
编辑:字段包含 '\n'、'\r'、',' 和 '"'。它们还包含 NULL,MySql 表示为 [escape]N,在示例中为“N.示例行:
"10.1.1.1.1483","5","9074080","Candidate high myopia loci on chromosomes 18p and 12q do not play a major role in susceptibility to common myopia","Results
There was no strong evidence of linkage of common myopia to these candidate regions: all two-point and multipoint heterogeneity LOD scores were < 1.0 and non-parametric linkage p-values were > 0.01. However, one Amish family showed slight evidence of linkage (LOD>1.0) on 12q; another 3 Amish families each gave LOD >1.0 on 18p; and 3 Jewish families each gave LOD >1.0 on 12q.
Conclusions
Significant evidence of linkage (LOD> 3) of myopia was not found on chromosome 18p or 12q loci in these families. These results suggest that these loci do not play a major role in the causation of common myopia in our families studied.","2004","BMC MEDICAL GENETICS","JOURNAL","N,"5","20","","","","0","1","USER","2007-11-19 05:00:00","rep1","PDFLib TET","0","2009-05-24 20:33:12"
【问题讨论】:
请从您的 CSV 行和 JSON 模式中提供几行示例。另外我上次在 MySQL ***.com/questions/24610691/…987654321@ 的输出时遇到了一个错误 【参考方案1】:我发现通过 CSV 加载非常困难。更多的限制和并发症。今天早上我一直在忙于将数据从 MySQL 移动到 BigQuery。
Bellow 是一个 Python 脚本,它将构建表装饰器并将数据直接流式传输到 BigQuery 表中。
我的数据库在云端,所以您可能需要更改连接字符串。填写您特定情况的缺失值,然后通过以下方式调用它:
SQLToBQBatch(tableName, limit)
我设置了限制进行测试。对于我的最终测试,我发送了 999999999 作为限制,一切正常。
我建议使用后端模块在 5g 上运行它。
使用“RowToJSON”清除无效字符(即任何非 utf8 字符)。
我没有在 5gb 上测试过,但它能够在大约 20 秒内完成 50k 行。 CSV 中的相同加载时间超过 2 分钟。
我写这个是为了测试,所以请原谅糟糕的编码实践和小黑客。它可以正常工作,因此请随时清理它以用于任何生产级别的工作。
import MySQLdb
import logging
from apiclient.discovery import build
from oauth2client.appengine import AppAssertionCredentials
import httplib2
OAUTH_SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID =
DATASET_ID =
TABLE_ID =
SQL_DATABASE_NAME =
SQL_DATABASE_DB =
SQL_USER =
SQL_PASS =
def Connect():
return MySQLdb.connect(unix_socket='/cloudsql/' + SQL_DATABASE_NAME, db=SQL_DATABASE_DB, user=SQL_USER, passwd=SQL_PASS)
def RowToJSON(cursor, row, fields):
newData =
for i, value in enumerate(row):
try:
if fields[i]["type"] == bqTypeDict["int"]:
value = int(value)
else:
value = float(value)
except:
if value is not None:
value = value.replace("\x92", "'") \
.replace("\x96", "'") \
.replace("\x93", '"') \
.replace("\x94", '"') \
.replace("\x97", '-') \
.replace("\xe9", 'e') \
.replace("\x91", "'") \
.replace("\x85", "...") \
.replace("\xb4", "'") \
.replace('"', '""')
newData[cursor.description[i][0]] = value
return newData
def GetBuilder():
return build('bigquery', 'v2',http = AppAssertionCredentials(scope=OAUTH_SCOPE).authorize(httplib2.Http()))
bqTypeDict = 'int' : 'INTEGER',
'varchar' : 'STRING',
'double' : 'FLOAT',
'tinyint' : 'INTEGER',
'decimal' : 'FLOAT',
'text' : 'STRING',
'smallint' : 'INTEGER',
'char' : 'STRING',
'bigint' : 'INTEGER',
'float' : 'FLOAT',
'longtext' : 'STRING'
def BuildFeilds(table):
conn = Connect()
cursor = conn.cursor()
cursor.execute("DESCRIBE %s;" % table)
tableDecorator = cursor.fetchall()
fields = []
for col in tableDecorator:
field =
field["name"] = col[0]
colType = col[1].split("(")[0]
if colType not in bqTypeDict:
logging.warning("Unknown type detected, using string: %s", str(col[1]))
field["type"] = bqTypeDict.get(colType, "STRING")
if col[2] == "YES":
field["mode"] = "NULLABLE"
fields.append(field)
return fields
def SQLToBQBatch(table, limit=3000):
logging.info("****************************************************")
logging.info("Starting SQLToBQBatch. Got: Table: %s, Limit: %i" % (table, limit))
bqDest = GetBuilder()
fields = BuildFeilds(table)
try:
responce = bqDest.datasets().insert(projectId=PROJECT_ID, body='datasetReference' :
'datasetId' : DATASET_ID ).execute()
logging.info("Added Dataset")
logging.info(responce)
except Exception, e:
logging.info(e)
if ("Already Exists: " in str(e)):
logging.info("Dataset already exists")
else:
logging.error("Error creating dataset: " + str(e), "Error")
try:
responce = bqDest.tables().insert(projectId=PROJECT_ID, datasetId=DATASET_ID, body='tableReference' : 'projectId' : PROJECT_ID,
'datasetId' : DATASET_ID,
'tableId' : TABLE_ID,
'schema' : 'fields' : fields
).execute()
logging.info("Added Table")
logging.info(responce)
except Exception, e:
logging.info(e)
if ("Already Exists: " in str(e)):
logging.info("Table already exists")
else:
logging.error("Error creating table: " + str(e), "Error")
conn = Connect()
cursor = conn.cursor()
logging.info("Starting load loop")
count = -1
cur_pos = 0
total = 0
batch_size = 1000
while count != 0 and cur_pos < limit:
count = 0
if batch_size + cur_pos > limit:
batch_size = limit - cur_pos
sqlCommand = "SELECT * FROM %s LIMIT %i, %i" % (table, cur_pos, batch_size)
logging.info("Running: %s", sqlCommand)
cursor.execute(sqlCommand)
data = []
for _, row in enumerate(cursor.fetchall()):
data.append("json": RowToJSON(cursor, row, fields))
count += 1
logging.info("Read complete")
if count != 0:
logging.info("Sending request")
insertResponse = bqDest.tabledata().insertAll(
projectId=PROJECT_ID,
datasetId=DATASET_ID,
tableId=TABLE_ID,
body="rows":data).execute()
cur_pos += batch_size
total += count
logging.info("Done %i, Total: %i, Response: %s", count, total, insertResponse)
if "insertErrors" in insertResponse:
logging.error("Error inserting data index: %i", insertResponse["insertErrors"]["index"])
for error in insertResponse["insertErrors"]["errors"]:
logging.error(error)
else:
logging.info("No more rows")
【讨论】:
感谢脚本。上传 8000 万行将花费一天的大部分时间,而且我们甚至没有进入错误处理/重试。 同意,如果您在脚本中遇到问题,请告诉他们,看看我是否可以重新编写它。我也有一个将它作为新行 json 文件发送到 GCS 存储桶。不确定这是否对你更好。看了你的情况后写的 @Ryan 感谢您的脚本!我尝试更新它(到最新的 google cloud api),添加了一些简单的命令行界面并更改了 mysql-loading 机制以提高性能。你可以在这里找到脚本:github.com/inkrement/MySQLbq。但是我有以下问题:我想加载超过 300 个 Mio 行,但是有配额限制(每天/表只允许 1000 个加载操作)。有没有办法解决这个问题? 看起来你正在流式传输,所以你应该没问题,因为他们使用不同的配额:cloud.google.com/bigquery/quota-policy#streaminginserts【参考方案2】:• Generate google service account key
o IAM & Admin > Service account > create_Service_account
o Once created then create key , download and save It to the project folder on local machine – google_key.json
• Run the code in pycharm environment after installing the packages.
NOTE : The table data in mysql remains intact. Also , if one uses preview in BQ to see that you won’t see. Go to console and fire the query.
o CODE
o import MySQLdb
from google.cloud import bigquery
import mysql.connector
import logging
import os
from MySQLdb.converters import conversions
import click
import MySQLdb.cursors
from google.cloud.exceptions import ServiceUnavailable
import sys
bqTypeDict = 'int': 'INTEGER',
'varchar': 'STRING',
'double': 'FLOAT',
'tinyint': 'INTEGER',
'decimal': 'FLOAT',
'text': 'STRING',
'smallint': 'INTEGER',
'char': 'STRING',
'bigint': 'INTEGER',
'float': 'FLOAT',
'longtext': 'STRING',
'datetime': 'TIMESTAMP'
def conv_date_to_timestamp(str_date):
import time
import datetime
date_time = MySQLdb.times.DateTime_or_None(str_date)
unix_timestamp = (date_time - datetime.datetime(1970, 1, 1)).total_seconds()
return unix_timestamp
def Connect(host, database, user, password):
return mysql.connector.connect(host='',
port='',
database='recommendation_spark',
user='',
password='')
def BuildSchema(host, database, user, password, table):
logging.debug('build schema for table %s in database %s' % (table, database))
conn = Connect(host, database, user, password)
cursor = conn.cursor()
cursor.execute("DESCRIBE %s;" % table)
tableDecorator = cursor.fetchall()
schema = []
for col in tableDecorator:
colType = col[1].split("(")[0]
if colType not in bqTypeDict:
logging.warning("Unknown type detected, using string: %s", str(col[1]))
field_mode = "NULLABLE" if col[2] == "YES" else "REQUIRED"
field = bigquery.SchemaField(col[0], bqTypeDict.get(colType, "STRING"), mode=field_mode)
schema.append(field)
return tuple(schema)
def bq_load(table, data, max_retries=5):
logging.info("Sending request")
uploaded_successfully = False
num_tries = 0
while not uploaded_successfully and num_tries < max_retries:
try:
insertResponse = table.insert_data(data)
for row in insertResponse:
if 'errors' in row:
logging.error('not able to upload data: %s', row['errors'])
uploaded_successfully = True
except ServiceUnavailable as e:
num_tries += 1
logging.error('insert failed with exception trying again retry %d', num_tries)
except Exception as e:
num_tries += 1
logging.error('not able to upload data: %s', str(e))
@click.command()
@click.option('-h', '--host', default='tempus-qa.hashmapinc.com', help='MySQL hostname')
@click.option('-d', '--database', required=True, help='MySQL database')
@click.option('-u', '--user', default='root', help='MySQL user')
@click.option('-p', '--password', default='docker', help='MySQL password')
@click.option('-t', '--table', required=True, help='MySQL table')
@click.option('-i', '--projectid', required=True, help='Google BigQuery Project ID')
@click.option('-n', '--dataset', required=True, help='Google BigQuery Dataset name')
@click.option('-l', '--limit', default=0, help='max num of rows to load')
@click.option('-s', '--batch_size', default=1000, help='max num of rows to load')
@click.option('-k', '--key', default='key.json',help='Location of google service account key (relative to current working dir)')
@click.option('-v', '--verbose', default=0, count=True, help='verbose')
def SQLToBQBatch(host, database, user, password, table, projectid, dataset, limit, batch_size, key, verbose):
# set to max verbose level
verbose = verbose if verbose < 3 else 3
loglevel = logging.ERROR - (10 * verbose)
logging.basicConfig(level=loglevel)
logging.info("Starting SQLToBQBatch. Got: Table: %s, Limit: %i", table, limit)
## set env key to authenticate application
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = os.path.join(os.getcwd(), key)
print('file found')
# Instantiates a client
bigquery_client = bigquery.Client()
print('Project id created')
try:
bq_dataset = bigquery_client.dataset(dataset)
bq_dataset.create()
logging.info("Added Dataset")
except Exception as e:
if ("Already Exists: " in str(e)):
logging.info("Dataset already exists")
else:
logging.error("Error creating dataset: %s Error", str(e))
bq_table = bq_dataset.table(table)
bq_table.schema = BuildSchema(host, database, user, password, table)
print('Creating schema using build schema')
bq_table.create()
logging.info("Added Table %s", table)
conn = Connect(host, database, user, password)
cursor = conn.cursor()
logging.info("Starting load loop")
cursor.execute("SELECT * FROM %s" % (table))
cur_batch = []
count = 0
for row in cursor:
count += 1
if limit != 0 and count >= limit:
logging.info("limit of %d rows reached", limit)
break
cur_batch.append(row)
if count % batch_size == 0 and count != 0:
bq_load(bq_table, cur_batch)
cur_batch = []
logging.info("processed %i rows", count)
# send last elements
bq_load(bq_table, cur_batch)
logging.info("Finished (%i total)", count)
print("table created")
if __name__ == '__main__':
# run the command
SQLToBQBatch()
o Command to run the file : python mysql_to_bq.py -d 'recommendation_spark' -t temp_market_store -i inductive-cocoa-250507 -n practice123 -k key.json
【讨论】:
以上是关于我只想将 5GB 从 MySql 加载到 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章
单选按钮将值发送为 Male=on 或 Female=on。我只想将 Male/Female 发送到数据库
我只想将字符串和 int 输入到某个特定的数组,但我遇到了一些错误