VoiceBroadcastPlayer - Fetch playlist in dedicated use case and improve player
This commit is contained in:
parent
174ba4f4cc
commit
3fcac097d3
@ -23,53 +23,42 @@ import im.vector.app.core.di.ActiveSessionHolder
|
|||||||
import im.vector.app.features.home.room.detail.timeline.helper.AudioMessagePlaybackTracker
|
import im.vector.app.features.home.room.detail.timeline.helper.AudioMessagePlaybackTracker
|
||||||
import im.vector.app.features.voice.VoiceFailure
|
import im.vector.app.features.voice.VoiceFailure
|
||||||
import im.vector.app.features.voicebroadcast.getVoiceBroadcastChunk
|
import im.vector.app.features.voicebroadcast.getVoiceBroadcastChunk
|
||||||
import im.vector.app.features.voicebroadcast.getVoiceBroadcastEventId
|
|
||||||
import im.vector.app.features.voicebroadcast.isVoiceBroadcast
|
|
||||||
import im.vector.app.features.voicebroadcast.listening.VoiceBroadcastPlayer.Listener
|
import im.vector.app.features.voicebroadcast.listening.VoiceBroadcastPlayer.Listener
|
||||||
import im.vector.app.features.voicebroadcast.listening.VoiceBroadcastPlayer.State
|
import im.vector.app.features.voicebroadcast.listening.VoiceBroadcastPlayer.State
|
||||||
|
import im.vector.app.features.voicebroadcast.listening.usecase.GetLiveVoiceBroadcastChunksUseCase
|
||||||
import im.vector.app.features.voicebroadcast.model.VoiceBroadcastState
|
import im.vector.app.features.voicebroadcast.model.VoiceBroadcastState
|
||||||
import im.vector.app.features.voicebroadcast.model.asVoiceBroadcastEvent
|
|
||||||
import im.vector.app.features.voicebroadcast.sequence
|
import im.vector.app.features.voicebroadcast.sequence
|
||||||
import im.vector.app.features.voicebroadcast.usecase.GetVoiceBroadcastUseCase
|
import im.vector.app.features.voicebroadcast.usecase.GetVoiceBroadcastUseCase
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.Job
|
import kotlinx.coroutines.Job
|
||||||
import kotlinx.coroutines.SupervisorJob
|
import kotlinx.coroutines.SupervisorJob
|
||||||
|
import kotlinx.coroutines.flow.launchIn
|
||||||
|
import kotlinx.coroutines.flow.onEach
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
import org.matrix.android.sdk.api.session.events.model.RelationType
|
|
||||||
import org.matrix.android.sdk.api.session.getRoom
|
|
||||||
import org.matrix.android.sdk.api.session.room.Room
|
|
||||||
import org.matrix.android.sdk.api.session.room.model.message.MessageAudioContent
|
import org.matrix.android.sdk.api.session.room.model.message.MessageAudioContent
|
||||||
import org.matrix.android.sdk.api.session.room.model.message.MessageAudioEvent
|
import org.matrix.android.sdk.api.session.room.model.message.MessageAudioEvent
|
||||||
import org.matrix.android.sdk.api.session.room.model.message.asMessageAudioEvent
|
|
||||||
import org.matrix.android.sdk.api.session.room.timeline.Timeline
|
|
||||||
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
|
|
||||||
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
|
|
||||||
import timber.log.Timber
|
import timber.log.Timber
|
||||||
import java.util.concurrent.CopyOnWriteArrayList
|
import java.util.concurrent.CopyOnWriteArrayList
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
import javax.inject.Singleton
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
class VoiceBroadcastPlayerImpl @Inject constructor(
|
class VoiceBroadcastPlayerImpl @Inject constructor(
|
||||||
private val sessionHolder: ActiveSessionHolder,
|
private val sessionHolder: ActiveSessionHolder,
|
||||||
private val playbackTracker: AudioMessagePlaybackTracker,
|
private val playbackTracker: AudioMessagePlaybackTracker,
|
||||||
private val getVoiceBroadcastUseCase: GetVoiceBroadcastUseCase,
|
private val getVoiceBroadcastUseCase: GetVoiceBroadcastUseCase,
|
||||||
|
private val getLiveVoiceBroadcastChunksUseCase: GetLiveVoiceBroadcastChunksUseCase
|
||||||
) : VoiceBroadcastPlayer {
|
) : VoiceBroadcastPlayer {
|
||||||
|
|
||||||
private val session
|
private val session
|
||||||
get() = sessionHolder.getActiveSession()
|
get() = sessionHolder.getActiveSession()
|
||||||
|
|
||||||
private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
|
private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
|
||||||
private var voiceBroadcastStateJob: Job? = null
|
private var voiceBroadcastStateJob: Job? = null
|
||||||
private var currentTimeline: Timeline? = null
|
|
||||||
set(value) {
|
|
||||||
field?.removeAllListeners()
|
|
||||||
field?.dispose()
|
|
||||||
field = value
|
|
||||||
}
|
|
||||||
|
|
||||||
private val mediaPlayerListener = MediaPlayerListener()
|
private val mediaPlayerListener = MediaPlayerListener()
|
||||||
private var timelineListener: TimelineListener? = null
|
|
||||||
|
|
||||||
private var currentMediaPlayer: MediaPlayer? = null
|
private var currentMediaPlayer: MediaPlayer? = null
|
||||||
private var nextMediaPlayer: MediaPlayer? = null
|
private var nextMediaPlayer: MediaPlayer? = null
|
||||||
@ -79,7 +68,10 @@ class VoiceBroadcastPlayerImpl @Inject constructor(
|
|||||||
}
|
}
|
||||||
private var currentSequence: Int? = null
|
private var currentSequence: Int? = null
|
||||||
|
|
||||||
|
private var fetchPlaylistJob: Job? = null
|
||||||
private var playlist = emptyList<MessageAudioEvent>()
|
private var playlist = emptyList<MessageAudioEvent>()
|
||||||
|
private var isLive: Boolean = false
|
||||||
|
|
||||||
override var currentVoiceBroadcastId: String? = null
|
override var currentVoiceBroadcastId: String? = null
|
||||||
|
|
||||||
override var playingState = State.IDLE
|
override var playingState = State.IDLE
|
||||||
@ -118,6 +110,7 @@ class VoiceBroadcastPlayerImpl @Inject constructor(
|
|||||||
// Stop playback
|
// Stop playback
|
||||||
currentMediaPlayer?.stop()
|
currentMediaPlayer?.stop()
|
||||||
currentVoiceBroadcastId?.let { playbackTracker.stopPlayback(it) }
|
currentVoiceBroadcastId?.let { playbackTracker.stopPlayback(it) }
|
||||||
|
isLive = false
|
||||||
|
|
||||||
// Release current player
|
// Release current player
|
||||||
release(currentMediaPlayer)
|
release(currentMediaPlayer)
|
||||||
@ -131,9 +124,9 @@ class VoiceBroadcastPlayerImpl @Inject constructor(
|
|||||||
voiceBroadcastStateJob?.cancel()
|
voiceBroadcastStateJob?.cancel()
|
||||||
voiceBroadcastStateJob = null
|
voiceBroadcastStateJob = null
|
||||||
|
|
||||||
// In case of live broadcast, stop observing new chunks
|
// Do not fetch the playlist anymore
|
||||||
currentTimeline = null
|
fetchPlaylistJob?.cancel()
|
||||||
timelineListener = null
|
fetchPlaylistJob = null
|
||||||
|
|
||||||
// Update state
|
// Update state
|
||||||
playingState = State.IDLE
|
playingState = State.IDLE
|
||||||
@ -141,13 +134,11 @@ class VoiceBroadcastPlayerImpl @Inject constructor(
|
|||||||
// Clear playlist
|
// Clear playlist
|
||||||
playlist = emptyList()
|
playlist = emptyList()
|
||||||
currentSequence = null
|
currentSequence = null
|
||||||
|
|
||||||
currentRoomId = null
|
currentRoomId = null
|
||||||
currentVoiceBroadcastId = null
|
currentVoiceBroadcastId = null
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Add a [Listener] to the given voice broadcast id.
|
|
||||||
*/
|
|
||||||
override fun addListener(voiceBroadcastId: String, listener: Listener) {
|
override fun addListener(voiceBroadcastId: String, listener: Listener) {
|
||||||
listeners[voiceBroadcastId]?.add(listener) ?: run {
|
listeners[voiceBroadcastId]?.add(listener) ?: run {
|
||||||
listeners[voiceBroadcastId] = CopyOnWriteArrayList<Listener>().apply { add(listener) }
|
listeners[voiceBroadcastId] = CopyOnWriteArrayList<Listener>().apply { add(listener) }
|
||||||
@ -155,15 +146,11 @@ class VoiceBroadcastPlayerImpl @Inject constructor(
|
|||||||
if (voiceBroadcastId == currentVoiceBroadcastId) listener.onStateChanged(playingState) else listener.onStateChanged(State.IDLE)
|
if (voiceBroadcastId == currentVoiceBroadcastId) listener.onStateChanged(playingState) else listener.onStateChanged(State.IDLE)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove a [Listener] from the given voice broadcast id.
|
|
||||||
*/
|
|
||||||
override fun removeListener(voiceBroadcastId: String, listener: Listener) {
|
override fun removeListener(voiceBroadcastId: String, listener: Listener) {
|
||||||
listeners[voiceBroadcastId]?.remove(listener)
|
listeners[voiceBroadcastId]?.remove(listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun startPlayback(roomId: String, eventId: String) {
|
private fun startPlayback(roomId: String, eventId: String) {
|
||||||
val room = session.getRoom(roomId) ?: error("Unknown roomId: $roomId")
|
|
||||||
// Stop listening previous voice broadcast if any
|
// Stop listening previous voice broadcast if any
|
||||||
if (playingState != State.IDLE) stop()
|
if (playingState != State.IDLE) stop()
|
||||||
|
|
||||||
@ -173,16 +160,11 @@ class VoiceBroadcastPlayerImpl @Inject constructor(
|
|||||||
playingState = State.BUFFERING
|
playingState = State.BUFFERING
|
||||||
|
|
||||||
val voiceBroadcastState = getVoiceBroadcastUseCase.execute(roomId, eventId)?.content?.voiceBroadcastState
|
val voiceBroadcastState = getVoiceBroadcastUseCase.execute(roomId, eventId)?.content?.voiceBroadcastState
|
||||||
if (voiceBroadcastState == VoiceBroadcastState.STOPPED) {
|
isLive = voiceBroadcastState != null && voiceBroadcastState != VoiceBroadcastState.STOPPED
|
||||||
// Get static playlist
|
observeIncomingEvents(roomId, eventId)
|
||||||
updatePlaylist(getExistingChunks(room, eventId))
|
|
||||||
startPlayback(false)
|
|
||||||
} else {
|
|
||||||
playLiveVoiceBroadcast(room, eventId)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun startPlayback(isLive: Boolean) {
|
private fun startPlayback() {
|
||||||
val event = if (isLive) playlist.lastOrNull() else playlist.firstOrNull()
|
val event = if (isLive) playlist.lastOrNull() else playlist.firstOrNull()
|
||||||
val content = event?.content ?: run { Timber.w("## VoiceBroadcastPlayer: No content to play"); return }
|
val content = event?.content ?: run { Timber.w("## VoiceBroadcastPlayer: No content to play"); return }
|
||||||
val sequence = event.getVoiceBroadcastChunk()?.sequence
|
val sequence = event.getVoiceBroadcastChunk()?.sequence
|
||||||
@ -201,24 +183,10 @@ class VoiceBroadcastPlayerImpl @Inject constructor(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun playLiveVoiceBroadcast(room: Room, eventId: String) {
|
private fun observeIncomingEvents(roomId: String, voiceBroadcastId: String) {
|
||||||
room.timelineService().getTimelineEvent(eventId)?.root?.asVoiceBroadcastEvent() ?: error("Cannot retrieve voice broadcast $eventId")
|
fetchPlaylistJob = getLiveVoiceBroadcastChunksUseCase.execute(roomId, voiceBroadcastId)
|
||||||
updatePlaylist(getExistingChunks(room, eventId))
|
.onEach(this::updatePlaylist)
|
||||||
startPlayback(true)
|
.launchIn(coroutineScope)
|
||||||
observeIncomingEvents(room, eventId)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun getExistingChunks(room: Room, eventId: String): List<MessageAudioEvent> {
|
|
||||||
return room.timelineService().getTimelineEventsRelatedTo(RelationType.REFERENCE, eventId)
|
|
||||||
.mapNotNull { it.root.asMessageAudioEvent() }
|
|
||||||
.filter { it.isVoiceBroadcast() }
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun observeIncomingEvents(room: Room, eventId: String) {
|
|
||||||
currentTimeline = room.timelineService().createTimeline(null, TimelineSettings(5)).also { timeline ->
|
|
||||||
timelineListener = TimelineListener(eventId).also { timeline.addListener(it) }
|
|
||||||
timeline.start()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun resumePlayback() {
|
private fun resumePlayback() {
|
||||||
@ -229,11 +197,32 @@ class VoiceBroadcastPlayerImpl @Inject constructor(
|
|||||||
|
|
||||||
private fun updatePlaylist(playlist: List<MessageAudioEvent>) {
|
private fun updatePlaylist(playlist: List<MessageAudioEvent>) {
|
||||||
this.playlist = playlist.sortedBy { it.getVoiceBroadcastChunk()?.sequence?.toLong() ?: it.root.originServerTs }
|
this.playlist = playlist.sortedBy { it.getVoiceBroadcastChunk()?.sequence?.toLong() ?: it.root.originServerTs }
|
||||||
|
onPlaylistUpdated()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun onPlaylistUpdated() {
|
||||||
|
when (playingState) {
|
||||||
|
State.PLAYING -> {
|
||||||
|
if (nextMediaPlayer == null) {
|
||||||
|
coroutineScope.launch { nextMediaPlayer = prepareNextMediaPlayer() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
State.PAUSED -> {
|
||||||
|
if (nextMediaPlayer == null) {
|
||||||
|
coroutineScope.launch { nextMediaPlayer = prepareNextMediaPlayer() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
State.BUFFERING -> {
|
||||||
|
val newMediaContent = getNextAudioContent()
|
||||||
|
if (newMediaContent != null) startPlayback()
|
||||||
|
}
|
||||||
|
State.IDLE -> startPlayback()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun getNextAudioContent(): MessageAudioContent? {
|
private fun getNextAudioContent(): MessageAudioContent? {
|
||||||
val nextSequence = currentSequence?.plus(1)
|
val nextSequence = currentSequence?.plus(1)
|
||||||
?: timelineListener?.let { playlist.lastOrNull()?.sequence }
|
?: playlist.lastOrNull()?.sequence
|
||||||
?: 1
|
?: 1
|
||||||
return playlist.find { it.getVoiceBroadcastChunk()?.sequence == nextSequence }?.content
|
return playlist.find { it.getVoiceBroadcastChunk()?.sequence == nextSequence }?.content
|
||||||
}
|
}
|
||||||
@ -279,37 +268,6 @@ class VoiceBroadcastPlayerImpl @Inject constructor(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private inner class TimelineListener(private val voiceBroadcastId: String) : Timeline.Listener {
|
|
||||||
override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
|
|
||||||
val currentSequences = playlist.map { it.sequence }
|
|
||||||
val newChunks = snapshot
|
|
||||||
.mapNotNull { timelineEvent ->
|
|
||||||
timelineEvent.root.asMessageAudioEvent()
|
|
||||||
?.takeIf { it.isVoiceBroadcast() && it.getVoiceBroadcastEventId() == voiceBroadcastId && it.sequence !in currentSequences }
|
|
||||||
}
|
|
||||||
if (newChunks.isEmpty()) return
|
|
||||||
updatePlaylist(playlist + newChunks)
|
|
||||||
|
|
||||||
when (playingState) {
|
|
||||||
State.PLAYING -> {
|
|
||||||
if (nextMediaPlayer == null) {
|
|
||||||
coroutineScope.launch { nextMediaPlayer = prepareNextMediaPlayer() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
State.PAUSED -> {
|
|
||||||
if (nextMediaPlayer == null) {
|
|
||||||
coroutineScope.launch { nextMediaPlayer = prepareNextMediaPlayer() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
State.BUFFERING -> {
|
|
||||||
val newMediaContent = getNextAudioContent()
|
|
||||||
if (newMediaContent != null) startPlayback(true)
|
|
||||||
}
|
|
||||||
State.IDLE -> startPlayback(true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private inner class MediaPlayerListener : MediaPlayer.OnInfoListener, MediaPlayer.OnCompletionListener, MediaPlayer.OnErrorListener {
|
private inner class MediaPlayerListener : MediaPlayer.OnInfoListener, MediaPlayer.OnCompletionListener, MediaPlayer.OnErrorListener {
|
||||||
|
|
||||||
override fun onInfo(mp: MediaPlayer, what: Int, extra: Int): Boolean {
|
override fun onInfo(mp: MediaPlayer, what: Int, extra: Int): Boolean {
|
||||||
@ -329,7 +287,7 @@ class VoiceBroadcastPlayerImpl @Inject constructor(
|
|||||||
val roomId = currentRoomId ?: return
|
val roomId = currentRoomId ?: return
|
||||||
val voiceBroadcastId = currentVoiceBroadcastId ?: return
|
val voiceBroadcastId = currentVoiceBroadcastId ?: return
|
||||||
val voiceBroadcastEventContent = getVoiceBroadcastUseCase.execute(roomId, voiceBroadcastId)?.content ?: return
|
val voiceBroadcastEventContent = getVoiceBroadcastUseCase.execute(roomId, voiceBroadcastId)?.content ?: return
|
||||||
val isLive = voiceBroadcastEventContent.voiceBroadcastState != null && voiceBroadcastEventContent.voiceBroadcastState != VoiceBroadcastState.STOPPED
|
isLive = voiceBroadcastEventContent.voiceBroadcastState != null && voiceBroadcastEventContent.voiceBroadcastState != VoiceBroadcastState.STOPPED
|
||||||
|
|
||||||
if (!isLive && voiceBroadcastEventContent.lastChunkSequence == currentSequence) {
|
if (!isLive && voiceBroadcastEventContent.lastChunkSequence == currentSequence) {
|
||||||
// We'll not receive new chunks anymore so we can stop the live listening
|
// We'll not receive new chunks anymore so we can stop the live listening
|
||||||
|
@ -0,0 +1,130 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 New Vector Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package im.vector.app.features.voicebroadcast.listening.usecase
|
||||||
|
|
||||||
|
import im.vector.app.core.di.ActiveSessionHolder
|
||||||
|
import im.vector.app.features.voicebroadcast.getVoiceBroadcastEventId
|
||||||
|
import im.vector.app.features.voicebroadcast.isVoiceBroadcast
|
||||||
|
import im.vector.app.features.voicebroadcast.model.VoiceBroadcastEvent
|
||||||
|
import im.vector.app.features.voicebroadcast.model.VoiceBroadcastState
|
||||||
|
import im.vector.app.features.voicebroadcast.model.asVoiceBroadcastEvent
|
||||||
|
import im.vector.app.features.voicebroadcast.sequence
|
||||||
|
import im.vector.app.features.voicebroadcast.usecase.GetVoiceBroadcastUseCase
|
||||||
|
import kotlinx.coroutines.channels.awaitClose
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.callbackFlow
|
||||||
|
import kotlinx.coroutines.flow.emptyFlow
|
||||||
|
import kotlinx.coroutines.flow.flowOf
|
||||||
|
import kotlinx.coroutines.flow.runningReduce
|
||||||
|
import org.matrix.android.sdk.api.session.events.model.RelationType
|
||||||
|
import org.matrix.android.sdk.api.session.room.model.message.MessageAudioEvent
|
||||||
|
import org.matrix.android.sdk.api.session.room.model.message.asMessageAudioEvent
|
||||||
|
import org.matrix.android.sdk.api.session.room.timeline.Timeline
|
||||||
|
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
|
||||||
|
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
|
||||||
|
import javax.inject.Inject
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a [Flow] of [MessageAudioEvent]s related to the given voice broadcast.
|
||||||
|
*/
|
||||||
|
class GetLiveVoiceBroadcastChunksUseCase @Inject constructor(
|
||||||
|
private val activeSessionHolder: ActiveSessionHolder,
|
||||||
|
private val getVoiceBroadcastUseCase: GetVoiceBroadcastUseCase,
|
||||||
|
) {
|
||||||
|
|
||||||
|
fun execute(roomId: String, voiceBroadcastId: String): Flow<List<MessageAudioEvent>> {
|
||||||
|
val session = activeSessionHolder.getSafeActiveSession() ?: return emptyFlow()
|
||||||
|
val room = session.roomService().getRoom(roomId) ?: return emptyFlow()
|
||||||
|
val timeline = room.timelineService().createTimeline(null, TimelineSettings(5))
|
||||||
|
|
||||||
|
// Get initial chunks
|
||||||
|
val existingChunks = room.timelineService().getTimelineEventsRelatedTo(RelationType.REFERENCE, voiceBroadcastId)
|
||||||
|
.mapNotNull { timelineEvent -> timelineEvent.root.asMessageAudioEvent().takeIf { it.isVoiceBroadcast() } }
|
||||||
|
|
||||||
|
val voiceBroadcastEvent = getVoiceBroadcastUseCase.execute(roomId, voiceBroadcastId)
|
||||||
|
val voiceBroadcastState = voiceBroadcastEvent?.content?.voiceBroadcastState
|
||||||
|
|
||||||
|
return if (voiceBroadcastState == null || voiceBroadcastState == VoiceBroadcastState.STOPPED) {
|
||||||
|
// Just send the existing chunks if voice broadcast is stopped
|
||||||
|
flowOf(existingChunks)
|
||||||
|
} else {
|
||||||
|
// Observe new timeline events if voice broadcast is ongoing
|
||||||
|
callbackFlow {
|
||||||
|
// Init with existing chunks
|
||||||
|
send(existingChunks)
|
||||||
|
|
||||||
|
// Observe new timeline events
|
||||||
|
val listener = object : Timeline.Listener {
|
||||||
|
private var lastEventId: String? = null
|
||||||
|
private var lastSequence: Int? = null
|
||||||
|
|
||||||
|
override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
|
||||||
|
val newEvents = lastEventId?.let { eventId -> snapshot.subList(0, snapshot.indexOfFirst { it.eventId == eventId }) } ?: snapshot
|
||||||
|
|
||||||
|
// Detect a potential stopped voice broadcast state event
|
||||||
|
val stopEvent = newEvents.findStopEvent()
|
||||||
|
if (stopEvent != null) {
|
||||||
|
lastSequence = stopEvent.content?.lastChunkSequence
|
||||||
|
}
|
||||||
|
|
||||||
|
val newChunks = newEvents.mapToChunkEvents(voiceBroadcastId, voiceBroadcastEvent.root.senderId)
|
||||||
|
|
||||||
|
// Notify about new chunks
|
||||||
|
if (newChunks.isNotEmpty()) {
|
||||||
|
trySend(newChunks)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Automatically stop observing the timeline if the last chunk has been received
|
||||||
|
if (lastSequence != null && newChunks.any { it.sequence == lastSequence }) {
|
||||||
|
timeline.removeListener(this)
|
||||||
|
timeline.dispose()
|
||||||
|
}
|
||||||
|
|
||||||
|
lastEventId = snapshot.firstOrNull()?.eventId
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
timeline.addListener(listener)
|
||||||
|
timeline.start()
|
||||||
|
awaitClose {
|
||||||
|
timeline.removeListener(listener)
|
||||||
|
timeline.dispose()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.runningReduce { accumulator: List<MessageAudioEvent>, value: List<MessageAudioEvent> -> accumulator.plus(value) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find a [VoiceBroadcastEvent] with a [VoiceBroadcastState.STOPPED] state.
|
||||||
|
*/
|
||||||
|
private fun List<TimelineEvent>.findStopEvent(): VoiceBroadcastEvent? =
|
||||||
|
this.mapNotNull { it.root.asVoiceBroadcastEvent() }
|
||||||
|
.find { it.content?.voiceBroadcastState == VoiceBroadcastState.STOPPED }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transform the list of [TimelineEvent] to a mapped list of [MessageAudioEvent] related to a given voice broadcast.
|
||||||
|
*/
|
||||||
|
private fun List<TimelineEvent>.mapToChunkEvents(voiceBroadcastId: String, senderId: String?): List<MessageAudioEvent> =
|
||||||
|
this.mapNotNull { timelineEvent ->
|
||||||
|
timelineEvent.root.asMessageAudioEvent()
|
||||||
|
?.takeIf {
|
||||||
|
it.isVoiceBroadcast() && it.getVoiceBroadcastEventId() == voiceBroadcastId &&
|
||||||
|
it.root.senderId == senderId
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user