最近一次数据迁移,需要将mysql的数据导出、处理后导入到新表和ES。这里做个简单记录,方便后续查询。
注: 为了写文章方便及隐私安全,实际内容会有所简化。例如表结构简化、数据库连接部分全部用 xxx 表示、目录及文件名均为化名等。
实践过程
原表:
book_db 库
- b_book(id,create_time,update_time,price,title,intro)
新表:
book 库
- book(id,price,title,create_time,update_time)
- book_ext(id,book_id,intro,create_time)
MySQL导出
mkdir -p /tmp/
# 导出原始数据
mysql -hxxx -uxxx -pxxx book_db --default-character-set=utf8 -e \'select id,create_time,update_time,price,title,intro from b_book\' | sed \'s/NULL//g\' > /tmp/b_book.csv
sed \'s/NULL//g\'
是因为导出的数据有些字段存的NULL,新表不需要存储NULL,所以去掉。
导出的数据每行默认以\\t
分隔,第一行包含字段名。这里我们删掉第一行:
sed -i \'1d\' /tmp/b_book.csv
数据处理
cd /tmp/
# 处理create_time,update_time,price,并生成文件 book.csv
cat b_book.csv | awk -F \'\\t\' -v OFS=\' @@@ \' \'{gsub(/[-:]/," ",$2); $2=mktime($2);gsub(/[-:]/,"",$3);$3=mktime($3);$4=$4*100;$6="";print $0}\' > book.csv
# 生成文件 book_ext.csv
cat b_book.csv | awk -F \'\\t\' -v OFS=\' @@@ \' \'{print $1,$6}\' > book_ext.csv
# 生成文件 book_es.csv
cat b_book.csv | awk -F \'\\t\' -v OFS=\' @@@ \' \'{$4=$4*100;print $0}\' > book_es.csv
因为原表里时间都是datetime格式,新表是时间戳格式,这里处理成时间戳格式。价格原表是以元为单位,这里*100
是为了处理成以分为单位。
-v OFS=\' @@@ \'
表示输出的时候每列以@@@
为分隔符。原因是原表里的intro
字段存储的是html,可能包含常用转义字符,这里使用@@@
确保能正确分隔每列。
导入到MySQL
mysql -hxxx -uxxx -pxxx book
Load Data LOCAL InFile \'/tmp/book.csv\' Into Table book
character set utf8
Fields Terminated By \' @@@ \' Enclosed By \'\' Escaped By \'\' Lines Terminated By \'\\n\'
(id,create_time,update_time,price,title);
Load Data LOCAL InFile \'/tmp/book_ext.csv\' Into Table book_ext
character set utf8
Fields Terminated By \' @@@ \' Enclosed By \'\' Escaped By \'\' Lines Terminated By \'\\n\'
(book_id,intro);
说明:
- Terminated 字段分隔符(列分隔符)。一般是空格或者
\\t
- Enclosed 字段括起字符。没有为空字符即可
- Escaped 转义字符。没有为空字符即可
- Terminated 记录分隔符(行结束符)
Into Table
代表插入,记录已存在(唯一键约束)则失败不再往下执行。Replace Into Table
代表覆盖,记录已存在则覆盖(是整条记录覆盖,没有列出的字段给默认值)。Ignore Into Table
遇到已存在直接跳过。
导入到ES
由于生产的book_es.csv
文件比较大,所以这里按20000条生成一个文件,防止文件过大,ES导入失败。
cd /tmp/
awk \'{filename = "book_es.csv." int((NR-1)/20000) ".csv"; print >> filename}\' book_es.csv
ConvertBookToEs.php
是PHP脚本,生成ES批量导入的文件。见附录。执行后生成很多book_es.csv.*.csv.json
文件。
php ConvertBookToEs.php
importToEs.sh
是ES批量导入脚本,如下:
#!/bin/bash
for file in `ls /tmp/book_es.csv.*.csv.json`
do
echo $file;
curl -XPOST http://xxx:9200/book/doc/_bulk -H "Content-Type: application/json" --data-binary "@$file" >> importToEs.log
done
执行脚本:
sh importToEs.sh
等待数分钟,便执行完毕了。
CASE WHEN 按字段更新批量更新
格式示例:
更新单值:
UPDATE categories SET
display_order = CASE id
WHEN 1 THEN 3
WHEN 2 THEN 4
WHEN 3 THEN 5
END
WHERE id IN (1,2,3)
更新多值:
UPDATE categories SET
display_order = CASE id
WHEN 1 THEN 3
WHEN 2 THEN 4
WHEN 3 THEN 5
END,
title = CASE id
WHEN 1 THEN \'New Title 1\'
WHEN 2 THEN \'New Title 2\'
WHEN 3 THEN \'New Title 3\'
END
WHERE id IN (1,2,3)
PHP封装:
/**
* 批量更新函数
* @param $data array 待更新的数据,二维数组格式
* @param array $params array 值相同的条件,键值对应的一维数组
* @param string $field string 值不同的条件,默认为id
* @return bool|string
*/
function batchUpdate($data, $field, $table, $params = [])
{
if (!is_array($data) || !$field || !is_array($params)) {
return false;
}
//in条件
$in_fields = array_column($data, $field);
$in_fields = implode(\',\', array_map(function ($value) {
return "\'" . $value . "\'";
}, $in_fields));
$updates = parseUpdate($data, $field);
$where = parseParams($params);
$sql = sprintf("UPDATE `%s` SET %s WHERE `%s` IN (%s) %s;\\n", $table, $updates, $field, $in_fields, $where);
return $sql;
}
/**
* 将二维数组转换成CASE WHEN THEN的批量更新条件
* @param $data array 二维数组
* @param $field string 列名
* @return string sql语句
*/
function parseUpdate($data, $field)
{
$sql = \'\';
$keys = array_keys(current($data));
foreach ($keys as $column) {
if ($column == $field) {//去掉ID主键
continue;
}
$sql .= sprintf("`%s` = CASE `%s` \\n", $column, $field);
foreach ($data as $line) {
$sql .= sprintf("WHEN \'%s\' THEN \'%s\' \\n", $line[$field], $line[$column]);
}
$sql .= "END,";
}
return rtrim($sql, \',\');
}
/**
* 解析where条件
* @param $params
* @return array|string
*/
function parseParams($params)
{
$where = [];
foreach ($params as $key => $value) {
$where[] = sprintf("`%s` = \'%s\'", $key, $value);
}
return $where ? \' AND \' . implode(\' AND \', $where) : \'\';
}
调用示例:
$data = [
[\'id\' => 1, \'parent_id\' => 100, \'title\' => \'A\', \'sort\' => 1],
[\'id\' => 2, \'parent_id\' => 100, \'title\' => \'A\', \'sort\' => 3],
[\'id\' => 3, \'parent_id\' => 100, \'title\' => \'A\', \'sort\' => 5],
[\'id\' => 4, \'parent_id\' => 100, \'title\' => \'B\', \'sort\' => 7],
[\'id\' => 5, \'parent_id\' => 101, \'title\' => \'A\', \'sort\' => 9],
];
echo batchUpdate($data, \'id\', "post");
生成的SQL:
UPDATE `post` SET parent_id` = CASE `id`
WHEN \'1\' THEN \'100\'
WHEN \'2\' THEN \'100\'
WHEN \'3\' THEN \'100\'
WHEN \'4\' THEN \'100\'
WHEN \'5\' THEN \'101\'
END,`title` = CASE `id`
WHEN \'1\' THEN \'A\'
WHEN \'2\' THEN \'A\'
WHEN \'3\' THEN \'A\'
WHEN \'4\' THEN \'B\'
WHEN \'5\' THEN \'A\'
END,`sort` = CASE `id`
WHEN \'1\' THEN \'1\'
WHEN \'2\' THEN \'3\'
WHEN \'3\' THEN \'5\'
WHEN \'4\' THEN \'7\'
WHEN \'5\' THEN \'9\'
END WHERE `id` IN (\'1\',\'2\',\'3\',\'4\',\'5\') ;
实现MySQL LOAD DATA按字段更新
为了将大量数据加载到MySQL中,LOAD DATA INFILE
是迄今为止最快的选择。但是,虽然这可以以INSERT IGNORE
或REPLACE
的方式使用,但目前不支持ON DUPLICATE KEY UPDATE
。
如果我们想批量更新某个字段,ON DUPLICATE KEY UPDATE
如何使用LOAD DATA INFILE
模拟?
stackoverflow 上有网友给了答案。步骤是:
1)创建一个新的临时表。
CREATE TEMPORARY TABLE temporary_table LIKE target_table;
2)从临时表中删除所有索引以加快速度。(可选)
SHOW INDEX FROM temporary_table;
DROP INDEX `PRIMARY` ON temporary_table;
DROP INDEX `some_other_index` ON temporary_table;
3)将CSV加载到临时表中
LOAD DATA INFILE \'your_file.csv\'
INTO TABLE temporary_table
Fields Terminated By \'\\t\' Enclosed By \'\' Escaped By \'\' Lines Terminated By \'\\n\'
(field1, field2);
4)使用ON DUPLICATE KEY UPDATE
复制数据
SHOW COLUMNS FROM target_table;
INSERT INTO target_table
SELECT * FROM temporary_table
ON DUPLICATE KEY UPDATE field1 = VALUES(field1), field2 = VALUES(field2);
MySQL将假定=
之前的部分引用INSERT INTO
子句中指定的列,第二部分引用SELECT
列。
5)删除临时表
DROP TEMPORARY TABLE temporary_table;
使用SHOW INDEX FROM
和SHOW COLUMNS FROM
此过程可以针对任何给定的表自动执行。
注:官方文档里
INSERT ... SELECT ON DUPLICATE KEY UPDATE
语句被标记为基于语句的复制不安全。所以上述方案请在充分测试后再实施。详见:
https://dev.mysql.com/doc/refman/5.6/en/insert-on-duplicate.html
附录
ConvertBookToEs.php
<?php
/**
* 转换wish_book为ES 批量格式(json)
*/
//id,create_time,update_time,price,title,intro
function dealBook($file)
{
$fp = fopen($file, \'r\');
while (!feof($fp)) {
$line = explode(\' @@@ \', fgets($fp, 65535));
if ($line && isset($line[1])) {
$arr_head = [
\'index\' => [
\'_id\' => (int)$line[0]
]
];
$arr = [
\'id\' => (int)$line[0],
\'create_time\' => strtotime($line[1]),
\'update_time\' => strtotime($line[2]),
\'price\' => intval($line[3]),
\'title\' => (string)$line[4],
\'intro\' => (string)$line[18],
];
file_put_contents($file . \'.json\', json_encode($arr_head, JSON_UNESCAPED_UNICODE) . PHP_EOL, FILE_APPEND);
file_put_contents($file . \'.json\', json_encode($arr, JSON_UNESCAPED_UNICODE) . PHP_EOL, FILE_APPEND);
}
}
}
try {
//处理CSV文件为es bluk json格式
//参考 https://www.elastic.co/guide/en/elasticsearch/reference/current/_batch_processing.html
$files = glob("/tmp/book_es.csv.*.csv");
if (false === $files) {
exit("can not find csv file");
}
$pids = [];
foreach ($files as $i => $file) {
$pid = pcntl_fork();
if ($pid < 0) {
exit("could not fork");
}
if ($pid > 0) {
$pids[$pid] = $pid;
} else {
echo time() . " new process, pid:" . getmypid() . PHP_EOL;
dealBook($file);
exit();
}
}
while (count($pids)) {
foreach ($pids as $key => $pid) {
$res = pcntl_waitpid($pid, $status, WNOHANG);
if ($res == -1 || $res > 0) {
echo \'Child process exit,pid \' . $pid . PHP_EOL;
unset($pids[$key]);
}
}
sleep(1);
}
} catch (Exception $e) {
$message = $e->getFile() . \':\' . $e->getLine() . \' \' . $e->getMessage();
echo $message;
}
参考
1、Linux命令行文本工具 - 飞鸿影~ - 博客园
https://www.cnblogs.com/52fhy/p/5836429.html
2、mysqldump 导出 csv 格式 --fields-terminated-by=, :字段分割符; - superhosts的专栏 - CSDN博客
https://blog.csdn.net/superhosts/article/details/26054997
3、Batch Processing | Elasticsearch Reference [6.4] | Elastic
https://www.elastic.co/guide/en/elasticsearch/reference/current/_batch_processing.html
4、mysql导入数据load data infile用法整理 - conanwang - 博客园
https://www.cnblogs.com/conanwang/p/5890753.html
5、MySQL LOAD DATA INFILE with ON DUPLICATE KEY UPDATE - Stack Overflow
https://stackoverflow.com/questions/15271202/mysql-load-data-infile-with-on-duplicate-key-update
6、mysql - INSERT INTO ... SELECT FROM ... ON DUPLICATE KEY UPDATE - Stack Overflow
https://stackoverflow.com/questions/2472229/insert-into-select-from-on-duplicate-key-update
7、MySQL :: MySQL 5.6参考手册:: 13.2.5.2 INSERT ... ON DUPLICATE KEY UPDATE语法
https://dev.mysql.com/doc/refman/5.6/en/insert-on-duplicate.html
8、复制表结构和数据SQL语句 - becket - 博客园
https://www.cnblogs.com/zhengxu/articles/2206894.html
9、MySQL批量更新数据 - 梦想_行人 - 博客园
https://www.cnblogs.com/ldj3/p/9288187.html