基于python的MySQL和redis数据同步实现(redis做缓存)

Posted 楊木木8023

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于python的MySQL和redis数据同步实现(redis做缓存)相关的知识,希望对你有一定的参考价值。

一、背景原理

1、mysql数据库

MySQL是一种关系型数据库,主要用于存放持久化数据,将数据存储在硬盘中,读取速度较慢。每次请求访问数据库时,都存在着I/O操作,如果反复频繁的访问数据库:会在反复链接数据库上花费大量时间,从而导致运行效率过慢;反复的访问数据库也会导致数据库的负载过高。所以,针对MySQL的缺点,衍生出了缓存的概念。

2、redis数据库

redis是一款非关系型数据库,是一种缓存数据库,数据存放在内存中,用于存储使用频繁的数据,这样减少访问数据库的次数,提高运行效率。所以redis数据库读取速度比较快,运行效率高。

3、二者区别与联系

(1) 类型:MySQL是关系型数据库,redis是缓存数据库;

(2) 作用:MySQL用于持久化的存储数据到硬盘,功能强大,速度较慢,基于磁盘,读写redis快,但是不受空间容量限制,性价比高;redis用于存储使用较为频繁的数据到缓存中,读取速度快,基于内存,读写速度快,也可做持久化,但是内存空间有限,当数据量超过内存空间时,需扩充内存,但内存价格贵;

(3) 需求:mysql和redis因为需求的不同,一般都是配合使用。需要高性能的地方使用Redis,不需要高性能的地方使用MySQL。存储数据在MySQL和Redis之间做同步。所以一般情形下,使用MySQL作为持久化存储数据库存储数据,使用redis作为缓存提升读取速度。

二、数据同步实现方案

本文实验,设计一个学生信息表,持久化数据存储在MySQL数据库中,然后利用redis作为缓存数据库,实现数据的快速读取。这样就需要保持redis和MySQL数据库的数据一致性,接下来,主要讲解查询和数据更新过程的数据库一致性实现。

1、查询一致性

查询数据时,由于redis作为缓存实现快速读取数据,所以首先查询redis中是否存在数据,若存在则返回查询结果,若不存在,则向MySQL数据库请求查询数据,然后由MySQL数据库返回结果。查询流程如下如所示。而且,由于本文中redis作为缓存使用,所以需要添加过期时间,也就是为redis的每条数据记录添加过期时间,若过期时间数据没有被查询则清除,若此时间内,数据被查询,则过期时间重置,这样可以定时清除查询不频繁的数据存在redis中,增加数据读取速度。

 2、数据更新一致性

更新数据时,如果先更新MySQL数据库,在尚未更新redis时,如果此时有查询进行,则redis返回尚未更新的数据,返回结果有误。所以为了避免这种情况,采用先更新redis,再更新MySQL的方案。也就是,首先查询redis是否存在要更新的数据,存在则清除redis的这条数据,进行重新添加更新,然后再更新MySQL数据库数据,当MySQL更新成功后,再次更新redis,防止MySQL更新失败,而redis更新成功的情况发生;若不存在,则更新MySQL数据库数据,MySQL更新成功后,然后再更新redis。

三、数据库设计

1、MySQL设计

设计一张学生信息表,存储学生学号、姓名、出生日期、电话号码等信息,主键为学生学号。SQL语句如下。

CREATE DATABASE python_mysql_test01;
CREATE TABLE tb_student(
	stu_id INT PRIMARY KEY NOT NULL,
	stu_name VARCHAR(20) NOT NULL,
	stu_birth DATE,
	stu_phone VARCHAR(100)
);

 2、redis设计

这里redis表的设计采用hash类型的数据,这样可以存在多个key-value对,以用户ID作为hash表的名称,stu_name、stu_birth等作为hash表的键值对,即一个记录:

hmset stu_id:1001 stu_name 'Alice' stu_birth '1990-12-15' stu_phone '15522222141'

同时设置过期时间:expire stu_id:1001 600,即过期时间为10分钟。

四、程序编写

1、查询数据

查询数据,先去缓存redis中查找,如存在数据则返回结果,若不存在,则去MySQL中查找。

# 查询数据
    def get_data(self, stu_id):
        # redis hash表名称
        find_info = 'stu_id:' + str(stu_id)

        # 先查询redis数据库是否存在数据,如果存在数据则返回输出,若不存在则去MySQL中查询,然后再将结果更新到redis中
        result = self.r0.hgetall(find_info)
        # 长度>0 即redis存在查询的信息,直接输出信息,否则redis中不存在,需要查询MySQL
        if len(result) > 0:
            """
            每次在redis中更新或者写入数据都需要设置过期时间10分钟,然后每查询到一次就重置过期时间10分钟,
            若10分钟没有查询到这个数据,就会被清除。这样设置过期时间主要防止redis缓存数据过多,清除不常用缓存数据"""
            self.r0.expire(find_info, 600)
            print(result)
            return result
        else:
            with self.conn.cursor() as cursor:
                try:
                    # 执行MySQL的查询操作
                    cursor.execute('select stu_name, stu_birth, stu_phone from tb_student '
                                   'where stu_id=%s', (stu_id,))
                    result_sql = cursor.fetchall()
                    print(result_sql)

                    # 将查询结果更新写入redis数据库中
                    stu_name, stu_birth, stu_phone = result_sql[0][0], result_sql[0][1], result_sql[0][2]
                    data_info = 'stu_name': stu_name,
                                 'stu_birth': str(stu_birth),
                                 'stu_phone': stu_phone
                    self.r0.hmset(find_info, data_info)
                    self.r0.expire(find_info, 600)  # 设置过期时间

                    return result_sql
                except Exception as error:
                    print(error)
                finally:
                    self.conn.close()

2、更新数据

更新数据,这里主要是以插入数据为例。

 """
    更新数据的操作,为了避免更新MySQL后,redis没更新的这一段空挡时间的查询,所以先更新redis,
    再更新MySQL,然后MySQL成功提交后,再次对redis进行重新更新
    """
    def post_data(self):
        # 插入数据
        stu_id, stu_name, stu_birth, stu_phone = 1004, 'Tom', '1993-07-04', '19909092332'
        # redis hash表名称
        find_info = 'stu_id:' + str(stu_id)

        # 先查询redis数据库是否存在数据,如果存在数据则更新redis,再更新MySQL,若不存在则去MySQL中更新,提交成功再次更新redis
        result = self.r0.hgetall(find_info)
        # reids存在数据,则需要对数据进行更新,即先清除再写入; 写入redis后,再将数据写入MySQL
        if len(result) > 0:
            # 清除数据
            all_keys = self.r0.hkeys(find_info)
            self.r0.hdel(find_info, *all_keys)
            data_info = 'stu_name': stu_name,
                         'stu_birth': stu_birth,
                         'stu_phone': stu_phone
            self.r0.hmset(find_info, data_info)
            self.r0.expire(find_info, 600)  # 设置过期时间

            with self.conn.cursor() as cursor:
                try:
                    # 插入SQL语句,result为返回的结果
                    res_info = cursor.execute(
                        'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
                    )
                    # 成功插入后需要提交才能同步在数据库中
                    if isinstance(res_info, int):
                        print('数据更新成功')
                        self.conn.commit()
                        all_keys = self.r0.hkeys(find_info)
                        # 再次更新redis
                        self.r0.hdel(find_info, *all_keys)
                        self.r0.hmset(find_info, data_info)
                        self.r0.expire(find_info, 600)  # 设置过期时间
                except MySQLError as error:
                    # 如果MySQL提交不成功,清除redis数据
                    all_keys = self.r0.hkeys(find_info)
                    self.r0.hdel(find_info, *all_keys)
                    print(error)
                    self.conn.rollback()
                finally:
                    # 操作执行完成后,需要关闭连接
                    self.conn.close()
        else:
            with self.conn.cursor() as cursor:
                try:
                    # 插入SQL语句,result为返回的结果
                    res_info = cursor.execute(
                        'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
                    )
                    # 成功插入后需要提交才能同步在数据库中
                    if isinstance(res_info, int):
                        print('数据更新成功')
                        self.conn.commit()
                except MySQLError as error:
                    print(error)
                    self.conn.rollback()
                finally:
                    # 操作执行完成后,需要关闭连接
                    self.conn.close()

附录代码:

import pymysql
import redis
from pymysql import MySQLError
import time, datetime


class DatabaseSync:
    def __init__(self):
        # 连接MySQL数据库
        try:
            self.conn = pymysql.connect(host='1.1.1.1', port=3306,
                                        user='root', password='111111',
                                        database='python_mysql_test01', charset='utf8')
        except Exception as error:
            print('连接MySQL出现问题!')
            print('失败原因:', error)
            exit()

        try:
            # 建立redis连接池
            self.conn_pool = redis.ConnectionPool(host='1.1.1.1', port=6379, db=0, decode_responses=True,
                                                  password='111111')
            # 客户端0连接数据库
            self.r0 = redis.StrictRedis(connection_pool=self.conn_pool)
        except Exception as error:
            print('连接redis出现问题!')
            print('失败原因:', error)
            exit()

    # 查询数据
    def get_data(self, stu_id):
        # redis hash表名称
        find_info = 'stu_id:' + str(stu_id)

        # 先查询redis数据库是否存在数据,如果存在数据则返回输出,若不存在则去MySQL中查询,然后再将结果更新到redis中
        result = self.r0.hgetall(find_info)
        # 长度>0 即redis存在查询的信息,直接输出信息,否则redis中不存在,需要查询MySQL
        if len(result) > 0:
            """
            每次在redis中更新或者写入数据都需要设置过期时间10分钟,然后每查询到一次就重置过期时间10分钟,
            若10分钟没有查询到这个数据,就会被清除。这样设置过期时间主要防止redis缓存数据过多,清除不常用缓存数据"""
            self.r0.expire(find_info, 600)
            print(result)
            return result
        else:
            with self.conn.cursor() as cursor:
                try:
                    # 执行MySQL的查询操作
                    cursor.execute('select stu_name, stu_birth, stu_phone from tb_student '
                                   'where stu_id=%s', (stu_id,))
                    result_sql = cursor.fetchall()
                    print(result_sql)

                    # 将查询结果更新写入redis数据库中
                    stu_name, stu_birth, stu_phone = result_sql[0][0], result_sql[0][1], result_sql[0][2]
                    data_info = 'stu_name': stu_name,
                                 'stu_birth': str(stu_birth),
                                 'stu_phone': stu_phone
                    self.r0.hmset(find_info, data_info)
                    self.r0.expire(find_info, 600)  # 设置过期时间

                    return result_sql
                except Exception as error:
                    print(error)
                finally:
                    self.conn.close()

    """
    更新数据的操作,为了避免更新MySQL后,redis没更新的这一段空挡时间的查询,所以先更新redis,
    再更新MySQL,然后MySQL成功提交后,再次对redis进行重新更新
    """
    def post_data(self):
        # 插入数据
        stu_id, stu_name, stu_birth, stu_phone = 1004, 'Tom', '1993-07-04', '19909092332'
        # redis hash表名称
        find_info = 'stu_id:' + str(stu_id)

        # 先查询redis数据库是否存在数据,如果存在数据则更新redis,再更新MySQL,若不存在则去MySQL中更新,提交成功再次更新redis
        result = self.r0.hgetall(find_info)
        # reids存在数据,则需要对数据进行更新,即先清除再写入; 写入redis后,再将数据写入MySQL
        if len(result) > 0:
            # 清除数据
            all_keys = self.r0.hkeys(find_info)
            self.r0.hdel(find_info, *all_keys)
            data_info = 'stu_name': stu_name,
                         'stu_birth': stu_birth,
                         'stu_phone': stu_phone
            self.r0.hmset(find_info, data_info)
            self.r0.expire(find_info, 600)  # 设置过期时间

            with self.conn.cursor() as cursor:
                try:
                    # 插入SQL语句,result为返回的结果
                    res_info = cursor.execute(
                        'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
                    )
                    # 成功插入后需要提交才能同步在数据库中
                    if isinstance(res_info, int):
                        print('数据更新成功')
                        self.conn.commit()
                        all_keys = self.r0.hkeys(find_info)
                        # 再次更新redis
                        self.r0.hdel(find_info, *all_keys)
                        self.r0.hmset(find_info, data_info)
                        self.r0.expire(find_info, 600)  # 设置过期时间
                except MySQLError as error:
                    # 如果MySQL提交不成功,清除redis数据
                    all_keys = self.r0.hkeys(find_info)
                    self.r0.hdel(find_info, *all_keys)
                    print(error)
                    self.conn.rollback()
                finally:
                    # 操作执行完成后,需要关闭连接
                    self.conn.close()
        else:
            with self.conn.cursor() as cursor:
                try:
                    # 插入SQL语句,result为返回的结果
                    res_info = cursor.execute(
                        'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
                    )
                    # 成功插入后需要提交才能同步在数据库中
                    if isinstance(res_info, int):
                        print('数据更新成功')
                        self.conn.commit()
                except MySQLError as error:
                    print(error)
                    self.conn.rollback()
                finally:
                    # 操作执行完成后,需要关闭连接
                    self.conn.close()


if __name__ == '__main__':
    dbs = DatabaseSync()
    # dbs.get_data(1003)

    dbs.post_data()

以上是关于基于python的MySQL和redis数据同步实现(redis做缓存)的主要内容,如果未能解决你的问题,请参考以下文章

基于python的MySQL和redis数据同步实现(redis做缓存)

使用canal解决Mysql和Redis数据同步(TCP)

使用canal解决Mysql和Redis数据同步(TCP)

基于Canal的Mysql&Redis数据同步实现

实现mysql和redis之间的触发数据同步——mysql 触发器+gearman+php.worker

使用python同步mysql到redis?由于数据较多,一条一条读出来写到redis太慢,有没有可以批量操作的。