Compare commits

...

2 Commits

Author SHA1 Message Date
ganfra
c74a56517b Change after PR review 2022-06-13 11:11:26 +02:00
ganfra
3f55f279d7 Fix EventInsertLiveObserver gets blocked by reverting and adding lock instead 2022-06-10 12:58:41 +02:00
2 changed files with 47 additions and 54 deletions

1
changelog.d/6278.bugfix Normal file
View File

@ -0,0 +1 @@
Fix regression on EventInsertLiveObserver getting blocked so there is no event being processed anymore.

View File

@ -19,10 +19,9 @@ package org.matrix.android.sdk.internal.database
import com.zhuinden.monarchy.Monarchy
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.matrix.android.sdk.internal.database.mapper.asDomain
import org.matrix.android.sdk.internal.database.model.EventEntity
import org.matrix.android.sdk.internal.database.model.EventInsertEntity
@ -39,26 +38,18 @@ internal class EventInsertLiveObserver @Inject constructor(
) :
RealmLiveEntityObserver<EventInsertEntity>(realmConfiguration) {
private val lock = Mutex()
override val query = Monarchy.Query {
it.where(EventInsertEntity::class.java).equalTo(EventInsertEntityFields.CAN_BE_PROCESSED, true)
}
private val onResultsChangedFlow = MutableSharedFlow<RealmResults<EventInsertEntity>>()
init {
onResultsChangedFlow
.onEach { handleChange(it) }
.launchIn(observerScope)
}
override fun onChange(results: RealmResults<EventInsertEntity>) {
observerScope.launch {
lock.withLock {
if (!results.isLoaded || results.isEmpty()) {
return
return@withLock
}
observerScope.launch { onResultsChangedFlow.emit(results) }
}
private suspend fun handleChange(results: RealmResults<EventInsertEntity>) {
val idsToDeleteAfterProcess = ArrayList<String>()
val filteredEvents = ArrayList<EventInsertEntity>(results.size)
Timber.v("EventInsertEntity updated with ${results.size} results in db")
@ -75,7 +66,6 @@ internal class EventInsertLiveObserver @Inject constructor(
}
idsToDeleteAfterProcess.add(it.eventId)
}
awaitTransaction(realmConfiguration) { realm ->
Timber.v("##Transaction: There are ${filteredEvents.size} events to process ")
filteredEvents.forEach { eventInsert ->
@ -99,6 +89,8 @@ internal class EventInsertLiveObserver @Inject constructor(
}
processors.forEach { it.onPostProcess() }
}
}
}
private fun shouldProcess(eventInsertEntity: EventInsertEntity): Boolean {
return processors.any {