当前位置: 首页 > news >正文

做菠菜网站多少钱手机wap浏览器

做菠菜网站多少钱,手机wap浏览器,学历提升中心,wordpress怎么装修网页目录一、观察者Observer创建过程二、被观察者Observable创建过程三、subscribe订阅过程四、map操作符五、线程切换原理简单示例1&#xff1a; private Disposable mDisposable; Observable.create(new ObservableOnSubscribe<String>() {Overridepublic void subscribe(…

目录

  • 一、观察者Observer创建过程
  • 二、被观察者Observable创建过程
  • 三、subscribe订阅过程
  • 四、map操作符
  • 五、线程切换原理

简单示例1:

private Disposable mDisposable;
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("test");}}).subscribe(new Observer<String>() { @Overridepublic void onSubscribe(Disposable d) {mDisposable = d;}@Overridepublic void onNext(String s) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});@Overrideprotected void onDestroy() {super.onDestroy();if (mDisposable != null) {if (!mDisposable.isDisposed()) {mDisposable.dispose();}}}

特别注意:上面示例代码中的mDisposable最后必须要释放掉,不然会出现内存泄漏

一、观察者Observer创建过程

  首先对观察者Observer源码开始进行简单分析下:
Observer.java

public interface Observer<T> {//表示一执行subscribe订阅就会执行该函数,这个函数一定执行在主线程中void onSubscribe(@NonNull Disposable d);// 表示拿到上一个流程的数据void onNext(@NonNull T t);// 表示拿到上一个流程的错误数据void onError(@NonNull Throwable e);// 表示事件流程结束void onComplete();
}

  具体的对象创建是在上面示例代码1中的new Observer<String>()操作,这个称这个为自定义观察者

二、被观察者Observable创建过程

  分析完观察者Observer的创建,现在来分析下被观察者Observable的创建流程,

Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("test");}})

  将new ObservableOnSubscribe()过程可以理解为是自定义source的过程。

new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("test");}}

执行Observable.create()代码流程
Observable.java

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null"); //校验是否为nullreturn RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));}

其中,RxJavaPlugins.onAssembly()采用了hook技术,如果没有重写RxJavaPlugins.setOnObservableAssembly()方法,这个可以不要考虑。
ObservableCreate.java

public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source; // 自定义sourcepublic ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);}}@Overridepublic void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}}@Overridepublic boolean tryOnError(Throwable t) {if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (!isDisposed()) {try {observer.onError(t);} finally {dispose();}return true;}return false;}@Overridepublic void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}}@Overridepublic void setDisposable(Disposable d) {DisposableHelper.set(this, d);}@Overridepublic void setCancellable(Cancellable c) {setDisposable(new CancellableDisposable(c));}@Overridepublic ObservableEmitter<T> serialize() {return new SerializedEmitter<T>(this);}@Overridepublic void dispose() {DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}}/*** Serializes calls to onNext, onError and onComplete.** @param <T> the value type*/static final class SerializedEmitter<T>extends AtomicIntegerimplements ObservableEmitter<T> {private static final long serialVersionUID = 4883307006032401862L;final ObservableEmitter<T> emitter;final AtomicThrowable error;final SpscLinkedArrayQueue<T> queue;volatile boolean done;SerializedEmitter(ObservableEmitter<T> emitter) {this.emitter = emitter;this.error = new AtomicThrowable();this.queue = new SpscLinkedArrayQueue<T>(16);}@Overridepublic void onNext(T t) {if (emitter.isDisposed() || done) {return;}if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (get() == 0 && compareAndSet(0, 1)) {emitter.onNext(t);if (decrementAndGet() == 0) {return;}} else {SimpleQueue<T> q = queue;synchronized (q) {q.offer(t);}if (getAndIncrement() != 0) {return;}}drainLoop();}@Overridepublic void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}}@Overridepublic boolean tryOnError(Throwable t) {if (emitter.isDisposed() || done) {return false;}if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (error.addThrowable(t)) {done = true;drain();return true;}return false;}@Overridepublic void onComplete() {if (emitter.isDisposed() || done) {return;}done = true;drain();}void drain() {if (getAndIncrement() == 0) {drainLoop();}}void drainLoop() {ObservableEmitter<T> e = emitter;SpscLinkedArrayQueue<T> q = queue;AtomicThrowable error = this.error;int missed = 1;for (;;) {for (;;) {if (e.isDisposed()) {q.clear();return;}if (error.get() != null) {q.clear();e.onError(error.terminate());return;}boolean d = done;T v = q.poll();boolean empty = v == null;if (d && empty) {e.onComplete();return;}if (empty) {break;}e.onNext(v);}missed = addAndGet(-missed);if (missed == 0) {break;}}}@Overridepublic void setDisposable(Disposable s) {emitter.setDisposable(s);}@Overridepublic void setCancellable(Cancellable c) {emitter.setCancellable(c);}@Overridepublic boolean isDisposed() {return emitter.isDisposed();}@Overridepublic ObservableEmitter<T> serialize() {return this;}}}

这里将ObservableCreate的源码全部放在这,作为一个埋点

  其实,Observable.create()方法主要功能就是创建了一个ObservableCreate对象,并将自定义的source传给ObservableCreate。该方法最终返回的是ObserverableCreate对象。

三、subscribe订阅过程

  分析执行subscribe()订阅流程,并将自定义观察者作为参数传入。
Observable.java

@Overridepublic final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null"); // 功能校验,判定observer是否为nulltry {observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");subscribeActual(observer); } catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}

  首先会执行一些功能校验,最后执行到subscribeActual()方法中。
Observable.java

 protected abstract void subscribeActual(Observer<? super T> observer);

  subscribeActual()是一个抽象类,从而最终调用的是ObservableCreate的subscribeActual()方法中。

ObservableCreate.java

@Overrideprotected void subscribeActual(Observer<? super T> observer) {  // observer为自定义观察者// 自定义一个CreateEmitter发射器CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 执行该方法就会执行自定义观察者的onSubscribe()方法中observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}

subscribeActual()方法里面会执行如下三个操作:
1)CreateEmitter<T> parent = new CreateEmitter<T>(observer); --> 首先会创建一个CreateEmitter发射器,并将自定义观察者传入该发射器中
2)observer.onSubscribe(parent);–> 执行自定义观察者的onSubscribe()方法,所以该方法也是最先执行调用,并且一定在主线程中
3)source.subscribe(parent); -->执行自定义source的subscribe()订阅操作,从而跳转到示例代码1中ObservableOnSubscribe的subscribe()方法,并将CreateEmitter发射器作为参数传入进去

new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("test");}}

  执行e.onNext("test")就会跳转到CreateEmitter发射器中的onNext()方法
ObservableCreate.java

static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);  //执行该流程,observer为自定义观察者}}...}

  该observer为上面流程中自定义的CreateEmitter发射器CreateEmitter<T> parent = new CreateEmitter<T>(observer);传入进来的自定义观察者对象,执行observer.onNext(t)该语句就调到示例代码1中的

@Override
public void onNext(String s) {}

Observable与Observer订阅的过程时序图如下:
在这里插入图片描述

  在标准的观察者设计模式中,是一个“被观察者”,多个“观察者”,并且需要“被观察者”发出改变通知后,所以的“观察者”才能观察到
  在RxJava观察者设计模式中,是多个“被观察者”,一个“观察者”,并且需要 起点(被观察者) 和 终点(观察者) 在“订阅”一次后,才发出改变通知,终点(观察者)才能观察到

图1:RxJava简单订阅过程:
在这里插入图片描述

四、map操作符

加入map操作符之后的简单示例代码2:

private Disposable mDisposable;// 创建ObserverCreate
Observable.create(new ObservableOnSubscribe<String>() { //自定义source@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("test");}})// ObservableCreate.map.map(new Function<String, String>() {@Overridepublic String apply(String s) throws Exception {return s;}})// ObservableMap.subscribe.subscribe(new Observer<String>() { //自定义观察者@Overridepublic void onSubscribe(Disposable d) {mDisposable = d;}@Overridepublic void onNext(String s) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});@Overrideprotected void onDestroy() {super.onDestroy();if (mDisposable != null) {if (!mDisposable.isDisposed()) {mDisposable.dispose();}}}

这个示例代码2写法采用装饰模型

图2加入map操作符之后的流程:
在这里插入图片描述

从①~⑥流程简称为封包裹,⑦ ~⑨流程简称为拆包裹

  其实图1与图2的区别不大,主要就是多了一个ObservableMap封包裹的流程,其他流程都类似。针对这个区别进行代码流程阐述下:
  从示例代码2中执行map()操作进行分析:
Observable.java

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {ObjectHelper.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));}

进行创建ObservableMap对象

ObservableMap.java

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {final Function<? super T, ? extends U> function;public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {super(source); //source指ObservableCreatethis.function = function; // 自定义的Function方法}@Overridepublic void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function)); //这里面的t为下一层包裹即图2中的自定义观察者,source指上一层ObservableCreate}
...
}

  这里需要注意,在ObservableMap()构造函数中,参数source指从上一层传过来的ObservableCreate对象,参数function指示例代码2中的new Function()方法。

 .map(new Function<String, String>() 

  执行示例代码2中的.subscribe()其实就是执行到了ObservableMap类的subscribeActual()方法,在这个方法中会对MapObserver进行封装一层包裹,并将下一层的包裹即自定义观察者也就是参数t传入。

MapObserver为ObservableMap的内部类。

ObservableMap.java

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {final Function<? super T, ? extends U> mapper;MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {super(actual); // actual为自定义观察者this.mapper = mapper;}...
}

  在执行图2的第⑧步流程时,就会调用执行包裹1的onNext()方法,即MapObserver类的onNext();
ObservableMap.java

@Override
public void onNext(T t) {if (done) {return;}if (sourceMode != NONE) {actual.onNext(null);return;}U v;try {// 代码1v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {fail(ex);return;}// 代码2actual.onNext(v);
}

1:代码1
  执行mapper.apply(t)流程的时候,其实就是调用了示例代码2中的apply()方法。
Function.java

public interface Function<T, R> {R apply(@NonNull T t) throws Exception;
}
@Override
public String apply(String s) throws Exception {return s;
}

2:代码2
   actual.onNext(v);中的actual是在ObservableMap构造函数传过来的,actual对应图2中的自定义观察者对象,也就是对应图2中的第9步流程。

五、线程切换原理

subscribeOn:给上面代码分配线程
observeOn:给下面代码分配线程

Scheduler分类:

调度器类型效果
Schedulers.computation()用于计算任务,如事件循环或回调处理,不要用于IO操作(IO操作使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor)使用指定的Executor作为调度器
Schedulers.immediate()在当前线程立即开始执行任务
Schedulers.io()用于IO密集型任务
Schedulers.newThread()为每个任务创建一个新任务
Schedulers.trampoline()当其他排队的任务完成后,在当前线程排队开始执行
AndroidSchedulers.mainThread()用于Android的UI更新操作
http://www.yayakq.cn/news/953457/

相关文章:

  • 铁道部建设管理司网站电子商务发展现状
  • 企业网站手机版模板网站建设夹夹虫公司
  • 网站正能量破解版下载大全安装37游戏官网中心
  • 苏州企业网站制作百度网站排名哪家好
  • 网站 制作水印seo案例分析及解析
  • 创建自己网站网络营销推广部做什么
  • 常德网站建设 天维网站如何做才会有流量
  • 学校网站建设方案下载应用
  • 成都科技网站建设找工作组赴河南协助
  • 电脑做网站主机wordpress没有备案
  • 网站需要多大空间IT男做网站
  • 常平做网站公司官网源码下载
  • 优秀的网站设计图片seo优化网站建设公司
  • php网站开发招聘百度域名的书写
  • 模块网站怎么做百度分析工具
  • 做原型的素材网站wordpress多媒体大小
  • 西安地产网站建设济南专业的网站建设公司
  • 如何运营网站dnf制裁做任务网站
  • 手机免费建设网站制作九冶建设有限公司官方网站
  • 网站运营学习aspnet网站开发例题
  • 实验室网站建设意义得到app怎么样
  • 中国建设银行网站 纪念币预约seo推广营销网站
  • 属于网站建设过程规划和准备阶段的是php如何做局域网的网站
  • 网站准备建设的内容WordPress文字黑条
  • 建站公司联系电话网站改版收录减少
  • 网站建设公司一月赚多少梵克雅宝官网中文官网
  • 全面的网站制作做外国订单有什么网站
  • 做网站的职员称呼什么池州专业网站建设公司
  • 网站信息建设总结凡科网之前做的网站在哪看
  • 网站建设 千佳网络生鲜网站开发背景