Python3 与 C# 并发编程之~ 进程篇
Posted 逸鹏说道
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python3 与 C# 并发编程之~ 进程篇相关的知识,希望对你有一定的参考价值。
上次说了很多Linux下进程相关知识,这边不再复述,下面来说说Python的并发编程,如有错误欢迎提出~
如果遇到听不懂的可以看上一次的文章:https://www.cnblogs.com/dotnetcrazy/p/9363810.html
官方文档:https://docs.python.org/3/library/concurrency.html
在线预览:http://github.lesschina.com/python/base/concurrency/2.并发编程-进程篇.html
1.进程篇¶
官方文档:https://docs.python.org/3/library/multiprocessing.html
Code:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/PythonProcess
1.1.进程(Process)¶
Python的进程创建非常方便,看个案例:(这种方法通用,fork只适用于Linux系)
import os
# 注意一下,导入的是Process不是process(Class是大写开头)
from multiprocessing import Process
def test(name):
print("[子进程-%s]PID:%d,PPID:%d" % (name, os.getpid(), os.getppid()))
def main():
print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
p = Process(target=test, args=("萌萌哒", )) # 单个元素的元组表达别忘了(x,)
p.start()
p.join() # 父进程回收子进程资源(内部调用了wait系列方法)
if __name__ == \'__main__\':
main()
运行结果:
[父进程]PID:25729,PPID:23434
[子进程-萌萌哒]PID:25730,PPID:25729
创建子进程时,传入一个执行函数和参数,用start()方法来启动进程即可
join()
方法是父进程回收子进程的封装(主要是回收僵尸子进程(点我))
其他参数可以参考源码 or 文档,贴一下源码的init
方法:
def __init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)
扩展:name:为当前进程实例的别名
p.is_alive()
判断进程实例p是否还在执行p.terminate()
终止进程(发SIGTERM
信号)
上面的案例如果用OOP来实现就是这样:(如果不指定方法,默认调Run方法)
import os
from multiprocessing import Process
class My_Process(Process):
# 重写了Proce类的Init方法
def __init__(self, name):
self.__name = name
Process.__init__(self) # 调用父类方法
# 重写了Process类的run()方法
def run(self):
print("[子进程-%s]PID:%d,PPID:%d" % (self.__name, os.getpid(),
os.getppid()))
def main():
print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
p = My_Process("萌萌哒") # 如果不指定方法,默认调Run方法
p.start()
p.join() # 父进程回收子进程资源(内部调用了wait系列方法)
if __name__ == \'__main__\':
main()
PS:multiprocessing.Process
自行处理僵死进程,不用像os.fork
那样自己建立信号处理程序、安装信号处理程序
1.1.源码拓展¶
现在说说里面的一些门道(只想用的可以忽略)
新版本的封装可能多层,这时候可以看看Python3.3.X系列(这个算是Python3早期版本了,很多代码都暴露出来,比较明了直观)
multiprocessing.process.py
# 3.4.x开始,Process有了一个BaseProcess
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/process.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/process.py
def join(self, timeout=None):
\'\'\'一直等到子进程over\'\'\'
self._check_closed()
# 断言(False就触发异常,提示就是后面的内容
# 开发中用的比较多,部署的时候可以python3 -O xxx 去除所以断言
assert self._parent_pid == os.getpid(), "只能 join 一个子进程"
assert self._popen is not None, "只能加入一个已启动的进程"
res = self._popen.wait(timeout) # 本质就是用了我们之前讲的wait系列
if res is not None:
_children.discard(self) # 销毁子进程
multiprocessing.popen_fork.py
# 3.4.x开始,在popen_fork文件中(以前是multiprocessing.forking.py)
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/popen_fork.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/popen_fork.py
def wait(self, timeout=None):
if self.returncode is None:
# 设置超时的一系列处理
if timeout is not None:
from multiprocessing.connection import wait
if not wait([self.sentinel], timeout):
return None
# 核心操作
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
return self.returncode
# 回顾一下上次说的:os.WNOHANG - 如果没有子进程退出,则不阻塞waitpid()调用
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
# 他的内部调用了waitpid
pid, sts = os.waitpid(self.pid, flag)
except OSError as e:
# 子进程尚未创建
# e.errno == errno.ECHILD == 10
return None
if pid == self.pid:
if os.WIFSIGNALED(sts):
self.returncode = -os.WTERMSIG(sts)
else:
assert os.WIFEXITED(sts), "Status is {:n}".format(sts)
self.returncode = os.WEXITSTATUS(sts)
return self.returncode
关于断言的简单说明:(别泛滥)
如果条件为真,它什么都不做,反之它触发一个带可选错误信息的AssertionError
def test(a, b):
assert b != 0, "哥哥,分母不能为0啊"
return a / b
def main():
test(1, 0)
if __name__ == \'__main__\':
main()
结果:
Traceback (most recent call last):
File "0.assert.py", line 11, in <module>
main()
File "0.assert.py", line 7, in main
test(1, 0)
File "0.assert.py", line 2, in test
assert b != 0, "哥哥,分母不能为0啊"
AssertionError: 哥哥,分母不能为0啊
运行的时候可以指定-O参数
来忽略assert
,eg:
python3 -O 0.assert.py
Traceback (most recent call last):
File "0.assert.py", line 11, in <module>
main()
File "0.assert.py", line 7, in main
test(1, 0)
File "0.assert.py", line 3, in test
return a / b
ZeroDivisionError: division by zero
扩展:
https://docs.python.org/3/library/unittest.html
https://www.cnblogs.com/shangren/p/8038935.html
1.2.进程池¶
多个进程就不需要自己手动去管理了,有Pool来帮你完成,先看个案例:
import os
import time
from multiprocessing import Pool # 首字母大写
def test(name):
print("[子进程-%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
time.sleep(1)
def main():
print("[父进程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
p = Pool(5) # 设置最多5个进程(不设置就默认为CPU核数)
for i in range(10):
# 异步执行
p.apply_async(test, args=(i, )) # 同步用apply(如非必要不建议用)
p.close() # 关闭池,不再加入新任务
p.join() # 等待所有子进程执行完毕回收资源(join可以指定超时时间,eg:`p.join(1)`)
print("over")
if __name__ == \'__main__\':
main()
图示:(join可以指定超时时间,eg:p.join(1)
)
调用join()
之前必须先调用close()
,调用close()
之后就不能继续添加新的Process
了(下面会说为什么)
1.3.源码拓展¶
验证一下Pool的默认大小是CPU的核数,看源码:
multiprocessing.pool.py
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/pool.py
class Pool(object):
def __init__(self, processes=指定的进程数,...):
if processes is None:
processes = os.cpu_count() or 1 # os.cpu_count() ~ CPU的核数
源码里面apply_async
方法,是有回调函数(callback)的
def apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):
if self._state != RUN:
raise ValueError("Pool not running")
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
return result
来看个例子:(和JQ很像)
import os
import time
from multiprocessing import Pool # 首字母大写
def test(name):
print("[子进程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
time.sleep(1)
return name
def error_test(name):
print("[子进程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
raise Exception("[子进程%s]啊,我挂了~" % name)
def callback(result):
"""成功之后的回调函数"""
print("[子进程%s]执行完毕" % result) # 没有返回值就为None
def error_callback(msg):
"""错误之后的回调函数"""
print(msg)
def main():
print("[父进程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
p = Pool() # CPU默认核数
for i in range(5):
# 搞2个出错的看看
if i > 2:
p.apply_async(
error_test,
args=(i, ),
callback=callback,
error_callback=error_callback) # 异步执行
else:
# 异步执行,成功后执行callback函数(有点像jq)
p.apply_async(test, args=(i, ), callback=callback)
p.close() # 关闭池,不再加入新任务
p.join() # 等待所有子进程执行完毕回收资源
print("over")
if __name__ == \'__main__\':
main()
输出:
[父进程]PID=12348,PPID=10999
[子进程0]PID=12349,PPID=12348
[子进程2]PID=12351,PPID=12348
[子进程1]PID=12350,PPID=12348
[子进程3]PID=12352,PPID=12348
[子进程4]PID=12352,PPID=12348
[子进程3]啊,我挂了~
[子进程4]啊,我挂了~
[子进程0]执行完毕
[子进程2]执行完毕
[子进程1]执行完毕
over
接着上面继续拓展,补充说说获取函数返回值。上面是通过成功后的回调函数来获取返回值
,这次说说自带的方法:
import time
from multiprocessing import Pool, TimeoutError
def test(x):
"""开平方"""
time.sleep(1)
return x * x
def main():
pool = Pool()
task = pool.apply_async(test, (10, ))
print(task)
try:
print(task.get(timeout=1))
except TimeoutError as ex:
print("超时了~", ex)
if __name__ == \'__main__\':
main()
输出:(apply_async
返回一个ApplyResult
类,里面有个get方法可以获取返回值)
<multiprocessing.pool.ApplyResult object at 0x7fbc354f50b8>
超时了~
再举个例子,顺便把Pool
里面的map
和imap
方法搞个案例(类比jq)
import time
from multiprocessing import Pool
def test(x):
return x * x
if __name__ == \'__main__\':
with Pool(processes=4) as pool:
task = pool.apply_async(test, (10, ))
print(task.get(timeout=1))
obj_list = pool.map(test, range(10))
print(obj_list)
# 返回一个可迭代类的实例对象
obj_iter = pool.imap(test, range(10))
print(obj_iter)
next(obj_iter)
for i in obj_iter:
print(i, end=" ")
输出:
100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
<multiprocessing.pool.IMapIterator object at 0x7ff7f9734198>
1 4 9 16 25 36 49 64 81
微微看一眼源码:(基础忘了可以查看==> 点我 )
class IMapIterator(object):
def __init__(self, cache):
self._cond = threading.Condition(threading.Lock())
self._job = next(job_counter)
self._cache = cache
self._items = collections.deque()
self._index = 0
self._length = None
self._unsorted = {}
cache[self._job] = self
def __iter__(self):
return self # 返回一个迭代器
# 实现next方法
def next(self, timeout=None):
with self._cond:
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
raise StopIteration from None
self._cond.wait(timeout)
try:
item = self._items.popleft()
except IndexError:
if self._index == self._length:
raise StopIteration from None
raise TimeoutError from None
success, value = item
if success:
return value
raise value
......
扩展:优雅杀死子进程的探讨 https://segmentfault.com/q/1010000005077517
1.4.拓展之subprocess¶
官方文档:https://docs.python.org/3/library/subprocess.html
还记得之前李代桃僵的execlxxx
系列吗?
这不,subprocess
就是它的一层封装,当然了要强大的多,先看个例子:(以os.execlp
的例子为引)
import subprocess
def main():
# os.execlp("ls", "ls", "-al") # 执行Path环境变量可以搜索到的命令
result = subprocess.run(["ls", "-al"])
print(result)
if __name__ == \'__main__\':
main()
输出
总用量 44
drwxrwxr-x 2 dnt dnt 4096 8月 7 17:32 .
drwxrwxr-x 4 dnt dnt 4096 8月 6 08:01 ..
-rw-rw-r-- 1 dnt dnt 151 8月 3 10:49 0.assert.py
-rw-rw-r-- 1 dnt dnt 723 8月 5 18:00 1.process2.py
-rw-rw-r-- 1 dnt dnt 501 8月 3 10:20 1.process.py
-rw-rw-r-- 1 dnt dnt 1286 8月 6 08:16 2.pool1.py
-rw-rw-r-- 1 dnt dnt 340 8月 7 16:38 2.pool2.py
-rw-rw-r-- 1 dnt dnt 481 8月 7 16:50 2.pool3.py
-rw-rw-r-- 1 dnt dnt 652 8月 5 17:01 2.pool.py
-rw-rw-r-- 1 dnt dnt 191 8月 7 17:33 3.subprocess.py
CompletedProcess(args=[\'ls\', \'-al\'], returncode=0)
文档¶
现在看下官方的文档描述来理解一下:
r"""
具有可访问I / O流的子进程
Subprocesses with accessible I/O streams
此模块允许您生成进程,连接到它们输入/输出/错误管道,并获取其返回代码。
This module allows you to spawn processes, connect to their
input/output/error pipes, and obtain their return codes.
完整文档可以查看:https://docs.python.org/3/library/subprocess.html
For a complete description of this module see the Python documentation.
Main API
========
run(...): 运行命令,等待它完成,然后返回`CompletedProcess`实例。
Runs a command, waits for it to complete,
then returns a CompletedProcess instance.
Popen(...): 用于在新进程中灵活执行命令的类
A class for flexibly executing a command in a new process
Constants(常量)
---------
DEVNULL: 特殊值,表示应该使用`os.devnull`
Special value that indicates that os.devnull should be used
PIPE: 表示应创建`PIPE`管道的特殊值
Special value that indicates a pipe should be created
STDOUT: 特殊值,表示`stderr`应该转到`stdout`
Special value that indicates that stderr should go to stdout
Older API(尽量不用,说不定以后就淘汰了)
=========
call(...): 运行命令,等待它完成,然后返回返回码。
Runs a command, waits for it to complete, then returns the return code.
check_call(...): Same as call() but raises CalledProcessError()
if return code is not 0(返回值不是0就引发异常)
check_output(...): 与check_call()相同,但返回`stdout`的内容,而不是返回代码
Same as check_call but returns the contents of stdout instead of a return code
getoutput(...): 在shell中运行命令,等待它完成,然后返回输出
Runs a command in the shell, waits for it to complete,then returns the output
getstatusoutput(...): 在shell中运行命令,等待它完成,然后返回一个(exitcode,output)元组
Runs a command in the shell, waits for it to complete,
then returns a (exitcode, output) tuple
"""
其实看看源码很有意思:(内部其实就是调用的os.popen
【进程先导篇讲进程守护的时候用过】)
def run(*popenargs, input=None, capture_output=False,
timeout=None, check=False, **kwargs):
if input is not None:
if \'stdin\' in kwargs:
raise ValueError(\'stdin和输入参数可能都不会被使用。\')
kwargs[\'stdin\'] = PIPE
if capture_output:
if (\'stdout\' in kwargs) or (\'stderr\' in kwargs):
raise ValueError(\'不能和capture_outpu一起使用stdout 或 stderr\')
kwargs[\'stdout\'] = PIPE
kwargs[\'stderr\'] = PIPE
with Popen(*popenargs, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
raise TimeoutExpired(
process.args, timeout, output=stdout, stderr=stderr)
except: # 包括KeyboardInterrupt的通信处理。
process.kill()
# 不用使用process.wait(),.__ exit__为我们做了这件事。
raise
retcode = process.poll()
if check and retcode:
raise CalledProcessError(
retcode, process.args, output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)
返回值类型:CompletedProcess
# https://github.com/lotapp/cpython3/blob/master/Lib/subprocess.py
class CompletedProcess(object):
def __init__(self, args, returncode, stdout=None, stderr=None):
self.args = args
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
def __repr__(self):
"""对象按指定的格式显示"""
args = [
\'args={!r}\'.format(self.args),
\'returncode={!r}\'.format(self.returncode)
]
if self.stdout is not None:
args.append(\'stdout={!r}\'.format(self.stdout))
if self.stderr is not None:
args.append(\'stderr={!r}\'.format(self.stderr))
return "{}({})".format(type(self).__name__, \', \'.join(args))
def check_returncode(self):
"""如果退出代码非零,则引发CalledProcessError"""
if self.returncode:
raise CalledProcessError(self.returncode, self.args, self.stdout,
self.stderr)
简单demo¶
再来个案例体会一下方便之处:
import subprocess
def main():
result = subprocess.run(["ping", "www.baidu.com"])
print(result.stdout)
if __name__ == \'__main__\':
main()
图示:
交互demo¶
再来个强大的案例(交互的程序都可以,比如 ftp
,nslookup
等等):popen1.communicate
import subprocess
def main():
process = subprocess.Popen(
["ipython3"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
# 对pstree进行交互
out, err = process.communicate(input=b\'print("hello")\', timeout=3)
print("Out:%s\\nErr:%s" % (out.decode(), err.decode()))
except TimeoutError:
# 如果超时到期,则子进程不会被终止,需要自己处理一下
process.kill()
out, err = process.communicate()
print("Out:%s\\nErr:%s" % (out.decode(), err.decode()))
if __name__ == \'__main__\':
main()
输出:
IPython 6.4.0 -- An enhanced Interactive Python. Type \'?\' for help.
In [1]: hello
In [2]: Do you really want to exit ([y]/n)?
Err:
注意点:如果超时到期,则子进程不会被终止,需要自己处理一下(官方提醒)
通信demo¶
这个等会说进程间通信还会说,所以简单举个例子,老规矩拿ps aux | grep bash
说事:
import subprocess
def main():
# ps aux | grep bash
# 进程1获取结果
p1 = subprocess.Popen(["ps", "-aux"], stdout=subprocess.PIPE)
# 得到进程1的结果再进行筛选
p2 = subprocess.Popen(["grep", "bash"], stdin=p1.stdo以上是关于Python3 与 C# 并发编程之~ 进程篇的主要内容,如果未能解决你的问题,请参考以下文章