[同步脚本]mysql-elasticsearch同步

Posted wilderness

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[同步脚本]mysql-elasticsearch同步相关的知识,希望对你有一定的参考价值。

公司项目搜索部分用的elasticsearch,那么这两个之间的数据同步就是一个问题。

网上找了几个包,但都有各自的缺点,最后决定还是自己写一个脚本,大致思路如下:

1.在死循环中不断的select指定的表

2.读取表中更新时间晚于某个时间点的所有行 (初始化时候为"1970-01-01 00:00:00")

3.把需要的字段更新到elasticsearch

注意:1.中间要考虑到脚本中断,或者重启所以把最后的更新时间记录到了固定的txt文件

         2.为了让脚本更加通用,不至于为了一个表就大幅度更改脚本,考虑动态生成变量,使用了locals和globals

代码如下:

#!/usr/bin/env python
# coding=utf-8
import sys
sys.path.append(/Users/cangyufu/work_jbkj/elabels-flask)
from modules.utils.commons import app, redispool, db_master, db_slave
from sqlalchemy import text
import os
import datetime
import time
from service.myelasticsearch.index import es
from modules.utils.mysqldb import db_obj_dict
import datetime

CONST_SLEEP = 3

WORK_INDEX = ‘test

#https://stackoverflow.com/questions/136168/get-last-n-lines-of-a-file-with-python-similar-to-tail
def tail(f, lines=1):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
                # from the end of the file
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            # read the last block we haven‘t yet read
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count(\n)
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = ‘‘.join(reversed(blocks))
    return \n.join(all_read_text.splitlines()[-total_lines_wanted:])


def is_file_exists(filename):
    if not os.path.isfile(filename):
        file = open(filename, wb)
        file.write("1970-01-01 00:00:00\n")
        file.close()


#传入要监控的表名
def sync_main(*args):
    for table in args:
        try:
            callable(globals()[monitor_+table])
        except Exception:
            raise Exception(lack function monitor_{}.format(table))
    for table in args:
        filename = ‘‘.join([monitor_, table, .txt])
        locals()[table+path] = os.path.join(os.path.dirname(__file__), filename)
        is_file_exists(locals()[table+path])
        locals()[table+file] = open(locals()[table+path], rb+)
    try:
        print "begin"
        while True:
            count = 0
            for table in args:
                print handleing +table
                last_time = tail(locals()[table+file], 1)
                update_time = globals()[monitor_+table](last_time)
                print update_time
                if update_time == last_time:
                    count += 1
                    continue
                locals()[table + file].write(update_time+\n)
                locals()[table + file].flush()
            if count == len(args):
                time.sleep(CONST_SLEEP)
    except Exception, e:
        print e
        raise e
    finally:
        for table in args:
            locals()[table + file].close()

########################################################################################################################
#
# 如果要监控哪个表,必须要实现 函数 monitor_table_name,比如要监控table1表,就必须要实现monitor_table1函数,
#   传入参数为开始更新的起始时间,初始化时候为1970-01-01 00:00:00,返回更新到的最新的时间
#
########################################################################################################################
def monitor_table1(last_time):
    pass
    return last_time
def monitor_table2(last_time):
    pass
    return last_time
def trans_date_time(dt): 
  
return datetime.datetime.strptime(dt, "%Y-%m-%d %H:%M:%S")


sync_main(
‘table1,‘table2)

 





以上是关于[同步脚本]mysql-elasticsearch同步的主要内容,如果未能解决你的问题,请参考以下文章

20.31 expect脚本同步文件;20.32 expect脚本指定host和要同步的文件;

20.31 expect脚本同步文件 20.32 expect脚本指定host和要同步的文件 20.

mysql sqlserver数据库,实时同步,增量同步(脚本模式)

expect脚本同步文件,expect脚本指定host和要同步的文件,构建文件分发系统,批量远程执行

expect脚本同步文件expect脚本指定host和同步的文件构建文件分发系统批量远程执行命

expect脚本同步文件expect脚本指定host和要同步的文件构建文件分发系统批量远程执行