【Unity】UniRxで作成したストリームを停止する

UniRxで作り出したストリームは基本的にOnCompletedされない限り残り続けてしまいます。必要なくなったストリームを外部から止めるためにはSubscribeメソッドが呼ばれたときに返される、IDisposable型のインタフェースのDisposeメソッドを呼び必要があります。UnirxではこのDisposeメソッドを呼ぶことでストリームの停止を行うことができます。しかし実際に使用する場合、その呼び出し方は様々なので状況に応じて最適なものを選ぶと良さそうです。

IDisposable.Dispose()を呼びだす

Observable.Create<T>メソッドを使ってDisposeメソッドの確認をしてみます。Observable.Create<T>はIObserver<T>型の変数を引数にとり、IDisposableを返すdelegateを引数として受け取り、IObservable<T>を返すメソッドです。任意のIObservable<T>を作れるファクトリメソッドみたいな感じですね。

var observable = Observable.Create<string>(observer =>
{
    observer.OnNext("OnNext");

    return Disposable.Create(() =>
    {
        Debug.Log("Dipose");
    });
});
observable.Subscribe(x => Debug.Log(x), _ => Debug.Log("OnError"), () => { Debug.Log("OnCompleted"); });

実行結果

OnNext
自分でDisposeを呼び出す

上記の結果の通り、OnNextしか表示されていないので、このストリームは止まっていません。止めるためにはSubscribeした時の返り値(IDisposable)を受け取って停止したいタイミングでDisposeメソッドを呼び出す必要があります。

var observable = Observable.Create<string>(observer =>
{
    observer.OnNext("OnNext");

    return Disposable.Create(() =>
    {
        Debug.Log("Dipose");
    });
});
var disposable = observable.Subscribe(x => Debug.Log(x), _ => Debug.Log("OnError"), () => { Debug.Log("OnCompleted"); });
disposable.Dispose();

実行結果

OnNext
Dispose
usingを使って自動でDisposeを呼び出す

using文を使うとusingでくくったスコープを抜けるときに自動的にDisposeメソッドが呼ばれます。これはUnityではなくC#のusing文とIDiposableインターフェースの機構です。

using (observable.Subscribe(x => Debug.Log(x), _ => Debug.Log("OnError"), () => { Debug.Log("OnCompleted"); }))
{
    Debug.Log("↓usingのscopeが抜ける時に自動でDisposeが呼ばれる");
}

実行結果

OnNext
↓usingのscopeが抜ける時に自動でDisposeが呼ばれる
Dispose

AddToメソッド

UniRxのAddTo(GameObject)メソッドを使う

AddToメソッドを使うと、良きタイミングでDisposeを自動で呼ばれるように設定することができます。おそらく頻繁に使われる方法と思われるのがこちらの方法です。AddToメソッドはT型の拡張メソッドとして定義されており、Subscribeメソッドに続けて処理を書くことができます。AddToはGameObject型の引数を受け取り、そのGameObjectがnullになったとき(== null 判定がtrueになったとき)にDisposeメソッドを自動で呼び出してくれます。AddToメソッドの詳細は別の機会に追うとして、下記のような感じでDisposeが呼ばれることを確認できます。

var obj = new GameObject();
observable.Subscribe(x => Debug.Log(x), _ => Debug.Log("OnError"), () => { Debug.Log("OnCompleted"); }).AddTo(obj);
Destroy(obj);

実行結果

OnNext
Dispose

普段使いではこのような方法ではなくAddTo(this)のように、自分が消えるときにストリームも止めてしまうといったように使われそうな気がします。

observable.Subscribe(x => Debug.Log(x), _ => Debug.Log("OnError"), () => { Debug.Log("OnCompleted"); }).AddTo(compositeDisposable);
AddToとCompositeDisposable

IDisposableをまとめて管理するクラスとしてCompositeDisposableが用意されています。GameObjectを継承していないクラスでUniRxを使う場合や、自分が破棄されないうちにストリームを起動したり消したりする場合、このクラスを使うのがベストプラクティスとなりそうです。CompositeDisposable.Clearを呼べば、登録したIDisposable.Disposeをまとめて呼んでくれるので、AddToでこのクラスのインスタンスを渡せばストリームをまとめて管理できます。CompositeDisposable.Disposeを呼ぶと、それ以降Addされたストリームをすぐに破棄するのでClearと使い分ければ安全なストリーム管理ができます。

var compositeDisposable = new CompositeDisposable();
observable.Subscribe(x => Debug.Log(x), _ => Debug.Log("OnError"), () => { Debug.Log("OnCompleted"); }).AddTo(compositeDisposable);
observable.Subscribe(x => Debug.Log(x), _ => Debug.Log("OnError"), () => { Debug.Log("OnCompleted"); }).AddTo(compositeDisposable);
observable.Subscribe(x => Debug.Log(x), _ => Debug.Log("OnError"), () => { Debug.Log("OnCompleted"); }).AddTo(compositeDisposable);
compositeDisposable.Clear();

実行結果

OnNext
OnNext
OnNext
Dispose
Dispose
Dispose
その他

Disposeしたからといってすぐにそのオブジェクトnullになるわけではないです。そしてUniRxで返されるIDisposableインタフェースを実装しているクラス達は複数回Disposeが呼ばれても変なエラーが起こらないようになっているようです。

var disposable = observable.Subscribe(x => Debug.Log(x), _ => Debug.Log("OnError"), () => { Debug.Log("OnCompleted"); });
disposable.Dispose();
disposable.Dispose();
disposable.Dispose();

実行結果

OnNext
Dispose

基本的なUniRxのオペレータを使った場合、Subscribeメソッドを呼んだときにOperatorObservableBase.Subscribeメソッドが呼ばれます。この実装をみて見ると、IDisposableを保持する変数として、SingleAssignmentDisposableクラスのインスンタスが用いられています。

public IDisposable Subscribe(IObserver<T> observer)
{
    var subscription = new SingleAssignmentDisposable();

    // note:
    // does not make the safe observer, it breaks exception durability.
    // var safeObserver = Observer.CreateAutoDetachObserver<T>(observer, subscription);

    if (isRequiredSubscribeOnCurrentThread && Scheduler.IsCurrentThreadSchedulerScheduleRequired)
    {
        Scheduler.CurrentThread.Schedule(() => subscription.Disposable = SubscribeCore(observer, subscription));
    }
    else
    {
        subscription.Disposable = SubscribeCore(observer, subscription);
    }

    return subscription;
}

つまり、Disposeを呼んだときには、このSingleAssignmentDisposableクラスのDisposeメソッドが呼ばれます。その実装を見て見るとすでにdipose済みかをチェックし、disposeされてない時のみdisposeすると言う感じの実装になっています。このおかげで複数回読んでも大丈夫っぽそうなんですね。ついでにlockで排他ロックを取ってからdipose済みかをチェックしているのでスレッドセーフですね。

public void Dispose()
{
    IDisposable old = null;

    lock (gate)
    {
        if (!disposed)
        {
            disposed = true;
            old = current;
            current = null;
        }
    }

    if (old != null) old.Dispose();
}

停止しないストリームがあるとパフォーマンス劣化の原因になるので、ちゃんと止める方法を覚えておく必要がありますね。