学习 RxSwift,开始冲了个泊学会员看视频,发现看视频还不如直接看泊学文档。之后看到了这本书,果然看文档还不如看书。(泊学视频中的RxSwift 就是照搬了这本书的前两个 Section,例子也是照搬的)
着手 RxSwift
简介
RxSwift 本质上是通过对新数据的顺序处理,来简化异步程序的开发。
RxSwift 基础
RxSwift 由三部分组成:Observables,Operators 和 Schedulers。
Observables<T>
帮助一个程序产生一个带有数据的事件序列,允许一个或多个 Observers 响应事件并处理新的数据。Observables 能发送三种类型的事件:next 事件,completed 事件,error 事件。使用事件序列是最终的解耦方式,不再需要使用 delegate 或者闭包。
Operators 指的是有着异步的输入,然后产生没有副作用的输出的方法。它们能够组合在一起,实现复杂的逻辑。比如下图的 filter
和 map
的过程:
Schedulers 是 Rx 中的调度队列。我们可以通过 RxSwift 将同一个订阅在不同的调度队列中完成以达到最佳的性能:
App 架构
Rx 适合所有的架构。当然最匹配的还是 MVVM。MVVM 中有一层 ViewModel 能够暴露出各个 Observables 属性,你可以在 ViewController 中直接将其和 UI 绑定。
RxCocoa
RxSwift 提供了通用的 Rx API。开发的时候还需要 RxCocoa 配合 RxSwift 使用 UIKit 和 Cocoa。比如 RxCocoa 给 UISwitch 添加了一个 rx.isOn
属性,你可以订阅它来监听 UISwitch 的事件队列。
Observable
Observable 的生命周期
一个 Observable 不发出任何事件,直到有订阅发生。订阅者发送 error 或者 completed 事件能结束订阅。当订阅结束,也就不会再发出任何事件了。
事件 event 是个枚举类型:
1 | public enum Event<Element> { |
创建 Observables
.just
可以用来创建一个只有一个元素的 Observable:
1 | let observable = Observable<Int>.just(one) |
.of
接收多个元素形成一个 Observable:
1 | let observable2 = Observable.of(1, 2, 3) |
.from
接收一个数组,将数组元素生成一个 Observable:
1 | let observable3 = Observable.from([1, 2, 3]) |
.empty
提供 0 个元素的 Observable,只会触发 completed 事件。尽管 empty 不触发 next 事件,但是还需要明确泛型类型,所以使用 void 是最好的选择:
1 | let observable4 = Observable<Void>.empty() |
.never
创建的 Observable 不会触发 next,也不会触发 completed:
1 | let observable5 = Obnservable<Any>.never() |
.create
创建 Observable,需要传入一个入参为 observer,返回一个 Disposable 实例的闭包。在这个闭包里定义了所有将要发送的事件:
1 | Observable<String>.create { observer in |
注意前面生命周期中提及的,onCompleted
发生后,订阅结束。所以后面的 observer.onNext("?")
事件是无法接收到的。
订阅 Observables
通过 .subScribe
订阅 Observable,返回一个 Disposable
对象。
下面是传入一个入参为事件 event 闭包的例子:
1 | let disposable = observable.subScribe { event in |
.subScribe
还可以接受各种事件的回调闭包,比如 onNext
,onCompleted
,onError
:
1 | let disposable2 = observable.subscribe(onNext: { element in |
取消订阅 Observables
如果不取消订阅,将会将会产生内存泄漏,取消订阅可以通过 .dispose
直接取消:
1 | disposable.dispose() |
但是这种手动的取消非常的麻烦,RxSwift 提供了一个 DisposeBag
类型,用于自动回收,把订阅后返回的 disposable
对象直接加入 DisposeBag
:
1 | let disposeBag = DisposeBag() |
当 disposeBag
回收的时候,会自动取消订阅。
取消订阅有两种方式,一种是此处说的 disposebag,还有一种方式是订阅者调用 onCompleted 方法。比如线面的 Subjects 就可以直接调用取消:
subject.onCompleted()
Subjects
什么是 subjects?
subjects 既是 Observable 又是 Observer。有四种类型:
- PublishSubject:订阅后接受事件
- BehaviorSubject:有初始值,新的订阅者会获得订阅前最新的事件
- ReplySubject:有一个 buffer,新的订阅者会获得 buffer 中缓存的事件
- Variable:包装了的 BehaviorSubject。把当前值作为状态,值改变时触发事件。新的订阅者同样会获得订阅前最新的事件。
PublishSubjects
PublishSubject 只有在订阅后才能接收到事件。流程如图所示,在事件1时并没有进行订阅,所以并没有收到事件;在事件1和事件2之间进行订阅,能收到事件2和事件3;在事件2和事件3之间订阅,能收到事件3:
示例代码如下,要注意初始化的时候要设置好泛型类型:
1 | let disposeBag = DisposeBag() |
要注意一点,当一个 subject 取消订阅后,再订阅,会直接受到一个 completed 事件,之后再发送 next 事件是没有任何响应的。
BehaviorSubjects
BehaviorSubjects 在订阅之后可以接到最近一次的事件。并且接受一个初始值,当订阅前没有事件时,将初始值作为事件值发送。如果所示,事件1和事件2之间订阅的时候,还是能收到事件1;事件2和事件3之间订阅的时候,还是能收到事件2:
示例代码如下,由于有初始值的类型推断,就不需要设置泛型了:
1 | let disposeBag = DisposeBag() |
和 PublishSubject 类似的,当 subject 完成或者错误时,再订阅,就只会发出完成和错误事件,并且忽略后面的 next 事件。
ReplaySubjects
ReplaySubject 有一个 buffer 用来暂存先前的事件值,初始化的时候设置缓存的大小。当有订阅时,将缓存的值发送。如图所示,当在事件2和事件3之间产生订阅的时候,缓存的事件1和事件2都触发了:
示例代码如下,还是要注意设置泛型,并且初始化方法是 create(bufferSize:)
1 | let disposeBag = DisposeBag() |
注意,和前面都相同的是,由于之前 subject 已经发出错误事件,再次订阅的时候会立刻收到错误事件。不同的是,由于 ReplaySubject 有 buffer,所以仍然会先响应 buffer 中缓存的事件。
Variables
Variable 是 BehaviorSubject 的包装,将其值作为状态。当值变化时,触发事件,不再需要 onNext()
方法触发事件。Variable 不会出现 error,也不需要手动触发 completed 事件。
代码示例如下:
1 | let disposeBag = DisposeBag() |
Variable 需要使用 asObservable()
方法将其转换为 Observable,值是其 value
属性。另外,由于是 BehaviorSubject 的包装,因此,订阅的时候会打印最后一次的事件值。
Subject 与 Observer 和 Observable 的区别
这一节是我自己的理解,可能在理解上有偏差。如果看到这里可以自己也思考一下。
Subject 与 Observable 的不同
onNext
,onError
,onCompleted
都是 Observer 中的方法,但是本身并没有具体的实现。Observable 在订阅的时候传入了这几个方法的实现,所以在触发事件的时候,直接内部调用这些闭包响应事件即可。你可以把这个过程想象成 Observable 调用其内部的 Observer。所以 Observable 必须要能被订阅,订阅有什么用?用来传递事件处理方法。
Subject 既是 Observable 又是 Observer,我们订阅完,传入事件的实现后,就可以直接调用 observer.onNext()
。相当于,我们可以选择主动触发事件的时机,然后 Observer 响应事件。这很有用,比如处理 UI 交互,就必须要在任意时刻都能主动触发 observer.onNext()
方法。而单纯的 Observable 则不行,它是非常被动的,要么直接触发,要么延迟多久或者多少周期触发。
Subject 与 Observer 的不同
对于单纯的 Observer,由于没有这些事件方法的具体实现,所以我们也不能像 Subject 一样主动触发事件。但是有一些特殊的 Observer 本身在初始化的时候就提供了事件处理的方法,这种 Observer 不需要像 subscribe 一样订阅的时候传入事件处理方法,直接使用 bindTo()
绑定即可。
不过也正是因为 Observer 提供了默认的事件处理方法,所以我们不能用其处理 UI 交互。因为 UI 交互的事件处理方法是要根据具体情况而定的。比如点击一个按钮我可能要让计数加一,或者让计数减一。
总结
所以,在我的理解下,Observable 需要传递事件处理方法,所以可以定制,但不能主动触发;Observer 有默认事件处理方法,所以可以主动触发,但不能定制。
因为 Subject 既能主动选择触发事件的时机,又能个性化事件处理的方法,所以一般交互的 UI 控件中的属性都是 Subject 类型。
操作符的最佳实践
过滤操作符
Igoring operators
ignoreElements
ignoreElements 用来忽略所有的 .next
事件。所以用来指接受 completed 事件
示例如下:
1 | let strikes = PublishSubject<String> |
elementAt
elementAt 只会获取索引序号的事件,忽略其他的所有 .next
。索引序号从 0 开始:
代码如下:
1 | let strikes = PublishSubject<String>() |
filter
filter 接受一个断言闭包,入参为当前事件值。接受所有断言正确的事件。注意是接受而不是过滤掉:
代码如下:
1 | Observable.of(1, 2, 3, 4, 5) |
Skipping operators
上面一节是根据条件筛选;这一节是忽略到某个满足条件的,后面的全部接受。
skip
skip 用来略过一定数量的 .next
事件,然后开始接受。从 1 开始计数
代码如下:
1 | Observable.of(1, 2, 3, 4, 5) |
skipWhile
skipWhile 是略过直到某个满足条件的事件发生。skipWhile 还有一个兄弟方法 skipWhileWithIndex,除了接受事件值,还接受事件序号,index 从 0 开始:
代码如下:
1 | Observable.of(1, 2, 3, 4, 5) |
skipUntil
skipUntil 是略过直到某个事件发生。就是当前 Observable 和另外一个 Observable 相关联,当特定的 Observable 的 .next
发生的时候,才开始接受事件:
代码如下:
1 | let subject = PublishSubject<String>() |
Taking operators
本小节和上一小结正好相反,这一小节是接受某个事件之前的所有事件,之后的都不接受。
take
take 和 skip 正好相反。take 是直到一定数量的事件发生后才开始取,而不是取到一定数量的时候停
代码如下:
1 | Observable.of(1, 2, 3, 4, 5) |
takeWhile
takeWhile 和 skipWhile 正好相反。takeWhile 是取到某个不满足条件的事件。takeWhile 还有一个兄弟方法 takeWhileWithIndex,除了事件值 value,还接受事件的序号 index:
代码如下:
1 | Observable.of(1, 2, 3, 4, 5) |
takeUntil
takeUntil 和 skipUntil 相反。表示接受事件知道某个 Observable 的事件触发:
代码如下:
1 | let subject = PublishSubject<String>() |
Distinct operators
本小节的操作符可以防止重复事件
distinctUntilChanged
如果当前事件和前一个事件的事件值相同,那么忽略这个事件:
代码如下:
1 | Observable.of(1, 2, 2, 1) |
distinctUntilChanged 还接受一个闭包,闭包入参为相邻事件的事件值,闭包返回值为 true 则忽略当前事件。
代码如下,当前一个事件值为 1,当前事件值为 2 的时候忽略当前事件:
1 | Observable.of(1, 2, 2, 1) |
转换操作符
转换元素
toArray
将事件序列的元素转换成一个数组,然后将这个数组作为事件值,触发 .next
事件:
代码示例:
1 | Observable.of("A", "B", "C") |
map
map 和数组中的 map 无异:
代码示例:
1 | Observable.of(1, 2, 3) |
map 还有一个兄弟方法 mapWithIndex 带有一个索引:
代码示例:
1 | Observable.of(1, 2, 3) |
转换内部 Observables
flatMap
flatMap 主要就是将一个 Observable 中的每个元素都转换为一个 Observable,并且订阅。flatMap 需要一个闭包,传入当前 Observable 的事件值,返回一个新的 Observable。下面图示的例子中 Observable 的事件值类型为 Variable。传入一个 Variable,将其 value 属性扩大十倍:
代码示例:
1 | let outter = PublishSubject<Variable<Int>> |
flatMapLatest
flatMapLatest 是 flatMap 和 switchlatest 的合体。在使用上和 flatMap 一致,但是它值订阅最新的 Observable:
看出区别了么?当有新的订阅产生的时候,旧的 Observable 就取消订阅了,所以这里由于订阅了绿色的 Observable,所以蓝色变为 30,并不会触发订阅;由于订阅了橙色的 Observable,所以绿色变为 50,也不会触发订阅。而 flatMap 则是全部都触发了订阅的。
关联操作符
前缀与串联
startWith
startWith 接受一个事件值,将其插到当前事件序列的最前面,返回一个新的 Observable:
代码如下:
1 | let numbers = Observable.of(2, 3, 4) |
concat
concat 连接两个事件序列,生成一个新的事件序列:
代码如下,注意要用 []
将事件序列当成一个数组:
1 | let first = Observable.of(1, 2, 3) |
连个事件序列的泛型类型一定要相同,否则崩溃给你看😖
合并
merge
merge 将元素为事件序列的事件序列自动拆开,成为一个新的事件序列。其实你也可以分开来写,让它们分别订阅,merge 主要就是用来减少事件序列的订阅的:
代码如下:
1 | let left = PublishSubject<String>() |
注意,只有当内部的时间序列都 completed 后,merge 产生的事件序列才会 completed。
关联元素
combineLatest
当 combineLatest 中的子序列中的任意一个发出事件的时候,将会调用一个你提供的闭包。这个闭包将子序列的最近的事件值作为入参传入,得到的返回值作为事件值执行订阅的方法。主要用在同时监控多个源的状态。
上面图示中,当事件 1 触发的时候,由于另外一个序列没有事件发生过,所以不触发订阅,直到那一个序列发生了事件 4,才触发了订阅。
代码如下:
1 | let left = PublishSubject<String>() |
另外需要说明的就是只有两个子序列都 completed,外部序列才会 completed。如果其中一个子序列先结束了,当另外一个序列触发事件的时候,使用的是结束的那个子序列结束前最后一次事件的事件值。其实上面的图中也有展示,right 先结束了,此时left 触发了事件 3,所以最终是将 3,6 的值作为事件值的。
zip
和上面的 combineLatest 不同,zip 要求必须每个子序列都有新消息的时候,才触发事件:
可以看到,left 和 right 都必须有新的消息最终才能产生事件。由于 right 已经结束了,所以 sunny 永远不会接收到。
代码如下:
1 | let left = PublishSubject<String>() |
需要注意的是,zip 不需要所有内部序列都完成,只要有一个 completed,整个事件序列就结束了。
(确实这个挺像拉链的,名字起得很形象)
触发器
withLatestFrom
当一个 Observable 触发的时候,获取另一个 Observable 的最新的事件值。很常用,比如点击按钮的时候要获取 textfield 的最新值:
text 不管怎么修改,当 button 点击的时候,都获得最新的 text 的值。
代码如下:
1 | let button = PublishSubject<Void>() |
simple
触发某个 Observable 获取另一个 Observable 的最新值。但是和 withLatestFrom 不同的是,当再次触发这个 Observable 的时候,如果另一个 Observable 没有更新值,那么不会触发事件,类似于 distinctUntilChanged:
代码如下:
1 | let Observable = textField.sample(button) |
一定要注意这里啊,前面是 button.withLatestFrom(textField)
,这里是 textField.sample(button)
。
开关
amb
当两个 Observable 中的任意一个触发的时候,取消订阅另一个,以后只接受当前 Observable 的事件。如图所示,由于 right 先出法,所以就取消了 left 的订阅,以后就只能接收到 right 的事件了:
代码如下:
1 | let left = PublishSubject<String>() |
switchLatest
前面那个是被动的哪个 Observable 最先触发就一直订阅哪一个。这个是可以自己控制当前想要订阅那个 Observable:
如图所示,source 在选择 one 的时候,只接受 one 的事件,在选择 two 的时候,只接受 two 的事件。
1 | let one = PublishSubject<String>() |
还记得 flatMapLatest 吗?之前说过 flatMapLatest,就是 map + switchLatest
元素和序列的结合
reduce
Rx 中的 reduce 类似于 Swift 中的 reduce,将一个序列的所有事件值通过运算,得到一个最终的事件值,并触发事件:
代码如下:
1 | Observable.of(1, 2, 3) |
scan
scan 和 reduce 的不同在于,reduce 是一锤子买卖,scan 每次接收到事件值时都会触发一个事件:
代码如下:
1 | Observable.of(1, 2, 3) |
基于时间的操作符
缓存操作符
replay
这个操作符是针对一个 Observable,多个订阅者的。为 Observable 设置 replay ,当有新的订阅者订阅的时候,会立即触发最近的几个事件:
上面的是 Observable,每隔 1s 发出一次事件。下面是一个订阅者,在第 4s 的时候开始订阅。由于设置了 replay 的数量为 1,所以立刻重现之前的事件 3,再加上当前事件 4 的触发,所以再时刻 4,有两个事件一起触发了。
代码如下:
1 | let interval = Observable<Int>.interval(1, |
这种一个 Observable,多个订阅者的情况叫做可连接 Observable,一般的 Observable 类型为 Observable<E>
,这种类型为 ConnectableObservable<E>
。所以需要使用 .connect
方法来表示 Observable 开始运行。
如果要所有元素都重现,那么可以使用 .replayAll()
buffer
buffer 用于将事件缓存,在某个条件下,一并发出。buffer(timeSpan:count:scheduler:)
接受一个最大时间跨度 timeSpan,一个事件最大发生数量 count。处理逻辑在于,当最大时间跨度内事件数量没有到时,发送一个事件,其事件值为当前事件跨度内发出的时间的事件值组成的数组,重置时间跨度;当时间跨度内发生的事件超过了最大发生数量时,立即发送一个事件值为这些事件值所组成的数组的事件,重置时间跨度:
如图,时间跨度为4,最大事件值为2。最开始什么时间也没有发生,所以发送一个 0 个元素的数组。之后某个时刻发出了三个事件,所以将前两个事件值合成为一个数组发送,剩余一个事件,重置时间跨度。后面事件数量没有到最大值,但是时间跨度到了,所以也发送一个事件的数组,重置时间跨度。最后又没有事件发生,发送 0 个元素的数组的事件。
1 | let interval = Observable<String>.interval(1, scheduler: MainScheduler.instance) |
时间平移操作符
delaySubscription
延迟订阅,在正式订阅前发生的事件都会被忽略:
如图,在 1 时刻开始订阅,由于延迟了 1.5s,所以前两个事件被忽略了。
1 | Observable.of(1, 2, 3, 4, 5) |
delay
delay 则将序列中的所有事件延迟执行,所以并不会忽略掉事件:
差别就在于,上面的忽略了1,2,而这里则仍是从事件 1 开始。
1 | Observable.of(1, 2, 3, 4, 5) |
定时操作符
interval
Rx 中的定时不需要使用 NSTimer,也不需要使用 DispatchSource。interval 的使用非常简单,比如一个1s的定时器:
1 | Observable<Int>.interval(1, scheduler: MainScheduler.instance) |
事件值默认是从 0 开始发送,依次递增。如果你不想要从 0 开始,可以使用 map。不过一般我们不需要使用这个事件值。
timer
timer(_:period:scheduler:)
和 interval 的区别在于,可以设置一个重复次数 period。如果不设置,默认只执行一次:
1 | Observable<Int>.timer(3, period:3, scheduler: MainScheduler.instance) |