mongodb拆库分表脚本
Posted 安大叔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mongodb拆库分表脚本相关的知识,希望对你有一定的参考价值。
脚本功能:
1. 将指定的报告文件按照指定的字段、切库切表策略切分
2. 将切分后的文件并发导入到对应的Mongodb中
3. 生成日志文件和done标识文件
使用手册:
-h 打印帮助信息,并退出";
-f 需要切分的数据文件";
-g 清理昨日或历史全部数据: 1 昨日数据 2 历史全部数据";
-k 拆分字段在文件中列数,从1开始";
-o 需要切分的数据文件格式 tsv或csv ";
-d 切分的库数目";
-t 切分的表数目";
-m 切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_01";
-c 切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_0102";
-a 入库fieldFile";
-p 配置文件",
使用步骤:
1. 在配置文件中设置日志、切割后数据临时路径$LOG_HOME 和 $DATA_SPLIT_HOME目录,如果不存在,则手动创建;
在配置文件中设置目标Mongodb参数信息,用来作为导入数据的目标库;
在配置文件中设置Mongodb程序的主目录$MONGO;
2. 按照具体的参数意义,仿照下面的格式执行脚本:
举例:./mongo-split-importer.sh -f /data/shell/test.ata -g 1 -o tsv -k 3 -d 3 -t 3 -m idea -c idea -p ../conf/demeter_conf_qa.sh -a ../conf/idea-head-file
-f 切分目标文件 -o 文件格式 tsv -k 切割字段,第三个 -d 切割成3个库 -t 每个库3个表
-m 导入的mongodb未拆分名称idea -c 导入的mongodb未拆分表名idea -p 环境配置文件 -a 导入目标表的fieldFile文件 -g 清理昨日数据
mongo-split-importer.sh执行脚本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
#!/bin/bash
SPLITFILE=
""
#目标切割文件
FILEFORMAT=
""
# 目标切割文件格式 , \t
FILEFORMATNAME=
""
#切割目标文件格式名称 csv tsv
SPLITKEY=1
SPLITDBNUM=
""
#目标切割库数目
SPLITTBNUM=
""
#目标切割表数目
IMPORTDBNAME=
""
# 目标入库未分割库名
IMPORTTBNAME=
""
#目标入库未切割表名
PROFILE=
""
#配置文件
FIELDFILE=
""
#入库fieldFile
CLEAN=0
#清理数据, 0:默认不清理, 1 : 清理昨日的数据 2: 清理所有以前的数据
SPILTTMPDIR=
""
#目标切割文件存放临时目录
FULLPATH=$(cd `dirname $0`;pwd -P)
SCRIPTFILE=`basename $0`
TOTLE_RECORD_NUM=0
#文件切割前的记录条目
SUBFILE_RECORD_NUM=0
#切割后所有文件汇总的记录条目
_mongo_count=
"-1"
#------------------------------------------------函数---------------------------------------------------------------
function usage(){
echo
"$SCRIPTFILE - 分库分表后将数据导数据到mongodb"
echo
"SYNOPSIS"
echo
"OPTIONS"
echo
" -h 打印帮助信息,并退出"
;
echo
" -f 需要切分的数据文件"
;
echo
" -g 是否清理历史数据,默认不清理 1:清理昨日数据 2:清理以前所有数据"
;
echo
" -k 拆分字段在文件中列数,从1开始"
;
echo
" -o 需要切分的数据文件格式 tsv或csv "
;
echo
" -d 切分的库数目"
;
echo
" -t 切分的表数目"
;
echo
" -m 切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_01"
;
echo
" -c 切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_0102"
;
echo
" -a 入库fieldFile"
;
echo
" -p 配置文件,绝对或相对路径文件"
,
exit
}
function setFileFormat(){
FILEFORMATNAME=$1
case $1
in
csv) FILEFORMAT=
","
;;
tsv) FILEFORMAT=
"\t"
;;
*) echo
"unknow profile -o $1"
; usage;;
esac
}
while
getopts
‘:hf:g:o:k:d:t:a:p:m:c:‘
OPTION
do
case
$OPTION
in
h) usage;;
f) SPLITFILE=
$OPTARG
;;
g)CLEAN=
$OPTARG
;;
o) setFileFormat
$OPTARG
;;
k) SPLITKEY=
$OPTARG
;;
d) SPLITDBNUM=
$OPTARG
;;
t) SPLITTBNUM=
$OPTARG
;;
a) FIELDFILE=
$OPTARG
;;
p) PROFILE=
$OPTARG
;;
m) IMPORTDBNAME=
$OPTARG
;;
c) IMPORTTBNAME=
$OPTARG
;;
:) echo
"选项 \"-$OPTARG\" 后面缺少对应值, 将使用默认值"
;;
\?)echo
" 错误的选项 -$OPTARG, 将退出"
; usage;;
esac
done
#记录日志信息
function logInfo(){
echo
"[`date +"
%Y
-
%m
-
%d
%H
:
%M
:
%S
"`] [email protected] "
| tee -a
$LOGFILE
}
function checkError(){
if
[ $? -ne 0 ]; then
echo
"[`date +"
%Y
-
%m
-
%d
%H
:
%M
:
%S
,
%s
"`][$SCRIPTFILE, $$] ERROR OCCURS! - $1"
| tee -a
$ERRORFILE
exit
1;
fi
}
function check_ready() {
tmp_done_file=`
printf
"$reportDoneFile"
"$TABLE"
"$1"
`
while
[
"$isok"
=
"false"
];
do
rsync --list-only ${tmp_done_file}
if
[ $? -eq 0 ]; then
isok=
"true"
;
break;
fi
if
[
"$isok"
=
"false"
]; then
sleep
300
fi
time_now=`date +
%s
`
if
[ `expr ${time_now} - ${time_start}` -ge
$max_interval
]; then
return
255;
fi
done
return
0;
}
#从数据库列表里选择主库
function selectMongoMaster(){
tmp=
"TARGET_MONGO_HOST_LIST_0$1"
TMP_HOST=${!tmp}
echo
$TMP_HOST
#replica set
for
DUBHE_MONGO_HOST in
$TMP_HOST
;
do
if
[ $? -eq 0 ] ; then
break;
fi
done
# single server
#for DUBHE_MONGO_HOST in $TMP_HOST; do
#TARGET_MONGO_HOST=$DUBHE_MONGO_HOST
#echo $TARGET_MONGO_HOST
#done
}
#切割
function
split
() {
logInfo
"spilt data file"
echo
"split db num"
$SPLITDBNUM
echo
"split tb num"
$SPLITTBNUM
echo
"Start to split file: "
$SPLITFILE
awk ‘
BEGIN {
FS=
"‘${FILEFORMAT}‘"
;
}
ARGIND==1{
#分库分表
DBN=$
‘${SPLITKEY}‘
%
‘${SPLITDBNUM}‘
+ 1;
TBN=
int
($
‘${SPLITKEY}‘
/
‘${SPLITDBNUM}‘
)
TBN=TBN %
‘${SPLITTBNUM}‘
+ 1;
DBN=
"0"
DBN;
TBN=
"0"
TBN;
print
$0 >
"‘${SPILTTMPDIR}‘"
"/"
"‘${IMPORTTBNAME}‘"
"_"
DBN
""
TBN
}
END {
}
‘ ${SPLITFILE};
ls
$SPILTTMPDIR
echo
"Split file successfully : "
$SPLITFILE
}
#导入
function
import
() {
#importData
local
iter=1;
while
[
$iter
-le
$SPLITDBNUM
];
do
thread_import
$iter
&
iter=`expr
$iter
+ 1`
done
#wait for child-threads
wait
;
}
#导入子线程
function thread_import() {
local
num=1;
targetFileName=
$IMPORTTBNAME
"_0"
$1
"0"
$num
targetFile=
$SPILTTMPDIR
/
$IMPORTTBNAME
"_0"
$1
"0"
$num
targetDB=
$IMPORTDBNAME
"_0"
$1
targetCollection=
$IMPORTTBNAME
"_0"
$1
"0"
$num
if
[ ! -f
$targetFile
]; then
logInfo
"spilt file does not exits : "
$targetFile
num=`expr
$num
+ 1`
continue
fi
user=
"TARGET_MONGO_USER_0"
$1
TMP_USER=${!user}
password=
"TARGET_MONGO_PWD_0"
$1
TMP_PASSWORD=${!password}
#选择master
selectMongoMaster $1;
#clean dirty data
if
[
$CLEAN
-gt 0 ]; then
logInfo
"$qdate $targetDB.$targetCollection cleaning up dirty data in mongodb"
clean_dirty_data
checkError
"whether error occurs during cleaning dirty data from mongodb"
fi
#import data
import2mongo $1
$targetFile
$targetDB
$targetCollection
#record done file
statusfile=
"$STATUS_LOG_HOME/$targetFileName.done.`date -d $qdate +"
%Y
-
%m
-
%d
"`"
touch
$statusfile
num=`expr
$num
+ 1`
done
logInfo
"thread $1 ends"
}
#把指定的文件导到指定的库指定的表,并建立索引,mongodb自身会判断索引是否存在
#不存在的情况下才创建新索引
function import2mongo(){
if
[
"$FIELDFILE"
!=
""
]; then
MONGO_FIELD_FILE=
$FIELDFILE
else
MONGO_FIELD_FILE=
$FULLPATH
/../conf/${IMPORTTBNAME}-head-file
fi
DATAFILE=$2
if
[ ! -f
$DATAFILE
]; then
logInfo
"mongodb [${DB}.${COLL}] imported 0 objects"
return
0
fi
TMPLOGFILE=
$INFO_LOG_HOME
/
$DB
.
$COLL
.tmp.
log
tmp=$?
if
[
"$tmp"
!=
"0"
]; then
return
$tmp
fi
#data check
_mongo_count=`tail
$TMPLOGFILE
|
grep
imported`
_mongo_count=`expr 0
$_mongo_count
+ 0`
#start to ensure index
ensureIndex
logInfo
"mongodb [${DB}.${COLL}] imported $_mongo_count objects"
return
$tmp
}
function ensureIndex(){
}
#垃圾数据清理
function clean_dirty_data(){
day=`date -d ${1:-
‘ -1day‘
} +
"%y%m%d"
`
if
[
$CLEAN
-eq 1 ]; then
_mongo_condition=
"{\"_id\":{\"\$gte\":\"${day}_0\",\"\$lte\":\"${day}_9\"}}"
else
_mongo_condition=
"{\"_id\":{\"\$lte\":\"${day}_9\"}}"
fi
logInfo
"waiting for the clean task.."
echo
$_mongo_condition
tmp=$?
if
[
"$tmp"
!=
"0"
]; then
return
$tmp
fi
sleep
5s
logInfo
"dirty data cleaned: "
$targetDB
$targetCollection
$dirtyCount
echo
"dirty data cleaned: "
$targetDB
$targetCollection
$dirtyCount
return
$tmp
}
#parameter check
function checkParams() {
if
[ 1 -ne
$CLEAN
-a 2 -ne
$CLEAN
]; then
logInfo
"-g the parameter clean is not in [1, 2] : "
$CLEAN
return
1;
fi
if
[
$FILEFORMAT
!=
","
-a
$FILEFORMAT
!=
"\t"
]; then
logInfo
"-o the parameter file format is not in [csv, tsv] : "
$FILEFORMAT
return
1;
fi
if
[
$SPLITKEY
-lt 1 ]; then
logInfo
"-k split key must not be less than 1 : "
$SPLITKEY
return
1;
fi
if
[
$SPLITDBNUM
-lt 1 ]; then
logInfo
"-d database number must not be less than 1 : "
$SPLITDBNUM
return
1;
fi
if
[
$SPLITTBNUM
-lt 1 ]; then
logInfo
"-t collection number must not be less than 1 : "
$SPLITTBNUM
return
1;
fi
if
[ ! -f
$FIELDFILE
]; then
logInfo
"-a field file is not a common file or not exits : "
$FIELDFILE
return
1;
fi
if
[
""
=
$IMPORTDBNAME
] ; then
logInfo
"-m import database name is empty : "
$IMPORTDBNAME
return
1;
fi
if
[
""
=
$IMPORTTBNAME
] ; then
logInfo
"-m import table name is empty : "
$IMPORTTBNAME
return
1;
fi
}
#主函数
function main() {
set +x
echo
"check split file and profile: "
$SPLITFILE
$PROFILE
if
[ ! -f
$SPLITFILE
]; then
echo
"-f split file is not a common file or not exits : "
$SPLITFILE
return
1;
fi
if
[ ! -f
$PROFILE
]; then
echo
"-p profile file is not a common file or not exits : "
$PROFILE
return
1;
fi
source
$PROFILE
qdate=`date +
"%Y-%m-%d"
`
last_day=`date -d
"-1day"
+
"%Y-%m-%d"
`
BASEFILENAME=$(basename
$SPLITFILE
)
echo
"base split file name is : "
$BASEFILENAME
if
[ ! -d
$LOG_HOME
] ; then
logInfo
" log home is not a common directory or not exits : "
$LOG_HOME
return
1;
fi
LOGFILE=
$INFO_LOG_HOME
/
$BASEFILENAME
.
$qdate
.
log
if
[ -f
$LOGFILE
]; then
mv
$LOGFILE
$LOGFILE
.
$last_day
fi
touch
$LOGFILE
ERRORFILE=
$ERROR_LOG_HOME
/
$BASEFILENAME
.error.
log
if
[ -f
$ERRORFILE
]; then
mv
$ERRORFILE
$ERRORFILE
.
$last_day
fi
touch
$ERRORFILE
#空行
echo
echo
logInfo
"start to check parameters!"
checkParams
checkError
"whether error occurs during check parameters : $SPLITFILE"
#空行
echo
echo
logInfo
"start to split file: "
$SPLITFILE
if
[ ! -d
$DATA_SPLIT_HOME
] ; then
logInfo
" data split home is not a common directory or not exits : "
$DATA_SPLIT_HOME
return
1;
fi
SPILTTMPDIR=
$DATA_SPLIT_HOME
/
$BASEFILENAME
echo
"split temple directory : "
$SPILTTMPDIR
if
[ -d ${SPILTTMPDIR} ]; then
rm -rf ${SPILTTMPDIR}
fi
mkdir
-p ${SPILTTMPDIR}
split
checkError
"whether error occurs during split data : $SPLITFILE"
logInfo
"split data completely : $SPLITFILE"
statusfile=
$STATUS_LOG_HOME
/
$BASEFILENAME
".split.done."
$qdate
touch ${statusfile}
#空行
echo
echo
logInfo
"start to import split file to mongodb"
import
logInfo
"import data completely : $SPLITFILE"
statusfile=
$STATUS_LOG_HOME
/
$BASEFILENAME
".import.done."
$qdate
touch ${statusfile}
#空行
echo
echo
#remove temple directory
# if [ -d ${SPILTTMPDIR} ]; then
# rm -rf ${SPILTTMPDIR}
# fi
}
#-------------------------------------------------入口----------------------------------------------------------------
source /etc/profile
demeter_conf_cpc_qa.sh 脚本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/bin/bash
source /etc/profile
#logger path
INFO_LOG_HOME=
"${LOG_HOME}/info"
STATUS_LOG_HOME=
"${LOG_HOME}/status"
if
[ ! -d
$ERROR_LOG_HOME
]; then
if
[ ! -d
$INFO_LOG_HOME
]; then
mkdir
-p
$INFO_LOG_HOME
fi
if
[ ! -d
$STATUS_LOG_HOME
]; then
mkdir
-p
$STATUS_LOG_HOME
fi
if
[ ! -d
$DATA_HOME
]; then
mkdir
-p
$DATA_HOME
fi
#data path for source and target data path
DATA_SPLIT_HOME=/data/demeter/sdata
#import target mongodbs
TARGET_MONGO_PORT_01=XXX
TARGET_MONGO_USER_01=XXX
TARGET_MONGO_PWD_01=XXX
TARGET_MONGO_HOST_LIST_01="test01.mongodb01:
$TARGET_MONGO_PORT_01
test01.mongodb02:
$TARGET_MONGO_PORT_01
test01.mongodb03:$
TARGET_MONGO_PORT_01"
TARGET_MONGO_PORT_02=XXX
TARGET_MONGO_USER_02=XXX
TARGET_MONGO_PWD_02=XXX
TARGET_MONGO_HOST_LIST_02="testt02.mongodb01:
$TARGET_MONGO_PORT_02
test02.mongodb02:
$TARGET_MONGO_PORT_02
test02.mongodb03:$
TARGET_MONGO_PORT_02"
TARGET_MONGO_PORT_03=XXX
TARGET_MONGO_USER_03=XXX
TARGET_MONGO_PWD_03=XXX
TARGET_MONGO_HOST_LIST_03="test03.mongodb01:
$TARGET_MONGO_PORT_03
test03.mongodb02:
$TARGET_MONGO_PORT_03
test03.mongodb03:$
TARGET_MONGO_PORT_03"
#mongodb utils
MONGO=/opt/mongodb
xuri-cpc-head-file
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
a
b
c
d
e
f
g
h
i
j
k
l
m
n
0
p
q
r
s
host:
1
2
3
4
5
6
7
8
9
XX.XX.XX.XX test01.mongodb01
XX.XX.XX.XX test01.mongodb02
XX.XX.XX.XX testt01.mongodb03
XX.XX.XX.XX test02.mongodb01
XX.XX.XX.XX test02.mongodb02
XX.XX.XX.XX test02.mongodb03
XX.XX.XX.XX test03.mongodb01
XX.XX.XX.XX test03.mongodb02
XX.XX.XX.XX test03.mongodb03
以上是关于mongodb拆库分表脚本的主要内容,如果未能解决你的问题,请参考以下文章
mybatis-plus小技能: 分表策略(按年分表和按月分表)