Sync/pagination: make it possible to have multiple chunks in the db

This commit is contained in:
ganfra 2019-12-05 19:24:32 +01:00
parent 1c4cef9115
commit 1aa65dad7f
17 changed files with 153 additions and 259 deletions

View File

@ -19,37 +19,40 @@ package im.vector.matrix.android.internal.database.helper
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.room.send.SendState
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.mapper.toEntity
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.EventAnnotationsSummaryEntity
import im.vector.matrix.android.internal.database.model.ReadReceiptEntity
import im.vector.matrix.android.internal.database.model.ReadReceiptsSummaryEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntityFields
import im.vector.matrix.android.internal.database.query.fastContains
import im.vector.matrix.android.internal.database.query.find
import im.vector.matrix.android.internal.database.query.getOrCreate
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.extensions.assertIsManaged
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
import io.realm.Realm
import io.realm.Sort
// By default if a chunk is empty we consider it unlinked
internal fun ChunkEntity.isUnlinked(): Boolean {
assertIsManaged()
return timelineEvents.where()
.equalTo(TimelineEventEntityFields.ROOT.IS_UNLINKED, false)
.findAll()
.isEmpty()
}
internal fun ChunkEntity.deleteOnCascade() {
assertIsManaged()
this.stateEvents.deleteAllFromRealm()
this.timelineEvents.deleteAllFromRealm()
this.deleteFromRealm()
}
internal fun ChunkEntity.addStateEvent(stateEvent: Event) {
if (stateEvent.eventId == null || stateEvents.fastContains(stateEvent.eventId)) {
return
} else {
val entity = stateEvent.toEntity(roomId).apply {
this.stateIndex = Int.MIN_VALUE
this.isUnlinked = true
this.sendState = SendState.SYNCED
}
stateEvents.add(entity)
}
}
internal fun ChunkEntity.add(localRealm: Realm,
roomId: String,
@ -57,7 +60,7 @@ internal fun ChunkEntity.add(localRealm: Realm,
direction: PaginationDirection,
stateIndexOffset: Int = 0,
isUnlinked: Boolean = false) {
if (event.eventId == null) {
if (event.eventId == null || timelineEvents.fastContains(event.eventId)) {
return
}
var currentDisplayIndex = lastDisplayIndex(direction, 0)
@ -84,7 +87,7 @@ internal fun ChunkEntity.add(localRealm: Realm,
val senderId = event.senderId ?: ""
val readReceiptsSummaryEntity = ReadReceiptsSummaryEntity.where(localRealm, eventId).findFirst()
?: ReadReceiptsSummaryEntity(eventId, roomId)
?: ReadReceiptsSummaryEntity(eventId, roomId)
// Update RR for the sender of a new message with a dummy one

View File

@ -33,15 +33,13 @@ internal fun RoomEntity.addOrUpdate(chunkEntity: ChunkEntity) {
}
internal fun RoomEntity.addStateEvent(stateEvent: Event,
stateIndex: Int = Int.MIN_VALUE,
filterDuplicates: Boolean = false,
isUnlinked: Boolean = false) {
if (stateEvent.eventId == null || (filterDuplicates && untimelinedStateEvents.fastContains(stateEvent.eventId))) {
stateIndex: Int = Int.MIN_VALUE) {
if (stateEvent.eventId == null || untimelinedStateEvents.fastContains(stateEvent.eventId)) {
return
} else {
val entity = stateEvent.toEntity(roomId).apply {
this.stateIndex = stateIndex
this.isUnlinked = isUnlinked
this.isUnlinked = false
this.sendState = SendState.SYNCED
}
untimelinedStateEvents.add(entity)

View File

@ -32,30 +32,36 @@ internal fun TimelineEventEntity.updateSenderData(realm: Realm, chunkEntity: Chu
val roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst() ?: return
val stateIndex = root?.stateIndex ?: return
val senderId = root?.sender ?: return
val isUnlinked = chunkEntity.isUnlinked()
var senderMembershipEvent: EventEntity?
var senderRoomMemberContent: String?
var senderRoomMemberPrevContent: String?
when {
stateIndex <= 0 -> {
senderMembershipEvent = chunkEntity.timelineEvents.buildQuery(senderId, isUnlinked).next(from = stateIndex)?.root
senderMembershipEvent = chunkEntity.timelineEvents.buildTimelineEventQuery(senderId).next(from = stateIndex)?.root
senderRoomMemberContent = senderMembershipEvent?.prevContent
senderRoomMemberPrevContent = senderMembershipEvent?.content
}
else -> {
senderMembershipEvent = chunkEntity.timelineEvents.buildQuery(senderId, isUnlinked).prev(since = stateIndex)?.root
senderMembershipEvent = chunkEntity.timelineEvents.buildTimelineEventQuery(senderId).prev(since = stateIndex)?.root
senderRoomMemberContent = senderMembershipEvent?.content
senderRoomMemberPrevContent = senderMembershipEvent?.prevContent
}
}
// We fallback to untimelinedStateEvents if we can't find membership events in timeline
// We fallback to chunk stateEvents if we can't find membership events in timeline
if (senderMembershipEvent == null) {
senderMembershipEvent = chunkEntity.stateEvents
.buildStateEventQuery(senderId)
.prev()
senderRoomMemberContent = senderMembershipEvent?.content
senderRoomMemberPrevContent = senderMembershipEvent?.prevContent
}
// We fallback to room stateEvents if we can't find membership events in timeline and chunk
if (senderMembershipEvent == null) {
senderMembershipEvent = roomEntity.untimelinedStateEvents
.where()
.equalTo(EventEntityFields.STATE_KEY, senderId)
.equalTo(EventEntityFields.TYPE, EventType.STATE_ROOM_MEMBER)
.prev(since = stateIndex)
.buildStateEventQuery(senderId)
.prev()
senderRoomMemberContent = senderMembershipEvent?.content
senderRoomMemberPrevContent = senderMembershipEvent?.prevContent
}
@ -88,9 +94,15 @@ internal fun TimelineEventEntity.Companion.nextId(realm: Realm): Long {
}
}
private fun RealmList<TimelineEventEntity>.buildQuery(sender: String, isUnlinked: Boolean): RealmQuery<TimelineEventEntity> {
private fun RealmList<EventEntity>.buildStateEventQuery(sender: String): RealmQuery<EventEntity> {
return where()
.equalTo(EventEntityFields.STATE_KEY, sender)
.equalTo(EventEntityFields.TYPE, EventType.STATE_ROOM_MEMBER)
}
private fun RealmList<TimelineEventEntity>.buildTimelineEventQuery(sender: String): RealmQuery<TimelineEventEntity> {
return where()
.equalTo(TimelineEventEntityFields.ROOT.STATE_KEY, sender)
.equalTo(TimelineEventEntityFields.ROOT.TYPE, EventType.STATE_ROOM_MEMBER)
.equalTo(TimelineEventEntityFields.ROOT.IS_UNLINKED, isUnlinked)
}

View File

@ -26,6 +26,8 @@ internal open class ChunkEntity(@Index var prevToken: String? = null,
@Index var nextToken: String? = null,
@Index var roomId: String = "",
var timelineEvents: RealmList<TimelineEventEntity> = RealmList(),
// These are state events for chunks other than the live one (isLastForward=true)
var stateEvents: RealmList<EventEntity> = RealmList(),
@Index var isLastForward: Boolean = false,
@Index var isLastBackward: Boolean = false,
var backwardsDisplayIndex: Int? = null,

View File

@ -23,6 +23,7 @@ import io.realm.annotations.PrimaryKey
internal open class RoomEntity(@PrimaryKey var roomId: String = "",
var chunks: RealmList<ChunkEntity> = RealmList(),
// These are live state events coming from sync only
var untimelinedStateEvents: RealmList<EventEntity> = RealmList(),
var sendingTimelineEvents: RealmList<TimelineEventEntity> = RealmList(),
var areAllMembersLoaded: Boolean = false

View File

@ -1,38 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.database.model
import im.vector.matrix.android.api.session.room.model.Membership
import io.realm.RealmObject
import io.realm.annotations.PrimaryKey
internal open class RoomStateMemberEntity(@PrimaryKey var primaryKey: String = "",
var displayName: String? = null,
var avatarUrl: String? = null
) : RealmObject() {
private var membershipStr: String = Membership.NONE.name
var membership: Membership
get() {
return Membership.valueOf(membershipStr)
}
set(value) {
membershipStr = value.name
}
companion object
}

View File

@ -115,6 +115,10 @@ internal fun RealmList<TimelineEventEntity>.find(eventId: String): TimelineEvent
.findFirst()
}
internal fun RealmList<TimelineEventEntity>.fastContains(eventId: String): Boolean {
return find(eventId) != null
}
internal fun TimelineEventEntity.Companion.findAllInRoomWithSendStates(realm: Realm,
roomId: String,
sendStates: List<SendState>)

View File

@ -122,9 +122,6 @@ internal abstract class RoomModule {
@Binds
abstract fun bindGetContextOfEventTask(getContextOfEventTask: DefaultGetContextOfEventTask): GetContextOfEventTask
@Binds
abstract fun bindClearUnlinkedEventsTask(clearUnlinkedEventsTask: DefaultClearUnlinkedEventsTask): ClearUnlinkedEventsTask
@Binds
abstract fun bindPaginationTask(paginationTask: DefaultPaginationTask): PaginationTask

View File

@ -19,7 +19,6 @@ package im.vector.matrix.android.internal.session.room.membership
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.internal.database.helper.addStateEvent
import im.vector.matrix.android.internal.database.helper.updateSenderData
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.network.executeRequest

View File

@ -1,48 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.state
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.RoomAvatarContent
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.query.getOrCreate
import io.realm.Realm
/**
* This class is responsible for mutating the "room state" entities, so
*/
internal class RoomStateMutator(private val realm: Realm, private val roomId: String) {
fun mutate(event: Event): Boolean {
return when (event.type) {
EventType.STATE_ROOM_AVATAR -> mutateRoomAvatar(event.content?.toModel())
else -> false
}
}
private fun mutateRoomAvatar(model: RoomAvatarContent?): Boolean {
if (model == null) {
return false
}
val roomSummaryEntity = RoomSummaryEntity.getOrCreate(realm, roomId)
roomSummaryEntity.avatarUrl = model.avatarUrl
return true
}
}

View File

@ -1,52 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.timeline
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.internal.database.helper.deleteOnCascade
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
import im.vector.matrix.android.internal.database.model.EventEntityFields
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.util.awaitTransaction
import javax.inject.Inject
internal interface ClearUnlinkedEventsTask : Task<ClearUnlinkedEventsTask.Params, Unit> {
data class Params(val roomId: String)
}
internal class DefaultClearUnlinkedEventsTask @Inject constructor(private val monarchy: Monarchy) : ClearUnlinkedEventsTask {
override suspend fun execute(params: ClearUnlinkedEventsTask.Params) {
monarchy.awaitTransaction { localRealm ->
val unlinkedChunks = ChunkEntity
.where(localRealm, roomId = params.roomId)
.equalTo("${ChunkEntityFields.TIMELINE_EVENTS.ROOT}.${EventEntityFields.IS_UNLINKED}", true)
.findAll()
unlinkedChunks.forEach {
it.deleteOnCascade()
}
val roomEntity = RoomEntity.where(localRealm, roomId = params.roomId).findFirst()
?: return@awaitTransaction
roomEntity.untimelinedStateEvents.where().equalTo(EventEntityFields.IS_UNLINKED, true).findAll().deleteAllFromRealm()
}
}
}

View File

@ -41,6 +41,6 @@ internal class DefaultGetContextOfEventTask @Inject constructor(private val room
val response = executeRequest<EventContextResponse> {
apiCall = roomAPI.getContextOfEvent(params.roomId, params.eventId, params.limit, filter)
}
return tokenChunkEventPersistor.insertInDb(response, params.roomId, PaginationDirection.FORWARDS)
return tokenChunkEventPersistor.insertInDb(response, params.roomId, PaginationDirection.BACKWARDS)
}
}

View File

@ -69,7 +69,6 @@ internal class DefaultTimeline(
private val realmConfiguration: RealmConfiguration,
private val taskExecutor: TaskExecutor,
private val contextOfEventTask: GetContextOfEventTask,
private val clearUnlinkedEventsTask: ClearUnlinkedEventsTask,
private val paginationTask: PaginationTask,
private val cryptoService: CryptoService,
private val timelineEventMapper: TimelineEventMapper,
@ -217,9 +216,6 @@ internal class DefaultTimeline(
}
eventDecryptor.destroy()
}
clearUnlinkedEventsTask
.configureWith(ClearUnlinkedEventsTask.Params(roomId))
.executeBy(taskExecutor)
}
}
@ -335,7 +331,7 @@ internal class DefaultTimeline(
val lastBuiltEvent = builtEvents.lastOrNull()
val firstCacheEvent = results.firstOrNull()
val firstBuiltEvent = builtEvents.firstOrNull()
val chunkEntity = getLiveChunk()
val chunkEntity = getCurrentChunk()
updateState(Timeline.Direction.FORWARDS) {
it.copy(
@ -371,7 +367,6 @@ internal class DefaultTimeline(
} else {
updateState(direction) { it.copy(isPaginating = false, requestedPaginationCount = 0) }
}
return !shouldFetchMore
}
@ -488,15 +483,15 @@ internal class DefaultTimeline(
* This has to be called on TimelineThread as it access realm live results
*/
private fun executePaginationTask(direction: Timeline.Direction, limit: Int) {
val token = getTokenLive(direction)
val token = getCurrentToken(direction)
if (token == null) {
updateState(direction) { it.copy(isPaginating = false, requestedPaginationCount = 0) }
return
}
val params = PaginationTask.Params(roomId = roomId,
from = token,
direction = direction.toPaginationDirection(),
limit = limit)
from = token,
direction = direction.toPaginationDirection(),
limit = limit)
Timber.v("Should fetch $limit items $direction")
cancelableBag += paginationTask
@ -510,6 +505,7 @@ internal class DefaultTimeline(
Timber.v("Success fetching $limit items $direction from pagination request")
}
TokenChunkEventPersistor.Result.REACHED_END -> {
updateState(direction) { it.copy(isPaginating = false, requestedPaginationCount = 0) }
postSnapshot()
}
TokenChunkEventPersistor.Result.SHOULD_FETCH_MORE ->
@ -532,15 +528,15 @@ internal class DefaultTimeline(
* This has to be called on TimelineThread as it access realm live results
*/
private fun getTokenLive(direction: Timeline.Direction): String? {
val chunkEntity = getLiveChunk() ?: return null
private fun getCurrentToken(direction: Timeline.Direction): String? {
val chunkEntity = getCurrentChunk() ?: return null
return if (direction == Timeline.Direction.BACKWARDS) chunkEntity.prevToken else chunkEntity.nextToken
}
/**
* This has to be called on TimelineThread as it access realm live results
*/
private fun getLiveChunk(): ChunkEntity? {
private fun getCurrentChunk(): ChunkEntity? {
return filteredEvents.firstOrNull()?.chunk?.firstOrNull()
}
@ -571,7 +567,7 @@ internal class DefaultTimeline(
val timelineEvent = buildTimelineEvent(eventEntity)
if (timelineEvent.isEncrypted()
&& timelineEvent.root.mxDecryptionResult == null) {
&& timelineEvent.root.mxDecryptionResult == null) {
timelineEvent.root.eventId?.let { eventDecryptor.requestDecryption(it) }
}
@ -623,7 +619,7 @@ internal class DefaultTimeline(
private fun buildEventQuery(realm: Realm): RealmQuery<TimelineEventEntity> {
return if (initialEventId == null) {
TimelineEventEntity
.where(realm, roomId = roomId, linkFilterMode = EventEntity.LinkFilterMode.LINKED_ONLY)
.where(realm, roomId = roomId)
.equalTo("${TimelineEventEntityFields.CHUNK}.${ChunkEntityFields.IS_LAST_FORWARD}", true)
} else {
TimelineEventEntity

View File

@ -42,8 +42,7 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
private val cryptoService: CryptoService,
private val paginationTask: PaginationTask,
private val timelineEventMapper: TimelineEventMapper,
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper,
private val clearUnlinkedEventsTask: ClearUnlinkedEventsTask
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper
) : TimelineService {
@AssistedInject.Factory
@ -57,7 +56,6 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
monarchy.realmConfiguration,
taskExecutor,
contextOfEventTask,
clearUnlinkedEventsTask,
paginationTask,
cryptoService,
timelineEventMapper,
@ -69,10 +67,10 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
override fun getTimeLineEvent(eventId: String): TimelineEvent? {
return monarchy
.fetchCopyMap({
TimelineEventEntity.where(it, roomId = roomId, eventId = eventId).findFirst()
}, { entity, _ ->
timelineEventMapper.map(entity)
})
TimelineEventEntity.where(it, roomId = roomId, eventId = eventId).findFirst()
}, { entity, _ ->
timelineEventMapper.map(entity)
})
}
override fun getTimeLineEventLive(eventId: String): LiveData<Optional<TimelineEvent>> {

View File

@ -23,14 +23,15 @@ import im.vector.matrix.android.api.session.events.model.Event
@JsonClass(generateAdapter = true)
data class EventContextResponse(
@Json(name = "event") val event: Event,
@Json(name = "start") override val start: String? = null,
// Reversed start and end on purpose
@Json(name = "start") override val end: String? = null,
@Json(name = "end") override val start: String? = null,
@Json(name = "events_before") val eventsBefore: List<Event> = emptyList(),
@Json(name = "events_after") val eventsAfter: List<Event> = emptyList(),
@Json(name = "end") override val end: String? = null,
@Json(name = "state") override val stateEvents: List<Event> = emptyList()
) : TokenChunkEvent {
override val events: List<Event> by lazy {
eventsBefore.reversed() + listOf(event) + eventsAfter
eventsAfter.reversed() + listOf(event) + eventsBefore
}
}

View File

@ -17,17 +17,20 @@
package im.vector.matrix.android.internal.session.room.timeline
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.internal.database.helper.*
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.database.helper.add
import im.vector.matrix.android.internal.database.helper.addOrUpdate
import im.vector.matrix.android.internal.database.helper.addStateEvent
import im.vector.matrix.android.internal.database.helper.deleteOnCascade
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.query.create
import im.vector.matrix.android.internal.database.query.find
import im.vector.matrix.android.internal.database.query.findAllIncludingEvents
import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.room.RoomSummaryUpdater
import im.vector.matrix.android.internal.session.user.UserEntityFactory
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm
import io.realm.kotlin.createObject
import timber.log.Timber
import javax.inject.Inject
@ -110,63 +113,9 @@ internal class TokenChunkEventPersistor @Inject constructor(private val monarchy
suspend fun insertInDb(receivedChunk: TokenChunkEvent,
roomId: String,
direction: PaginationDirection): Result {
monarchy
.awaitTransaction { realm ->
Timber.v("Start persisting ${receivedChunk.events.size} events in $roomId towards $direction")
val roomEntity = RoomEntity.where(realm, roomId).findFirst()
?: realm.createObject(roomId)
val nextToken: String?
val prevToken: String?
if (direction == PaginationDirection.FORWARDS) {
nextToken = receivedChunk.end
prevToken = receivedChunk.start
} else {
nextToken = receivedChunk.start
prevToken = receivedChunk.end
}
val prevChunk = ChunkEntity.find(realm, roomId, nextToken = prevToken)
val nextChunk = ChunkEntity.find(realm, roomId, prevToken = nextToken)
// We try to look for a chunk next to the token,
// otherwise we create a whole new one
val currentChunk = if (direction == PaginationDirection.FORWARDS) {
prevChunk?.apply { this.nextToken = nextToken }
} else {
nextChunk?.apply { this.prevToken = prevToken }
}
?: realm.createObject<ChunkEntity>().apply {
this.roomId = roomId
this.prevToken = prevToken
this.nextToken = nextToken
}
if (receivedChunk.events.isEmpty() && receivedChunk.end == receivedChunk.start) {
if (direction == PaginationDirection.FORWARDS) {
Timber.v("Reach live state of $roomId")
// We make sure we only have one live chunk
ChunkEntity.findLastLiveChunkFromRoom(realm, roomId)?.deleteOnCascade()
currentChunk.isLastForward = true
currentChunk.nextToken = null
currentChunk.timelineEvents.forEach {
it.root?.isUnlinked = false
}
roomSummaryUpdater.update(realm, roomId, updateMembers = false)
} else {
Timber.v("Reach end of $roomId")
currentChunk.isLastBackward = true
}
} else {
Timber.v("Add ${receivedChunk.events.size} events in chunk(${currentChunk.nextToken} | ${currentChunk.prevToken}")
for (event in receivedChunk.events) {
currentChunk.add(realm, roomId, event, direction, isUnlinked = !currentChunk.isLastForward)
}
}
roomEntity.addOrUpdate(currentChunk)
}
monarchy.awaitTransaction { realm ->
handleChunk(realm, receivedChunk, roomId, direction)
}
return if (receivedChunk.events.isEmpty()) {
if (receivedChunk.start != receivedChunk.end) {
Result.SHOULD_FETCH_MORE
@ -177,4 +126,76 @@ internal class TokenChunkEventPersistor @Inject constructor(private val monarchy
Result.SUCCESS
}
}
private fun handleChunk(realm: Realm, receivedChunk: TokenChunkEvent, roomId: String, direction: PaginationDirection) {
Timber.v("Start persisting ${receivedChunk.events.size} events in $roomId towards $direction")
val roomEntity = RoomEntity.where(realm, roomId).findFirst()
?: realm.createObject(roomId)
val nextToken: String?
val prevToken: String?
if (direction == PaginationDirection.FORWARDS) {
nextToken = receivedChunk.end
prevToken = receivedChunk.start
} else {
nextToken = receivedChunk.start
prevToken = receivedChunk.end
}
val prevChunk = ChunkEntity.find(realm, roomId, nextToken = prevToken)
val nextChunk = ChunkEntity.find(realm, roomId, prevToken = nextToken)
// We try to look for a chunk next to the token,
// otherwise we create a whole new one
val currentChunk = if (direction == PaginationDirection.FORWARDS) {
prevChunk?.apply { this.nextToken = nextToken }
} else {
nextChunk?.apply { this.prevToken = prevToken }
}
?: realm.createObject<ChunkEntity>().apply {
this.roomId = roomId
this.prevToken = prevToken
this.nextToken = nextToken
}
// We are saving the state events in the chunk, it will allow us to keep multiple chunks alive
receivedChunk.stateEvents.forEach { stateEvent ->
currentChunk.addStateEvent(stateEvent)
}
val eventIds = receivedChunk.events.mapNotNull { it.eventId }
// If we are overlapping with a chunk other than the live one we remove it to avoid the merging processes
ChunkEntity.findAllIncludingEvents(realm, eventIds)
.asSequence()
.filterNot { it.isLastForward }
.forEach {
it.deleteOnCascade()
}
if (receivedChunk.events.isEmpty() && receivedChunk.end == receivedChunk.start) {
if (direction == PaginationDirection.FORWARDS) {
Timber.v("Reach live state of $roomId")
// We make sure we only have one live chunk
ChunkEntity.findLastLiveChunkFromRoom(realm, roomId)?.deleteOnCascade()
currentChunk.isLastForward = true
currentChunk.nextToken = null
currentChunk.timelineEvents.forEach {
it.root?.isUnlinked = false
}
} else {
Timber.v("Reach end of $roomId")
currentChunk.isLastBackward = true
}
} else {
Timber.v("Add ${receivedChunk.events.size} events in chunk(${currentChunk.nextToken} | ${currentChunk.prevToken}")
for (event in receivedChunk.events) {
currentChunk.add(realm, roomId, event, direction, isUnlinked = !currentChunk.isLastForward)
}
}
roomEntity.addOrUpdate(currentChunk)
roomSummaryUpdater.update(realm, roomId, updateMembers = false)
}
}

View File

@ -69,7 +69,7 @@ internal class RoomEntityFactory @Inject constructor(private val cryptoService:
// State events
if (roomSync.state != null && roomSync.state.events.isNotEmpty()) {
roomSync.state.events.forEach { event ->
roomEntity.addStateEvent(event, filterDuplicates = false, stateIndex = Int.MIN_VALUE)
roomEntity.addStateEvent(event)
// Give info to crypto module
cryptoService.onStateEvent(roomId, event)
UserEntityFactory.createOrNull(event)?.also {
@ -116,7 +116,7 @@ internal class RoomEntityFactory @Inject constructor(private val cryptoService:
?: Int.MIN_VALUE
val untimelinedStateIndex = minStateIndex + 1
roomSync.state.events.forEach { event ->
roomEntity.addStateEvent(event, filterDuplicates = true, stateIndex = untimelinedStateIndex)
roomEntity.addStateEvent(event, stateIndex = untimelinedStateIndex)
// Give info to crypto module
cryptoService.onStateEvent(roomId, event)
UserEntityFactory.createOrNull(event)?.also {