您当前的位置:首页 > 电脑百科 > 程序开发 > 移动端 > Android

架构师之RX响应式编程——RxJava2.0操作符原理

时间:2022-07-26 14:22:10  来源:  作者:Android秃老师

RxJAVAAndroid开发中已经算是无人不知无人不晓了,加之它与Retrofit等流行框架的完美结合,已经成为Android项目开发的必备利器。随手记作为一个大型项目,引入三方框架一直比较慎重,但也从今年初开始,正式引入了RxJava2.0,并配合Retrofit对项目的网络框架和繁琐的异步逻辑进行重构。

RxJava的作用:

就是异步RxJava的使用,可以使“逻辑复杂的代码”保持极强的阅读性。

RxAndorid的作用:

Android中RxAndorid与RxJava配合使用; RxAndorid 封装了
AndroidSchedulers.mainThread(),Android开发者使用过程中,可以轻松的将任务post Andorid主线程中,执行页面更新操作。

RxJava的优点

简单来讲RxJava是一个简化异步调用的库,但其实它更是一种优雅的编程方式和编程思想,当你熟悉RxJava的使用方式之后,会很容易爱上它。 我总结它的优点主要有两个方面:

  • 简洁,免除传统异步代码逻辑中的callback hell
  • 增加业务逻辑代码的可读性

Rx的操作符有哪些

刚接触Rx的人面对一堆各式各样的操作符会觉得不知如何去学习记忆,其实你只需要从整体上了解Rx操作符的类别和掌握一些使用频率较高的操作符就足够了,至于其他的操作符,你只需要知道它的使用场景和掌握如何快速理解一个操作符的方法,就可以在需要的时候快速拿来用了。 下图是我根据官方文档总结的Rx操作符的分类及每个类别下的代表性操作符

 

 

 

Rx操作符的原理

要了解操作符的原理,肯定要从源码入手喽。所以我们先来简单撸一遍Rx的最基本的Create操作符的源码。 Rx的源码目录结构是比较清晰的,我们先从Observable.create方法来分析

Observable.create(new ObservableOnSubscribe<String>() {
  @Override
  public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
      e.onNext("s");
  }
}).subscribe(new Observer<String>() {
  @Override
  public void onSubscribe(@NonNull Disposable d) {
    // 创建的Observer中多了一个回调方法onSubscribe,传递参数为Disposable ,Disposable相当于RxJava1.x中的Subscription,用于解除订阅。
  }

  @Override
  public void onNext(@NonNull String s) {

  }

  @Override
  public void onError(@NonNull Throwable e) {

  }

  @Override
  public void onComplete() {

  }
});复制代码

create方法如下

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
   ObjectHelper.requireNonNull(source, "source is null");
   return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}复制代码

代码很简单,第一行判空不用管,第二行调用RxJavaPlugins的方法是为了实现Rx的hook功能,我们暂时也无需关注,在一般情况下,第二行代码会直接返回它的入参即ObservableCreate对象,ObservableCreate是Observable的子类,实现了Observable的一些抽象方法比如subscribeActual。事实上Rx的每个操作符都对应Observable的一个子类。 这里create方法接受的是一个ObservableOnSubscribe的接口实现类:

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}复制代码

通过注释可以知道这个接口的作用是通过一个subscribe方法接受一个ObservableEmitter类型的实例,俗称发射器。 Observable.create方法执行时,我们传入的就是一个ObservableOnSubscribe类型的匿名内部类,并实现了它的subscribe方法,然后它又被传入create方法的返回对象ObservableCreate,最终成为ObservableCreate的成员source

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ...复制代码

接着我们来看Observable的subscribe方法,它的入参是一个Observer(即观察者,也就是事件接收者)

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

       subscribeActual(observer);
   } catch (NullPointerException e) { // NOPMD
       throw e;
   } catch (Throwable e) {
       Exceptions.throwIfFatal(e);
       // can't call onError because no way to know if a Disposable has been set or not
       // can't call onSubscribe because the call might have set a Subscription already
       RxJavaPlugins.onError(e);

       NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
       npe.initCause(e);
       throw npe;
   }
}复制代码

最终它会调用它的子类ObservableCreate的subscribeActual方法:

@Override
protected void subscribeActual(Observer<? super T> observer) {
   CreateEmitter<T> parent = new CreateEmitter<T>(observer);
   observer.onSubscribe(parent);

   try {
       source.subscribe(parent);
   } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
   }
}复制代码

在subscribeActual里首先创建了用于发射事件的CreateEmitter对象parent,CreateEmitter实现了接口Emitter和Disposable,并持有observer。 这段代码的关键语句是source.subscribe(parent),这行代码执行后,就会触发事件源进行发射事件,即e.onNext("s")会被调用。细心的同学也会注意到这行代码之前,parent先被传入了observer的onSubscribe()方法,而在上面我们说过,observer的onSubscribe()方法接受一个Disposable类型的参数,可以用于解除订阅,之所以能够解除订阅,正是因为在触发事件发射之前调用了observer的onSubscribe(),给了我们调用CreateEmitter的解除订阅的方法dispose()的机会。 继续来看CreateEmitter的onNext()方法,它最终是通过调用observer的onNext()方法将事件发射出去的

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


   private static final long serialVersionUID = -3434801548987643227L;

   final Observer<? super T> observer;

   CreateEmitter(Observer<? super T> observer) {
       this.observer = observer;
   }

   @Override
   public void onNext(T t) {
       if (t == null) {
           onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
           return;
       }
       // 在真正发射之前,会先判断该CreateEmitter是否已经解除订阅
       if (!isDisposed()) {
           observer.onNext(t);
       }
   }
   ...
}复制代码

至此,Rx事件源的创建和订阅的流程就走通了。

下面我们从map操作符来入手看一下Rx操作符的原理,map方法如下

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mApper) {
   ObjectHelper.requireNonNull(mapper, "mapper is null");
   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}复制代码

map方法接受一个Function类型的参数mapper,返回了一个ObservableMap对象,它也是继承自Observable,而mapper被传给了ObservableMap的成员function,同时当前的源Observable被传给ObservableMap的成员source,进入ObservableMap类

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}复制代码

可以看到这里用到了装饰者模式,ObservableMap持有来自它上游的事件源source,MapObserver持有来自它下游的事件接收者和我们实现的转换方法function,在subscribeActual()方法中完成ObservableMap对source的订阅,触发MapObserver的onNext()方法,继而将来自source的原始数据经过函数mapper转换后再发射给下游的事件接收者,从而实现map这一功能。

现在我们终于能够来总结一下包含多个操作符时的订阅流程了,以下面这段代码为例

Observable.
        create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("holen");
            }
        })
        .map(new Function<String, Integer>() {
            @Override
            public Integer apply(@NonNull String s) throws Exception {
                return s.length();
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });复制代码

执行代码时,自上而下每一步操作符都会创建一个新的Observable(均为Observable的子类,对应不同的操作符),当执行create时,创建并返回了ObservableCreate,当执行map时,创建并返回了ObservableMap,并且每一个新的Observable都持有它上游的源Observable(即source)及当前涉及到的操作函数function。当最后一步执行订阅方法subscribe时会触发ObservableMap的subscribeActual()方法,并将最下游的Observer包装成MapObserver,同时该方法又会继续调用它所持有ObservableCreate的订阅方法(即执行source.subscribe),由此也会触发ObservableCreate的subscribeActual()方法,此时我们的发射器CreateEmitter才会调用它的onNext()方法发射事件,再依次调用MapObserver的操作函数mapper和onNext()方法,最终将事件传递给了最下游的Observer的onNext()方法。

我简单的将这段逻辑用下面这幅图来表示

 

 

 

操作符举例

1. map

 

 

map

可以看到,这幅图表达的意思是一个源先后发射了1、2、3的三个item,而经过操作符一转换,就变成了一个发射了10、20、30三个item的新的。描述操作符的长方框中也清楚的说明了该操作符进行了何种具体的转换操作(图中的10*x只是一个例子,这个具体的转换函数是可以自定义的)。于是,我们就很快速地理解了操作符的含义和用法,简单来讲,它就是通过一个函数将一个发射的item逐个进行某种转换。示例代码:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
    }
}).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(@NonNull Integer integer) throws Exception {
        return integer * 10;
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer result) throws Exception {
        Log.i(TAG, "accept : " + result +"n" );
    }
});复制代码

输出结果:

 

 

 

2. zip

 

 

根据的宝石图,可以知道zip操作符的作用是把多个源发射的item通过特定函数组合在一起,然后发射组合后的item。从图中还可以看到一个重要的信息是,最终发射的item是对上面的两个源发射的item按照发射顺序逐个组合的结果,而且最终发射的等item的发射时间是由组合它的和等item中发射时间较晚的那个item决定的,也正是如此,操作符经常可以用在需要同时组合处理多个网络请求的结果的业务场景中。示例代码:

Observable.zip(Observable.just(1, 2, 3),
        Observable.just("A", "B", "C"),
        new BiFunction<Integer, String, String>() {
            @Override
            public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
                return integer + s;
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.i(TAG, "zip : accept : " + s + "n");
            }
        });复制代码

输出结果:

 

3. concat

 

 

从宝石图可以看出,操作符的作用就是将两个源发射的item连接在一起发射出来。这里的连接指的是整体连接,被操作后产生的会先发射第一个源的所有item,然后紧接着再发射第二个源的所有的item。示例代码:

Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.i(TAG, "concat : " + integer + "n");
            }
        });复制代码

输出结果:

 

4.flatMap

  @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
     ...
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));

看看ObservableFlatMap代码

 public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

是不是和MAP超级像,我们这几看MergeObserver onNext做了什么

@Override
        public void onNext(T t) {
             ...
               p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");

            ...

            subscribeInner(p);
        }
         @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                if (p instanceof Callable) {
                  
                } else {
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    addInner(inner);
                    p.subscribe(inner);
                    break;
                }
            }
        }

省略了很多代码,我们看主要逻辑,获取到flatMap生成的observableSource,然后 p.subscribe(inner);注意这里的P不是observable 看innerObserver的onNext做了什么

    //这里的onNext事件由 p.subscribe(inner)触发
  @Override
        public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                parent.tryEmit(t, this);
            } else {
                parent.drain();
            }
        }
        
        void tryEmit(U value, InnerObserver<T, U> inner) {
            if (get() == 0 && compareAndSet(0, 1)) {
                actual.onNext(value);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }

在这里我们终于看到我们定义的observer接收到了onNext事件

Rx操作符的应用场景

说了这么多,其实我们最关心的还是Rx操作符的应用场景。其实只要存在异步的地方,都可以优雅地使用Rx操作符。比如很多流行的Rx周边开源项目

 



Tags:RxJava   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
RxJava在Android开发中已经算是无人不知无人不晓了,加之它与Retrofit等流行框架的完美结合,已经成为Android项目开发的必备利器。随手记作为一个大型项目,引入三方框架一直比较...【详细内容】
2022-07-26  Tags: RxJava  点击:(62)  评论:(0)  加入收藏
去年的时候学习了Rxjava和Retrofit的基本用法,但一直没有在实际项目中运用。今年开做新项目,果断在新项目中引入了RxJava和Retrofit。本篇文章将介绍笔者在项目中对Retrofit的...【详细内容】
2019-08-05  Tags: RxJava  点击:(427)  评论:(0)  加入收藏
▌简易百科推荐
前言更改用户交互中的文本字段颜色。预览 当选择一个文本字段并接受输入时,它被称为具有“焦点”通常,用户通过点击将焦点转移到文本字段,开发人员通过使用本菜谱中描述的工具...【详细内容】
2022-11-02  会煮咖啡的猫  稀土掘金  Tags:Flutter   点击:(2)  评论:(0)  加入收藏
简介Coil 是一个 Android 图片加载库,通过 Kotlin 协程的方式加载图片。特点如下:更快: Coil 在性能上有很多优化,包括内存缓存和磁盘缓存,把缩略图存保存在内存中,循环利用 bitm...【详细内容】
2022-10-18    CSDN  Tags:Coil使   点击:(27)  评论:(0)  加入收藏
Handler引发的泄露(通常发生在Activity、Fragment等容器)、crash是Android开发中常见的问题,也是面试时非常容易被问到的技术点。关于Handler为何会引起容器泄露,网上有很多的文...【详细内容】
2022-10-18  莫大叔杂谈    Tags:Android   点击:(12)  评论:(0)  加入收藏
前言我们都知道面试大厂主要就是考察程序员技术方向的专业技能,Java开发主要考察的就是Java方面的专业技能,而Android岗位的专业技能就是Android程序员面试的重要考察方向。大...【详细内容】
2022-10-09  Android不是安卓  今日头条  Tags:Android   点击:(23)  评论:(0)  加入收藏
Android 13 已于一个多月前发布,谷歌方面现如今则在积极地开发 Android 14 中。最新消息指出,新版本似乎将强制所有设备使用 AV1 编解码器。此前,设备只需要支持 VP8 和 VP9(AV1...【详细内容】
2022-10-03  微软NETCORE  今日头条  Tags:Android   点击:(41)  评论:(0)  加入收藏
Android项目开发完,多渠道打包是必不可少的环节。其原理在于,通过在Android安卓包中添加不同的标识,区分各个渠道下载来源,用于统计App在不同应用市场或渠道合作中的各项数据。...【详细内容】
2022-09-29  范二先森  搜狐号  Tags:Android   点击:(29)  评论:(0)  加入收藏
要理解RTMP推流,我们就要知道详细原理。本文将详细的来给大家介绍RTMP推流原理以及如何推送到服务器,首先我们了解一下推流的全过程: 我们将会分为几个小节来展开:一. 本文用到...【详细内容】
2022-09-27  音视频开发老舅  今日头条  Tags:ffmpeg   点击:(44)  评论:(0)  加入收藏
在Android中,简单的说可以使用MediaMuxer来封装编码后的视频流和音频流到mp4容器中,MediaMuxer从api18开始提供,可以封装编码后的视频流和音频流到视频文件中。目前MediaMuxer...【详细内容】
2022-09-22  音视频开发老舅  今日头条  Tags:MediaMuxer   点击:(84)  评论:(0)  加入收藏
前言现在市面上对APP的安全合规管控越来越严格了,也就要求了APP在上架之前一定要做合规检测和加固处理。对APP就是加固的好处,可以提高APP的安全性,提高APP被逆向分析破解的门...【详细内容】
2022-09-21  Android秃老师  今日头条  Tags:Android   点击:(130)  评论:(0)  加入收藏
本文作者:张涛-货拉拉TheRouter 是一个 Kotlin 编写,用于 Android 模块化开发的一整套解决方案框架。Github 项目地址与使用文档详见 https://github.com/HuolalaTech/hll-wp-...【详细内容】
2022-09-20  Android群英传  搜狐号  Tags:TheRouter   点击:(27)  评论:(0)  加入收藏
站内最新
站内热门
站内头条