Kotlin Flows
本文包含的内容:
- Flow是什么, 基本概念和用法.
- Flow的不同类型, StateFlow和SharedFlow比较.
- Flow在Android中的使用安全收集.
- 操作符
stateIn
,
shareIn
的用法和区别.
本文被收录在集合中: https://github.com/mengdd/KotlinTutorials
Coroutines Flow Basics
Flow是什么
Flow可以按顺序发送多个值, 概念上是一个数据流, 发射的值必须是同一个类型.
Flow使用suspend方法来生产/消费值, 数据流可以做异步计算.
几个基本知识点:
- 创建flow: 通过flow builders
- Flow数据流通过
emit()
来发射元素.
- 可以通过各种操作符对flow的数据进行处理. 注意中间的操作符都不会触发flow的数据发送.
- Flow默认是cold flow, 即需要通过被观察才能激活, 最常用的操作符是
collect()
.
- Flow的
CoroutineContext
, 不指定的情况下是
collect()
的
CoroutineContext
, 如果想要更改, 用flowOn
改之前的.
关于Flow的基本用法, 19年底写的这篇coroutines flow in Android可以温故知新.
Flow的操作符
一个Flow操作符的可视化小网站: FlowMarbles.
Flow的不同类型
SharedFlow and StateFlow
应用程序里比较常用的类型是SharedFlow和StateFlow.
Android官方有一篇专门的文档来介绍二者: StateFlow and SharedFlow
StateFlow继承于SharedFlow, SharedFlow继承于Flow.
基本关系如下:
-
Flow
基类. Cold.
Flow的两大特性: Context preservation; Exception transparency. -
SharedFlow
继承Flow, 是一种hot flow, 所有collectors共享它的值, 永不终止, 是一种广播的方式.
一个shared flow上的活跃collector被叫作subscriber.
在sharedFlow上的collect call永远不会正常complete, 还有Flow.launchIn.
可以配置replay and buffer overflow strategy.
如果subscriber56csuspend了, sharedflow会suspend这个stream, buffer这个要发射的元素, 等待subscriber resume.
Because onBufferOverflow is set with
BufferOverflow.SUSPEND
, the flow will suspend until it can deliver the event to all subscribers.
默认参数:
public fun <T> MutableSharedFlow(replay: Int = 0,extraBufferCapacity: Int = 0,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND)
total buffer是:
replay + extraBufferCapacity
.
如果total buffer是0, 那么onBufferOverflow只能是
onBufferOverflow = BufferOverflow.SUSPEND
.
关于reply和buffer, 这个文章
有详细的解释, 并且配有动图.
- StateFlow
继承SharedFlow, hot flow, 和是否有collector收集无关, 永不complete.
可以通过
value
属性访问当前值.
有conflated特性, 会跳过太快的更新, 永远返回最新值.
Strong equality-based conflation: 会通过
equals()
来判断值是否发生改变, 如果没56c有改变, 则不会通知collector.
因为conflated的特性, StateFlow赋值的时候要注意使用不可变的值.
cold vs hot
cold stream 可以重复收集, 每次收集, 会对每一个收集者单独开启一次.
hot stream 永远发射不同的值, 和是否有人收集无关, 永远不会终止.
- sharedIn
可以把cold flow转成hot的SharedFlow. - stateIn
可以把cold flow转成hot的StateFlow.
StateFlow vs SharedFlow
共性:
-
StateFlow
和
SharedFlow
永远都不会停止. 不能指望它们的
onCompletionCallback
.
不同点:
-
StateFlow
可以通过
value
属性读到最新的值, 但
SharedFlow
却不行.
-
StateFlow
是conflated: 如果新的值和旧的值一样, 不会传播.
-
SharedFlow
需要合理设置buffer和replay策略.
互相转换:
SharedFlow
用了
distinctUntilChanged
2b58以后变成
StateFlow
.
// MutableStateFlow(initialValue) is a shared flow with the following parameters:val shared = MutableSharedFlow(replay = 1,onBufferOverflow = BufferOverflow.DROP_OLDEST)shared.tryEmit(initialValue) // emit the initial valueval state = shared.distinctUntilChanged() // get StateFlow-like behavior
RxJava的等价替代:
-
PublishSubject
->
SharedFlow
.
-
BehaviorSubject
->
StateFlow
.
Use Flow in Android
发送事件(Event或Effects): SharedFlow
因为SharedFlow没有conflated特性, 所以适合发送事件, 即便值变化得快也是每个都发送.
private val _sharedViewEffects = MutableSharedFlow<SharedViewEffects>() // 1val sharedViewEffects = _sharedViewEffects.asSharedFlow() // 2
这里用了
asSharedFlow
来创建一个
ReadonlySharedFlow
.
SharedFlow发射元素有两个方法:
-
emit
: suspend方法.
-
tryEmit
: 非suspend方法.
因为
tryEmit
是非suspend的, 适用于有buffer的情况.
保存暴露UI状态: StateFlow
StateFlow
是一个state-holder, 可以通过
value
读到当前状态值.
一般会有一个
MutableStateFlow
类型的Backing property.
StateFlow
是hot的, collect并不会触发producer code.
当有新的consumer时, 新的consumer会接到上次的状态和后续的状态.
使用StateFlow时, 发射新元素只需要赋值:
mutableState.value = newState
注意这里新值和旧的值要
equals
判断不相等才能发射出去.
StateFlow vs LiveData
StateFlow
和
LiveData
很像.
StateFlow
和
LiveData
的相同点:
- 永远有一个值.
- 只有一个值.
- 支持多个观察者.
- 在订阅的瞬间, replay最新的值.
有一点点不同:
-
StateFlow
需要一个初始值.
-
LiveData
会自动解绑, flow要达到相同效果, collect要在
Lifecycle.repeatOnLifecycle
里.
Flow的安全收集
关于收集Flow的方法, 主要还是关注一下生命周期的问题, 因为SharedFlow和StateFlow都是hot的.
在这个文章里有详细的讨论: A safer way to collect flows from Android UIs
在UI层收集的时候注意要用
repeatOnLifecycle
:
class LatestNewsActivity : AppCompatActivity() {private val latestNewsViewModel = // getViewModel()override fun onCreate(savedInstanceState: Bundle?) {//...// Start a coroutine in the lifecycle scopelifecycleScope.launch {// repeatOnLifecycle launches the block in a new coroutine every time the// lifecycle is in the STARTED state (or above) and cancels it when it\'s STOPPED.repeatOnLifecycle(Lifecycle.State.STARTED) {// Trigger the flow and start listening for values.// Note that this happens when lifecycle is STARTED and stops// collecting when the lifecycle is STOPPEDlatestNewsViewModel.uiState.collect { uiState ->// New value receivedwhen (uiState) {is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)is LatestNewsUiState.Error -> showError(uiState.exception)}}}}}}
这个文章里有个扩展方法也挺好的:
class FlowObserver<T> (lifecycleOwner: LifecycleOwner,private val flow: Flow<T>,private val collector: suspend (T) -> Unit) {private var job: Job? = nullinit {lifecycleOwner.lifecycle.addObserver(LifecycleEventObserver {source: LifecycleOwner, event: Lifecycle.Event ->when (event) {Lifecycle.Event.ON_START -> {job = source.lifecycleScope.launch {flow.collect { collector(it) }}}Lifecycle.Event.ON_STOP -> {job?.cancel()job = null}else -> { }}})}}inline fun <reified T> Flow<T>.observeOnLifecycle(lifecycleOwner: LifecycleOwner,noinline collector: suspend (T) -> Unit) = FlowObserver(lifecycleOwner, this, collector)inline fun <reified T> Flow<T>.observeInLifecycle(lifecycleOwner: LifecycleOwner) = FlowObserver(lifecycleOwner, this, {})
看了一下官方的
repeatOnLifecycle
其实大概也是这个意思:
public suspend fun Lifecycle.repeatOnLifecycle(state: Lifecycle.State,block: suspend CoroutineScope.() -> Unit) {require(state !== Lifecycle.State.INITIALIZED) {\"repeatOnLifecycle cannot start work with the INITIALIZED lifecycle state.\"}if (currentState === Lifecycle.State.DESTROYED) {return}// This scope is required to preserve context before we move to Dispatchers.MaincoroutineScope {withContext(Dispatchers.Main.immediate) {// Check the current state of the lifecycle as the previous check is not guaranteed// to be done on the main thread.if (currentState === Lifecycle.State.DESTROYED) return@withContext// Instance of the running repeating coroutinevar launchedJob: Job? = null// Registered observervar observer: LifecycleEventObserver? = nulltry {// Suspend the coroutine until the lifecycle is destroyed or// the coroutine is cancelledsuspendCancellableCoroutine<Unit> { cont ->// Lifecycle observers that executes `block` when the lifecycle reaches certain state, and// cancels when it falls below that state.val startWorkEvent = Lifecycle.Event.upTo(state)val cancelWorkEvent = Lifecycle.Event.downFrom(state)val mutex = Mutex()observer = LifecycleEventObserver { _, event ->if (event == startWorkEvent) {// Launch the repeating work preserving the calling contextlaunchedJob = [email protected] {// Mutex makes invocations run serially,// coroutineScope ensures all child coroutines finishmutex.withLock {coroutineScope {block()}}}return@LifecycleEventObserver}if (event == cancelWorkEvent) {launchedJob?.cancel()launchedJob = null}if (event == Lifecycle.Event.ON_DESTROY) {cont.resume(Unit)}}[email protected](observer as LifecycleEventObserver)}} finally {launchedJob?.cancel()observer?.let {[email protected](it)}}}}}
既然官方已经推出了, 我们就用官方的
repeatOnLifecycle
方法吧.
shareIn
和
stateIn
前面提过这两个操作符是用来做flow转换的:
- sharedIn
可以把cold flow转成hot的SharedFlow. - stateIn
可以把cold flow转成hot的StateFlow.
shareIn
可以保证只有一个数据源被创造, 并且被所有collectors收集.
比如:
class LocationRepository(private val locationDataSource: LocationDataSource,private val externalScope: CoroutineScope) {val locations: Flow<Location> =locationDataSource.locationsSource.shareIn(externalScope, WhileSubscribed())}
WhileSubscribed
这个策略是说, 当无人观测时, 上游的flow就被取消.
实际使用时可以用
WhileSubscribed(5000)
, 让上游的flow即便在无人观测的情况下, 也能继续保持5秒.
这样可以在某些情况(比如旋转屏幕)时避免重建上游资源, 适用于上游资源创建起来很expensive的情况.
如果我们的需求是, 永远保持一个最新的cache值.
class LocationRepository(private val locationDataSource: LocationDataSource,private val externalScope: CoroutineScope) {val locations: Flow<Location> =locationDataSource.locationsSource.stateIn(externalScope, WhileSubscribed(), EmptyLocation)}
Flow.stateIn
将会缓存最后一个值, 并且有新的collector时, 将这个最新值传给它.
shareIn
,
stateIn
使用注意事项
永远不要在方法里面调用
shareIn
和
stateIn
, 因为方法每次被调用, 它们都会创建新的流.
这些流没有被复用, 会存在内存里面, 直到scope被取消或者没有引用时被GC.
推荐的使用方式是在property上用:
class UserRepository(private val userLocalDataSource: UserLocalDataSource,private val externalScope: Coroutine1044Scope) {// DO NOT USE shareIn or stateIn in a function like this.// It creates a new SharedFlow/StateFlow per invocation which is not reused!fun getUser(): Flow<User> =userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())// DO USE shareIn or stateIn in a propertyval user: Flow<User> =userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())}
StateFlow使用总结
从ViewModel暴露数据到UI, 用
StateFlow
的两种方式:
- 暴露一个StateFlow属性, 用
WhileSubscribed
加上一个timeout.
class MyViewModel(...) : ViewModel() {val result = userId.mapLatest { newUserId ->repository.observeItem(newUserId)}.stateIn(scope = viewModelScope,started = WhileSubscribed(5000),initialValue = Result.Loading)}
- 用
repeatOnLifecycle
收集.
onCreateView(...) {viewLifecycleOwner.lifecycleScope.launch {viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {myViewModel.myUiState.collect { ... }}}}
其他的组合都会保持上游的活跃, 浪费资源:
- 用
WhileSubscribed
暴露属性, 在
lifecycleScope.launch/launchWhenX
里收集.
- 通过
Lazily/Eagerly
暴露, 用
repeatOnLifecycle
收集.
References
- Kotlin flows on Android
- StateFlow and SharedFlow
- A safer way to collect flows from Android UIs
- Things to know about Flow’s shareIn and stateIn operators
- Shared flows, broadcast channels
- Kotlin SharedFlow or: How I learned to stop using RxJava and love the Flow
- Migrating from LiveData to Kotlin’s Flow
- Substituting Android’s LiveData: StateFlow or SharedFlow?
- Learning State & Shared Flows with Unit Tests
- Reactive Streams on Kotlin: SharedFlow and StateFlow
- Reading Coroutine official guide thoroughly — Part 0