[TOC]
一、操作方法
1、retry:
Flowable retry() 默认构造方法是 integer 最多次数
Flowable retry(BiPredicate<? super Integer, ? super Throwable> predicate) 这个出错的时候可以打印一下
这两个方法没用过,回头看看,标志上先
Flowable retry(long count)
Flowable retry(long times, Predicate<? super Throwable> predicate)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Flowable<ResponseBody> data = RetrofitHelper.getService().getData1(); final Disposable subscribe = data.subscribeOn(Schedulers.io()) .retry(new BiPredicate<Integer, Throwable>() { @Override public boolean test(Integer integer, Throwable throwable) throws Exception { System.out.println(integer); return integer < 3000; } }) .subscribe(new Consumer<ResponseBody>() { @Override public void accept(ResponseBody body) throws Exception { System.out.println(body.string()); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { System.out.println(throwable); } });
|
2、retryWhen:
网络请求重试次数,可以通过 flatMap -> timer 来进行延迟重试
1 2
| Flowable<T> retryWhen( final Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler)
|
注意:
1 2
| RxJava的retryWhen操作符会影响上游,而不会影响下游,比如下游发生了异常,不会重试,而上游发生了异常,会重试retryWhen指定的次数 思考一下就能知道原因,因为订阅是从下到上订阅,而数据流是从上到下流动,所以retryWhen返回的Observable发现上流的数据有异常的时候会重试,没有异常了就流给下游,所以retryWhen只能控制它所在的上游
|
1 2 3 4 5 6 7 8 9 10 11 12
| public static void loadRetryWhen2() { Flowable<ResponseBody> data = RetrofitHelper.getService().getData1(); Disposable subscribe = data.subscribeOn(Schedulers.io()) .retryWhen(new RetryFlowable()) .subscribe(new Consumer<ResponseBody>() { @Override public void accept(ResponseBody body) throws Exception { System.out.println(body.string()); } }); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| static class RetryFlowable implements Function<Flowable<Throwable>, Publisher<?>> { private long tryCount; private long count; private long delay; private TimeUnit unit;
public RetryFlowable() { this(3, 3, TimeUnit.SECONDS); }
public RetryFlowable(int tryCount, long delay, TimeUnit unit) { this.tryCount = tryCount; this.delay = delay; this.unit = unit; }
@Override public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception { return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() { @Override public Publisher<?> apply(Throwable throwable) throws Exception { if (count < tryCount) { count++; System.out.println("get error, it will try after " + " millisecond, retry count " + count); return Flowable.timer(delay, unit); }
return Flowable.error(throwable); } }); } }
|
3、maybe
没有onNext(Object o)方法,有个onSuccess(Object o)方法
may be 只接受一次成功的值
成功的后面再掉成功也不会发射了
1 2 3 4 5 6 7 8 9 10 11 12 13
| Maybe.create(new MaybeOnSubscribe<String>() { @Override public void subscribe(MaybeEmitter<String> emitter) throws Exception {
emitter.onSuccess("fsafadsf"); emitter.onSuccess("aaaa"); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } });
|
4、do操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Observable.just("hello") .doOnNext(s -> System.out.println("doOnNext: " + s)) .doAfterNext(s -> System.out.println("doAfterNext: " + s)) .doOnComplete(() -> System.out.println("doOnComplete: ")) .doOnSubscribe(disposable -> System.out.println("doOnSubscribe: " + disposable)) .doAfterTerminate(() -> System.out.println("doAfterTerminate: ")) .doFinally(() -> System.out.println("doFinally: ")) .doOnEach((notify) -> System.out.println("doOnEach: " + (notify.isOnNext() ? "onNext" :notify.isOnComplete() ? "onComplete":"onEror"))) .doOnLifecycle(disposable -> System.out.println("doOnLifecycle: " + disposable.isDisposed()) ,() -> System.out.println("doOnLifecycle run")) .subscribe(s -> System.out.println("收到消息: " + s));
|
5、Single
single只发送一次,没有完成方法,跟Maybe有点像
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Single.create(new SingleOnSubscribe<String>() { @Override public void subscribe(SingleEmitter<String> emitter) throws Exception { emitter.onSuccess("safdsa"); emitter.onSuccess("fffff"); } }).subscribe(new SingleObserver<String>() { @Override public void onSubscribe(Disposable d) {
}
@Override public void onSuccess(String s) { System.out.println(s); }
@Override public void onError(Throwable e) {
} });
|
6、debounce
debounce操作符 优化app搜索功能
1 2
| implementation 'com.jakewharton.rxbinding3:rxbinding:3.0.0-alpha2'
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| RxTextView.textChanges(searchEdit) .debounce(1, TimeUnit.SECONDS) .flatMap(new Function<CharSequence, ObservableSource<String>>() { @Override public ObservableSource<String> apply(final CharSequence charSequence) throws Exception { return new ObservableSource<String>() { @Override public void subscribe(Observer<? super String> observer) { observer.onNext(charSequence.toString()); } }; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } });
|
如果当用户刚刚在1s中的时候,又开始输入,而过了1s后请求还没回来,但是1s又开始了请求,这样就会乱数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| RxTextView.textChanges(searchEdit) .debounce(1, TimeUnit.SECONDS) .switchMap(new Function<CharSequence, ObservableSource<String>>() { @Override public ObservableSource<String> apply(final CharSequence charSequence) throws Exception { return new ObservableSource<String>() { @Override public void subscribe(Observer<? super String> observer) { observer.onNext(charSequence.toString()); } }; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } });
|
switchMap操作符
只会发射第二次(最近一次)请求的Observable
7、concat
concat操作处理多个数据源
可以处理三级缓存,后面可以进行真实数据来进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Flowable<Integer> just = Flowable.just(1);
Flowable<Integer> just2 = Flowable.just(2);
Flowable<Integer> just3 = Flowable.just(3);
Flowable.concat(just, just2, just3).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { if (integer == 1) { } System.out.println(integer); } });
|
concat 在第一个发送完成之后,才会去发送第二个,如果第一个没有complete,后面的就不会再发送了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| Observable<Integer> stringObservable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(11); System.out.println("Integer"); emitter.onNext(22); System.out.println("Integer"); emitter.onNext(33); System.out.println("Integer"); emitter.onComplete(); } });
Observable<String> stringObservable1 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("44"); System.out.println("String"); emitter.onNext("55"); System.out.println("String"); emitter.onNext("66"); System.out.println("String"); emitter.onComplete(); } });
Observable.concat(stringObservable, stringObservable1).subscribe(new Consumer<Object>() { @Override public void accept(Object s) throws Exception { if (s instanceof String) { System.out.println(s); } } });
|
8、zip
与contact不同的是,contact是同一类型的,一个一个发送的。
zip是把不同类型同时发送过来,其实也是一个一个的发过来,只不过,发送过来的时候中途两个数据可以进行计算,在计算过程中,以最少的数量来进行计算,多出来的都不会进行计算了
zip会以最后一个为标准,最后一个不满足就,观察者就不会收到,在就和 CombineLatest区别,CombineLatest每次都进行回调不管哪个变化都会进行比较,然后通知观察者,Rxbing中的表单提交就是用zip之后再最后的那个有效就有效,不然就不通知观察者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| Observable.zip(getCreateData1(), getCreateData2(), new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return s + integer; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } }); public static Observable<Integer> getCreateData1() { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2);
} }); }
public static Observable<String> getCreateData2() { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("我是数据1"); emitter.onNext("我是数据2"); emitter.onNext("我是数据3"); } }); }
|
9、merge
merge和concat差不多,也是一个一个的执行,只是不用调用 onComplete后面的被观察者也进行了执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("我是数据1"); emitter.onNext("我是数据2");
emitter.onNext("我是数据3"); } });
Observable<String> stringObservable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("我是数据4"); emitter.onNext("我是数据5"); emitter.onNext("我是数据6"); } });
Observable.merge(observable, stringObservable).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } });
|
10、startWith
把startWith的可观察者对象数据 会放在前面执行,然后执行再执行调用startWith的可观察者执行
1 2 3 4 5 6 7 8 9 10 11 12 13
| Observable<String> just = Observable.just("1", "2", "3");
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("111"); } }).startWith(just).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } });
|
11、flatMap 、concatMap和switchMap
flatMap是不会按照顺序发送过来的
concatMap会按照顺序发送,中间如果有耗时也会发送耗时完后的数据,不会跳着发送
switchMap发送最近的一次observable的数据,前一次observable,会没有效果,所以去重不错
12、timer,interval和intervalRange
timer,任务延迟请求一次,执行完就调用了onComplete的方法
正常使用,默认会在一个线程中的 RxComputationThreadPool 的线程中
1 2 3
| public static Observable<Long> timer(long delay, TimeUnit unit) { return timer(delay, unit, Schedulers.computation()); }
|
interval是默认无限发送的,可以通过take来截取一部分
timer相似,如果不额外指定的话,则会采取默认订阅的线程
与timer相似,无法指定起始值,也为默认的long类型的0
支持指定延时发送,所以不会自动发送onComplete
intervalRange:
可以指定时间范围完成,可以完成倒计时
这样就可以实现验证码的逻辑了
timer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| Observable.just(1,2,3,4).take(3).timer(3, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println(d); }
@Override public void onNext(Long aLong) { System.out.println("onNext: " + aLong); System.out.println(Thread.currentThread().getName()); }
@Override public void onError(Throwable e) { System.out.println("onError"); }
@Override public void onComplete() { System.out.println("onComplete"); } });
|
interval
1 2 3 4 5 6 7
| Observable.interval(1, 3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println(aLong); } });
|
intervalRange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| static int i = 10; Observable.intervalRange(0,10, 0, 1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { i--; System.out.println(i); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { System.out.println("倒计时出现错误"); } }, new Action() { @Override public void run() throws Exception { System.out.println("完成倒计时"); } });
|
13、repeat
这个是不需要条件,就触发重试,和 retry类似,retry需要 exception 来进行触发重试
repeat是无条件的重试
14、switchIfEmpty
做二级缓存的时候使用, 如果都不发射数据,直接完成了,就会走switchIfEmpty里面的数据,然后发送个 观察者
onNext传null,或者调用了 onError 不会走switchIfEmpty里面的可观察者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> emitter) throws Exception {
emitter.onComplete();; } }, BackpressureStrategy.DROP).switchIfEmpty(new Publisher<String>() { @Override public void subscribe(Subscriber<? super String> s) { s.onNext("我是switchIfEmpty来的数据"); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } });
|
二、自定义操作符
1、
细节:
Rxjava中前面不管写了多少,只有设置观察者的时候才起效,所以要做单步调试的时候,需要设置观察者,才会执行这个单步的操作,判断是否有问题
1 2 3 4 5
| public final void subscribe(Observer<? super T> observer) { subscribeActual(observer); }
|
Flowable.的使用,如果用subscribe
1 2 3 4 5
| @Override public void onSubscribe(Subscription s) { s.request(1); System.out.println(s); }
|
okio的使用
无声音乐、前台服务、双进程守护、像素保活、jobs
兄弟萌,你们怎么查,内存泄漏
ViewDataBinding binding = DataBindingUtil.inflate(LayoutInflater.from(parent.getContext()), mLayoutId, parent, false);
CommonHolder myHolder = new CommonHolder(binding.getRoot());
myHolder.setBinding(binding);
上传图片,可以看到正在上传第几张图片这个应该就不能进行retrofit去弄了,封装的太好,可能这个没上传一次成功了,没有回调,只有全部上传了才有回调
可能这样,一个一个上传到服务后,然后服务器给了每一个图片的链接,保存,然后整体上传
doOnNext
实战: 可以通过retryWhen重新去获取token,然后又继续请求,这里可能需要判断网络是否有什么问题,或者限制一次请求重新获取次数或者每次的时间,不然,万一是服务器宕机了,或者恶意返回token是失败验证,就会进入死循环,我们自己的程序也宕机了。
优雅的全局加载 Gloading
Gradle命令使用。
产考:
RxJava系列文章
友好 RxJava2.x 源码解析(三)zip 源码分析