任务、控制流与事件循环(一)
发布于 - 修改于 - 大约需要 5 分钟 - 2172 字简介
我这次准备了系列文章,预计四篇,每一篇都非常简短地介绍一个主题。这主要是为了分享我最近的一些学习和思考。
这个系列的主要编程语言将会是 Racket。因为我觉得 Racket 的安装非常方便,文档十分完善,可以说是天下第二好学的语言,仅次于 MoonBit (声明:利益相关)。
这个系列主要会分别介绍任务的抽象、控制流的操作、以及事件循环的实现,最后第四篇则会是一个大杂烩,争取把几个主题攥在一起。
这句话大家其实可以看到,里面包含了阅读四篇文章的任务,并且存在着依赖顺序,即最后一篇文章应当在阅读了前三篇文章之后阅读,而前三篇文章 之间则没有阅读的先后顺序(当然最好先读第一章不然这个简介就看不到了)。那么我们不妨从任务这个话题聊起。
任务
任务有几种定义。一种定义被写作 IO[T]
,在函数式编程中较为常见。不过这描述的是一个计算值 T
的任务,并且我们仅仅是
定义了这个任务,没有开始执行,一般要等到一个 run
的时候才开始进行。
大家在日常编写中见到较多的可能还是 Promise[T]
或者 Future[T]
。
Promise[T]
或者 Future[T]
在多数编程语言中象征着一个将会在未来被计算出的值,类型为 T
。
那么显而易见,其背后对应着一个计算这个值的任务,可能正在执行中。
当我们向这个值挂上回调函数的时候,我们就是在声明,有另一个任务应当在这个任务结束后再进行。
而仅仅提供值意味着我们并没有对这个任务的控制权,无法做到类似于取消任务的操作。因此我们可能需要一个内容更丰富的抽象。
在 Java 中,
这个类型 FutureTask<V>
是对于 Future<V>
的一个实现。在 Future
的一系列方法以外还提供了 cancel
等操作。
那我们今天就来简单看看 Promise
或者 Future
的实现。我个人喜欢叫 Future
,因此下文都写作 Future
。内容主要参考
CS3110。好书,推荐大家一读。唯一的遗憾可能是不是用 Racket 写的(逃
Future
Future 有三种状态:等待结果、已完成、已失败。其中,已完成和已失败包含一个对应的值,而等待结果则包含一些回调函数,这些回调函数会以状态为参数。
#lang racket
(struct state [])
(struct pending state [callbacks])
(struct success state [value])
(struct failure state [value])
(struct future [ state ] #:mutable)
那么对于一个 Future
,如果我们想要改变它的状态,改为成功或失败时,我们可以模式匹配,修改其中的值,并且调用所有的回调函数。这里我们用柯里化定义来简化后续的实现:
(define ((resolve future) value)
(match (future-state future)
;; 如果状态为计算中
[(pending callbacks)
(begin
;; 更新状态
(set-future-state! future (success value))
;; 调用所有回调函数
(for ([callback callbacks])
(callback (success value))))]
[_ (void)]))
接下来我们添加成功时的回调。参数是一个针对正确时值的回调函数:
(define (on-success future f)
(match (future-state future)
;; 我们进行模式匹配,如果这个值已经计算出,那么我们立刻计算
[(success value) (f value)]
[(pending callbacks)
;; 否则,我们把这个回调进行包裹,并添加进状态中
(set-future-state! future
(pending (cons (match-λ
((success value) (f value))
(_ (void)))
callbacks)))]
[_ (void)]))
我们这里省略处理错误的部分,因为只要把上面的两段中的所有 success
替换为 failure
即可。
最后则是创建 Future
,它会返回 Future
以及用来兑现其中值的 resolve
和 reject
,
类似于 JavaScript 最新的 Promise.withResolvers
,
或者 Scala 的 Promise
,
Java 的 CompletableFuture
:
(define (make)
(let ([future (future (pending '()))])
(values future (resolve future) (reject future))))
上述代码应当能通过以下测例:
(module+ test
(require rackunit)
(test-case
"resolve value should trigger on-success"
(let-values ([(box) (box #f)]
[(future resolve reject) (make)])
(begin
(on-success future (λ (v) (set-box! box v)))
(on-failure future (λ (v) (set-box! box (v . + . 1))))
(resolve 5)
(check-eq? (unbox box) 5))))
(test-case
"on-success should execute upon a resolved future"
(let-values ([(box) (box #f)]
[(future resolve reject) (make)])
(begin
(resolve 5)
(on-success future (λ (v) (set-box! box v)))
(check-eq? (unbox box) 5)))))
到这里为止,我们就已经实现了简单的 Future
。
依赖关系
简单的单个任务之间的顺序依赖和条件依赖我们其实已经实现了。on_success
其实就是定义了一个任务需要在另一个任务完成后才执行,并且条件是任务执行成功;而 on_reject
则相反;无论如何都要执行的 on_complete
或者说 finally
也不难实现,只需要不对运行结果进行模式匹配即可。
而多个任务之间的依赖,一般有 all
any
等。通过回调函数以及闭包等功能,我们可以简单地实现一下:
(define (any-of . futures)
(let-values ([(future resolve reject) (make)]
[(pending-count) (box 0)] ;; 当前尚未计算出的值
[(failures) (box '())]) ;; 当前已经统计的失败
(for ([f futures])
(set-box! pending-count (+ (unbox pending-count) 1))
(on-success f resolve)
(on-failure f (λ (reason)
(let ([v (unbox pending-count)]
[reasons (unbox failures)])
(if (equal? v 1)
(reject (cons reason reasons))
(begin (set-box! pending-count (- v 1))
(set-box! failures (cons reason reasons))))))))
future))
这一部分应当通过以下测例:
(module+ test
(require rackunit)
(test-case
"any should resolve to the first success"
(let-values ([(box) (box #f)]
[(future1 resolve1 reject1) (make)]
[(future2 resolve2 reject2) (make)]
[(future3 resolve3 reject3) (make)])
(let ([future (any-of future1 future2 future3)])
(begin
(on-success future (λ (v) (set-box! box v)))
(on-failure future (λ (v) (set-box! box v)))
(resolve1 1)
(resolve2 2)
(resolve3 3)
(check-equal? (unbox box) 1)))))
(test-case
"any should resolve to reasons on total failure"
(let-values ([(box) (box #f)]
[(future1 resolve1 reject1) (make)]
[(future2 resolve2 reject2) (make)]
[(future3 resolve3 reject3) (make)])
(let ([future (any-of future1 future2 future3)])
(begin
(on-success future (λ (v) (set-box! box v)))
(on-failure future (λ (v) (set-box! box v)))
(reject1 1)
(reject2 2)
(reject3 3)
(check-equal? (unbox box) '(3 2 1))))))
(test-case
"any should resolve to success if any succeed"
(let-values ([(box) (box #f)]
[(future1 resolve1 reject1) (make)]
[(future2 resolve2 reject2) (make)]
[(future3 resolve3 reject3) (make)])
(let ([future (any-of future1 future2 future3)])
(begin
(on-success future (λ (v) (set-box! box v)))
(on-failure future (λ (v) (set-box! box v)))
(reject1 1)
(resolve2 2)
(reject3 3)
(check-equal? (unbox box) 2))))))
all
与 any
是对称的,因此这里省略。另外更好的实现应当考虑错误与输入一一对应,可以使用 vector
实现,这里我就先偷懒了。
总结
这一章中我们展示了 Future
的简单实现。事实上我们可以注意到,虽然我们平时遇到 Future
都是在异步的场景下遇到,但实际上 Future
与异步并不直接强绑定。Future
,如我们开头所说,只是对一个计算逻辑的抽象,表示了一个正在计算的值,包含了对这个值后续计算的定义。后续计算,说到这里,可能有的人会想起我以前介绍过的,continuation 的概念。下一章不妨让我们接着这个话题,聊聊控制流变换。
同时我们还注意到,在 any
的实现中,当有任意一个值计算完成后,我们会去设置最终的目标值,但此时其他任务事实上已经没有用了,而依然在计算,但 Future
的抽象并不包含取消的概念,因为它是对计算结果的抽象,无法操作计算过程。因此,在这种场合,我们或许需要用到 Task
的概念。同样的,由于 Future
只是对计算结果的抽象,因此在构造时返回 resolve
和 reject
似乎有悖这个概念。这或许也是为什么 Scala 选择拆分为 Promise
和 Future
吧。