Python 协程的本质?原来也不过如此

发布时间:2022-08-19 11:31

都是单线程,为什么原来低效率的代码用了 async、await 加一些异步库就变得效率高了?

如果在做基于 Python 的网络或者 Web 开发时,对于这个问题曾感到疑惑,这篇文章尝试给一个答案。

0x00 开始之前

首先,本文不是带你浏览源代码,然后对照原始代码给你讲 Python 标准的实现。相反,我们会从实际问题出发,思考解决问题的方案,一步步体会解决方案的演进路径,最重要的,希望能在过程中获得知识系统性提升。
⚠️ 本文仅是提供了一个独立的思考方向,并未遵循历史和现有实际具体的实现细节。
其次,阅读这篇文章需要你对 Python 比较熟悉,至少了解 Python 中的生成器 generator 的概念。

0x01 IO 多路复用
这是性能的关键。但我们这里只解释概念,其实现细节不是重点,这对我们理解 Python 的协程已经足够了,如已足够了解,前进到 0x02。
首先,你要知道所有的网络服务程序都是一个巨大的死循环,你的业务逻辑都在这个循环的某个时刻被调用:

def handler(request):
    # 处理请求
    pass

# 你的 handler 运行在 while 循环中
while True:
    # 获取一个新请求
    request = accept()
    # 根据路由映射获取到用户写的业务逻辑函数
    handler = get_handler(request)
    # 运行用户的handler,处理请求
    handler(request)

设想你的 Web 服务的某个 handler,在接收到请求后需要一个 API 调用才能响应结果。
对于最传统的网络应用,你的 API 请求发出去后在等待响应,此时程序停止运行,甚至新的请求也得在响应结束后才进得来。如果你依赖的 API 请求网络丢包严重,响应特别慢呢?那应用的吞吐量将非常低。
很多传统 Web 服务器使用多线程技术解决这个问题:把 handler 的运行放到其他线程上,每个线程处理一个请求,本线程阻塞不影响新请求进入。这能一定程度上解决问题,但对于并发比较大的系统,过多线程调度会带来很大的性能开销。
IO 多路复用可以做到不使用线程解决问题,它是由操作系统内核提供的功能,可以说专门为这类场景而生。简单来讲,你的程序遇到网络IO时,告诉操作系统帮你盯着,同时操作系统提供给你一个方法,让你可以随时获取到有哪些 IO 操作已经完成。就像这样:

# 操作系统的IO复用示例伪代码
# 向操作系统IO注册自己关注的IO操作的id和类型
io_register(io_id, io_type)
io_register(io_id, io_type)

# 获取完成的IO操作
events = io_get_finished()

for (io_id, io_type) in events:
    if io_type == READ:
        data = read_data(io_id) 
    elif io_type == WRITE:
        write_data(io_id,data)

把 IO 复用逻辑融合到我们的服务器中,大概会像这样:

call_backs = {}

def handler(req):
    # do jobs here
    io_register(io_id, io_type)
    def call_back(result):
        # 使用返回的result完成剩余工作...
    call_backs[io_id] = call_back

# 新的循环
while True:
    # 获取已经完成的io事件
    events = io_get_finished()
    for (io_id, io_type) in events:
        if io_type == READ: # 读取
            data = read(io_id) 
            call_back = call_backs[io_id]
            call_back(data)
        else:
            # 其他类型io事件的处理
            pass

    # 获取一个新请求
    request = accept()
    # 根据路由映射获取到用户写的业务逻辑函数
    handler = get_handler(request)
    # 运行用户的handler,处理请求
    handler(request)

我们的 handler 对于 IO 操作,注册了回调就立刻返回,同时每次迭代都会对已完成的 IO 执行回调,网络请求不再阻塞整个服务器。
上面的伪代码仅便于理解,具体实现细节更复杂。而且就连接受新请求也是在从操作系统得到监听端口的 IO 事件后进行的。
我们如果把循环部分还有 call_backs 字典拆分到单独模块,就能得到一个 EventLoop,也就是 Python 标准库 asyncio 包中提供的 ioloop。

0x02 用生成器消除 callback
着重看下我们业务中经常写的 handler 函数,在有独立的 ioloop 后,它现在变成类似这样:

def handler(request):
    # 业务逻辑代码...
    # 需要执行一次 API 请求
    def call_back(result):
        # 使用 API 返回的result完成剩余工作
        print(result)
    # 没有io_call这个方法,这里只是示意,表示注册一个IO操作
    asyncio.get_event_loop().io_call(api, call_back)

到这里,性能问题已经解决了:我们不再需要多线程就能源源不断接受新请求,而且不用care依赖的 API 响应有多慢。

但是我们也引入了一个新问题,原来流畅的业务逻辑代码现在被拆成了两部分,请求 API 之前的代码还正常,请求 API 之后的代码只能写在回调函数里面了。

这里我们业务逻辑只有一个 API 调用,如果有多个 API ,再加上对 Redis 或者 MySQL 的调用(它们本质也是网络请求),整个逻辑会被拆分的更散,这对业务开发是一笔负担。

对于有匿名函数的一些语言(没错就是JavaScript),还可能会引发所谓的「回调地狱」。

接下来我们想办法解决这个问题。

我们很容易会想到:如果函数在运行到网络 IO 操作处后能够暂停,完成后又能在断点处唤醒就好了。
如果你对 Python 的「生成器」熟悉,你应该会发现,它恰好具有这个功能:

def example():
    value = yield 2
    print("get", value)
    return value

g = example()
# 启动生成器,我们会得到 2
got = g.send(None)
print(got)  # 2

try:
    # 再次启动 会显示 "get 4", 就是我们传入的值
    got = g.send(got*2)
except StopIteration as e:
    # 生成器运行完成,将会print(4),e.value 是生成器return的值
    print(e.value)

函数中有 yield 关键字,调用函数将会得到一个生成器,生成器一个关键的方法 send() 可以跟生成器交互。

g.send(None) 会运行生成器内代码直到遇到 yield,并返回其后的对象,也就是 2,生成器代码就停在这里了,直到我们再次执行 g.send(got2),会把 22 也就是 4 赋值给yield 前面的变量 value,然后继续运行生成器代码。

yield 在这里就像一扇门,可以把一件东西从这里送出去,也可以把另一件东西拿进来。

如果 send 让生成器运行到下一个 yield 前就结束了,send 调用会引发一个特殊的异常StopIteration,这个异常自带一个属性 value,为生成器 return 的值。

如果我们把我们的 handler 用 yield 关键字转换成一个生成器,运行它来把 IO 操作的具体内容返回,IO 完成后的回调函数中把 IO 结果放回并恢复生成器运行,那就解决了业务代码不流畅的问题了:

def handler(request):
    # 业务逻辑代码...
    # 需要执行一次 API 请求,直接把 IO 请求信息yield出去
    result = yield io_info
    # 使用 API 返回的result完成剩余工作
    print(result)

# 这个函数注册到ioloop中,用来当有新请求的时候回调
def on_request(request):
    # 根据路由映射获取到用户写的业务逻辑函数
    handler = get_handler(request)
    g = handler(request)
    # 首次启动获得io_info
    io_info = g.send(None)

    # io完成回调函数
    def call_back(result):
        # 重新启动生成器
        g.send(result)

    asyncio.get_event_loop().io_call(io_info, call_back)

上面的例子,用户写的 handler 代码已经不会被打散到 callback 中,on_request 函数使用 callback 和 ioloop 交互,但它会被实现在 Web 框架中,对用户不可见。
上面代码足以给我们提供用生成器消灭的 callback 的启发,但局限性有两点:
业务逻辑中仅发起一次网络 IO,但实际中往往更多
业务逻辑没有调用其他异步函数(协程),但实际中我们往往会调用其他协程

0x03 解决完整调用链

我们来看一个更复杂的例子:
其中 request 执行真正的 IO,func1、func2 仅调用。显然我们的代码只能写成这样:

def func1():
    ret = yield request("http://test.com/foo")
    ret = yield func2(ret)
    return ret

def func2(data):
    result = yield request("http://test.com/"+data)
    return result

def request(url):
    # 这里模拟返回一个io操作,包含io操作的所有信息,这里用字符串简化代替
    result = yield "iojob of %s" % url
    return result

对于 request,我们把 IO 操作通过 yield 暴露给框架。

对于 func1 和 func2,调用 request 显然也要加 yield 关键字,否则 request 调用返回一个生成器后不会暂停,继续执行后续逻辑显然会出错。
这基本就是我们在没有 yield from、aysnc、await 时代,在 tornado 框架中写异步代码的样子。

要运行整个调用栈,大概流程如下:

  • 调用 func1() 得到生成器
  • 调用 send(None) 启动它得到会得到 request("http://test.com/foo") 的结果,还是生成器对象
  • send(None) 启动由 request() 产生的生成器,会得到 IO 操作,由框架注册到 ioloop 并指定回调
  • IO 完成后的回调函数内唤醒 request 生成器,生成器会走到 return 语句结束
  • 捕获异常得到 request 生成器的返回值,将上一层 func1 唤醒,同时又得到 func2() 生成器
  • 继续执行...

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号