ひよっこゲームブログ

なにもかも初心者のひよっこがゆったりと何かする

リアクティブプログラミング7日目

今日もRxJavaのリファレンス読む

Continuations

和訳で「継続」


Dependent

和訳で「依存」

itemが利用可能になったとき、それに依存する計算をしたい
これをContinuationsと呼ぶ

最も一般的なパターンは
値を指定して別のサービスを呼び出し、結果を受け取って処理を継続するとか

サンプルコード

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

サンプルコード見た感じだとまぁ見たまんまの処理


flatMapの中にflatMapを入れるみたいな場合

サンプルコード

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

これもシンプルに見たまんまの処理


Non-dependent

和訳で「非依存」

原文読んでもピンとこなかったので読み解いてみる

サンプルコード(ダメなパターン)

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);
  • flatMapSingle
    Flowableの各要素をSingleSourceにしてマッピングする
    Single = 値かエラーが1度だけ流れる

今までと違うところは、observableの宣言をしてflatMap
そのあとにmapで整頓してる

1回ずつoperatorを使いたいって感じ

この場合だとoperatorの処理でマッピングが複数の値をもたらす可能性があるみたい
んー、なぜだ?

あー・・・?
値が入り次第、次のmapが動作しちゃうみたい

こっちの方法で書くのが正解

andThenを利用することで、前のobservableが正常完了してから次のobservableが動作する
なるほど

サンプルコード

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())


Deferred-dependent

和訳で「遅延依存」

前と後ろでデータ依存があり
何らかの理由で「通常のチャンネル」をストリームしていない場合

サンプルコード(ダメなパターン)

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

これは全て「0」が出力されてしまう
Single.just(count.get()) がデータフロー実行前に評価されてしまうのでアウト

よってメインのストリームが終わってから評価する必要がある
=遅延依存


サンプルコード

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

または

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);


あー、これは忘れたころにつまずきそうなパターンだ・・・


今日やった部分はデータフローの仕方がごちゃっとしてるなぁ(体感)
とりまここまで、おやすみなさーい

リアクティブプログラミング6日目

Dependent sub-flows

和訳で「依存サブフロー
依存サブフロー is 何

本文を読む

flatMapは強力なoperatorです
お、おう

サンプルコード

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

んー、コード見てもぱっと分からん

調べてみる

ReactiveX - FlatMap operator
説明を読んだ感じだと
map(itemを変換)とmarge(複数動作していたObservableをまとめる)が合体したみたいな

図を見る限り変換後の順番は保証してないっぽいね


次の項目がちょっと長いので、ちょっと短めでおしまい

今日はちょっと仕事もあったので精神的にもんにょり
リモートワーク始まってから休日に労働をせがまれるリスクを背負ってるなぁと

寝るーん

リアクティブプログラミング5日目

Concurrency within a flow

和訳で「フローの並行性」

サンプルコード

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

バックグラウンドで1~10を2乗していって、結果をメインスレッドで返す

普通

Parallel processing

並列処理

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

RxJavaの並列処理は 「独立したフローを実行してその結果を単一のフローにマージする」だそうな

なるほど。

flatMapは順序を保証するものではないそうです
なので代替のものがある

  • concatMap
    一度に1つのフローを実行する

  • concatMapEager
    すべてのフローを一度に実行するが、出力フローは処理された順番になる

concatMapEagerはちょっと行儀悪いね


こう書いてもいいって

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

ちょっと待て出てきてないものが多い
parallel() = ParallelFlowable
runOn
sequential()

parallel()で並列であることを明記して
runOn()でスケジューラを設定
sequential()は最初に明記した処理順に単一のフローへマージする(図が分かりやすい)


今日はずいぶんのんびりやってしまった

リアクティブプログラミング4日目

Javaエアプなのでちょくちょく調べないとあかん


サンプルコード

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish


fromCallable()って何
Callable (Java Platform SE 8)

Javaの関数型のインターフェースだそうだ、ほーん


書き直すとこうなるらしい

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

バックグランドで入出力処理して、そのあとフォアグラウンドで処理してる。うむ


Schedulers

Schedulers.io() とかは「スケジューラー」と呼ばれる。まんまだな
これでスレッド操作とかするみたい

使用感がCoroutineのDispatchersみたいな感じだと思う(適当)


Javaのメインスレッドが終了した場合、スケジューラーは死ぬそうです。ふむふむ

ぱっと中身を見てみる

  • Schedulers.computation()
    バックグラウンドの固定のスレッド。ほとんどの処理はこれがデフォでいいみたい

  • Schedulers.io()
    動的なスレッド。ブロックできると書いてあるので処理待ちができるんだろう、たぶん

  • Schedulers.single()
    単一のスレッドで、FIFO方式(最初に入れたデータを最初に取り出す)

  • Schedulers.trampoline()
    テスト用。これもFIFO方式だそうです。まぁテスト用だし使わんやろ!(


明日を乗り切れば休みだー。リモートになってから曜日感覚がない

リアクティブプログラミング3日目

この勉強方法向いている気がした

今日もRxJavaのリファレンスを上から和訳して読んでく


用語

Upstream, downstream
データフローの種類っぽい
Operatorから見てどの方向にデータがストリームしているかという意味でのUp, down?

実際に動作を確認しないとちょっとこれ分からないです・・・


Objects in motion
emission, emits, item, event, signal, data and message(原文)は同じ意味
データフローで動くオブジェクトって呼ぶ


Backpressure
FlowableとObservableの話でやったやつ
ストリームとして受け取ったデータが重すぎると上手く動かなくなるので、そのための制御フロー


Assembly time
データフローの準備はアセンブラの時間で行われる
コンパイル後に初めて行われるってことだよね・・・?


Subscription time
subscribeが動き始めたときの話だから気にしなくていいや


Runtime
フローがアイテムを放出、エラー、完了したとき
サンプルコード

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

うん、まぁ、そうだな

用語だけで終わってしまった
明日からはまた処理をごたごた見ていく

リアクティブプログラミング2日目

ひとまずRxJavaのリファレンスを読んでみる
いつもの

package rxjava.examples;

import io.reactivex.rxjava3.core.*;

public class HelloWorld {
    public static void main(String[] args) {
        Flowable.just("Hello world").subscribe(System.out::println);
    }
}

データを突っ込んで渡して出力、以上! Rxをコードに組み込むのが目的のコードなやつ


RxJavaのおおざっぱな流れとしてはこうらしい

  • Observableからデータが流れてくる(=ストリーム)
  • Operator(=map()やfilter())を使うとストリームなデータをいじれる(=flip)
  • subscribe()で処理を開始


onNextとか色々あった気がするけどひとまず後ででいいや


んで、 Flowable.just って何よ

Flowable : ストリームを作る(=Observable)
just : オブジェクトをOperatorに渡す


FlowableとObservableの何が違うんだ?

  • io.reactivex.rxjava3.core.Flowable: 0..N flows, supporting Reactive-Streams and backpressure
  • io.reactivex.rxjava3.core.Observable: 0..N flows, no backpressure

「Reactive-Streams and backpressure」とな

ストリームとして受け取ったデータが重すぎると上手く動かなくなるので、そのための制御フロー
処理が重くなるやつはこれ使っとけって感じなのかな

調べてみたらObservableによるデータ制御には限界があるらしい

backpressureによる管理は
「処理が1回終わる毎に、次のデータを1つもらう」
というのを確実に行うためのものみたい、ふむふむ


今日はここまで、昨日は遅かったので早く寝るぞ

リアクティブプログラミング1日目

Rxを勉強していく中で、どうせなら基礎の基礎から記事を書いていこうと思った
ちょっとずつでもいいから毎日がんばろう

リアクティブプログラミングはRxKotlinをちょっとかじった
あとはKotlinのObservableは使ったことある程度
(RxというよりはただのObserverパターンだけども)

RxKotlinは公式の記述がちょっと少なくて、リストのやつしか扱ったことがないんですよね
単体をObservableで管理したかったんだけどぱっと見サンプルが無かった
(というかそもそもRx自体がリスト操作がメインっぽい?)

RxKotlinをちまちまやってたんだけど
RxKotlinよりもRxJavaの方が資料が多いのでひとまずそっちでお勉強することにした
えいえいおー!


リアクティブプログラミングとは

色んなとこに書いてあるけど
データのストリーミングしてデータを受け取るたびにアクションを起こす

ようは変化があったときに、変化した順番に物事を処理する...

Androidだと、通信の結果を待ってから画面遷移とか
RoomDBにデータを書き込んでから処理とか

現状はUI絡みのものだとLiveDataとDataBindingで何とかして
通信とかはCoroutineのasync / awaitとかで対応できる
もうこれでよくね

まぁ非同期対策の武器が多いに越したことはないしひとまず使えるようになりたい


リアクティブプログラミングのメリット

従来の書き方よりシンプルになりやすい(らしい)
なんかこればかりは触ってみないと分からんな


ひとまず今日はここまで。遅いし寝よう
RxJavaのコードは明日から記事にする