rxjava

[TOC]

一、操作方法

1、retry:

Flowable retry() 默认构造方法是 integer 最多次数

Flowable retry(BiPredicate<? super Integer, ? super Throwable> predicate) 这个出错的时候可以打印一下

这两个方法没用过,回头看看,标志上先

Flowable retry(long count)
Flowable retry(long times, Predicate<? super Throwable> predicate)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// retry好几种写法
Flowable<ResponseBody> data = RetrofitHelper.getService().getData1();
final Disposable subscribe = data.subscribeOn(Schedulers.io())
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
System.out.println(integer);
return integer < 3000;
}
})
.subscribe(new Consumer<ResponseBody>() {
@Override
public void accept(ResponseBody body) throws Exception {
System.out.println(body.string());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println(throwable);
}
});
2、retryWhen:

网络请求重试次数,可以通过 flatMap -> timer 来进行延迟重试

1
2
Flowable<T> retryWhen(
final Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler)

注意:

1
2
RxJava的retryWhen操作符会影响上游,而不会影响下游,比如下游发生了异常,不会重试,而上游发生了异常,会重试retryWhen指定的次数
思考一下就能知道原因,因为订阅是从下到上订阅,而数据流是从上到下流动,所以retryWhen返回的Observable发现上流的数据有异常的时候会重试,没有异常了就流给下游,所以retryWhen只能控制它所在的上游
1
2
3
4
5
6
7
8
9
10
11
12
//通过retryWhen可重试,当请求失败的时候
public static void loadRetryWhen2() {
Flowable<ResponseBody> data = RetrofitHelper.getService().getData1();
Disposable subscribe = data.subscribeOn(Schedulers.io())
.retryWhen(new RetryFlowable())
.subscribe(new Consumer<ResponseBody>() {
@Override
public void accept(ResponseBody body) throws Exception {
System.out.println(body.string());
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//重试的一个类
static class RetryFlowable implements Function<Flowable<Throwable>, Publisher<?>> {
private long tryCount;
private long count;
private long delay;
private TimeUnit unit;

public RetryFlowable() {
this(3, 3, TimeUnit.SECONDS);
}

public RetryFlowable(int tryCount, long delay, TimeUnit unit) {
this.tryCount = tryCount;
this.delay = delay;
this.unit = unit;
}

@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
if (count < tryCount) {
count++;
System.out.println("get error, it will try after "
+ " millisecond, retry count " + count);
return Flowable.timer(delay, unit);
}

return Flowable.error(throwable);
}
});
}
}
3、maybe

没有onNext(Object o)方法,有个onSuccess(Object o)方法

may be 只接受一次成功的值

成功的后面再掉成功也不会发射了

1
2
3
4
5
6
7
8
9
10
11
12
13
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> emitter) throws Exception {
// emitter.onComplete();//调取完成,后面的值也没有了
emitter.onSuccess("fsafadsf");
emitter.onSuccess("aaaa"); //在成功的后面再掉成功也不会发射了
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
4、do操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.just("hello")
.doOnNext(s -> System.out.println("doOnNext: " + s))
.doAfterNext(s -> System.out.println("doAfterNext: " + s))
.doOnComplete(() -> System.out.println("doOnComplete: "))
.doOnSubscribe(disposable -> System.out.println("doOnSubscribe: " + disposable))
.doAfterTerminate(() -> System.out.println("doAfterTerminate: "))
.doFinally(() -> System.out.println("doFinally: "))
.doOnEach((notify) -> System.out.println("doOnEach: "
+ (notify.isOnNext()
? "onNext" :notify.isOnComplete()
? "onComplete":"onEror")))
.doOnLifecycle(disposable -> System.out.println("doOnLifecycle: " + disposable.isDisposed())
,() -> System.out.println("doOnLifecycle run"))
.subscribe(s -> System.out.println("收到消息: " + s));
5、Single

single只发送一次,没有完成方法,跟Maybe有点像

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
emitter.onSuccess("safdsa");
emitter.onSuccess("fffff");
}
}).subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onSuccess(String s) {
System.out.println(s);
}

@Override
public void onError(Throwable e) {

}
});
6、debounce

debounce操作符 优化app搜索功能

1
2
//导入rxbinding
implementation 'com.jakewharton.rxbinding3:rxbinding:3.0.0-alpha2'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
RxTextView.textChanges(searchEdit)
//变化后的1s钟,然后再进行网络请求
.debounce(1, TimeUnit.SECONDS)
.flatMap(new Function<CharSequence, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final CharSequence charSequence) throws Exception {
return new ObservableSource<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
observer.onNext(charSequence.toString());
}
};
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});

如果当用户刚刚在1s中的时候,又开始输入,而过了1s后请求还没回来,但是1s又开始了请求,这样就会乱数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
RxTextView.textChanges(searchEdit)
.debounce(1, TimeUnit.SECONDS)
.switchMap(new Function<CharSequence, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(final CharSequence charSequence) throws Exception {
return new ObservableSource<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
observer.onNext(charSequence.toString());
}
};
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});

switchMap操作符只会发射第二次(最近一次)请求的Observable

7、concat

concat操作处理多个数据源

可以处理三级缓存,后面可以进行真实数据来进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//内存缓存
Flowable<Integer> just = Flowable.just(1);
//磁盘缓存
Flowable<Integer> just2 = Flowable.just(2);
//网络缓存
Flowable<Integer> just3 = Flowable.just(3);


Flowable.concat(just, just2, just3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
if (integer == 1) { //内存缓存,假设没有的时候是2,那就不通过,所以就往下走了

}
System.out.println(integer);
}
});

concat 在第一个发送完成之后,才会去发送第二个,如果第一个没有complete,后面的就不会再发送了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Observable<Integer> stringObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(11);
System.out.println("Integer");
emitter.onNext(22);
System.out.println("Integer");
emitter.onNext(33);
System.out.println("Integer");
emitter.onComplete(); //需要写个完成,不写完成,后面就不会被发射
}
});

Observable<String> stringObservable1 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("44");
System.out.println("String");
emitter.onNext("55");
System.out.println("String");
emitter.onNext("66");
System.out.println("String");
emitter.onComplete();
}
});

Observable.concat(stringObservable, stringObservable1).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
if (s instanceof String) {
System.out.println(s);
}
}
});
8、zip

与contact不同的是,contact是同一类型的,一个一个发送的。

zip是把不同类型同时发送过来,其实也是一个一个的发过来,只不过,发送过来的时候中途两个数据可以进行计算,在计算过程中,以最少的数量来进行计算,多出来的都不会进行计算了

zip会以最后一个为标准,最后一个不满足就,观察者就不会收到,在就和 CombineLatest区别,CombineLatest每次都进行回调不管哪个变化都会进行比较,然后通知观察者,Rxbing中的表单提交就是用zip之后再最后的那个有效就有效,不然就不通知观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Observable.zip(getCreateData1(), getCreateData2(), new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return s + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});

public static Observable<Integer> getCreateData1() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
//中断了所有的结果都不会打印了
// emitter.onError(new NullPointerException());
// emitter.onNext(3); //注释了,就长度为2了

}
});
}

public static Observable<String> getCreateData2() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("我是数据1");
emitter.onNext("我是数据2");
emitter.onNext("我是数据3");
}
});
}
9、merge

merge和concat差不多,也是一个一个的执行,只是不用调用 onComplete后面的被观察者也进行了执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("我是数据1");
emitter.onNext("我是数据2");
// Thread.sleep(2000);
// 如果事件中断了,后面的也不执行了
// emitter.onError(new NullPointerException());
emitter.onNext("我是数据3");
}
});

Observable<String> stringObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("我是数据4");
emitter.onNext("我是数据5");
emitter.onNext("我是数据6");
}
});

Observable.merge(observable, stringObservable).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
10、startWith

把startWith的可观察者对象数据 会放在前面执行,然后执行再执行调用startWith的可观察者执行

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable<String> just = Observable.just("1", "2", "3");

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("111");
}
}).startWith(just).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
11、flatMap 、concatMap和switchMap

flatMap是不会按照顺序发送过来的

concatMap会按照顺序发送,中间如果有耗时也会发送耗时完后的数据,不会跳着发送

switchMap发送最近的一次observable的数据,前一次observable,会没有效果,所以去重不错

12、timer,interval和intervalRange

timer,任务延迟请求一次,执行完就调用了onComplete的方法

​ 正常使用,默认会在一个线程中的 RxComputationThreadPool 的线程中

1
2
3
public static Observable<Long> timer(long delay, TimeUnit unit) {
return timer(delay, unit, Schedulers.computation());
}

interval是默认无限发送的,可以通过take来截取一部分

timer相似,如果不额外指定的话,则会采取默认订阅的线程

与timer相似,无法指定起始值,也为默认的long类型的0

支持指定延时发送,所以不会自动发送onComplete

intervalRange:

可以指定时间范围完成,可以完成倒计时

这样就可以实现验证码的逻辑了

timer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 //take对timer来说是无效的,有没有take都是延迟3s,然后执行一次
Observable.just(1,2,3,4).take(3).timer(3, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(d);
}

@Override
public void onNext(Long aLong) {
System.out.println("onNext: " + aLong);
System.out.println(Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});

interval

1
2
3
4
5
6
7
//第一个参数是延迟多少秒,第二个参数是,没多少秒后执行一次
Observable.interval(1, 3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
});

intervalRange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//实现倒计时10s钟
static int i = 10;
Observable.intervalRange(0,10, 0, 1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
i--;
System.out.println(i);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("倒计时出现错误");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("完成倒计时");
}
});
13、repeat

这个是不需要条件,就触发重试,和 retry类似,retry需要 exception 来进行触发重试

repeat是无条件的重试

14、switchIfEmpty

做二级缓存的时候使用, 如果都不发射数据,直接完成了,就会走switchIfEmpty里面的数据,然后发送个 观察者

onNext传null,或者调用了 onError 不会走switchIfEmpty里面的可观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// emitter.onNext("发射数据1");
// emitter.onNext("发射数据2");
//onNext传null,或者调用了 onError 不会走switchIfEmpty里面的可观察者
// emitter.onNext(null);
// emitter.onError(new NullPointerException());
emitter.onComplete();;
}
}, BackpressureStrategy.DROP).switchIfEmpty(new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> s) {
s.onNext("我是switchIfEmpty来的数据");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});

二、自定义操作符

1、

细节:

Rxjava中前面不管写了多少,只有设置观察者的时候才起效,所以要做单步调试的时候,需要设置观察者,才会执行这个单步的操作,判断是否有问题

1
2
3
4
5
public final void subscribe(Observer<? super T> observer) {
//..省略代码
subscribeActual(observer); //执行了这个前面所有的代码才会去走
//..省略代码
}

Flowable.的使用,如果用subscribe

1
2
3
4
5
@Override
public void onSubscribe(Subscription s) {
s.request(1); //要加上这个,就可以进行请求,当时忘记加这个一直不能进行成功
System.out.println(s);
}

okio的使用

无声音乐、前台服务、双进程守护、像素保活、jobs

兄弟萌,你们怎么查,内存泄漏

ViewDataBinding binding = DataBindingUtil.inflate(LayoutInflater.from(parent.getContext()), mLayoutId, parent, false);
CommonHolder myHolder = new CommonHolder(binding.getRoot());
myHolder.setBinding(binding);

上传图片,可以看到正在上传第几张图片这个应该就不能进行retrofit去弄了,封装的太好,可能这个没上传一次成功了,没有回调,只有全部上传了才有回调

可能这样,一个一个上传到服务后,然后服务器给了每一个图片的链接,保存,然后整体上传

doOnNext

实战: 可以通过retryWhen重新去获取token,然后又继续请求,这里可能需要判断网络是否有什么问题,或者限制一次请求重新获取次数或者每次的时间,不然,万一是服务器宕机了,或者恶意返回token是失败验证,就会进入死循环,我们自己的程序也宕机了。

优雅的全局加载 Gloading

Gradle命令使用。

产考:

RxJava系列文章

友好 RxJava2.x 源码解析(三)zip 源码分析