一、Kotlin Flow 简介
Kotlin Flow 是 Kotlin 团队推出的异步编程库,旨在让异步编程变得更加自然和优美。
与传统的 RxJava 操作符相比,Kotlin Flow 的 pipeline 会在每一个发射项(flow item)处执行,也就是遇到下一个 item 才会再次执行 pipe 中的代码,这种机制更适合逐条数据处理的场景。
Kotlin Flow 同时也支持背压(backpressure),意味着我们可以完美地处理快速发射数据源的场景。
二、Kotlin Flow 间隔收集
在 Kotlin Flow 中,通过编写一条流水线(pipe)来对数据进行处理。其中,间隔收集(每隔一定时间间隔打印一个 item)是常用的操作之一。
fun main() = runBlocking{ (1..3).asFlow() // 模拟发射流数据源 .onEach { delay(100) } // 间隔100ms发射一次 .collect { println(it) } // 打印发射出的数据 }
上述代码中,我们通过 `asFlow()` 将一个集合转换为流数据源,然后通过 `onEach` 设置数据的发射间隔为 100 ms,最后通过 `collect` 收集并打印发射出的数据。
三、Kotlin Flow Retrofit
Kotlin Flow 在网络请求框架 Retrofit 中的使用也相当简单,只需要在 API 接口方法中返回 Flow 类型即可。
interface ApiService { @GET("api/v1/courses") fun getList(): Flow// 返回 Flow 类型 } fun main() = runBlocking { val retrofit = Retrofit.Builder() .baseUrl(BASE_URL) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(FlowCallAdapterFactory.create()) // 使用 FlowCallAdapterFactory .build() val service = retrofit.create(ApiService::class.java) service.getList() .flowOn(Dispatchers.IO) // 在 IO 线程执行网络请求操作 .collect { // 处理返回的数据 } }
我们在 API 接口方法中返回 Flow 类型,并在构建 Retrofit 实例时添加 FlowCallAdapterFactory,然后就可以在 collect 之前使用 `flowOn` 进行线程切换和操作。
四、Kotlin Flow 手动刷新
当我们需要在用户手动刷新操作时请求网络并更新 UI 时,可以使用 Kotlin Flow 的 `channelFlow` 和 `debounce` 实现手动刷新功能。
private var refreshChannel: ConflatedBroadcastChannel? = null fun refresh() { if (refreshChannel == null) { refreshChannel = ConflatedBroadcastChannel(Unit) } refreshChannel?.offer(Unit) } private fun observeRefreshChannel() = refreshChannel?.asFlow() ?.debounce(200) // 设置间隔时间防止频繁请求网络 ?.flatMapLatest { // flatMapLatest 保证最近的一次请求是我们需要的 fetchList() } ?.flowOn(Dispatchers.IO) ?.catch { // 捕获异常并重新发布 Channel 供下一次操作 refreshChannel?.offer(Unit) } // 从网络获取列表数据 private fun fetchList() = flow { // 发起网络请求获取数据 emit(/* 网络请求返回的数据 */) }
在这段代码中,我们首先定义一个 BroadcastChannel 用于触发刷新操作,并且添加一个 `debounce` 操作符来限制发送刷新信号的频率。当 BroadcastChannel 发送刷新信号时,我们通过 `flatMapLatest` 操作符来获取最新的网络请求并进行数据处理。
五、Kotlin Flow 在协程中的应用
Kotlin Flow 与协程(coroutine)紧密结合,如果我们需要在非界面线程中发起异步操作并更新界面,就要用到 Flow 的流水线(pipe)机制。
fun fetchData(): Flow= flow { emit(DataResult.Loading) // 发布加载中状态 val resultFromApi = /* 网络请求返回的结果 */ if (resultFromApi.isSuccessful) { emit(DataResult.Success(resultFromApi.body()!!)) // 发布成功获取到的数据 } else { emit(DataResult.Failure(resultFromApi.message())) // 发布错误信息 } emit(DataResult.Done) // 发布完成状态 } class MyViewModel : ViewModel() { fun loadData() { viewModelScope.launch { fetchData() .flowOn(Dispatchers.IO) .collect { result -> // 处理返回的数据状态 } } } }
在这段代码中,我们在 `fetchData()` 方法中返回一个 Flow 类型的数据源,并在协程中通过 `collect` 来接收流的下一个 item。在 `collect` 内部,我们可以对 item 进行处理或者更新 UI。