Flow是kotlin提供的一个工具,使用协程封装成生产者-消费者模式,上流来负责生产,下流来接收消耗。
一、Flow的使用
1.Flow的创建
1.可以使用flow构建函数构建一个Flow类型返回值的函数
2.flow{}构建体中可以调用挂起函数,即上流
3.上流使用emit函数发射值
4.下流使用collect函数收集值
//上流函数
fun simpleFlow() = flow {for (i in 1..3) {delay(100)emit(i)}
}fun main() {runBlocking {//下流接收数据simpleFlow().collect { value ->println(value)}println("finished")}
}
结果:
1
2
3
finished
2.Flow是冷流,所以collect是挂起函数,不是子协程,并且只有执行collect函数时,上流的代码才会被执行,所以在一个协程中多次调用collect,它们会按顺序执行
fun simpleFlow() = flow {for (i in 1..3) {delay(100)emit(i)}
}fun main() {runBlocking {simpleFlow().collect { value ->println(value)}println("collect1 finished")simpleFlow().collect { value ->println(value)}println("collect2 finished")}
}
结果:
1
2
3
collect1 finished
1
2
3
collect2 finished
3.Flow的连续性
Flow也支持函数式编程,并且从上流到下流的每个过渡操作符都会处理发射值,最终流入下流
fun main() {runBlocking {flow {for (i in 1..5) {delay(100)emit(i)}}.filter {it % 2 == 0 //只取偶数}.map {"String $it"}.collect {println(it)}}
}
结果:
String 2
String 4
4.Flow构建器
除了使用flow函数外,还有两种方式
1.flowOf函数
2.使用.asFlow()扩展函数,可以将各种集合与序列转为流
fun main() {runBlocking {val startTime = System.currentTimeMillis()flowOf(3, 5, 7).onEach { delay(100) }.collect {println("${System.currentTimeMillis() - startTime}ms $it")}(3..6).asFlow().collect { println(it) }}
}
结果:
131ms 3
239ms 5
350ms 7
3
4
5
6
5.collect为挂起函数,但是Flow也提供了flowOn函数方便我们指定上流是否使用子协程执行
fun main() {runBlocking {flow {println("flow :${Thread.currentThread().name}")for (i in 1..5) {delay(100)emit(i)}}.flowOn(Dispatchers.Default).collect {println("collect:${Thread.currentThread().name} $it")}}
}
结果:
flow :DefaultDispatcher-worker-1
collect:main 1
collect:main 2
collect:main 3
collect:main 4
collect:main 5
下流还是会使用主协程的上下文
6.除了使用子协程执行上流外,我们还可以使用launchIn函数来让Flow使用全新的协程上下文
fun main() {runBlocking {flow {println("flow :${Thread.currentThread().name}")for (i in 1..5) {delay(100)emit(i)}}.flowOn(Dispatchers.Default).onEach { println("collect:${Thread.currentThread().name} $it") }.launchIn(CoroutineScope(Dispatchers.IO)).join()//主线程等待这个协程执行结束}
}
结果:
flow :DefaultDispatcher-worker-1
collect:DefaultDispatcher-worker-1 1
collect:DefaultDispatcher-worker-1 2
collect:DefaultDispatcher-worker-1 3
collect:DefaultDispatcher-worker-1 4
collect:DefaultDispatcher-worker-1 5
7.Flow的取消
Flow的取消和协程的取消相同,流的收集是CPU密集型的,但是如果收集时有挂起函数,那么挂起函数可以抛出取消异常来中断执行
使用了新协程的情况,可以使用cancel:
fun main() {runBlocking {val flow = flow {println("flow :${Thread.currentThread().name}")for (i in 1..5) {delay(100)emit(i)}}.flowOn(Dispatchers.Default).onEach { println("collect:${Thread.currentThread().name} $it") }.launchIn(CoroutineScope(Dispatchers.IO))delay(200)flow.cancel()flow.join()}
}
使用timeout:
fun main() {runBlocking {withTimeoutOrNull(300){flow {println("flow :${Thread.currentThread().name}")for (i in 1..5) {delay(100)emit(i)}}.flowOn(Dispatchers.Default).collect { println("collect:${Thread.currentThread().name} $it") }}println("finished")}
}
8.Flow的取消检测
之前我们调用子协程的取消时,CPU密集型代码并不能结束运行,在不使用挂起函数的情况下,我们在子协程体中通过ensureActive函数来检测该协程是否被取消了
1.而Flow为了方便,Flow构建器会对每个发射值(emit函数)执行ensureActive函数来进行取消
fun main() {runBlocking {flow {for (i in 1..5) {emit(i)}}.collect {println("$it")if (it > 2)cancel()}println("finished")}
}
结果:
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@2833cc44
2.出于性能考虑,大多数其他流操作不会执行检测,此时我们可以使用cancellable函数来指定该Flow是可以取消的
fun main() {runBlocking {val flow = flowOf(1, 2, 3, 5).cancellable()//不指定,那么将不执行取消检测.collect {println("$it")if (it > 2)cancel()}println("finished")}
}
结果:
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@7c29daf3
9.背压
上流每次发射耗时1s,下流接收耗时3s,那么它们总共会耗时多久
fun main() {runBlocking {val flow = flow {for (i in 1..3) {delay(1000)emit(i)}}val time = measureTimeMillis {flow.collect {delay(3000)println("$it")}}println("time : $time ms")}
}
结果:
1
2
3
time : 12073 ms
可以看出,一般情况下,上下流执行是同步的
1.使用buff,来让上流不等待下流接收,而是发射到缓存区
fun main() {runBlocking {val flow = flow {for (i in 1..3) {delay(1000)emit(i)}}val time = measureTimeMillis {flow.buffer(50)//指定缓存区大小为50个.collect {delay(3000)println("$it")}}println("time : $time ms")}
}
结果:
1
2
3
time : 10158 ms
时间是1s + 3s * 3
2.指定上流协程
fun main() {runBlocking {val flow = flow {for (i in 1..3) {delay(1000)emit(i)}}val time = measureTimeMillis {flow.flowOn(Dispatchers.IO).collect {delay(3000)println("$it")}}println("time : $time ms")}
}
结果和1.是一样的
3.有时我们不需要一个不漏的接收上流的元素时,可以使用conflate,下流来不及处理的会被丢弃掉
fun main() {runBlocking {val flow = flow {for (i in 1..3) {delay(1000)emit(i)}}val time = measureTimeMillis {flow.conflate().collect {delay(3000)println("$it")}}println("time : $time ms")}
}
结果:
1
3
time : 7124 ms
4.collectLast可以只接收上流发射的最后一个元素
fun main() {runBlocking {val flow = flow {for (i in 1..3) {delay(1000)emit(i)}}val time = measureTimeMillis {flow.collectLatest {delay(3000)println("$it")}}println("time : $time ms")}
}
3
time : 6144 ms
二、操作符
上面我们也提到了Flow支持函数式编程,用法和之前学习的差不多
1.转换操作符
1.map函数
fun main() {runBlocking {flow {for (i in 1..3) {emit(i)}}.map {"String $it"}.collect {println(it)}}
}
结果:
String 1
String 2
String 3
2.transform函数,还可以将上流的一个变为多个发射出去
fun main() {runBlocking {flow {for (i in 1..3) {emit(i)}}.transform {emit("String1 $it")emit("String2 $it")}.collect {println(it)}}
}
结果:
String1 1
String2 1
String1 2
String2 2
String1 3
String2 3
2.限长操作符
take函数
fun main() {runBlocking {flow {for (i in 1..3) {emit(i)}}.take(2).collect {println(it)}}
}
结果:
1
2
3.末端操作符
末端操作符是用于启动流的挂起函数,collect是最基础的末端操作符,除此以外还有其他的
1.转化为各种集合,如:toList或toSet
2.获取第一个元素(first)与确保流只发射一个元素(single)
3.flod与reduce将流整合到一个值
flod函数
fun main() {runBlocking {val value = flow {for (i in 1..3) {emit(i)}}.map {it * it}.fold(0) { acc, value ->acc + value}print(value)}
}
结果:
14
4.组合操作符
zip函数
fun main() {runBlocking {val flow1 = flow {for (i in 1..3) {emit(i)}}val flow2 = flowOf("one", "two", "three")flow1.zip(flow2) { a, b ->"$a -> $b"}.collect { value ->println(value)}}
}
结果:
1 -> one
2 -> two
3 -> three
5.展平操作符
类似于集合的集合,流里也有可能有流,那么这个时候我们就需要使用展平操作符了
1.flatMapConcat
fun main() {runBlocking {val startTime = System.currentTimeMillis()flow {for (i in 1..3) {emit(i)}}.flatMapConcat {flow {emit("first $it")delay(500)emit("second $it")}}.collect {println("${System.currentTimeMillis() - startTime}ms $it")}}
}
结果:
52ms first 1
570ms second 1
571ms first 2
1074ms second 2
1074ms first 3
1577ms second 3
2.flatMapMerge
和flatMapConcat不同,flatMapConcat是按流函数体中顺序执行,而flatMapMerge中遇到发射函数时,会一次性执行上流的所有发射
fun main() {runBlocking {val startTime = System.currentTimeMillis()flow {for (i in 1..3) {emit(i)}}.flatMapMerge {flow {emit("first $it")delay(500)emit("second $it")}}.collect {println("${System.currentTimeMillis() - startTime}ms $it")}}
}
结果:
130ms first 1
130ms first 2
131ms first 3
632ms second 1
632ms second 2
632ms second 3
3.flatMapLatest
flatMapLatest中遇到第二个发射函数时,只会发射上流最后一次的元素
fun main() {runBlocking {val startTime = System.currentTimeMillis()flow {for (i in 1..3) {emit(i)}}.flatMapLatest {flow {emit("first $it")delay(500)emit("second $it")}}.collect {println("${System.currentTimeMillis() - startTime}ms $it")}}
}
结果:
298ms first 1
300ms first 2
301ms first 3
806ms second 3
三、Flow的异常处理
当运算符中的发射器或代码抛出异常,可以有两种方式处理
1.try catch
2.catch函数
1.try catch适用于收集时发生的异常
fun main() {runBlocking {val flow = flow {for (i in 1..3) {emit(i)}}try {flow.collect {println(it)throw RuntimeException()}} catch (e: Exception) {print("caught: $e")}}
}
2.虽然上流也可以使用try catch,但是更推荐catch函数
fun main() {runBlocking {val flow = flow {for (i in 1..3) {emit(i)throw RuntimeException()}}.catch { e ->print("caught1: $e")}.collect {println(it)}}
}
四、Flow的完成
有时候我们需要在Flow完成时,做一些其他事情,可以使用下面的方式
1.finally块
fun main() {runBlocking {try{val flow = flow {for (i in 1..3) {emit(i)}}.collect {println(it)}}finally {println("done") }}
}
2.onCompletion函数
fun main() {runBlocking {val flow = flow {for (i in 1..3) {emit(i)}}.onCompletion {println("done")}.collect {println(it)}}
}