Mysql 从库部署

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Mysql 从库部署相关的知识,希望对你有一定的参考价值。

  1 #!/usr/bin/env python
  2 # -*- coding: utf-8 -*-
  3 
  4 import paramiko,os,sys,datetime,time,mysqldb
  5 
  6 def mysql_conn(sql):
  7     try:
  8         conn = MySQLdb.connect(host = ,user = ,passwd = ,connect_timeout=5)
  9         cursor = conn.cursor(cursorclass = MySQLdb.cursors.DictCursor)
 10         cursor.execute(sql)
 11         alldata = cursor.fetchall()
 12         cursor.close()
 13         conn.close()
 14         if len(alldata)==0:
 15             return 0
 16         return alldata[0]
 17     except MySQLdb.Error,e:
 18         return 0
 19 
 20 def ssh_connect(ip,password,cmd):
 21         port=63008
 22         user="root"
 23 
 24         ssh=paramiko.SSHClient()
 25         ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 26         ssh.connect(ip,port,user,password)
 27         stdin,stdout,stderr=ssh.exec_command(cmd)
 28         data=stdout.read().strip()
 29         ssh.close()
 30         return data
 31     
 32 
 33 class Database:
 34     def __init__(self,dbport,slave_ip,stat_ip,master_ip):
 35         self.user=root
 36         self.password=
 37         self.port=63008
 38         self.today=datetime.date.today().strftime(%Y%m%d)
 39         self.dbport=dbport
 40         self.slave_ip=slave_ip
 41         self.host=master_ip
 42         self.main_id=0
 43         self.dist_id=0
 44         self.data_dir=‘‘
 45         self.version=‘‘
 46         self.m_version=‘‘
 47         self.app_name=‘‘
 48         self.m_app_name=‘‘
 49         self.conf_name=‘‘
 50         self.m_conf_name=‘‘
 51         self.STAT_IP = stat_ip
 52         self.m_dbport=0
 53 
 54     def log_w(self,text):
 55         logfile = "/home/create_slave_%s.log" %self.dbport
 56         now = time.strftime("%Y-%m-%d %H:%M:%S")
 57         tt = str(now) + "\t" + str(text) + "\n"
 58         f = open(logfile,a+)
 59         f.write(tt)
 60         f.close()
 61 
 62     def find_ip(self):#走中心应用确认本服务器是否用于从库
 63         text = "正在从中心应用确认本服务器是否用于从库,稍等......".decode("utf-8").encode("GBK")
 64         self.log_w(text)
 65         print "\033[1;32;40m%s\033[0m" % text
 66         sql = "SELECT c.main_id,c.data_dir,c.version,c.dist_id,c.ip,c.app_name FROM center_app.main_category a, center_app.sub_category b, center_app.app_info c WHERE a.id = b.main_id AND b.dist_id = c.dist_id AND b.main_id = c.main_id  AND c.flag=1 AND c.del_info!=0 AND  c.ip=‘%s‘ AND c.port=%s" %(self.slave_ip,self.dbport)
 67         alldata = mysql_conn(sql)
 68         if alldata==0:
 69             text = "    This server is not matched server,server Error,please check it !!"
 70             self.log_w(text)
 71             print "\033[1;31;40m%s\033[0m" % text
 72             sys.exit()
 73         if alldata["app_name"].find("_s") != -1:
 74             text = "    This server is slave server,server OK !!"
 75             self.log_w(text)
 76             print "\033[1;32;40m%s\033[0m" % text
 77             self.dist_id=alldata["dist_id"]
 78             self.main_id=alldata[main_id]
 79             self.version=alldata[version]
 80             self.app_name=alldata[app_name]
 81             self.m_app_name=self.app_name.split(_)[0]
 82             self.data_dir=alldata[data_dir]
 83             self.conf_name=%s-%s-%s-%s %(self.main_id,self.dist_id,self.m_app_name,self.dbport)
 84             print "STAT_IP: %s" %(self.STAT_IP)
 85             sql = "SELECT c.version FROM center_app.main_category a, center_app.sub_category b, center_app.app_info c WHERE a.id = b.main_id AND b.dist_id = c.dist_id AND b.main_id = c.main_id  AND c.main_id=%s AND c.flag=‘1‘ AND c.del_info!=0 AND c.dist_id=%s AND c.app_name=‘%s‘" %(self.main_id,self.dist_id,self.m_app_name)
 86             self.m_version=mysql_conn(sql)["version"]
 87 
 88         else:
 89             text = "    This server is not used slave server,server Error,please check it !!"
 90             self.log_w(text)
 91             print "\033[1;31;40m%s\033[0m" % text
 92             sys.exit()
 93 
 94     def check_mysql(self):#检查从库mysql数据库服务是否运行,如在运行则pkill掉,然后跳过权限表启动,为导入数据做准备
 95         text = "Check mysql now,Please wait...."
 96         self.log_w(text)
 97         print "\033[1;32;40m%s\033[0m" % text
 98         ret=os.popen("cd /root/ && rm -f /root/deploy_mysql_instance_gao.sh && wget http://208.asktao.com/multi/deploy_mysql_instance_gao.sh && echo $?").read().strip()
 99         if ret != 0:
100             text = "    mysql deploy script download fail !"
101             self.log_w(text)
102             print "\033[1;31;40m%s\033[0m" % text
103             sys.exit()
104         ret=os.popen("sh /root/deploy_mysql_instance_gao.sh %s %s %s %s %s %s %s" %(self.main_id,self.dist_id,self.app_name,self.dbport,self.version,self.data_dir,self.STAT_IP)).read().strip()
105         os.popen("rm -f /root/deploy_mysql_instance_gao.sh")
106         if ret.find(Deployment failure) != -1:
107             text = "    mysql instance exists,please check !"
108             self.log_w(text)
109             print "\033[1;31;40m%s\033[0m" % text
110             sys.exit()
111         
112         if not os.path.isdir("/usr/local/mysql%s" %self.version):
113             text = "    Mysql not install,Please install mysql !"
114             self.log_w(text)
115             print "\033[1;31;40m%s\033[0m" % text
116             sys.exit()
117         if os.popen("netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() != 0:
118             os.popen("/usr/local/mysql%s/bin/mysqladmin --socket=/tmp/mysql-%s.sock shutdown"%(self.version,self.dbport))
119             time.sleep(10)
120         while 1:
121             if os.popen("netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() == 0:
122                 conm = "/usr/local/mysql%s/bin/mysqld_safe --defaults-file=/home/mysql/etc/%s.cnf  --user=mysql --skip-grant-tables &"%(self.version,self.conf_name)
123                 os.popen(conm)
124                 break
125             else:
126                 text = "Mysql not stop,please wait..."
127                 self.log_w(text)
128                 print text
129                 time.sleep(5)
130         a = 0
131         while 1:
132             if os.popen("netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() != 0:
133                 text = "Mysql restart success !!"
134                 self.log_w(text)
135                 print "\033[1;32;40m%s\033[0m" % text
136                 break
137             else:
138                 if a == 12:
139                     text = "Mysql not Running,please start with ‘--skip-grant-tables‘ !"
140                     self.log_w(text)
141                     print "\033[1;31;40m%s\033[0m" % text
142                     sys.exit()
143                 else:
144                     a= a + 1
145                     time.sleep(5)
146 
147     def export_table(self):#导出当前主库的表结构
148         text = "Export master table and back mysql,Please wait ...."
149         self.log_w(text)
150         print "\033[1;32;40m%s\033[0m" % text
151         try:
152             s=paramiko.SSHClient()
153             s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
154             s.connect(self.host,self.port,self.user,self.password)
155             conm = "/usr/local/mysql%s/bin/mysqldump --socket=/tmp/mysql-%s.sock --add-drop-table -udump -pKPt1IVNd4BnswQpc -d -A|bzip2 -2 > /data1/script/db_back/%s_%s_table_%s_%s.bz2  && echo $?" % (self.m_version,self.m_dbport,self.main_id,self.app_name.split(_)[0],self.dist_id,self.today)
156             stdin,stdout,stderr=s.exec_command(conm)
157             result = stdout.readlines()[-1].strip()
158             s.close()
159             if result == 0:
160                 text = "    Export_table success !"
161                 self.log_w(text)
162                 print text
163             else:
164                 text = "Export_table Error !"
165                 self.log_w(text)
166                 print "\033[1;31;40m%s\033[0m" % text
167                 sys.exit()
168         except Exception,e:
169             text = "SSH connect Error haha!"
170             print e
171             self.log_w(text)
172             print "\033[1;31;40m%s\033[0m" % text
173         #    sys.exit()
174     
175     def down_back(self):#拷贝主库当天的数据库备份和表结构
176         os.popen("mkdir -p /data1/tmp/%s"%self.m_app_name)
177         local_dir= "/data1/tmp/%s/"%self.m_app_name
178         remote_dir=/data1/script/db_back/
179         try:
180             t=paramiko.Transport((self.host,self.port))
181             t.connect(username=self.user,password=self.password)
182             sftp=paramiko.SFTPClient.from_transport(t)
183             files=sftp.listdir(remote_dir)
184             text = "Download back file,Please wait ...."
185             self.log_w(text)
186             print "\033[1;32;40m%s\033[0m" % text
187             text = "    Beginning to download file  from %s  %s " % (self.host,datetime.datetime.now())
188             self.log_w(text)
189             print text
190             for f in files:
191                 if f.find(self.today) != -1 and f.find(str(self.dist_id)) != -1 and f.find("%s_%s"%(self.main_id,self.app_name.split(_)[0])) != -1:
192                     text = "        Downloading file:%s:%s" % (self.host,os.path.join(remote_dir,f))
193                     self.log_w(text)
194                     print text
195                     sftp.get(os.path.join(remote_dir,f),os.path.join(local_dir,f))
196                     #sftp.put(os.path.join(local_dir,f),os.path.join(remote_dir,f))
197             t.close()
198             text =     Download All back file success %s  % datetime.datetime.now()
199             self.log_w(text)
200             print text
201         except Exception,e:
202             text = "SFTP connect Error !"
203             self.log_w(text)
204             print "\033[1;31;40m%s\033[0m" % text
205             print e
206             sys.exit()
207     
208     def unbz2(self):#解压拷贝的数据库备份和表结构bz2包
209         text = "Decompression file,Please wait ...."
210         self.log_w(text)
211         print "\033[1;32;40m%s\033[0m" % text
212         text =     Beginning to Decompression file  from %s % datetime.datetime.now()
213         self.log_w(text)
214         print text
215         conm = bzip2 -dfk /data1/tmp/%s/%s_%s*%s_%s*.bz2 && echo $? %(self.m_app_name,self.main_id,self.app_name.split(_)[0],self.dist_id,self.today)
216         bz = os.popen(conm).read().strip()
217         if bz == 0:
218             text =     Decompression file  success %s % datetime.datetime.now()
219             self.log_w(text)
220             print text
221         else:
222             text = "Decompression Error !"
223             self.log_w(text)
224             print "\033[1;31;40m%s\033[0m" % text
225             sys.exit()
226     
227     def restart_mysql(self):
228         text = "Restart mysql Now,Please wait ...."
229         self.log_w(text)
230         print "\033[1;32;40m%s\033[0m" % text
231         os.popen("sed -i ‘s/#log-bin/log-bin/‘ /home/mysql/etc/%s.cnf" % (self.conf_name))
232         os.popen("sed -i ‘s/#binlog_format/binlog_format/‘ /home/mysql/etc/%s.cnf" % (self.conf_name))
233         os.popen("/usr/local/mysql%s/bin/mysqladmin   --socket=/tmp/mysql-%s.sock shutdown"%(self.version,self.dbport))
234         while 1:
235             if os.popen("netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() == 0:
236                 time.sleep(10)
237                 os.popen("/usr/local/mysql%s/bin/mysqld_safe --defaults-file=/home/mysql/etc/%s.cnf   --user=mysql &"%(self.version,self.conf_name))
238                 break
239             else:
240                 text = "Mysql not stop,please wait..."
241                 self.log_w(text)
242                 print text
243                 time.sleep(5)
244         a = 0
245         while 1:
246             if os.popen("netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() != 0:
247                 text = "Mysql restart success !!"
248                 self.log_w(text)
249                 print "\033[1;32;40m%s\033[0m" % text
250                 break
251             else:
252                 if a == 12:
253                     text = "Mysql restart Error,please check !!"
254                     self.log_w(text)
255                     print "\033[1;31;40m%s\033[0m" % text
256                     sys.exit()
257                 else:
258                     a= a + 1
259                     time.sleep(5)
260 
261         if self.app_name=="adb_s" and self.version=="5.1" and self.m_version=="4.0":
262             os.popen("/usr/local/mysql%s/bin/mysql_upgrade --socket=/tmp/mysql-%s.sock -udev2 -pPs6iW7-Jp39Rx5"%(self.version,self.dbport))
263     
264     def import_date(self):#导入表结构和备份数据库
265         text = "Slave import master date,Please wait ...."
266         self.log_w(text)
267         print "\033[1;32;40m%s\033[0m" % text
268         #导入表结构
269         dir =  "/data1/tmp/%s/"%self.m_app_name
270         table = %s_%s_table_%s_%s %(self.main_id,self.app_name.split(_)[0],self.dist_id,self.today)
271         conm = "/usr/local/mysql%s/bin/mysql --socket=/tmp/mysql-%s.sock  < %s%s && echo $?" % (self.version,self.dbport,dir,table)
272         result = os.popen(conm).read().strip()
273         if result == 0:
274             text = "    Import %s success !" % table
275             self.log_w(text)
276             print text
277         else:
278             text = "Import Table structure Error !"
279             self.log_w(text)
280             print "\033[1;31;40m%s\033[0m" % text
281             sys.exit()
282 
283         for f in os.listdir(dir):#导入数据库
284             if os.path.isfile(os.path.join(dir,f)) and (f.find(bz2) == -1) and (f.find(table) == -1):
285                 if f.find("%s_%s"%(self.main_id,self.app_name.split(_)[0])) != -1 and f.find(self.today) != -1 and (f.find(str(self.dist_id)) != -1):
286                     conm = "/usr/local/mysql%s/bin/mysql --socket=/tmp/mysql-%s.sock < %s && echo $?" % (self.version,self.dbport,os.path.join(dir,f))
287                     result = os.popen(conm).read().strip()
288                     if result == 0:
289                         text = "    Import %s success !" % f
290                         self.log_w(text)
291                         print text
292                     else:
293                         text = "Import Database %s Error !" %self.app_name.split(_)[0]
294                         self.log_w(text)
295                         print "\033[1;31;40m%s\033[0m" % text
296                         sys.exit()
297     
298     def slave_start(self):#启动salve
299         text = "Settings Slave,Please wait ...."
300         self.log_w(text)
301         print "\033[1;32;40m%s\033[0m" % text
302         binlog,log_pos=self.bin_pos()
303         sql = "change master to master_host=‘%s‘,master_user=‘repl‘,master_password=‘H7RYbCkGHmm_P1XO‘,master_port=%s,master_log_file=‘%s‘,master_log_pos=%s;" % (self.host,self.m_dbport,binlog,log_pos)
304         try:
305             conn = MySQLdb.connect(host = 127.0.0.1,port=self.dbport,user = ,passwd = ,connect_timeout=5)
306             cursor = conn.cursor(cursorclass = MySQLdb.cursors.DictCursor)
307             cursor.execute(sql)
308             cursor.execute("slave start;")
309             time.sleep(3)
310             cursor.execute("show slave status;")
311             alldata = cursor.fetchall()[0]
312             if alldata["Slave_IO_Running"] == "Yes" and alldata["Slave_SQL_Running"] == "Yes":
313                 text = "    Settings Slave success!"
314                 self.log_w(text)
315                 print "\033[1;32;40m%s\033[0m" % text
316                 for key in alldata:
317                     print "%21s :" % key + \t + str(alldata[key])
318                 time.sleep(5)
319                 print
320                 print "******************************************"
321                 print
322                 cursor.execute("show slave status;")
323                 alldata = cursor.fetchall()[0]
324                 for key in alldata:
325                     print "%21s :" % key + \t + str(alldata[key])
326             else:
327                 text = "    Settings Slave Error,Please check it!"
328                 self.log_w(text)
329                 print "\033[1;31;40m%s\033[0m" % text
330             cursor.close()
331             conn.close()
332         except MySQLdb.Error,e:
333             self.log_w(e)
334             print e
335             sys.exit()
336 
337     
338     def bin_pos(self):#获取主库备份前的一个bin-log文件以及它的第一个pos位置
339         dir =  "/data1/tmp/%s/"%self.m_app_name
340         for f in os.listdir(dir):
341             if os.path.isfile(os.path.join(dir,f)) and (f.find(bz2) == -1) and (f.find(table) == -1):
342                 if f.find("%s_%s"%(self.main_id,self.app_name.split(_)[0])) != -1 and f.find(self.today) != -1 and (f.find(str(self.dist_id)) != -1):
343                     binlog = f.split(_)[4]
344                     log_pos = f.split(_)[5]
345                     return binlog,log_pos
346     ‘‘‘    
347     def eth1_ip(self):
348         eth0_ip = self.m_ip()
349         try:
350             conm = "ifconfig | grep Mask | grep  192.168 | awk -F ‘:‘ ‘{print $2}‘ | awk -F ‘ ‘ ‘{print $1}‘"
351             result = ssh_connect(eth0_ip,self.password,conm)
352             if result != ‘‘:
353                 text = "    成功获取主库内网地址 :%s".decode("utf-8").encode("GBK") % result
354                 self.log_w(text)
355                 print "\033[1;32;40m%s\033[0m" % text
356                 #self.host=result
357     
358             else:
359                 text = "    无法获取主库内网地址".decode("utf-8").encode("GBK")
360                 self.log_w(text)
361                 print "\033[1;31;40m%s\033[0m" % text
362                 sys.exit()
363         except Exception,e:
364             text = "SSH connect Error !"
365             self.log_w(text)
366             print "\033[1;31;40m%s\033[0m" % text
367             sys.exit()
368     ‘‘‘
369 
370     def m_ip(self):#走中心应用确认本服务器的主库IP地址
371         text = "走中心应用确认本服务器的主库信息,稍等......".decode("utf-8").encode("GBK")
372         self.log_w(text)
373         print "\033[1;32;40m%s\033[0m" % text
374         
375         sql = "SELECT c.dist_id,c.ip,c.app_name,c.port FROM center_app.main_category a, center_app.sub_category b, center_app.app_info c WHERE a.id = b.main_id AND b.dist_id = c.dist_id AND b.main_id = c.main_id AND c.main_id=%s AND c.dist_id=%s AND c.app_name = ‘%s‘ AND c.flag=‘1‘ AND c.del_info!=0" % (self.main_id,self.dist_id,self.m_app_name)
376         alldata = mysql_conn(sql)
377         self.m_dbport=int(alldata["port"])
378         self.m_conf_name=%s-%s-%s-%s %(self.main_id,self.dist_id,self.m_app_name,self.m_dbport)
379         if alldata != 0: 
380             return alldata["ip"]
381         else:
382             text = "    中心应用没有找到本服务器主库的相关信息,程序退出!".decode("utf-8").encode("GBK")
383             self.log_w(text)
384             print "\033[1;31;40m%s\033[0m" % text
385             sys.exit()
386     
387     
388     def drop_dump_file(self):
389         text = "    删除游戏 %s %s dump文件".decode("utf-8").encode("GBK")  %(self.main_id,self.app_name.split(_)[0])
390         self.log_w(text)
391         print "\033[1;32;40m%s\033[0m" % text
392         conm="rm -f /data1/tmp/%s/%s_%s*%s_%s*" %(self.m_app_name,self.main_id,self.app_name.split(_)[0],self.dist_id,self.today)
393         os.popen(conm)
394 
395 if __name__=="__main__":
396     dbport = raw_input(Enter :Slave_dbport :)
397     print "\033[1;31;40m%s\033[0m" %(‘‘‘
398 注意事项:1、确认脚本中用于服务器连接的用户名、密码和端口之类的没有错
399           2、确认主库已经按照要求修改配置文件打开了bin-log,设置了相关参数
400           3、确认从库已经安装和主库一样版本的mysql
401           4、运行之后删除/data1/tmp/目录下的压缩包
402           5、运行格式:python create_slave.py
403           6、从库创建完成并且数据与主库追齐之后要用脚本验证数据的一致性,数据一致后整个创建从库过程完成
404           7、用于创建主从
405           8、/home/create_slave_%s.log为脚本运行日志
406           9、在执行check_mysql时,会出现Broken pipe的错误,这个是由于python调用系统命令关闭和打开mysql时显示的信息没有正确的显示在终端造成的,没有影响,暂时没有找到不让显示此类信息的方法,亟待解决
407 ‘‘‘%dbport).decode("utf-8").encode("GBK")
408     dbport=int(dbport)
409     slave_ip = raw_input(Enter :slave outer_ip :)
410     stat_ip = raw_input(Enter :stat_ip :)
411     master_ip = raw_input(Enter :master_ip :)
412     boss = Database(dbport,slave_ip,stat_ip,master_ip)
413     boss.find_ip()
414     boss.m_ip()
415     boss.check_mysql()
416     boss.export_table()
417     boss.down_back()
418     boss.unbz2()
419     boss.import_date()
420     boss.restart_mysql()
421     boss.slave_start()
422     boss.drop_dump_file()

 

以上是关于Mysql 从库部署的主要内容,如果未能解决你的问题,请参考以下文章

部署mysql主从同步

MySQL新增从库

MySQL主从复制实践与部署

MySQL主从同步原理

CentOS7.X安装部署mysql5.7主从环境

代码连MySQL从库报“Table ‘performance_schema.session_var”