Fix EventInsertLiveObserver gets blocked by reverting and adding lock instead
This commit is contained in:
parent
65bb1a7ddc
commit
c384a3de8d
1
changelog.d/6278.bugfix
Normal file
1
changelog.d/6278.bugfix
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fix regression on EventInsertLiveObserver getting blocked so there is no event being processed anymore.
|
@ -19,10 +19,9 @@ package org.matrix.android.sdk.internal.database
|
|||||||
import com.zhuinden.monarchy.Monarchy
|
import com.zhuinden.monarchy.Monarchy
|
||||||
import io.realm.RealmConfiguration
|
import io.realm.RealmConfiguration
|
||||||
import io.realm.RealmResults
|
import io.realm.RealmResults
|
||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
|
||||||
import kotlinx.coroutines.flow.launchIn
|
|
||||||
import kotlinx.coroutines.flow.onEach
|
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
|
import kotlinx.coroutines.sync.Mutex
|
||||||
|
import kotlinx.coroutines.sync.withLock
|
||||||
import org.matrix.android.sdk.internal.database.mapper.asDomain
|
import org.matrix.android.sdk.internal.database.mapper.asDomain
|
||||||
import org.matrix.android.sdk.internal.database.model.EventEntity
|
import org.matrix.android.sdk.internal.database.model.EventEntity
|
||||||
import org.matrix.android.sdk.internal.database.model.EventInsertEntity
|
import org.matrix.android.sdk.internal.database.model.EventInsertEntity
|
||||||
@ -37,65 +36,58 @@ internal class EventInsertLiveObserver @Inject constructor(@SessionDatabase real
|
|||||||
private val processors: Set<@JvmSuppressWildcards EventInsertLiveProcessor>) :
|
private val processors: Set<@JvmSuppressWildcards EventInsertLiveProcessor>) :
|
||||||
RealmLiveEntityObserver<EventInsertEntity>(realmConfiguration) {
|
RealmLiveEntityObserver<EventInsertEntity>(realmConfiguration) {
|
||||||
|
|
||||||
|
private val lock = Mutex()
|
||||||
|
|
||||||
override val query = Monarchy.Query {
|
override val query = Monarchy.Query {
|
||||||
it.where(EventInsertEntity::class.java).equalTo(EventInsertEntityFields.CAN_BE_PROCESSED, true)
|
it.where(EventInsertEntity::class.java).equalTo(EventInsertEntityFields.CAN_BE_PROCESSED, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
private val onResultsChangedFlow = MutableSharedFlow<RealmResults<EventInsertEntity>>()
|
|
||||||
|
|
||||||
init {
|
|
||||||
onResultsChangedFlow
|
|
||||||
.onEach { handleChange(it) }
|
|
||||||
.launchIn(observerScope)
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun onChange(results: RealmResults<EventInsertEntity>) {
|
override fun onChange(results: RealmResults<EventInsertEntity>) {
|
||||||
if (!results.isLoaded || results.isEmpty()) {
|
observerScope.launch {
|
||||||
return
|
lock.withLock {
|
||||||
}
|
if (!results.isLoaded || results.isEmpty()) {
|
||||||
observerScope.launch { onResultsChangedFlow.emit(results) }
|
return@launch
|
||||||
}
|
|
||||||
|
|
||||||
private suspend fun handleChange(results: RealmResults<EventInsertEntity>) {
|
|
||||||
val idsToDeleteAfterProcess = ArrayList<String>()
|
|
||||||
val filteredEvents = ArrayList<EventInsertEntity>(results.size)
|
|
||||||
Timber.v("EventInsertEntity updated with ${results.size} results in db")
|
|
||||||
results.forEach {
|
|
||||||
if (shouldProcess(it)) {
|
|
||||||
// don't use copy from realm over there
|
|
||||||
val copiedEvent = EventInsertEntity(
|
|
||||||
eventId = it.eventId,
|
|
||||||
eventType = it.eventType
|
|
||||||
).apply {
|
|
||||||
insertType = it.insertType
|
|
||||||
}
|
}
|
||||||
filteredEvents.add(copiedEvent)
|
val idsToDeleteAfterProcess = ArrayList<String>()
|
||||||
|
val filteredEvents = ArrayList<EventInsertEntity>(results.size)
|
||||||
|
Timber.v("EventInsertEntity updated with ${results.size} results in db")
|
||||||
|
results.forEach {
|
||||||
|
if (shouldProcess(it)) {
|
||||||
|
// don't use copy from realm over there
|
||||||
|
val copiedEvent = EventInsertEntity(
|
||||||
|
eventId = it.eventId,
|
||||||
|
eventType = it.eventType
|
||||||
|
).apply {
|
||||||
|
insertType = it.insertType
|
||||||
|
}
|
||||||
|
filteredEvents.add(copiedEvent)
|
||||||
|
}
|
||||||
|
idsToDeleteAfterProcess.add(it.eventId)
|
||||||
|
}
|
||||||
|
awaitTransaction(realmConfiguration) { realm ->
|
||||||
|
Timber.v("##Transaction: There are ${filteredEvents.size} events to process ")
|
||||||
|
filteredEvents.forEach { eventInsert ->
|
||||||
|
val eventId = eventInsert.eventId
|
||||||
|
val event = EventEntity.where(realm, eventId).findFirst()
|
||||||
|
if (event == null) {
|
||||||
|
Timber.v("Event $eventId not found")
|
||||||
|
return@forEach
|
||||||
|
}
|
||||||
|
val domainEvent = event.asDomain()
|
||||||
|
processors.filter {
|
||||||
|
it.shouldProcess(eventId, domainEvent.getClearType(), eventInsert.insertType)
|
||||||
|
}.forEach {
|
||||||
|
it.process(realm, domainEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
realm.where(EventInsertEntity::class.java)
|
||||||
|
.`in`(EventInsertEntityFields.EVENT_ID, idsToDeleteAfterProcess.toTypedArray())
|
||||||
|
.findAll()
|
||||||
|
.deleteAllFromRealm()
|
||||||
|
}
|
||||||
|
processors.forEach { it.onPostProcess() }
|
||||||
}
|
}
|
||||||
idsToDeleteAfterProcess.add(it.eventId)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
awaitTransaction(realmConfiguration) { realm ->
|
|
||||||
Timber.v("##Transaction: There are ${filteredEvents.size} events to process ")
|
|
||||||
filteredEvents.forEach { eventInsert ->
|
|
||||||
val eventId = eventInsert.eventId
|
|
||||||
val event = EventEntity.where(realm, eventId).findFirst()
|
|
||||||
if (event == null) {
|
|
||||||
Timber.v("Event $eventId not found")
|
|
||||||
return@forEach
|
|
||||||
}
|
|
||||||
val domainEvent = event.asDomain()
|
|
||||||
processors.filter {
|
|
||||||
it.shouldProcess(eventId, domainEvent.getClearType(), eventInsert.insertType)
|
|
||||||
}.forEach {
|
|
||||||
it.process(realm, domainEvent)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
realm.where(EventInsertEntity::class.java)
|
|
||||||
.`in`(EventInsertEntityFields.EVENT_ID, idsToDeleteAfterProcess.toTypedArray())
|
|
||||||
.findAll()
|
|
||||||
.deleteAllFromRealm()
|
|
||||||
}
|
|
||||||
processors.forEach { it.onPostProcess() }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun shouldProcess(eventInsertEntity: EventInsertEntity): Boolean {
|
private fun shouldProcess(eventInsertEntity: EventInsertEntity): Boolean {
|
||||||
|
Loading…
Reference in New Issue
Block a user