如何在 Python 中进行跨进程跨脚本同步

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在 Python 中进行跨进程跨脚本同步相关的知识,希望对你有一定的参考价值。

既然是两个脚本,想必就是两个 python 进程了
IPC 的话基本没有简单的解决方案= =
基本上简单的就是文件和端口了吧= =
跨平台的话,像你说的 Windows 有 mutex 的接口,但是 Linux 还有 mmap 呢,不过这两个都不能算跨平台了
参考技术A 这里有现成的同步方法。很多。建议你试试rpy,简单足够用。 不过如果你用multiprocessing可以直接用queue。 以前还用过共享内存。SOCKET,文件,管道等。本回答被提问者采纳

深入解读Redis分布式锁

之前码甲哥写了两篇有关线程安全的文章:

  • 你管这叫线程安全?
  • .NET八股文:线程同步技术解读

分布式锁是"线程同步"的延续

最近首度应用"分布式锁",现在想想,分布式锁不是孤立的技能点,这其实就是跨主机的线程同步

进程内 跨进程 跨主机
Lock/Monitor、SemaphoreSlim Metux、Semaphore 分布式锁
用户态线程安全 内核态线程安全

单机服务器可以通过共享某堆内存来标记上锁/解锁,线程同步说到底是建立在单机操作系统的用户态/内核态对共享内存的访问控制。

而分布式服务器不是在同一台机器上:跨主机,因此需要将内存标记存储在所有机器进程都能看到的地方。

在开发很多业务场景会使用到锁,例如库存控制,抽奖等。
例如库存只剩1个商品,有三个用户同时打算购买,谁先购买库存立即清零,不能让其他二人也购买成功。

解读分布式锁

我们常说的线程安全、线程同步方案,包括此次的分布式锁都是基于“多线程/多进程对特定资源有更新操作”。

基本考量:

  1. 分布式系统,一个锁在同一时间只能被一个服务器获取 (这是分布式锁的基础)
  2. 具备锁失效机制,防止死锁 (防止某些意外,锁没有得到释放,那别人也无法得到锁)

Redis SET resource-name anystring NX EX max-lock-time 是一种最简单的分布式锁实现方案。

SET 命令支持多个参数:

  • EX seconds-- 设置过期时间(s)
  • NX -- 如果key不存在,则设置
    ......
    因为SET命令参数可以替代SETNX,SETEX,GETSET,这些命令在未来可能被废弃。

上面的命令返回OK(或经过重试),客户端就获取到这个锁;
使用DEL命令解锁;
到达超时时间会自动释放锁。

在解锁时,增加一些设计,让系统更加健壮:

  1. 不要使用固定的String值,而是使用一个不易被猜中的随机值, 业内称为token
  2. 不使用DEL命令释放锁,而是发送script去移除key

第3、4点是为了解决 :“锁提前过期,客户端A还没有执行完,然后客户端B获取了锁,这时客户端A执行完了,会不会再删锁的时候把B的锁给删掉” -- 4是3技术上的推荐实现。

脚本如下:

if redis.call("get",KEYS1] ==ARGV[1])
then
   return  redis.call("DEL",KEYS[1])
else
  return 0
end

下面使用StackExchange.Redis 写了基于以上考量的代码示例:


        /// <summary>
        /// Acquires the lock.
        /// </summary>
        /// <param name="key"></param>
        /// <param name="token">随机值</param>
        /// <param name="expireSecond"></param>
        /// <param name="waitLockSeconds">非阻塞锁</param>
        static bool Lock(string key, string token,int expireSecond=10, double waitLockSeconds = 0)
        {
            var waitIntervalMs = 50;
            bool isLock;
            
            DateTime begin = DateTime.Now;
            do
            {
                isLock = Connection.GetDatabase().StringSet(key, token, TimeSpan.FromSeconds(expireSecond), When.NotExists);
                if (isLock)
                    return true;

                //不等待锁则返回
                if (waitLockSeconds == 0) break;
                //超过等待时间,则不再等待
                if ((DateTime.Now - begin).TotalSeconds >= waitLockSeconds) break;

                Thread.Sleep(waitIntervalMs);
            } while (!isLock);
            return false;
        }

        /// <summary>  
        /// Releases the lock.  
        /// </summary>  
        /// <returns><c>true</c>, if lock was released, <c>false</c> otherwise.</returns>  
        /// <param name="key">Key.</param>  
        /// <param name="value">value</param>  
        static bool UnLock(string key, string value)
        {
            string lua_script = @"  
                if (redis.call(\'GET\', KEYS[1]) == ARGV[1]) then  
                    redis.call(\'DEL\', KEYS[1])  
                    return true  
                else  
                    return false  
                end  
                ";

            try
            {
                var res = Connection.GetDatabase().ScriptEvaluate(lua_script,
                                                           new RedisKey[] { key },
                                                           new RedisValue[] { value });
                return (bool)res;
            }
            catch (Exception ex)
            {
                Console.WriteLine($"ReleaseLock lock fail...{ex.Message}");
                return false;
            }
        }
        
        private static Lazy<ConnectionMultiplexer> lazyConnection = new Lazy<ConnectionMultiplexer>(() =>
        {
            ConfigurationOptions configuration = new ConfigurationOptions
            {
                AbortOnConnectFail = false,
                ConnectTimeout = 5000,
            };

            configuration.EndPoints.Add("10.100.219.9", 6379);

            return ConnectionMultiplexer.Connect(configuration.ToString());
        }); 
        public static ConnectionMultiplexer Connection => lazyConnection.Value;

以上代码新增了第五点考量:

  1. 为避免无限制抢锁,增加了非阻塞锁: 轮询_s等待锁,未等到则不再抢锁

使用方式:

下面并行开启三个任务,减少库存:

  static void Main(string[] args)
        {
            // 尝试并行执行3个任务
            Parallel.For(0, 3, x =>
            {
                string token = $"loki:{x}";
                bool isLocked = Lock("loki", token, 5, 10);
            
                if (isLocked)
                {
                    Console.WriteLine($"{token} begin reduce stocks (with lock) at {DateTime.Now}.");
                    Thread.Sleep(1000);
                    Console.WriteLine($"{token} release lock {UnLock("loki", token)} at {DateTime.Now}. ");
                }
                else
                {
                    Console.WriteLine($"{token} begin reduce stocks at {DateTime.Now}.");
                }
            });
        }

输出总结

本文从基础的线程安全,八卦文线程同步,延伸到跨主机的资源线程/进程安全, 其中演示了利用RedisSET命令做分布式锁的设计方案,虽然是面试八股文,我们依旧需要仔细揣摩Redis Lock的细节考量。

以上是关于如何在 Python 中进行跨进程跨脚本同步的主要内容,如果未能解决你的问题,请参考以下文章

面试系列——爱奇艺Andromeda 跨进程通信组件分析

在托管进程中使用 EventWaitHandle 和在非托管进程中使用 WaitForSingleObject 进行跨进程同步

是共享库/dll中的全局变量,跨进程共享

如何在不传递引用的情况下在 Python 中使用 SyncManager 跨进程共享列表

跨进程通信之Messenger

如何在android中实现跨进程锁?