python 3.5+ 协程自我挂起及其应用

Posted on Thu 26 October 2017 in 遗迹

前言

有段时间没有写文章了。说来惭愧,没写文章的一个主要原因是这个博客还不支持传图。

近来又颇懒惰,于是就长久的没有更新了……

P.S. 此文章实际写于《esp8266 系列芯片入手指南》之前

背景

网络编程中我们经常面对双方建立连接,两边同时可以收/发消息的情况。

例如说我们连接到某服务器,最简单的状态下,我们向服务器发送一条指令,然后直接等待服务器返回并读取出数据。 也就是这种情况:

import time
import serial

s = serial.Serial('COM10')
s.write(b'AT')
time.sleep(0.5)
assert s.read_all().strip() == b'OK'

为了保证返回结果能够正确,这个过程有两个问题:

  1. 永远必须等待一条指令执行完成返回后,才能执行下一条指令
  2. 服务器基本没有机会主动向我们发送数据

实际上这是一种极大的浪费,更为流行的做法是一边发送一边接收,有的时候协议或框架自己实现了信息配对,直接在发送时传个 callback 函数就可以了。

而有些时候,则是全局共用同一个 callback。

一个典型场景是这样的:

class Connection:
    def on_message(self, text):
        msg = json.loads(text)
        head, data = msg
        if head == 'on_init':
            pass
        elif head == 'on_task_new':
            self.on_task_new_done(task_id = data['id'], data=data['data'])

    def task_new(self):
        self.send(json.dumps(['task_new']))

我相信大家应该对这一幕很熟悉,或多或少写过类似的代码。

我是在写 UART 串口程序的时候遇到这一需求的,此外 websocket 也类似,不过这一技巧对客户端的作用远大于服务端,而且基本上不会有 python websocket client的场景。

这例代码中的主要问题是将逻辑上写在一个函数中的比较方便的过程,强行分割成了两个部分。

或许有人会觉得,一应一答两相分开,清清楚楚没毛病呀?

其实不然,复杂度上升以后,这种模式会成为开发人员的噩梦。

期望

我认为对开发人员来说理想的模式是这样的:

def task_new(self):
    self.send(json.dumps(['task_new']))

    ret = many_caozuo() # 挂起自己,等待被唤醒的代码

    self.on_task_done(task_id = ret['id'], data=ret['data'])

我们知道有协程这种东西的存在,希望函数在执行到某一时间挂起自己,然后暴露一个什么对象出来。等到另一边接收到消息之后,再利用这个暴露出来的对象,将挂起的函数唤醒。

这一想法是符合逻辑的,借助 python 3.5 版本之后的特性,asyncio.Future 对象能够让我们实现挂起与等待唤醒,而 async/await 能够让整个过程变得优雅起来。

我们来举个例子,这样一个场景: 程序被切成了至少四段。我们预计改造之后会产生这样的效果:

下面我们将讨论如何做这件事情。

实现

核心问题:异步函数(协程)挂起自身,由其他函数唤醒

直接上代码:

import asyncio

future = None
loop = asyncio.get_event_loop()


async def func():
    global future
    print('func begin')
    future = loop.create_future()
    ret = await future
    print('func end with %s' % ret)


async def func2():
    print('func2 begin')
    await asyncio.sleep(2)
    future.set_result('greetings from func2')
    print('func2 end')


asyncio.ensure_future(func())
asyncio.ensure_future(func2())

loop.run_forever()
loop.close()

这段代码的意思是并行执行 func 和 func2,其中 func 直接挂起自己,而 func2 在等待两秒后将 func 唤醒然后结束。

这里 Future 对象起了关键作用,这里利用了Future的两个性质:

  1. 可以用 await 关键字等待
  2. 当 Future 实例被 set_result() 之后,该 Future 的状态会被标记为完成并且结束对实例的等待

而取出 set_result 的值,有两种方式:

  1. 在 future 完成之后,通过 ret = future.result() 取出
  2. 通过 ret = await future 等待完成并取出

我们这里使用了 await 等待的方式,而在更古老的时候,需要通过注册 callback 来完成收尾工作。具体见 Future.add_done_callback(),这里不就展开讨论了。

应用

有了工具之后,我们对协议接口稍微做一些改动,以使得演示代码简单明了。

原始格式:发 [command, data] 收 ['on_' + command, data]

修改格式:发 [random_id, command, data] 收 [random_id, 'on_' + command, data]

前面各增加一个 id 来做标识,简单生成如下:

import os
get_random_id = lambda: '%x' % int.from_bytes(os.urandom(8), 'little')

改造后的 Connection 如下:

class Connection:
    def __init__(self):
        self._wait_table = {}

    def on_message(self, text):
        msg = json.loads(text)
        rid, head, data = msg

        if rid in self._wait_table:
            fut = self._wait_table[rid]
            del self._wait_table[rid]
            fut.set_result([head, data])

    def _send_command(data: list):
        rid = get_random_id()
        future = loop.create_future()
        self._wait_table[rid] = future
        self.send(json.dumps([rid] + data))
        return future

    async def task_new(self):
        ret = await self.send_command(['task_new'])
        return ret[1]  # data, 即 task 的详细信息

    async def task_get(self, id):
        ret = await self.send_command(['task_get', {'id': id}])
        return ret[1]

这样一来,不仅逻辑能够写在一块,用户也可以直接 await conn.task_new() 来一步到位获取结果了。

这种模式在一个行为对应多条指令的情况下十分优雅,能够完爆应答分离模式(当然这主要归功于async/await语法,不然一串回调也好看不到哪儿去),举例:

    async def task_new_and_setup(self, name, timeout):
        task = await self.send_command(['task_new'])[1]
        await self.send_command(['task_set_name', task['id'], name])
        await self.send_command(['task_set_timeout', task['id'], timeout])
        await self.send_command(['task_start', task['id'])
        return task

现在还有一个问题,那就是如果一直拿不到返回结果,_send_command 会无限制的等待下去,这可不行:

    async def _send_command(data: list, timeout=10):
        rid = get_random_id()
        future = loop.create_future()
        self._wait_table[rid] = future
        self.send(json.dumps([rid] + data))
        return await asyncio.wait_for(future, timeout)

这样的话超时情况下会抛出 asyncio.TimeoutError 异常。

除此之外,如果没有信息回调机制的话,也可以使用轮询拉取信息来同样使用这个技巧

class Connection:
    def __init__(self):
        self._wait_table = {}

        def func():
            self._recv()
            loop.call_later(1, func)

        self._recv()
        loop.call_later(1, func)

    def _recv(self):
        msgs = self.check_messages()
        if msgs:
            for i in msgs:
                self.on_message(i)

结束

说实话这个技巧更有用的地点可能是前端,按照相同的思路很容易就能写一个JS版本。

在浏览器控制台中做测试:

let a = null;

let b = new Promise((resolve, reject) => {
    a = resolve;
}).then(() => {
    console.log(1111);
});

随后运行:

a()

得到 1111 的输出。那么在实际应用中 await 该 promise 即可达成目的。

此场景有奇效

  'task_create'
=> 'task_create_ret'

   'task_set_param1', id
=> 'task_set_param1_ret'

   'task_set_param2', id
=> 'task_set_param2_ret'

   'task_start', id
=> 'task_start_ret'

可改写为

let ret = await command('task_create')
ret = await command('task_set_param1', ret.id, 111)
ret = await command('task_set_param2', ret.id, 222)
ret = await command('task_start', ret.id)

update 1: 加入了 JS 相关内容

update 2: 加了两张说明图