Open
Description
I need to change sampling rate dynamically. Why I need this? In my case I need to send data with a period getting from backend server. Data is a callbackflow from android sensor. How can I make it?
Firstly I thought about onEach
, however it make delay on each element without throttling, so I tried to use mapLatest
but it can't be use too, because if I got delay about 100ms and emit time ~50ms, I got loop.
Finally I tried to wrtite such solution, however it still not fully working as expected (also I understand that this code is very bad). Also I need to create atomic value for each of my datasource flow. Is it possible to optimize my code or write it more correct?
fun main() = runBlocking {
val timeOfPrevEmmit = AtomicLong(System.currentTimeMillis())
val delayFlow = flow {
emit(200L)
delay(3000)
emit(2000L)
}
val generatingFLow = flow {
repeat(50) { currentNumber ->
emit(currentNumber)
delay(100)
}
}
launch {
val t: Flow<Int> = generatingFLow.combine(delayFlow) { generator, delay ->
generator to delay
}.transformLatest { (generator, delay) ->
val awaitTime = delay - (System.currentTimeMillis() - timeOfPrevEmmit.get())
delay(awaitTime)
println("delay: $delay")
emit(generator)
}
t.onEach {
timeOfPrevEmmit.set(System.currentTimeMillis())
}.collect {
println("value: $it")
}
}
delay(10000)
}
result:
delay: 200
value: 1
delay: 200
value: 3
delay: 200
delay: 200
value: 4
value: 5
delay: 200
value: 6
delay: 200
value: 8
delay: 200
value: 10
delay: 200
value: 12
delay: 200
value: 14
delay: 200
value: 16
delay: 200
delay: 200
value: 17
value: 18
delay: 200
value: 19
delay: 200
value: 21
delay: 200
value: 23
delay: 200
value: 25
delay: 200
value: 27
delay: 2000
value: 45
delay: 2000
value: 49