RxJava操作符
官网地址:http://reactivex.io/documentation/operators.html
按照操作符的类型可分为:创建操作符,变换操作符,过滤操作符,组合操作符,错误操作符,辅助操作符,条件和布尔操作符,算术和聚合操作符 及 连接操作符 等。
RxJava3 中 Action
被弃用了,改为Consumer
创建操作符
1 2 3 4 5 6 7 8
| Observable.create(new Observable.OnSubscribe<T>(){ @Overrode public void call(Subscriber<? super T>) sub){ sub.onNext("0"); sub.onNext("1"); sub.onCompleted(); } })
|
1 2
| String list = {"0","1","2"} Observable.from(list);
|
1
| Observable.just("0","1","2")
|
- interval 创建一个 按固定时间间隔发射整数序列 的Observable。
1 2 3 4 5 6 7
| Observable<Long> observable = Observable.interval(3, TimeUnit.SECONDS, Schedulers.trampoline());
observable.subscribe(new Consumer<Long>() { public void accept(Long aLong) throws Throwable { } });
|
- range 创建 发射指定范围的整数序列 的Observable。
1
| Observable<Integer> observable = Observable.range(0,5);
|
1 2
| Observable<Integer> observable = Observable.range(0,5) .repeat(2)
|
变换操作符
- map 将一个对象转变为另一个对象; —-偷梁换柱
1 2 3 4 5 6
| Observable<Integer> observable = Observable.range(0,5); Observable<String> map = observable.map(new Function<Integer, String>() { public String apply(Integer integer) throws Throwable { return integer.toString(); } });
|
- flatMap和cast
flatMap
将一个对象转为Observable
集合; cast
转为集合后电脑不知道你的Observable
的泛型类型,需要用cast
转换。 —–偷天换日
1 2 3 4 5 6
| final Observable<Integer> observable = Observable.range(0,5); observable.flatMap(new Function<Integer, ObservableSource<?>>() { public ObservableSource<?> apply(Integer integer) throws Throwable { return Observable.just("0","1","haha"); } }).cast(String.class);
|
和FlatMap
类似。区别在于:flatMap
不保证顺序,当上一个操作未执行时,下一个操作可能已经发射。
concatMap
保证了顺序,必须等到上一个操作发射完,才会发射下一个。
concatMap
和flatMap
的功能是一样的, 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据放进一个单独的Observable。只不过最后合并ObservablesflatMap
采用的merge,而concatMap
采用的是连接(concat)。总之一句一话,他们的区别在于:concatMap是有序的,flatMap是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不一定,有可能出现交错。 —出自这里
- **flatMapIteravle ** 把每一个元素转换成
Iterable
1 2 3 4
| Observable .interval(1, TimeUnit.SECONDS, Schedulers.trampoline()) .flatMapIterable( aLong -> Arrays.asList("a", "integer") ) .subscribe(s -> System.out.println(s));
|
- buffer 将Observable转为一个新的Observable,这个新Observable每次发射一组列表,而不是一个个发射。
1 2 3
| Observable .interval(1, TimeUnit.SECONDS, Schedulers.trampoline()) .buffer(3)
|
- groupBy 将Observable转为一个新的Observable,然后可以分组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| Person p1 = new Person("hh",13); Person p2 = new Person("ff",13); Person p3 = new Person("gg",23); Person p4 = new Person("aa",13); Person p5 = new Person("bb",33); @NonNull Observable<GroupedObservable<Integer, Person>> group = Observable.just(p1, p2, p3, p4, p5) .groupBy(p -> p.age); Observable.concat(group).subscribe(new Consumer<Person>() { @Override public void accept(Person p) throws Throwable { System.out.println(">>>>>>"+p.name+","+p.age); } });
>>>>>>hh,13 >>>>>>ff,13 >>>>>>aa,13 >>>>>>gg,23 >>>>>>bb,33
|
过滤操作符
1 2
| Observable.just(1, 2, 3, 4,7, 5) .filter(i -> i>2)
|
elementAt 返回指定位置的数据
distinct 去重
skip 和 take skip过滤掉前n项,take只取前n项目。
ignoreElements 忽略所有元素,只保留onComplete和onError通知
throttleFirst 定期发射这个时间段里源Observable发送的第一条数据。默认调度器computation
。
1 2 3 4 5 6
| Observable.interval(1000,TimeUnit.MILLISECONDS, Schedulers.trampoline()) .throttleFirst(2000,TimeUnit.MILLISECONDS) .subscribe(l -> System.out.println(l));
|
通过时间来限流。源Observable每次发射出来一个数据后都会进行计时,如果再设定好的时间结束前Observable有新的数据发射出来,这个数据就会被丢弃,同时开始重新计时。
组合操作
1
| Observable.just(3,4,5).startWith(1,2)
|
- merge 合并
Observable
,可能会数据交错。
1
| Observable.merge(obs1,obs2)
|
- concat 合并
Observable
,严格按照顺序发射。
1
| Observable.concat(obs1,obs2)
|
1 2 3 4 5 6 7
| @NonNull Observable<Long> obs1 = Observable.just(1L,2L,3L); @NonNull Observable<String> obs2 = Observable.just("1L","2L","3L"); Observable.zip(obs1,obs2,(l1,l2) -> l1+","+l2).subscribe(l -> System.out.println(l)); ---------------------- 1,1L 2,2L 3,3L
|
- **combineLasetst ** 用前者最后一条发射的数据进行合并。
1 2 3 4 5 6 7
| @NonNull Observable<Long> obs1 = Observable.just(1L,2L,3L); @NonNull Observable<String> obs2 = Observable.just("1L","2L","3L"); Observable.combineLatest(obs1,obs2,(l1,l2) -> l1+","+l2).subscribe(l -> System.out.println(l)); ------------------------ 3,1L 3,2L 3,3L
|
辅助操作符
delay 延迟操作
Do doXXX
,do后面加上某个生命周期,即对这个生命周期加了个回调。
subscribeOn、observeOn 指定线程;在哪个线程发布
,在哪个线程观察
;
timeOut 指定一段时间,超时则onError
;
错误处理操作符
- catch onErrorReturn、onErrorResume和onExceptionResumeNext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| RxJavaPlugins.setErrorHandler(e -> { }); Observable.create((ObservableOnSubscribe<Integer>) emitter -> { for (int i = 0; i < 20; i++) { if (i>2{ emitter.onError(new Throwable("ERROR")); } emitter.onNext(i); } emitter.onComplete(); })onErrorReturn(t -> 100) 0 1 2 100 complete
|
布尔操作符
- all 对源
Observable
发射的所有数据进行判断,全满足条件则返回true
,否则返回false
。
1 2 3 4 5 6 7 8 9 10 11
| Observable.just(1,2,3,4,5) .all(integer -> { System.out.println(integer); return integer<3; }) .subscribe(aBoolean -> System.out.println(aBoolean));
1 2 3 false
|
- contains 是否包含这个数据;isEmpty 是否为空;
条件操作符
- amb 多个Observable中,只发射第一个发射数据的的Observable。
- defaultEmpty 没发射数据则返回一个默认数据。
转换操作符
将Observable转换成另一个对象或数据结构。比map更简介。
1 2 3
| Observable.just(1,2,3,4,5) .toList() .subscribe(list -> {System.out.println(list.size())});
|
- toSortedList 和toList一样,不过多了个排序功能,所以发射对象要Comparable。
1 2 3 4 5 6 7 8 9 10 11
| Observable.just(1,2,3,4,5) .toMap(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Throwable { return integer.toString(); } }) .subscribe(map -> System.out.println(map.toString()));
|
未完待续。。。
ps: 其实还有很多没写,太多了,学不动了😵