同步与阻塞的关系
状态
在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。
(1)就绪(Ready)状态 当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。 (2)执行/运行(Running)状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。 (3)阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。
同步与异步
所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列其实就是一个程序结束才执行另外一个程序,串行的,不一定两个程序就有依赖关系。
所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。
举例
比如我们去楼下的老家肉饼吃饭,饭点好了,取餐的时候发生了一些同步异步的事情。同步:我们都站在队里等着取餐,前面有个人点了一份肉饼,后厨做了很久,但是由于同步机制,我们 还是要站在队里等着前面那个人的肉饼做好取走,我们才往前走一步。
异步:我们点完餐之后,点餐员给了我们一个取餐号码,跟你说,你不用在这里排队等着,去找个地方坐着玩手机去吧,等饭做好了,我叫你。这种机制(等待别人通知)就是异步等待消息通知。在异步消息处理中,等待消息通知者(在这个例子中等着取餐的你)往往注册一个回调机制,在所等待的事件被触发时由触发机制(点餐员)通过某种机制(喊号,‘250号你的包子好了‘)找到等待该事件的人。
阻塞和非阻塞
阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的
阻塞和非阻塞举例
继续上面的那个例子,不论是排队还是使用号码等待通知,如果在这个等待的过程中,等待者除了等待消息通知之外不能做其它的事情,那么该机制就是阻塞的,表现在程序中,也就是该程序一直阻塞在该函数调用处不能继续往下执行。相反,有的人喜欢在等待取餐的时候一边打游戏一边等待,这样的状态就是非阻塞的,因为他(等待者)没有阻塞在这个消息通知上,而是一边做自己的事情一边等待。阻塞的方法:input、time.sleep,socket中的recv、accept等等。
比较
同步阻塞形式 效率最低。拿上面的例子来说,就是你专心排队,什么别的事都不做。
异步阻塞形式
如果在排队取餐的人采用的是异步的方式去等待消息被触发(通知)
,也就是领了一张小纸条,假如在这段时间里他不能做其它的事情,就在那坐着等着,不能玩游戏等,那么很显然,这个人被阻塞在了这个等待的操作上面;
异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。
同步非阻塞形式
实际上是效率低下的。
想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有,如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换
,效率可想而知是低下的。
异步非阻塞形式
效率更高,
因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换
。
比如说,这个人突然发觉自己烟瘾犯了,需要出去抽根烟,于是他告诉点餐员说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操作上面,自然这个就是异步+非阻塞的方式了。
很多人会把同步和阻塞混淆,是因为很多时候同步操作会以阻塞的形式表现出来
,同样的,很多人也会把异步和非阻塞混淆,因为异步操作一般都不会在真正的IO操作处被阻塞
。
进程
进程创建的两种方式
方式1(推荐)
from multiprocessing import Process
def func():
print(12345)
if __name__ == '__main__':
p = Process(target=func,)
p.start()
print('*' * 10)
方式2(了解)
class MyProcess(Process):
def __init__(self,person):
super().__init__()
self.person=person
def run(self):
print(os.getpid())
print(self.pid)
print(self.pid)
print('%s 正在和女主播聊天' %self.person)
if __name__ == '__main__':
p1=MyProcess('Jedan')
p2=MyProcess('太白')
p3=MyProcess('alexDSB')
p1.start()
p2.start()
# p2.run()
p3.start()
子进程和主进程
我们通过主进程创建的子进程是异步执行的,那么我们就验证一下,并且看一下子进程和主进程(也就是父进程)的ID号,来看看是否是父子关系。
import time
import os
#os.getpid() 获取自己进程的ID号
#os.getppid() 获取自己进程的父进程的ID号
from multiprocessing import Process
def func():
print('aaaa')
time.sleep(1)
print('子进程>>',os.getpid())
print('该子进程的父进程>>',os.getppid())
print(12345)
if __name__ == '__main__':
#首先我运行当前这个文件,运行的这个文件的程序,那么就产生了主进程
p = Process(target=func,)
p.start()
print('*' * 10)
print('父进程>>',os.getpid())
print('父进程的父进程>>',os.getppid())
结果
#********** 首先打印出来了出进程的程序,然后打印的是子进程的,也就是子进程是异步执行的,相当于主进程和子进程同时运行着,如果是同步的话,我们先执行的是func(),然后再打印主进程最后的10个*号。
#父进程>> 3308
#父进程的父进程>> 5916 #我运行的test.py文件的父进程号,它是pycharm的进程号,看下面的截图
#aaaa
#子进程>> 4536
#该子进程的父进程>> 3308 #是我主进程的ID号,说明主进程为它的父进程
#12345
进程之间是空间隔离的
进程之间的数据是隔离的,也就是数据不共享
from multiprocessing import Process
n=100 #全局变量
def work():
global n
n=0
print('子进程内: ',n)
if __name__ == '__main__':
p=Process(target=work)
p.start()
p.join()
print('主进程内: ',n)
结果
#看结果:
# 子进程内: 0
# 主进程内: 100
进程对象的其他方法
name和pid的用法
import time
import random
from multiprocessing import Process
class Piao(Process):
def __init__(self,name):
#为我们开启的进程设置名字的做法
super().__init__()
self.name=name
def run(self):
print('%s is piaoing' %self.name)
time.sleep(random.randrange(1,3))
print('%s is piao end' %self.name)
p=Piao('egon')
p.start()
print('开始')
print(p.pid) #查看pid
结果
开始
934
egon is piaoing
egon is piao end
守护进程
前我们讲的子进程是不会随着主进程的结束而结束,子进程全部执行完之后,程序才结束,那么如果有一天我们的需求是我的主进程结束了,由我主进程创建的那些子进程必须跟着结束,怎么办?守护进程就来了!
主进程创建守护进程 其一:守护进程会在主进程代码执行结束后就终止 其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
import os
import time
from multiprocessing import Process
class Myprocess(Process):
def __init__(self,person):
super().__init__()
self.person = person
def run(self):
print(os.getpid(),self.name)
print('%s正在和女主播聊天' %self.person)
time.sleep(3)
if __name__ == '__main__':
p=Myprocess('太白')
p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
p.start()
# time.sleep(1) # 在sleep时linux下查看进程id对应的进程ps -ef|grep id
# p.join()
print('主')
进程同步
通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题:进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。
import os
import time
import random
from multiprocessing import Process
def work(n):
print('%s: %s is running' %(n,os.getpid()))
time.sleep(random.random())
print('%s:%s is done' %(n,os.getpid()))
if __name__ == '__main__':
for i in range(5):
p=Process(target=work,args=(i,))
p.start()
结果
# 看结果:通过结果可以看出两个问题:问题一:每个进程中work函数的第一个打印就不是按照我们for循环的0-4的顺序来打印的
#问题二:我们发现,每个work进程中有两个打印,但是我们看到所有进程中第一个打印的顺序为0-2-1-4-3,但是第二个打印没有按照这个顺序,变成了2-1-0-3-4,说明我们一个进程中的程序的执行顺序都混乱了。
#问题的解决方法,第二个问题加锁来解决,第一个问题是没有办法解决的,因为进程开到了内核,有操作系统来决定进程的调度,我们自己控制不了
# 0: 9560 is running
# 2: 13824 is running
# 1: 7476 is running
# 4: 11296 is running
# 3: 14364 is running
# 2:13824 is done
# 1:7476 is done
# 0:9560 is done
# 3:14364 is done
# 4:11296 is done
解决办法,加锁
锁
上述示例加锁
#由并发变成了串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process,Lock
import os,time
def work(n,lock):
#加锁,保证每次只有一个进程在执行锁里面的程序,这一段程序对于所有写上这个锁的进程,大家都变成了串行
lock.acquire()
print('%s: %s is running' %(n,os.getpid()))
time.sleep(1)
print('%s:%s is done' %(n,os.getpid()))
#解锁,解锁之后其他进程才能去执行自己的程序
lock.release()
if __name__ == '__main__':
lock=Lock()
for i in range(5):
p=Process(target=work,args=(i,lock))
p.start()
进程通信
队列
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出。
队列的方法介绍
q = Queue([maxsize])
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。
队列的简单使用
from multiprocessing import Queue
q=Queue(3) #创建一个队列对象,队列长度为3
#put ,get ,put_nowait,get_nowait,full,empty
q.put(3) #往队列中添加数据
q.put(2)
q.qsize 查看当前队列中已使用的长度 -- 2
q.put(1)
# q.put(4) # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
# 如果队列中的数据一直不被取走,程序就会永远停在这里。
try:
q.put_nowait(4) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
print('队列已经满了')
# 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
print(q.full()) #查看是否满了,满了返回True,不满返回False
print(q.get()) #取出数据
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
try:
q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
print('队列已经空了')
print(q.empty()) #空了
子/主进程通过队列进行通信
示例
#看下面的队列的时候,按照编号看注释
import time
from multiprocessing import Process, Queue
def f(q):
q.put('姑娘,多少钱~')
# print(q.qsize()) #查看队列中有多少条数据了
def f2(q):
print('》》》》》》》》')
print(q.get())
if __name__ == '__main__':
q = Queue()
q.put('小鬼')
p = Process(target=f, args=(q,))
p2 = Process(target=f2, args=(q,))
p.start()
p2.start()
time.sleep(1)
print(q.get()) #结果:姑娘,多少钱~
p.join()
输出:
》》》》》》》》
小鬼
姑娘,多少钱~
生产者和消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,并且我可以根据生产速度和消费速度来均衡一下多少个生产者可以为多少个消费者提供足够的服务,就可以开多进程等等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。
通过队列实现一个生产者和消费者模型
import time,random,os
from multiprocessing import Process,Queue
def consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
if __name__ == '__main__':
q=Queue()
#生产者们:即厨师们
p1=Process(target=producer,args=(q,))
#消费者们:即吃货们
c1=Process(target=consumer,args=(q,))
#开始
p1.start()
c1.start()
print('主')
上述模型解释
#生产者消费者模型总结
#程序中有两类角色
一类负责生产数据(生产者)
一类负责处理数据(消费者)
#引入生产者消费者模型为了解决的问题是:
平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
#如何实现:
生产者<-->队列<——>消费者
#生产者消费者模型实现类程序的解耦和
缓冲和解耦