Python 分布式进程Worker

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 分布式进程Worker相关的知识,希望对你有一定的参考价值。

#!/usr/bin/env python

# -*- coding:utf-8 -*-

# author: Changhua Gong


import random, time, queue

from multiprocessing.managers import BaseManager

from queue import Queue


‘‘‘

worker进程:执行任务,反馈结果

这部分内容与官网教程,有些出入

‘‘‘



# 从BaseManager继承QueueManager

class QueueManager(BaseManager):

    pass


# 从网络上获取对应的queue

QueueManager.register("get_queue_t")

QueueManager.register("get_queue_rs")

# 连接到服务器,也就是运行task_master.py的机器:

server_ip = "127.0.0.1"

print("Connect to server...%s" % server_ip)

manager = QueueManager(address=(server_ip, 5000), authkey=b"love8013")  # 保证端口和密匙一致

# 从网络连接

manager.connect()

# 获得通过网络访问的Queue对象

t = manager.get_queue_t()

rs = manager.get_queue_rs()

print("1")

# 从task队列中读取任务,并将结果写会result队列

for i in range(10):

    try:

        n = t.get(timeout=1)

        print(‘run task %d * %d...‘ % (n, n))

        r = ‘%d * %d = %d‘ % (n, n, n*n)

        time.sleep(1)

        rs.put(r)

    except Queue.Empty:

        print(‘task queue is empty.‘)

print(‘worker exit.‘)


以上是关于Python 分布式进程Worker的主要内容,如果未能解决你的问题,请参考以下文章

Dask Worker 进程内存不断增长

python celery多worker多队列定时任务

Celery分布式队列学习

Storm分布式实时计算模式

Storm分布式实时计算模式

python中,用Redis构建分布式锁