您当前的位置:首页 > 电脑百科 > 程序开发 > 移动端 > Android

架构师之RX响应式编程——RxJava2.0操作符原理

时间:2022-07-26 14:22:10  来源:  作者:Android秃老师

RxJAVAAndroid开发中已经算是无人不知无人不晓了,加之它与Retrofit等流行框架的完美结合,已经成为Android项目开发的必备利器。随手记作为一个大型项目,引入三方框架一直比较慎重,但也从今年初开始,正式引入了RxJava2.0,并配合Retrofit对项目的网络框架和繁琐的异步逻辑进行重构。

RxJava的作用:

就是异步RxJava的使用,可以使“逻辑复杂的代码”保持极强的阅读性。

RxAndorid的作用:

Android中RxAndorid与RxJava配合使用; RxAndorid 封装了
AndroidSchedulers.mAInThread(),Android开发者使用过程中,可以轻松的将任务post Andorid主线程中,执行页面更新操作。

RxJava的优点

简单来讲RxJava是一个简化异步调用的库,但其实它更是一种优雅的编程方式和编程思想,当你熟悉RxJava的使用方式之后,会很容易爱上它。 我总结它的优点主要有两个方面:

  • 简洁,免除传统异步代码逻辑中的callback hell
  • 增加业务逻辑代码的可读性

Rx的操作符有哪些

刚接触Rx的人面对一堆各式各样的操作符会觉得不知如何去学习记忆,其实你只需要从整体上了解Rx操作符的类别和掌握一些使用频率较高的操作符就足够了,至于其他的操作符,你只需要知道它的使用场景和掌握如何快速理解一个操作符的方法,就可以在需要的时候快速拿来用了。 下图是我根据官方文档总结的Rx操作符的分类及每个类别下的代表性操作符

 

 

 

Rx操作符的原理

要了解操作符的原理,肯定要从源码入手喽。所以我们先来简单撸一遍Rx的最基本的Create操作符的源码。 Rx的源码目录结构是比较清晰的,我们先从Observable.create方法来分析

Observable.create(new ObservableOnSubscribe<String>() {
  @Override
  public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
      e.onNext("s");
  }
}).subscribe(new Observer<String>() {
  @Override
  public void onSubscribe(@NonNull Disposable d) {
    // 创建的Observer中多了一个回调方法onSubscribe,传递参数为Disposable ,Disposable相当于RxJava1.x中的Subscription,用于解除订阅。
  }

  @Override
  public void onNext(@NonNull String s) {

  }

  @Override
  public void onError(@NonNull Throwable e) {

  }

  @Override
  public void onComplete() {

  }
});复制代码

create方法如下

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
   ObjectHelper.requireNonNull(source, "source is null");
   return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}复制代码

代码很简单,第一行判空不用管,第二行调用RxJavaPlugins的方法是为了实现Rx的hook功能,我们暂时也无需关注,在一般情况下,第二行代码会直接返回它的入参即ObservableCreate对象,ObservableCreate是Observable的子类,实现了Observable的一些抽象方法比如subscribeActual。事实上Rx的每个操作符都对应Observable的一个子类。 这里create方法接受的是一个ObservableOnSubscribe的接口实现类:

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}复制代码

通过注释可以知道这个接口的作用是通过一个subscribe方法接受一个ObservableEmitter类型的实例,俗称发射器。 Observable.create方法执行时,我们传入的就是一个ObservableOnSubscribe类型的匿名内部类,并实现了它的subscribe方法,然后它又被传入create方法的返回对象ObservableCreate,最终成为ObservableCreate的成员source

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ...复制代码

接着我们来看Observable的subscribe方法,它的入参是一个Observer(即观察者,也就是事件接收者)

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

       subscribeActual(observer);
   } catch (NullPointerException e) { // NOPMD
       throw e;
   } catch (Throwable e) {
       Exceptions.throwIfFatal(e);
       // can't call onError because no way to know if a Disposable has been set or not
       // can't call onSubscribe because the call might have set a Subscription already
       RxJavaPlugins.onError(e);

       NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
       npe.initCause(e);
       throw npe;
   }
}复制代码

最终它会调用它的子类ObservableCreate的subscribeActual方法:

@Override
protected void subscribeActual(Observer<? super T> observer) {
   CreateEmitter<T> parent = new CreateEmitter<T>(observer);
   observer.onSubscribe(parent);

   try {
       source.subscribe(parent);
   } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
   }
}复制代码

在subscribeActual里首先创建了用于发射事件的CreateEmitter对象parent,CreateEmitter实现了接口Emitter和Disposable,并持有observer。 这段代码的关键语句是source.subscribe(parent),这行代码执行后,就会触发事件源进行发射事件,即e.onNext("s")会被调用。细心的同学也会注意到这行代码之前,parent先被传入了observer的onSubscribe()方法,而在上面我们说过,observer的onSubscribe()方法接受一个Disposable类型的参数,可以用于解除订阅,之所以能够解除订阅,正是因为在触发事件发射之前调用了observer的onSubscribe(),给了我们调用CreateEmitter的解除订阅的方法dispose()的机会。 继续来看CreateEmitter的onNext()方法,它最终是通过调用observer的onNext()方法将事件发射出去的

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


   private static final long serialVersionUID = -3434801548987643227L;

   final Observer<? super T> observer;

   CreateEmitter(Observer<? super T> observer) {
       this.observer = observer;
   }

   @Override
   public void onNext(T t) {
       if (t == null) {
           onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
           return;
       }
       // 在真正发射之前,会先判断该CreateEmitter是否已经解除订阅
       if (!isDisposed()) {
           observer.onNext(t);
       }
   }
   ...
}复制代码

至此,Rx事件源的创建和订阅的流程就走通了。

下面我们从map操作符来入手看一下Rx操作符的原理,map方法如下

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mApper) {
   ObjectHelper.requireNonNull(mapper, "mapper is null");
   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}复制代码

map方法接受一个Function类型的参数mapper,返回了一个ObservableMap对象,它也是继承自Observable,而mapper被传给了ObservableMap的成员function,同时当前的源Observable被传给ObservableMap的成员source,进入ObservableMap类

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}复制代码

可以看到这里用到了装饰者模式,ObservableMap持有来自它上游的事件源source,MapObserver持有来自它下游的事件接收者和我们实现的转换方法function,在subscribeActual()方法中完成ObservableMap对source的订阅,触发MapObserver的onNext()方法,继而将来自source的原始数据经过函数mapper转换后再发射给下游的事件接收者,从而实现map这一功能。

现在我们终于能够来总结一下包含多个操作符时的订阅流程了,以下面这段代码为例

Observable.
        create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("holen");
            }
        })
        .map(new Function<String, Integer>() {
            @Override
            public Integer apply(@NonNull String s) throws Exception {
                return s.length();
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });复制代码

执行代码时,自上而下每一步操作符都会创建一个新的Observable(均为Observable的子类,对应不同的操作符),当执行create时,创建并返回了ObservableCreate,当执行map时,创建并返回了ObservableMap,并且每一个新的Observable都持有它上游的源Observable(即source)及当前涉及到的操作函数function。当最后一步执行订阅方法subscribe时会触发ObservableMap的subscribeActual()方法,并将最下游的Observer包装成MapObserver,同时该方法又会继续调用它所持有ObservableCreate的订阅方法(即执行source.subscribe),由此也会触发ObservableCreate的subscribeActual()方法,此时我们的发射器CreateEmitter才会调用它的onNext()方法发射事件,再依次调用MapObserver的操作函数mapper和onNext()方法,最终将事件传递给了最下游的Observer的onNext()方法。

我简单的将这段逻辑用下面这幅图来表示

 

 

 

操作符举例

1. map

 

 

map

可以看到,这幅图表达的意思是一个源先后发射了1、2、3的三个item,而经过操作符一转换,就变成了一个发射了10、20、30三个item的新的。描述操作符的长方框中也清楚的说明了该操作符进行了何种具体的转换操作(图中的10*x只是一个例子,这个具体的转换函数是可以自定义的)。于是,我们就很快速地理解了操作符的含义和用法,简单来讲,它就是通过一个函数将一个发射的item逐个进行某种转换。示例代码:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
    }
}).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(@NonNull Integer integer) throws Exception {
        return integer * 10;
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer result) throws Exception {
        Log.i(TAG, "accept : " + result +"n" );
    }
});复制代码

输出结果:

 

 

 

2. zip

 

 

根据的宝石图,可以知道zip操作符的作用是把多个源发射的item通过特定函数组合在一起,然后发射组合后的item。从图中还可以看到一个重要的信息是,最终发射的item是对上面的两个源发射的item按照发射顺序逐个组合的结果,而且最终发射的等item的发射时间是由组合它的和等item中发射时间较晚的那个item决定的,也正是如此,操作符经常可以用在需要同时组合处理多个网络请求的结果的业务场景中。示例代码:

Observable.zip(Observable.just(1, 2, 3),
        Observable.just("A", "B", "C"),
        new BiFunction<Integer, String, String>() {
            @Override
            public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
                return integer + s;
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.i(TAG, "zip : accept : " + s + "n");
            }
        });复制代码

输出结果:

 

3. concat

 

 

从宝石图可以看出,操作符的作用就是将两个源发射的item连接在一起发射出来。这里的连接指的是整体连接,被操作后产生的会先发射第一个源的所有item,然后紧接着再发射第二个源的所有的item。示例代码:

Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.i(TAG, "concat : " + integer + "n");
            }
        });复制代码

输出结果:

 

4.flatMap

  @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
     ...
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));

看看ObservableFlatMap代码

 public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

是不是和MAP超级像,我们这几看MergeObserver onNext做了什么

@Override
        public void onNext(T t) {
             ...
               p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");

            ...

            subscribeInner(p);
        }
         @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                if (p instanceof Callable) {
                  
                } else {
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    addInner(inner);
                    p.subscribe(inner);
                    break;
                }
            }
        }

省略了很多代码,我们看主要逻辑,获取到flatMap生成的observableSource,然后 p.subscribe(inner);注意这里的P不是observable 看innerObserver的onNext做了什么

    //这里的onNext事件由 p.subscribe(inner)触发
  @Override
        public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                parent.tryEmit(t, this);
            } else {
                parent.drain();
            }
        }
        
        void tryEmit(U value, InnerObserver<T, U> inner) {
            if (get() == 0 && compareAndSet(0, 1)) {
                actual.onNext(value);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }

在这里我们终于看到我们定义的observer接收到了onNext事件

Rx操作符的应用场景

说了这么多,其实我们最关心的还是Rx操作符的应用场景。其实只要存在异步的地方,都可以优雅地使用Rx操作符。比如很多流行的Rx周边开源项目

 



Tags:RxJava   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
架构师之RX响应式编程——RxJava2.0操作符原理
RxJava在Android开发中已经算是无人不知无人不晓了,加之它与Retrofit等流行框架的完美结合,已经成为Android项目开发的必备利器。随手记作为一个大型项目,引入三方框架一直比较...【详细内容】
2022-07-26  Search: RxJava  点击:(404)  评论:(0)  加入收藏
Android实现Rxjava2+Retrofit完美封装
去年的时候学习了Rxjava和Retrofit的基本用法,但一直没有在实际项目中运用。今年开做新项目,果断在新项目中引入了RxJava和Retrofit。本篇文章将介绍笔者在项目中对Retrofit的...【详细内容】
2019-08-05  Search: RxJava  点击:(1013)  评论:(0)  加入收藏
▌简易百科推荐
Android Emulator黑屏怎么办 Android模拟器黑屏解决方法
Android Emulator黑屏问题困扰了非常多的玩家,Android Emulator作为一款安卓模拟器,可以让你在电脑上运行和浏览安卓应用程序,但是程序本身不是很稳定,很容易会出现黑屏,启动不了...【详细内容】
2024-03-04  18183游戏网    Tags:Android Emulator   点击:(46)  评论:(0)  加入收藏
Android开发中常见的Hook技术有哪些?
Hook技术介绍Hook技术是一种在软件开发中常见的技术,它允许开发者在特定的事件发生时插入自定义的代码逻辑。常见的应用场景包括在函数调用前后执行特定的操作,或者在特定的事...【详细内容】
2023-12-25  沐雨花飞蝶  微信公众号  Tags:Android   点击:(92)  评论:(0)  加入收藏
在Android应用开发中使用NFC功能
NFC介绍NFC是指“近场通讯”(Near Field Communication),它是一种短距离无线通信技术,允许设备在非接触或极短距离内进行通信。NFC通常用于移动支付、门禁系统、智能标签和其他...【详细内容】
2023-12-22  沐雨花飞蝶  微信公众号  Tags:Android   点击:(104)  评论:(0)  加入收藏
关于Android图像Bitmap类,你要知道的一切
Bitmap介绍Bitmap是一种图像文件格式,它由像素阵列组成,每个像素都有自己的颜色信息。在计算机图形学中,Bitmap图像可以被描述为一个二维的矩阵,其中每个元素代表一个像素的颜色...【详细内容】
2023-12-19  沐雨花飞蝶  微信公众号  Tags:Android   点击:(102)  评论:(0)  加入收藏
Android开发中如何进行单元测试?
单元测试介绍单元测试是软件开发中的一种测试方法,用于验证代码中的最小可测试单元(通常是函数或方法)是否按预期工作。单元测试通常由开发人员编写,旨在隔离和测试代码的特定部...【详细内容】
2023-12-11  沐雨花飞蝶  微信公众号  Tags:Android   点击:(170)  评论:(0)  加入收藏
一篇聊聊Jetpack Room实现数据存储持久性
Room介绍Room 是 Android Jetpack 组件库中的一部分,它是用于在 Android 应用中进行本地数据库访问和管理的库。Room 提供了一个抽象层,使开发者能够更轻松地访问 SQLite 数据...【详细内容】
2023-12-08  沐雨花飞蝶  微信公众号  Tags:Jetpack   点击:(149)  评论:(0)  加入收藏
了解Android系统架构中的HAL硬件抽象层
在Android系统中,HAL的存在使得不同厂商的硬件可以统一被上层的应用程序调用,从而提高了系统的兼容性和可移植性。HAL还可以帮助开发者更方便地开发应用程序,因为他们不需要为...【详细内容】
2023-12-06  沐雨花飞蝶  微信公众号  Tags:Android   点击:(214)  评论:(0)  加入收藏
我们一起聊聊 IntentService 与 Service 的区别?
Service介绍Service组件是Android应用开发中的四大组件之一,用于在后台执行长时间运行的操作或处理远程请求。它可以在没有用户界面的情况下执行任务,并且可以与其他应用组件...【详细内容】
2023-12-06  沐雨花飞蝶  微信公众号  Tags:IntentService   点击:(179)  评论:(0)  加入收藏
Android数据对象序列化原理与应用
序列化与反序列化「序列化」是将对象转换为可以存储或传输的格式的过程。在计算机科学中,对象通常是指内存中的数据结构,如数组、列表、字典等。通过序列化,可以将这些对象转换...【详细内容】
2023-11-14  沐雨花飞蝶  微信公众号  Tags:Android   点击:(280)  评论:(0)  加入收藏
你了解Android中的SELinux吗?
SELinux介绍SELinux(Security-Enhanced Linux)是一种安全增强的Linux操作系统,它通过强制访问控制(MAC)机制来提供更高级别的系统安全保护。相比于传统的Linux访问控制机制(DAC),SEL...【详细内容】
2023-11-09  沐雨花飞蝶  微信公众号  Tags:Android   点击:(272)  评论:(0)  加入收藏
站内最新
站内热门
站内头条