您的位置:

Kotlin Flow 详解

一、Kotlin Flow 简介

Kotlin Flow 是 Kotlin 团队推出的异步编程库,旨在让异步编程变得更加自然和优美。

与传统的 RxJava 操作符相比,Kotlin Flow 的 pipeline 会在每一个发射项(flow item)处执行,也就是遇到下一个 item 才会再次执行 pipe 中的代码,这种机制更适合逐条数据处理的场景。

Kotlin Flow 同时也支持背压(backpressure),意味着我们可以完美地处理快速发射数据源的场景。

二、Kotlin Flow 间隔收集

在 Kotlin Flow 中,通过编写一条流水线(pipe)来对数据进行处理。其中,间隔收集(每隔一定时间间隔打印一个 item)是常用的操作之一。

    fun main() = runBlocking {
        (1..3).asFlow() // 模拟发射流数据源
            .onEach { delay(100) } // 间隔100ms发射一次
            .collect { println(it) } // 打印发射出的数据
    }

  

上述代码中,我们通过 `asFlow()` 将一个集合转换为流数据源,然后通过 `onEach` 设置数据的发射间隔为 100 ms,最后通过 `collect` 收集并打印发射出的数据。

三、Kotlin Flow Retrofit

Kotlin Flow 在网络请求框架 Retrofit 中的使用也相当简单,只需要在 API 接口方法中返回 Flow 类型即可。

    interface ApiService {
        @GET("api/v1/courses")
        fun getList(): Flow // 返回 Flow 类型
    }
    
    fun main() = runBlocking {
        val retrofit = Retrofit.Builder()
            .baseUrl(BASE_URL)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(FlowCallAdapterFactory.create()) // 使用 FlowCallAdapterFactory
            .build()

        val service = retrofit.create(ApiService::class.java)
        service.getList()
            .flowOn(Dispatchers.IO) // 在 IO 线程执行网络请求操作
            .collect {
                // 处理返回的数据
            }
    }

  

我们在 API 接口方法中返回 Flow 类型,并在构建 Retrofit 实例时添加 FlowCallAdapterFactory,然后就可以在 collect 之前使用 `flowOn` 进行线程切换和操作。

四、Kotlin Flow 手动刷新

当我们需要在用户手动刷新操作时请求网络并更新 UI 时,可以使用 Kotlin Flow 的 `channelFlow` 和 `debounce` 实现手动刷新功能。

    private var refreshChannel: ConflatedBroadcastChannel? = null

    fun refresh() {
        if (refreshChannel == null) {
            refreshChannel = ConflatedBroadcastChannel(Unit)
        }
        refreshChannel?.offer(Unit)
    }

    private fun observeRefreshChannel() = refreshChannel?.asFlow()
        ?.debounce(200)    // 设置间隔时间防止频繁请求网络
        ?.flatMapLatest {  // flatMapLatest 保证最近的一次请求是我们需要的
            fetchList()
        }
        ?.flowOn(Dispatchers.IO)
        ?.catch { // 捕获异常并重新发布 Channel 供下一次操作
            refreshChannel?.offer(Unit)
        }

    // 从网络获取列表数据
    private fun fetchList() = flow {
        // 发起网络请求获取数据
        emit(/* 网络请求返回的数据 */)
    }

  

在这段代码中,我们首先定义一个 BroadcastChannel 用于触发刷新操作,并且添加一个 `debounce` 操作符来限制发送刷新信号的频率。当 BroadcastChannel 发送刷新信号时,我们通过 `flatMapLatest` 操作符来获取最新的网络请求并进行数据处理。

五、Kotlin Flow 在协程中的应用

Kotlin Flow 与协程(coroutine)紧密结合,如果我们需要在非界面线程中发起异步操作并更新界面,就要用到 Flow 的流水线(pipe)机制。

    fun fetchData(): Flow = flow {
        emit(DataResult.Loading)  // 发布加载中状态

        val resultFromApi = /* 网络请求返回的结果 */
        if (resultFromApi.isSuccessful) {
            emit(DataResult.Success(resultFromApi.body()!!))  // 发布成功获取到的数据
        } else {
            emit(DataResult.Failure(resultFromApi.message()))  // 发布错误信息
        }

        emit(DataResult.Done)  // 发布完成状态
    }
    
    class MyViewModel : ViewModel() {
        fun loadData() {
            viewModelScope.launch {
                fetchData()
                    .flowOn(Dispatchers.IO)
                    .collect { result ->
                        // 处理返回的数据状态
                    }
            }
        }
    }

  

在这段代码中,我们在 `fetchData()` 方法中返回一个 Flow 类型的数据源,并在协程中通过 `collect` 来接收流的下一个 item。在 `collect` 内部,我们可以对 item 进行处理或者更新 UI。