协程
协程介绍
协程的目的是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态
cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它
协程本质上就是一个线程,以前线程任务的切换是由操作系统控制的,遇到I/O自动切换,现在我们用协程的目的就是较少操作系统切换的开销(开关线程,创建寄存器、堆栈等,在他们之间进行切换等),在我们自己的程序里面来控制任务的切换。
协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
协程就是告诉Cpython解释器,不是搞了个GIL锁吗,那好,我就自己搞成一个线程让你去执行,省去你切换线程的时间,我自己切换比你切换要快很多,避免了很多的开销,对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。
yield实现协程
其中上述的情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法。
任务切换+保存状态 = 并发
import time
def func1():
for i in range(11):
#yield
print('这是我第%s次打印啦' % i)
time.sleep(1)
def func2():
g = func1()
#next(g)
for k in range(10):
print('哈哈,我第%s次打印了' % k)
time.sleep(1)
#next(g)
#不写yield,下面两个任务是执行完func1里面所有的程序才会执行func2里面的程序,有了yield,我们实现了两个任务的切换+保存状态
func1()
func2()
总结协程特点:
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 附加:一个协程遇到IO操作自动切换到其它协程(gevent模块来实现)
greenlet实现协程
如果我们在单个线程内有20个任务,要想实现在多个任务之间切换,使用yield生成器的方式过于麻烦,而使用greenlet模块可以非常简单地实现这20个任务直接的切换
安装
pip3 install greenlet
简单使用
from greenlet import greenlet
def eat(name):
print('%s eat 1' %name)
g2.switch('taibai') #切换g2的任务
print('%s eat 2' %name)
g2.switch()
def play(name):
print('%s play 1' %name)
g1.switch()
print('%s play 2' %name)
g1=greenlet(eat) #提交greenlet任务
g2=greenlet(play)
g1.switch('taibai')#可以在第一次switch时传入参数,以后都不需要
虽然greenlet切换任务比yield方便一些,但是没有实现遇到IO自动切换任务的操作,还不够厉害,所以我们下面学一个厉害的模块。
gevent模块
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
安装
pip3 install gevent
常用方法
#用法
g1=gevent.spawn(func,1,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的,spawn是异步提交任务
g2=gevent.spawn(func2)
g1.join() #等待g1结束
g2.join() #等待g2结束 有人测试的时候会发现,不写第二个join也能执行g2,是的,协程帮你切换执行了,但是你会发现,如果g2里面的任务执行的时间长,但是不写join的话,就不会执行完等到g2剩下的任务了
#或者上述两步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
遇到IO阻塞时会自动切换任务
看示例:
import gevent
def eat(name):
print('%s eat 1' %name)
gevent.sleep(2)
print('%s eat 2' %name)
def play(name):
print('%s play 1' %name)
gevent.sleep(1)
print('%s play 2' %name)
g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('主')
结果 egon eat 1 egon play 1 egon play 2 egon eat 2 主
上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞, 而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了
from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前 或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头
看示例
from gevent import monkey;monkey.patch_all() #必须写在最上面,这句话后面的所有阻塞全部能够识别了
import gevent #直接导入即可
import time
def eat():
#print()
print('eat food 1')
time.sleep(2) #加上mokey就能够识别到time模块的sleep了
print('eat food 2')
def play():
print('play 1')
time.sleep(1) #来回切换,直到一个I/O的时间结束,这里都是我们个gevent做得,不再是控制不了的操作系统了。
print('play 2')
g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('主')
gevent模块的应用示例
示例1:爬虫
from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time
def get_page(url):
print('GET: %s' %url)
response=requests.get(url)
if response.status_code == 200:
print('%d bytes received from %s' %(len(response.text),url))
start_time=time.time()
gevent.joinall([
gevent.spawn(get_page,'https://www.python.org/'),
gevent.spawn(get_page,'https://www.yahoo.com/'),
gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
示例2 通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)
一个网络请求里面经过多个时间延迟time
服务端代码
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()
def server(server_ip,port):
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port))
s.listen(5)
while True:
conn,addr=s.accept()
gevent.spawn(talk,conn,addr)
def talk(conn,addr):
try:
while True:
res=conn.recv(1024)
print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1',8080)
客户端代码
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)