进击的Python第九章:paramiko模块线程与进程各种线程锁queue队列生产者消费者模型

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了进击的Python第九章:paramiko模块线程与进程各种线程锁queue队列生产者消费者模型相关的知识,希望对你有一定的参考价值。

一、paramiko模块

他是什么东西?

  paramiko模块是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接。

先来个实例:

 1 import paramiko
 2 # 创建SSH对象
 3 ssh = paramiko.SSHClient()
 4 
 5 # 允许连接不在know_hosts文件中的主机
 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 7 # 连接服务器
 8 ssh.connect(hostname=10.0.0.31, port=52113, username=root, password=123456)
 9 # 执行命令
10 stdin, stdout, stderr = ssh.exec_command(df)
11 # 获取命令结果
12 res,err = stdout.read(),stderr.read()
13 result = res if res else err
14 
15 print(result.decode())
16 
17 # 关闭连接
18 ssh.close()

有些机智的少年会突然想到,如果我想做信任呢??我的秘钥怎么用在里面呢?

同样机智的我想到了下面的方法:

 1 import paramiko
 2 
 3 #这里写我们的密钥文件
 4 private_key = paramiko.RSAKey.from_private_key_file(id_rsa.txt)
 5 
 6 # 创建SSH对象
 7 ssh = paramiko.SSHClient()
 8 # 允许连接不在know_hosts文件中的主机
 9 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
10 # 连接服务器,这里我们用pkey参数设置为私钥登陆
11 ssh.connect(hostname=10.0.0.41, port=22, username=xiaoqiang, pkey=private_key)
12 
13 # 执行命令
14 stdin, stdout, stderr = ssh.exec_command(df)
15 result = stdout.read()
16 print(result.decode())
17 stdin, stdout2, stderr = ssh.exec_command(ifconfig)
18 # 获取命令结果
19 result2 = stdout2.read()
20 print(result2.decode())
21 
22 # 关闭连接
23 ssh.close()

 

二、进程与线程

什么是进程??

An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。

在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。

有了进程为什么还要线程?

进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上:

  • 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。

  • 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。

例如,我们在使用qq聊天, qq做为一个独立进程如果同一时间只能干一件事,那他如何实现在同一时刻 即能监听键盘输入、又能监听其它人给你发的消息、同时还能把别人发的消息显示在屏幕上呢?你会说,操作系统不是有分时么?但我的亲,分时是指在不同进程间的分时呀, 即操作系统处理一会你的qq任务,又切换到word文档任务上了,每个cpu时间片分给你的qq程序时,你的qq还是只能同时干一件事呀。

再直白一点, 一个操作系统就像是一个工厂,工厂里面有很多个生产车间,不同的车间生产不同的产品,每个车间就相当于一个进程,且你的工厂又穷,供电不足,同一时间只能给一个车间供电,为了能让所有车间都能同时生产,你的工厂的电工只能给不同的车间分时供电,但是轮到你的qq车间时,发现只有一个干活的工人,结果生产效率极低,为了解决这个问题,应该怎么办呢?。。。。没错,你肯定想到了,就是多加几个工人,让几个人工人并行工作,这每个工人,就是线程!

 

什么是线程?

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.

Suppose you‘re reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.

If you have a roommate, and she‘s using the same technique, she can take the book while you‘re not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.

Threads work in the same way. A CPU is giving you the illusion that it‘s doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU‘s registers.

Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

 

三、线程与进程的区别

简而言之,一个程序至少有一个进程,一个进程至少有一个线程。
线程的划分尺度小于进程,使得多线程程序的并发性高。
另外,进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
从逻辑角度来看,多线程的意义在于一个应用程序中,有多个执行部分可以同时执行。但操作系统并没有将多个线程看做多个独立的应用,来实现进程的调度和管理以及资源分配。这就是进程和线程的重要区别。

进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位.
线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.
一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行.

进程和线程的主要差别在于它们是不同的操作系统资源管理方式。进程有独立的地址空间,一个进程崩溃后,在保护模式下不会对其它进程产生影响,而线程只是一个进程中的不同执行路径。线程有自己的堆栈和局部变量,但线程之间没有单独的地址空间,一个线程死掉就等于整个进程死掉,所以多进程的程序要比多线程的程序健壮,但在进程切换时,耗费资源较大,效率要差一些。但对于一些要求同时进行并且又要共享某些变量的并发操作,只能用线程,不能用进程。

 

看了上面这些概念应该有个大概的印象啦,那我们来个具体栗子,通过代码理解下:

多线程的调用有两种方法:

直接调用:

 1 import threading
 2 
 3 def run(n):
 4     print("task", n)
 5 
 6 t1 = threading.Thread(target=run, args=("t1",))
 7 t2 = threading.Thread(target=run, args=("t2",))
 8 
 9 t1.start()
10 t2.start()

上面是多线程的一个简单用法,大家运行后会觉得,这和单线程好像并没有什么区别。。。。。好吧,确实,那我们修改下

import threading

def run(n):
    print("task", n)
    time.sleep(2) #我们让他睡两秒

t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))

t1.start()
t2.start()

我们用上面的代码和单线程的比较后就发现区别了,多线程的是同时输出结果,然后等待两秒,一共就两秒,而单线程的是输出一个等两秒,在出一个再等两秒一共四秒。

继承式调用:

 1 import threading
 2 import time
 3  
 4  
 5 class MyThread(threading.Thread):
 6     def __init__(self,num):
 7         threading.Thread.__init__(self)
 8         self.num = num
 9  
10     def run(self):#定义每个线程要运行的函数
11  
12         print("running on number:%s" %self.num)
13  
14         time.sleep(3)
15  
16 if __name__ == __main__:
17  
18     t1 = MyThread(1)
19     t2 = MyThread(2)
20     t1.start()
21     t2.start()

四、线程的join和daemon用法

先给一段代码感受下输出:

 1 import threading
 2 import time
 3 
 4 def run(n):
 5     print("task ",n )
 6     time.sleep(2)
 7     print("task done",n)
 8 
 9 start_time = time.time()
10 t_objs = [] #存线程实例
11 for i in range(50):
12     t = threading.Thread(target=run,args=("t-%s" %i ,))
13     t.start()
14     t_objs.append(t) #为了不阻塞后面线程的启动,不在这里join,先放到一个列表里
15 
16 print("----------all threads has finished...")
17 print("cost:",time.time() - start_time)

运行后有没有发现。卧槽,为什么sleep的两秒没算进去啊,WTF~

这里我们就用到了join,我们在重点解释下,敲黑板!!一个程序,最少有一个线程,他叫主线程,其他线程都是由主线程创建,创建完主线程和子线程就没关系了。

在上面的程序中,主线程在创建完子线程后,并没有等子线程运行完,就执行了下面的代码,导致了最后输出时间没有包含子线程的两秒。

既然我们知道了原因,我们就修改下代码:

 1 import threading
 2 import time
 3 
 4 def run(n):
 5     print("task ",n )
 6     time.sleep(2)
 7     print("task done",n)
 8 
 9 start_time = time.time()
10 t_objs = [] #存线程实例
11 for i in range(50):
12     t = threading.Thread(target=run,args=("t-%s" %i ,))
13     t.start()
14     t_objs.append(t) #为了不阻塞后面线程的启动,不在这里join,先放到一个列表里
15 
16 for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
17     t.join()
18 
19 
20 print("----------all threads has finished...")
21 print("cost:",time.time() - start_time)

这里join的作用就是主线程等待其他线程执行完毕后在往下走。

daemon的作用有点和join相反的意思,主程序会等待非守护线程执行完毕后退出,守护线程相当于主线程的仆人,主线程结束,守护线程也结束。

举个栗子:

 1 import threading
 2 import time
 3 
 4 def run(n):
 5     print("task ",n )
 6     time.sleep(2)
 7     print("task done",n,threading.current_thread())
 8 
 9 start_time = time.time()
10 t_objs = [] #存线程实例
11 for i in range(50):
12     t = threading.Thread(target=run,args=("t-%s" %i ,))
13     t.setDaemon(True) #把当前线程设置为守护线程
14     t.start()
15     t_objs.append(t) #为了不阻塞后面线程的启动,不在这里join,先放到一个列表里
16 
17 time.sleep(2)
18 print("----------all threads has finished...",threading.current_thread(),threading.active_count())
19 print("cost:",time.time() - start_time)

运行后,发现程序不等守护线程的sleep直接结束了。

 

五、Python GIL(Global Interpreter Lock)

  GIL顾名思义叫全局锁,他为了使每一个数据只能由一个线程使用,防止造成上下文混乱。听到这。。。我擦,那这还叫什么多线程呀,其实GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。

GIL的流程图:

技术分享

线程锁(互斥锁mutex)

 当一个程序有多个线程共享一个父进程的内存,意味着可以修改同一份数据,当两个进程同时要修改一份数据是,我们就要用到锁

举个栗子:

 1 import threading
 2 import time
 3 
 4 def run(n):
 5     lock.acquire()
 6     global  num
 7     num +=1
 8     time.sleep(1)
 9     lock.release()
10 
11 
12 lock = threading.Lock()
13 num = 0
14 t_objs = [] #存线程实例
15 for i in range(50):
16     t = threading.Thread(target=run,args=("t-%s" %i ,))
17     t.start()
18     t_objs.append(t) #为了不阻塞后面线程的启动,不在这里join,先放到一个列表里
19 
20 for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
21     t.join()
22 
23 print("----------all threads has finished...",threading.current_thread(),threading.active_count())
24 
25 print("num:",num)

RLock(递归锁)

 锁套锁的时候,需要用到递归锁,不然外面的锁和里面的锁会混淆

 1 import threading,time
 2  
 3 def run1():
 4     print("grab the first part data")
 5     lock.acquire()
 6     global num
 7     num +=1
 8     lock.release()
 9     return num
10 def run2():
11     print("grab the second part data")
12     lock.acquire()
13     global  num2
14     num2+=1
15     lock.release()
16     return num2
17 def run3():
18     lock.acquire()
19     res = run1()
20     print(--------between run1 and run2-----)
21     res2 = run2()
22     lock.release()
23     print(res,res2)
24  
25  
26 if __name__ == __main__:
27  
28     num,num2 = 0,0
29     lock = threading.RLock()
30     for i in range(10):
31         t = threading.Thread(target=run3)
32         t.start()
33  
34 while threading.active_count() != 1:
35     print(threading.active_count())
36 else:
37     print(----all threads done---)
38     print(num,num2)

Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去

 1 import threading,time
 2  
 3 def run(n):
 4     semaphore.acquire()
 5     time.sleep(1)
 6     print("run the thread: %s\\n" %n)
 7     semaphore.release()
 8  
 9 if __name__ == __main__:
10  
11     num= 0
12     semaphore  = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
13     for i in range(20):
14         t = threading.Thread(target=run,args=(i,))
15         t.start()
16  
17 while threading.active_count() != 1:
18     pass #print threading.active_count()
19 else:
20     print(----all threads done---)
21     print(num)

Event事件

An event is a simple synchronization object;

the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event()

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

 1 import threading,time
 2 import random
 3 def light():
 4     if not event.isSet():
 5         event.set() #wait就不阻塞 #绿灯状态
 6     count = 0
 7     while True:
 8         if count < 10:
 9             print(\\033[42;1m--green light on---\\033[0m)
10         elif count <13:
11             print(\\033[43;1m--yellow light on---\\033[0m)
12         elif count <20:
13             if event.isSet():
14                 event.clear()
15             print(\\033[41;1m--red light on---\\033[0m)
16         else:
17             count = 0
18             event.set() #打开绿灯
19         time.sleep(1)
20         count +=1
21 def car(n):
22     while 1:
23         time.sleep(random.randrange(10))
24         if  event.isSet(): #绿灯
25             print("car [%s] is running.." % n)
26         else:
27             print("car [%s] is waiting for the red light.." %n)
28 if __name__ == __main__:
29     event = threading.Event()
30     Light = threading.Thread(target=light)
31     Light.start()
32     for i in range(3):
33         t = threading.Thread(target=car,args=(i,))
34         t.start()

queue队列 

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out 
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty  
Queue.full() # return True if full 
Queue.put(itemblock=Truetimeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)

Equivalent to put(item, False).

Queue.get(block=Truetimeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()

Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消费完毕

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

 下面是一个基本的生产者消费者模型

 1 import time,random
 2 import queue,threading
 3 q = queue.Queue()
 4 def Producer(name):
 5   count = 0
 6   while count <20:
 7     time.sleep(random.randrange(3))
 8     q.put(count)
 9     print(Producer %s has produced %s baozi.. %(name, count))
10     count +=1
11 def Consumer(name):
12   count = 0
13   while count <20:
14     time.sleep(random.randrange(4))
15     if not q.empty():
16         data = q.get()
17         print(data)
18         print(\\033[32;1mConsumer %s has eat %s baozi...\\033[0m %(name, data))
19     else:
20         print("-----no baozi anymore----")
21     count +=1
22 p1 = threading.Thread(target=Producer, args=(A,))
23 c1 = threading.Thread(target=Consumer, args=(B,))
24 p1.start()
25 c1.start()

 




















以上是关于进击的Python第九章:paramiko模块线程与进程各种线程锁queue队列生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

Python第九章模块和包

Python 编程快速上手 第九章 组织文件

Python web 开发第九章开发总结

第九章 常用模块

第九章包

Python基础 - 第九天 - paramiko模块进程线程