UniRx 详解

如何在Unity中优雅地编写异步程序

Posted by SWZ on January 12, 2020

前言

我们都知道,游戏很多的系统都是在时间上异步的,所以 Unity 开发者所需要实现的异步逻辑是非常多的。这也是为什么 Unity官方在引擎层实现了Coroutine(协程)这样的概念。

在游戏中,像动画的播放、声音的播放、网络请求、资源加载/卸载、Tween动画、场景过渡等都是在时间上异步的逻辑。甚至是游戏循环(Every Update、OnCollisionEnter 等)、传感器数据(Kinect、Leap Motion、VR Input 等)都是时间上异步的逻辑(事件)。

如果你有发现你的代码在使用协程时为了满足业务逻辑不得不出现强耦合的代码块,亦或是处理异步问题时代码编写过于复杂晦涩,或者是异步处理时会出现卡顿、卡死的情况。

那么此次的分享会对你一定有帮助!


初识UniRx

Rx是Reactive Extension的简单写法,翻译过来就是响应式扩展,而UniRx是Rx For Unity的意思,它是一个 Unity3D 的编程框架,专注于解决时间上异步的逻辑,使得异步逻辑的实现更加简洁和优雅。

Rx 只是⼀套标准,在其他语言也有实现,如果在 Unity 中熟悉了这套标准,在其他语言上也是可以很 快上手的。比如 RxJava、Rx.cpp、SwiftRx 等等。

Reactive Extensions 以擅长处理时间上异步的逻辑、以及极简的 API 风格而闻名

【响应式】一种面向数据流和变化传播的编程范式。可以很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

【扩展】对Linq的一种扩展,其关键的特性在于他不是基于IEnumerable而是基于IObservable的,这个接口用于迭代Observable集合。

设计理念

(f * g)(x) = f(g(x))

希望可以通过类似二元运算中结合律的性质来实现事件与事件的组合

在具体的代码中我们会采用链式编程方式来实现结合律,所谓的链式编程就是指可以通过”点”语法,将需要执行的代码块连续的书写下去,使得代码简单易读,书写方便。

三大核心

  1. Observable

    最为关键的是IObservable和IObserver两个基类

  2. Linq

    组织、变换、过滤、合并

  3. Scheduler

    调度器,来规定在什么时间什么地点执行什么事情

诞生由来

这里先抛出个概念[二元性],所谓的二元性就是说事物之间是相对的,有黑即有白,有阴即有阳,有一种事物的存在,必然有另一种事物与之相对。

IEnumerator和IObserver即是这样的关系,IEnumerator是基于,而IObserver是基于

IEnumerator(迭代器)通过从数据源拉数据来完成读取,当一个请求过来时任务会被阻塞,直到这个任务完成后再返回给前端。 IObserver(观察者)首先注册用户需求所需的回调,当满足条件时推送消息。

了解了以上概念后我们来看下以下两个接口,这两个接口看似毫无联系,但是却有着密不可分的联系。 Current Vs void OnNext(T value)

Current是用于获取当前数据的,直接从远端拉,然后返回的一个数据。 OnNext则是远端推一个数据过来的时候进行的相应的处理。


基本语法

Observable.xx().Subscribe()即是UniRx的基本语法特征,由 事件源+操作符+注册关键字 所组成。

常用操作符

Common

  • Select 在查询式写法中,编译器要求必须要在最后 select 一下,链式写法中省略了。子句中的变量作为返回
  • SelectMany UniRx中主要是完成 Coroutine 的顺序执行功能
  • Where
  • Distinct

Paging

  • Take 从序列的开头返回指定数量的相邻元素
  • Skip
  • Buffer 一种分组,可以按timespan/count
  • First

Concatenate

  • Concat 连接两个序列(与Merge区别:Concat是顺序的,Merge是非顺序的)
  • Merge
  • WhenAll

Time

  • Interval
  • Timer
  • Delay
  • Throttle
  • Timeout

Unity独占

  • FromCoroutine
  • ToYieldInstruction
  • EveryUpdate
  • ObserveOnMainThread
  • Get/Post
  • LoadFromCacheOrDownload
  • UpdateAsObservable
  • OnEnableAsObservable
  • OnDestroyAsObservable
  • OnClickAsObservable
  • OnValueChangedAsObservalbe
  • SubscribeToText
  • OnCollisionEnterAsObservable
  • OnPointerEnterAsObservable

生命周期

在Rx中IDisposable占到了非常重要的一个角色,那就是卸载事件,AddTo就是一种卸载接口,其实底层也是调用了IDisposable的接口。与迭代器不同,没下一个就完事儿了,观察者模式永远也不知道有没有下一个,所以主动地卸载事件是非常重要的一个事情。


应用示例

示例一:

 Observable.EveryUpdate()
              .Where(_ => Input.GetMouseButtonUp(0))
              .First()
              .Subscribe(_ => { /* do something*/ })
	      .AddTo(this);

如果在 Unity 中,使用传统的方式实现如上功能,首先要创建一个成员变量来记录点击次数/是否点击过,然后在脚本中创建一个 Update 方法来监听鼠标抬起的事件。如果在 Update 方法中,除了实现鼠标事件监听这个功能之外,还要实现其他的功能。那么 Update 里就会充斥着大量的状态判断等逻辑。代码非常不容易阅读。

而 UniRx 提供了一种编程思维,使得平时一些比较难以实现的异步逻辑(比如以上这种),使用 UniRx 轻松搞定,并且不失代码的可读性。

示例二:

List<IObservable<Unit>> list = new List<IObservable<Unit>>();
list.Add(Observable.FromCoroutine(_ => AsyncA()));
list.Add(Observable.FromCoroutine<Unit>(_ => AsyncB("")));
var stream = Observable.WhenAll(list.ToArray());
stream.ObserveOnMainThread()
    .Subscribe(
    _ => Debug.Log (OnNext"),
    e => Debug.Log ("Error"),
    () => Debug.Log ("OnComplete")
    );

ObserveOnMainThread,意思是把 WhellAll 结果转到主线程上,这样 Subscribe 里的回调就可以使用 Unity 的 API 了(Unity 的很多 API 不可以在其他线程中使用 )。


深入剖析

在深入研究UniRx之前先提一个小问题。

Q :如果我不进行注册,事件源还会派送事件么? 例:Observable.Range(1,10) //从1开始,每次以步长为1递增,共执行10次

在讲解源码之前,这里先得讲解一种设计模式,这有助于大家对UniRx底层设计的理解。该模式就是装饰者模式。下面附上该模式的UML图。

装饰者模式定义:动态的将责任附加到对象上。若要扩展功能,装饰者提供了比继承更有弹性的替代方案。

装饰者模式有点套娃的感觉,它利用继承来达到类型匹配,而不是利用继承获取行为。

优点

  • 装饰者模式与继承关系的目的都是要扩展对象的功能,但是装饰者模式可以提供比继承更多的灵活性

  • 通过使用不同的具体装饰类以及这些装饰类的排列组合,可以创造出很多不同行为的组合。

缺点

  • 这种比继承更加灵活机动的特性,也同时意味着更加多的复杂性。
  • 装饰模式会导致设计中出现许多小类,会使程序变得很复杂。

那接下来我们再来看下UniRx的UML图:

核心源码展示

Observable.EveryUpdate().First().Subscribe(_ => {});
public static IDisposible Subscribe<T>(this IObservable<T> source, Action<T> onNext)
{
    return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, Stubs.Nop));
}

这里注意,First().Subscribe()这里的Subscribe并不是成员函数,这里是一个扩展函数,相当于Subscribe(First(), _ => {}),真正的Subscribe成员函数在下面,请看下面的代码展示。

public IDisposable Subscribe(IObserver<T> observer)
{
    var subscription = new SingleAssignmentDisposable();
    if(isRequiredSubscribeOnCurrentThread && Scheduler.IsCurrentThreadSchedulerScheduleRequired)
    {
        Scheduler.CurrentThread.Schedule(() => subscription.Disposable = SubscribeCore(observer, subscription));
    }
    else
    {
        subscription.Disposable = SubscribeCore(observer, subscription);
    }
    
    return subscripion;
}
protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
{
    if(predicate == null)
    {
        return source.Subscribe(new First(this, observer, cancel));
    }
    else
    {
        return source.Subscribe(new First_(this, observer, cancel));
    }
}

可以看到成员函数Subscribe主要还是负责封装一些注册信息,而注册的核心代码则在SubscribeCore里面,具体的操作符筛选逻辑在对应的操作符.cs文件里,例如上面的First.cs

public override void OnNext(T value)
{
    if(notPublished)
    {
        notPublished = false;
        observer.OnNext(vaule);
        try { observer.OnComplete(); }
        finally { Dispose(); }
        return;
    }
}

OnNext函数则是可以让整个流程串起来的关键。

运作流程

首先Observable.事件源()先生成一个事件源,这样有了事件源之后,我们就可以用前面提到的操作符来对其进行操作了,调用了操作符之后该类型的操作符会返回一个对应的IObservable对象,最后调用Subscribe注册回调事件。

那么我们注册的回调事件到底是怎么被触发的呢?答案如下图所示:

Scheduler

最后来介绍下这个Scheduler,虽然在代码里的存在感比较低,但是也是非常重要的一个角色。老规矩,先上它的UML图。

Now代表的是当前时间 Schedule(Action action)对传入的方法进行调度 Schedule(TimeSpan dueTime, Action action)对传入时间对行为进行调度

在Rx中,使用Scheduler来控制并发。而对于Scheduler我们可以理解为程序调度(调度器),通过Scheduler来规定在什么时间什么地点执行什么事情。比如按固定时间间隔发射、每帧发射、只发射n次等等。调度器实际上也只是将行为放入队列,然后在Thread中进行wait,并没有使用协程。

UniRx提供了以下几种Scheduler:

  • CurrentThreadScheduler:在当前线程执行

  • ImmediateScheduler:在当前线程立即执行

  • ThreadPoolScheduler:即在线程池中执行

  • MainThreadScheduler:在主线程中执行

答案揭晓

在分析了大部分核心源码之后大家对本小节开头的问题有答案了么。

答案当然是不会触发了~只有进行了Subscribe操作后,才会生成IObserver对象,然后才能驱动这一整条操作链。

你有没有答对呢。


推荐资料