05文件线程进程协程

Posted yaopeiyun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了05文件线程进程协程相关的知识,希望对你有一定的参考价值。

一. 文件

1.    操作文件的函数/方法

1) open打开文件

  • open默认只读模式

   当要修改,调用write方法时,可以传入访问方式

  f=open(“文件名”,“访问方式”)

 技术图片

 

2) read将文件内容读取到内存

第一次读取之后,文件指针到了文件末尾,再次调用不会读取到任何的内容

3) write将指定内容写入文件

4) close 关闭文件

 技术图片

 

5) readLine方法:一次读取一行

方法执行后,会把 文件指针 移动到下一行,准备再次读取

 技术图片

 

2.    其他常用操作

在 Python 中,如果希望通过程序实现创建、重命名、删除、改变路径、查看目录内容、……

功能,需要导入 os 模块。

 技术图片

 

二. eval函数

1)概念将字符串 当成 有效的表达式 来求值 并 返回计算结果

2)在开发时千万不要使用 eval 直接转换 input 的结果

 技术图片

 

三. 线程

1.    并发并行

1)并发:指的是任务数多余cpu核数,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行(实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起执行而已)

2)并行:指的是任务数小于等于cpu核数,即任务真的是一起执行的

2. 多线程执行

import threading

import time

 

def saySorry():

    print("亲爱的,我错了,我能吃饭了吗?")

    time.sleep(1)

 

if __name__ == "__main__":

    for i in range(5):

        t = threading.Thread(target=saySorry)

        t.start() #启动线程,即让线程开始执行

 

封装后:

通过使用threading模块能完成多任务的程序开发,为了让每个线程的封装性更完美,所以使用threading模块时,往往会定义一个新的子类class,只要继承threading.Thread就可以了,然后重写run方法

import threading
import time
 
class MyThread(threading.Thread):
    def run(self):
        for i in range(3):
            time.sleep(1)
            msg = "I‘m "+self.name+ @ +str(i) #name属性中保存的是当前线程的名字
            print(msg)
 
 
if __name__ == __main__:
    t = MyThread()
    t.start()

 

1)每个线程默认有一个名字,尽管上面的例子中没有指定线程对象的name,但是python会自动为线程指定一个名字。

2)当线程的run()方法结束时该线程完成。

3)无法控制线程调度程序,但可以通过别的方式来影响线程调度的方式。

3. 共享全局变量

1)在一个进程内的所有线程共享全局变量,很方便在多个线程间共享数据

2)缺点就是,线程是对全局变量随意遂改可能造成多线程之间对全局变量的混乱(即线程非安全)

3)如果多个线程同时对同一个全局变量操作,会出现资源竞争问题,从而数据结果会不正确

四.互斥锁

1. 基本概念

1) 当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制

2) 线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。

3) 互斥锁为资源引入一个状态:锁定/非锁定

4) 某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

2. 实现

threading模块中定义了Lock类,可以方便的处理锁定:

 

# 创建锁

mutex = threading.Lock()

# 锁定

mutex.acquire()

# 释放

mutex.release()

注意:

如果这个锁之前是没有上锁的,那么acquire不会堵塞

如果在调用acquire对这个锁上锁之前 它已经被 其他线程上了锁,那么此时acquire会堵塞,直到这个锁被解锁为止

3.    总结

1)锁的好处:确保了某段关键代码只能由一个线程从头到尾完整地执行

2)锁的坏处:阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了;由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁

4. 死锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。

五.  进程

1.    概念

一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。不仅可以通过线程完成多任务,进程也是可以的。

2.    创建

传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动

from multiprocessing import Process
import os
import time
 
def run_proc():
    """子进程要执行的代码"""
    print(子进程运行中,pid=%d... % os.getpid())  # os.getpid获取当前进程的进程号
    print(子进程将要结束...)
 
if __name__ == __main__:
    print(父进程pid: %d % os.getpid())  # os.getpid获取当前进程的进程号
    p = Process(target=run_proc)
    p.start()

 

3. Process属性和方法

1) target:如果传递了函数的引用,可以任务这个子进程就执行这里的代码

args:给target指定的函数传递的参数,以元组的方式传递

kwargs:给target指定的函数传递命名参数

name:给进程设定一个名字,可以不设定

group:指定进程组,大多数情况下用不到

 

2) Process创建的实例对象的常用方法:

start():启动子进程实例(创建子进程)

is_alive():判断进程子进程是否还在活着

join([timeout]):是否等待子进程执行结束,或等待多少秒

terminate():不管任务是否完成,立即终止子进程

 

3) Process创建的实例对象的常用属性:

name:当前进程的别名,默认为Process-N,N为从1开始递增的整数

pid:当前进程的pid(进程号)

4.    进程间不共享全局变量

5.    进程线程对比

1)线程执行开销小,但不利于资源的管理和保护;而进程正相反

2)一个程序至少有一个进程,一个进程至少有一个线程.

3)进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率

4)线程不能够独立执行,必须依存在进程中

六.进程间通信

1. Queue使用

1)可以使用multiprocessing模块的Queue实现多进程之间的数据传递

2)初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);

2. Queue方法

1)Queue.qsize():返回当前队列包含的消息数量;

2)Queue.empty():如果队列为空,返回True,反之False ;

3)Queue.full():如果队列满了,返回True,反之False;

4) Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;

  • 如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;
  • 如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;
  • Queue.get_nowait():相当Queue.get(False);

5)Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;

  • 如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;
  • 如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;
  • Queue.put_nowait(item):相当Queue.put(item, False);
from multiprocessing import Process, Queue

import os, time, random

 

# 写数据进程执行的代码:

def write(q):

    for value in [A, B, C]:

        print(Put %s to queue... % value)

        q.put(value)

        time.sleep(random.random())

 

# 读数据进程执行的代码:

def read(q):

    while True:

        if not q.empty():

            value = q.get(True)

            print(Get %s from queue. % value)

            time.sleep(random.random())

        else:

            break

 

if __name__==__main__:

    # 父进程创建Queue,并传给各个子进程:

    q = Queue()

    pw = Process(target=write, args=(q,))

    pr = Process(target=read, args=(q,))

    # 启动子进程pw,写入:

    pw.start()   

    # 等待pw结束:

    pw.join()

    # 启动子进程pr,读取:

    pr.start()

    pr.join()

    # pr进程里是死循环,无法等待其结束,只能强行终止:

    print(‘‘)

    print(所有数据都写入并且读完)

 

七.进程池

1. 创建

1)当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

2)初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务

2. 常用函数

1)apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;

2)close():关闭Pool,使其不再接受新的任务;

3)terminate():不管任务是否完成,立即终止;

4)join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;

3. 实现

from multiprocessing import Pool
import os, time, random
 
def worker(msg):
    t_start = time.time()
    print("%s开始执行,进程号为%d" % (msg,os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random()*2) 
    t_stop = time.time()
    print(msg,"执行完毕,耗时%0.2f" % (t_stop-t_start))
 
po = Pool(3)  # 定义一个进程池,最大进程数3
for i in range(0,10):
    # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
    # 每次循环将会用空闲出来的子进程去调用目标
    po.apply_async(worker,(i,))
 
print("----start----")
po.close()  # 关闭进程池,关闭后po不再接收新的请求
po.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
print("-----end-----")

 

4.进程池中的Queue

使用multiprocessing.Manager()中的Queue()

from multiprocessing import Manager,Pool
import os,time,random
 
def reader(q):
    print("reader启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("reader从Queue获取到消息:%s" % q.get(True))
 
def writer(q):
    print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
    for i in "itcast":
        q.put(i)
 
if __name__=="__main__":
    print("(%s) start" % os.getpid())
    q = Manager().Queue()  # 使用Manager中的Queue
    po = Pool()
    po.apply_async(writer, (q,))
 
    time.sleep(1)  # 先让上面的任务向Queue存入数据,然后再让下面的任务开始从中取数据
 
    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())

 

八.迭代器

1. 判断一个对象是否可以迭代

使用 isinstance() 判断一个对象是否是 Iterable (可迭代)对象

       from collections import Iterable

print(isinstance([], Iterable))

Out: True

 

一个具备了__iter__方法的对象,就是一个可迭代对象

 

 技术图片

 

2.    iter()函数与next()函数

list、tuple等都是可迭代对象,我们可以通过iter()函数获取这些可迭代对象的迭代器。然后我们可以对获取到的迭代器不断使用next()函数来获取下一条数据。iter()函数实际上就是调用了可迭代对象的__iter__方法。

li = [11, 22, 33, 44, 55]

li_iter = iter(li)

next(li_iter)

注意,当我们已经迭代完最后一个数据之后,再次调用next()函数会抛出StopIteration的异常,来告诉我们所有数据都已迭代完成,不用再执行next()函数了。

3.    判断一个对象是否是迭代器

用 isinstance() 

 技术图片

 

4.    定义迭代器

1)一个实现了__iter__方法和__next__方法的对象,就是迭代器。

2)_iter__方法要返回一个迭代器,迭代器自身正是一个迭代器,所以迭代器的__iter__方法返回自身即可

class MyList(object):
    """自定义的一个可迭代对象"""
    def __init__(self):
        self.items = []
 
    def add(self, val):
        self.items.append(val)
 
    def __iter__(self):
        myiterator = MyIterator(self)
        return myiterator
 
 
class MyIterator(object):
    """自定义的供上面可迭代对象使用的一个迭代器"""
    def __init__(self, mylist):
        self.mylist = mylist
        # current用来记录当前访问到的位置
        self.current = 0
 
    def __next__(self):
        if self.current < len(self.mylist.items):
            item = self.mylist.items[self.current]
            self.current += 1
            return item
        else:
            raise StopIteration
 
    def __iter__(self):
        return self
 
 
if __name__ == __main__:
    mylist = MyList()
    mylist.add(1)
    mylist.add(2)
    mylist.add(3)
    mylist.add(4)
    mylist.add(5)
    for num in mylist:
        print(num)

 

也可以只定义一个类,__iter__返回本身,迭代时调用本身的__next__方法

 技术图片

 

5.    优点

可以节省大量的存储(内存)空间

实现斐波那契数列

 技术图片

 

九.生成器

1. 概念

生成器是一种特殊的迭代器

2.    生成生成器

 技术图片

 

将每次迭代返回数值的return换成了yield

3.    总结

1)使用了yield关键字的函数不再是函数,而是生成器。(使用了yield的函数就是生成器)

yield关键字有两点作用:

2)保存当前运行状态(断点),然后暂停执行,即将生成器(函数)挂起

3)将yield关键字后面表达式的值作为返回值返回,此时可以理解为起到了return的作用

4)可以使用next()函数让生成器从断点处继续执行,即唤醒生成器(函数)

5)Python3中的生成器可以使用return返回最终运行的返回值,而Python2中的生成器不允许使用return返回一个返回值(即可以使用return从生成器中退出,但return后不能有任何表达式)。

4. 使用send唤醒

使用send()函数的一个好处是可以在唤醒的同时向断点处传入一个附加数据。

 技术图片

 

十.协程

协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)。

1.    简单实现

import time

 

def work1():

    while True:

        print("----work1---")

        yield

        time.sleep(0.5)

 

def work2():

    while True:

        print("----work2---")

        yield

        time.sleep(0.5)

 

def main():

    w1 = work1()

    w2 = work2()

    while True:

        next(w1)

        next(w2)

 

if __name__ == "__main__":

    main()

 

2.    用greenlet实现

from greenlet import greenlet

import time

 

def test1():

    while True:

        print "---A--"

        gr2.switch()

        time.sleep(0.5)

 

def test2():

    while True:

        print "---B--"

        gr1.switch()

        time.sleep(0.5)

 

gr1 = greenlet(test1)

gr2 = greenlet(test2)

 

#切换到gr1中运行

gr1.switch()

 

 

原理:当一个greenlet遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。

3.    gevent实现

import gevent

def f(n):

    for i in range(n):

        print(gevent.getcurrent(), i)

        #用来模拟一个耗时操作,注意不是time模块中的sleep

        gevent.sleep(1)

 

g1 = gevent.spawn(f, 5)

g2 = gevent.spawn(f, 5)

g3 = gevent.spawn(f, 5)

g1.join()

g2.join()

g3.join()

 

from gevent import monkey
import gevent
import random
import time
 
# 有耗时操作时需要
monkey.patch_all()  # 将程序中用到的耗时操作的代码,换为gevent中自己实现的模块
 
def coroutine_work(coroutine_name):
    for i in range(10):
        print(coroutine_name, i)
        time.sleep(random.random())
 
gevent.joinall([
        gevent.spawn(coroutine_work, "work1"),
        gevent.spawn(coroutine_work, "work2")
])
 

 

以上是关于05文件线程进程协程的主要内容,如果未能解决你的问题,请参考以下文章

协程

进程线程协程

线程进程与协程

(VIP-朝夕教育)2021-05-27 .NET高级班 12-异步专题(发展史,多线程,多进程,协程,单线程非阻塞)

漫画:什么是协程?

多线程 多进程 协程 Queue(爬虫代码)