From fb4b42db325a35fda38f262184ae408d21b6a28e Mon Sep 17 00:00:00 2001 From: ganfra Date: Mon, 2 Dec 2019 19:08:46 +0100 Subject: [PATCH] Start investigate on perfs [WIP] --- .../session/room/timeline/ChunkEntityTest.kt | 39 ++-- .../database/helper/ChunkEntityHelper.kt | 47 ++--- .../database/helper/RoomEntityHelper.kt | 4 +- .../internal/extensions/RealmExtensions.kt | 3 +- .../room/membership/LoadRoomMembersTask.kt | 3 - .../session/room/prune/PruneEventTask.kt | 6 - .../room/timeline/TokenChunkEventPersistor.kt | 3 +- .../session/sync/ChunkEntityFactory.kt | 114 ++++++++++++ .../session/sync/RoomEntityFactory.kt | 173 ++++++++++++++++++ .../internal/session/sync/RoomSyncHandler.kt | 125 +------------ .../internal/session/sync/job/SyncService.kt | 2 + 11 files changed, 339 insertions(+), 180 deletions(-) create mode 100644 matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/ChunkEntityFactory.kt create mode 100644 matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomEntityFactory.kt diff --git a/matrix-sdk-android/src/androidTest/java/im/vector/matrix/android/session/room/timeline/ChunkEntityTest.kt b/matrix-sdk-android/src/androidTest/java/im/vector/matrix/android/session/room/timeline/ChunkEntityTest.kt index abb990c979..b7282826cb 100644 --- a/matrix-sdk-android/src/androidTest/java/im/vector/matrix/android/session/room/timeline/ChunkEntityTest.kt +++ b/matrix-sdk-android/src/androidTest/java/im/vector/matrix/android/session/room/timeline/ChunkEntityTest.kt @@ -28,7 +28,6 @@ import im.vector.matrix.android.session.room.timeline.RoomDataHelper.createFakeR import io.realm.Realm import io.realm.RealmConfiguration import io.realm.kotlin.createObject -import org.amshove.kluent.shouldBeFalse import org.amshove.kluent.shouldBeTrue import org.amshove.kluent.shouldEqual import org.junit.Before @@ -52,7 +51,7 @@ internal class ChunkEntityTest : InstrumentedTest { monarchy.runTransactionSync { realm -> val chunk: ChunkEntity = realm.createObject() val fakeEvent = createFakeMessageEvent() - chunk.add("roomId", fakeEvent, PaginationDirection.FORWARDS) + chunk.add(realm, "roomId", fakeEvent, PaginationDirection.FORWARDS) chunk.timelineEvents.size shouldEqual 1 } } @@ -62,8 +61,8 @@ internal class ChunkEntityTest : InstrumentedTest { monarchy.runTransactionSync { realm -> val chunk: ChunkEntity = realm.createObject() val fakeEvent = createFakeMessageEvent() - chunk.add("roomId", fakeEvent, PaginationDirection.FORWARDS) - chunk.add("roomId", fakeEvent, PaginationDirection.FORWARDS) + chunk.add(realm, "roomId", fakeEvent, PaginationDirection.FORWARDS) + chunk.add(realm, "roomId", fakeEvent, PaginationDirection.FORWARDS) chunk.timelineEvents.size shouldEqual 1 } } @@ -73,7 +72,7 @@ internal class ChunkEntityTest : InstrumentedTest { monarchy.runTransactionSync { realm -> val chunk: ChunkEntity = realm.createObject() val fakeEvent = createFakeRoomMemberEvent() - chunk.add("roomId", fakeEvent, PaginationDirection.FORWARDS) + chunk.add(realm, "roomId", fakeEvent, PaginationDirection.FORWARDS) chunk.lastStateIndex(PaginationDirection.FORWARDS) shouldEqual 1 } } @@ -83,7 +82,7 @@ internal class ChunkEntityTest : InstrumentedTest { monarchy.runTransactionSync { realm -> val chunk: ChunkEntity = realm.createObject() val fakeEvent = createFakeMessageEvent() - chunk.add("roomId", fakeEvent, PaginationDirection.FORWARDS) + chunk.add(realm, "roomId", fakeEvent, PaginationDirection.FORWARDS) chunk.lastStateIndex(PaginationDirection.FORWARDS) shouldEqual 0 } } @@ -94,7 +93,9 @@ internal class ChunkEntityTest : InstrumentedTest { val chunk: ChunkEntity = realm.createObject() val fakeEvents = createFakeListOfEvents(30) val numberOfStateEvents = fakeEvents.filter { it.isStateEvent() }.size - chunk.addAll("roomId", fakeEvents, PaginationDirection.FORWARDS) + fakeEvents.forEach { + chunk.add(realm, "roomId", it, PaginationDirection.FORWARDS) + } chunk.lastStateIndex(PaginationDirection.FORWARDS) shouldEqual numberOfStateEvents } } @@ -107,7 +108,9 @@ internal class ChunkEntityTest : InstrumentedTest { val numberOfStateEvents = fakeEvents.filter { it.isStateEvent() }.size val lastIsState = fakeEvents.last().isStateEvent() val expectedStateIndex = if (lastIsState) -numberOfStateEvents + 1 else -numberOfStateEvents - chunk.addAll("roomId", fakeEvents, PaginationDirection.BACKWARDS) + fakeEvents.forEach { + chunk.add(realm, "roomId", it, PaginationDirection.BACKWARDS) + } chunk.lastStateIndex(PaginationDirection.BACKWARDS) shouldEqual expectedStateIndex } } @@ -117,8 +120,12 @@ internal class ChunkEntityTest : InstrumentedTest { monarchy.runTransactionSync { realm -> val chunk1: ChunkEntity = realm.createObject() val chunk2: ChunkEntity = realm.createObject() - chunk1.addAll("roomId", createFakeListOfEvents(30), PaginationDirection.BACKWARDS) - chunk2.addAll("roomId", createFakeListOfEvents(30), PaginationDirection.BACKWARDS) + createFakeListOfEvents(30).forEach { + chunk1.add(realm, "roomId", it, PaginationDirection.BACKWARDS) + } + createFakeListOfEvents(30).forEach { + chunk2.add(realm, "roomId", it, PaginationDirection.BACKWARDS) + } chunk1.merge("roomId", chunk2, PaginationDirection.BACKWARDS) chunk1.timelineEvents.size shouldEqual 60 } @@ -133,14 +140,20 @@ internal class ChunkEntityTest : InstrumentedTest { val eventsForChunk2 = eventsForChunk1 + createFakeListOfEvents(10) chunk1.isLastForward = true chunk2.isLastForward = false - chunk1.addAll("roomId", eventsForChunk1, PaginationDirection.FORWARDS) - chunk2.addAll("roomId", eventsForChunk2, PaginationDirection.BACKWARDS) + eventsForChunk1.forEach { + chunk1.add(realm, "roomId", it, PaginationDirection.FORWARDS) + } + eventsForChunk2.forEach { + chunk2.add(realm, "roomId", it, PaginationDirection.BACKWARDS) + } chunk1.merge("roomId", chunk2, PaginationDirection.BACKWARDS) chunk1.timelineEvents.size shouldEqual 40 chunk1.isLastForward.shouldBeTrue() } } + /* + @Test fun merge_shouldEventsBeLinked_whenMergingLinkedWithUnlinked() { monarchy.runTransactionSync { realm -> @@ -192,4 +205,6 @@ internal class ChunkEntityTest : InstrumentedTest { chunk1.nextToken shouldEqual nextToken } } + + */ } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/helper/ChunkEntityHelper.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/helper/ChunkEntityHelper.kt index e9ffa140c9..01c17daa31 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/helper/ChunkEntityHelper.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/helper/ChunkEntityHelper.kt @@ -33,6 +33,7 @@ import im.vector.matrix.android.internal.database.query.getOrCreate import im.vector.matrix.android.internal.database.query.where import im.vector.matrix.android.internal.extensions.assertIsManaged import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection +import io.realm.Realm import io.realm.Sort // By default if a chunk is empty we consider it unlinked @@ -74,45 +75,20 @@ internal fun ChunkEntity.merge(roomId: String, val events = eventsToMerge.mapNotNull { it.root?.asDomain() } val eventIds = ArrayList() events.forEach { event -> - add(roomId, event, direction, isUnlinked = isUnlinked) + add(realm, roomId, event, direction, isUnlinked = isUnlinked) if (event.eventId != null) { eventIds.add(event.eventId) } } - updateSenderDataFor(eventIds) } -internal fun ChunkEntity.addAll(roomId: String, - events: List, - direction: PaginationDirection, - stateIndexOffset: Int = 0, - // Set to true for Event retrieved from a Permalink (i.e. not linked to live Chunk) - isUnlinked: Boolean = false) { - assertIsManaged() - val eventIds = ArrayList() - events.forEach { event -> - add(roomId, event, direction, stateIndexOffset, isUnlinked) - if (event.eventId != null) { - eventIds.add(event.eventId) - } - } - updateSenderDataFor(eventIds) -} - -internal fun ChunkEntity.updateSenderDataFor(eventIds: List) { - for (eventId in eventIds) { - val timelineEventEntity = timelineEvents.find(eventId) ?: continue - timelineEventEntity.updateSenderData() - } -} - -internal fun ChunkEntity.add(roomId: String, +internal fun ChunkEntity.add(localRealm: Realm, + roomId: String, event: Event, direction: PaginationDirection, stateIndexOffset: Int = 0, isUnlinked: Boolean = false) { - assertIsManaged() - if (event.eventId != null && timelineEvents.find(event.eventId) != null) { + if (event.eventId != null) { return } var currentDisplayIndex = lastDisplayIndex(direction, 0) @@ -134,22 +110,21 @@ internal fun ChunkEntity.add(roomId: String, backwardsStateIndex = currentStateIndex } } - - val localId = TimelineEventEntity.nextId(realm) + val localId = TimelineEventEntity.nextId(localRealm) val eventId = event.eventId ?: "" val senderId = event.senderId ?: "" - val readReceiptsSummaryEntity = ReadReceiptsSummaryEntity.where(realm, eventId).findFirst() + val readReceiptsSummaryEntity = ReadReceiptsSummaryEntity.where(localRealm, eventId).findFirst() ?: ReadReceiptsSummaryEntity(eventId, roomId) // Update RR for the sender of a new message with a dummy one if (event.originServerTs != null) { val timestampOfEvent = event.originServerTs.toDouble() - val readReceiptOfSender = ReadReceiptEntity.getOrCreate(realm, roomId = roomId, userId = senderId) + val readReceiptOfSender = ReadReceiptEntity.getOrCreate(localRealm, roomId = roomId, userId = senderId) // If the synced RR is older, update if (timestampOfEvent > readReceiptOfSender.originServerTs) { - val previousReceiptsSummary = ReadReceiptsSummaryEntity.where(realm, eventId = readReceiptOfSender.eventId).findFirst() + val previousReceiptsSummary = ReadReceiptsSummaryEntity.where(localRealm, eventId = readReceiptOfSender.eventId).findFirst() readReceiptOfSender.eventId = eventId readReceiptOfSender.originServerTs = timestampOfEvent previousReceiptsSummary?.readReceipts?.remove(readReceiptOfSender) @@ -166,9 +141,9 @@ internal fun ChunkEntity.add(roomId: String, } it.eventId = eventId it.roomId = roomId - it.annotations = EventAnnotationsSummaryEntity.where(realm, eventId).findFirst() + it.annotations = EventAnnotationsSummaryEntity.where(localRealm, eventId).findFirst() it.readReceipts = readReceiptsSummaryEntity - it.readMarker = ReadMarkerEntity.where(realm, roomId = roomId, eventId = eventId).findFirst() + it.readMarker = ReadMarkerEntity.where(localRealm, roomId = roomId, eventId = eventId).findFirst() } val position = if (direction == PaginationDirection.FORWARDS) 0 else this.timelineEvents.size timelineEvents.add(position, eventEntity) diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/helper/RoomEntityHelper.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/helper/RoomEntityHelper.kt index 948af2af96..e19114cd0b 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/helper/RoomEntityHelper.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/database/helper/RoomEntityHelper.kt @@ -41,8 +41,7 @@ internal fun RoomEntity.addStateEvent(stateEvent: Event, stateIndex: Int = Int.MIN_VALUE, filterDuplicates: Boolean = false, isUnlinked: Boolean = false) { - assertIsManaged() - if (stateEvent.eventId == null || (filterDuplicates && fastContains(stateEvent.eventId))) { + if (stateEvent.eventId == null || (filterDuplicates && untimelinedStateEvents.fastContains(stateEvent.eventId))) { return } else { val entity = stateEvent.toEntity(roomId).apply { @@ -53,6 +52,7 @@ internal fun RoomEntity.addStateEvent(stateEvent: Event, untimelinedStateEvents.add(entity) } } + internal fun RoomEntity.addSendingEvent(event: Event) { assertIsManaged() val senderId = event.senderId ?: return diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/extensions/RealmExtensions.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/extensions/RealmExtensions.kt index 8934bdb0b6..b6195378b1 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/extensions/RealmExtensions.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/extensions/RealmExtensions.kt @@ -18,6 +18,7 @@ package im.vector.matrix.android.internal.extensions import io.realm.RealmObject -internal fun RealmObject.assertIsManaged() { +internal fun RealmObject.assertIsManaged(): Boolean { check(isManaged) { "${javaClass.simpleName} entity should be managed to use this function" } + return true } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/LoadRoomMembersTask.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/LoadRoomMembersTask.kt index 7d9332ee84..a943e46596 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/LoadRoomMembersTask.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/membership/LoadRoomMembersTask.kt @@ -70,9 +70,6 @@ internal class DefaultLoadRoomMembersTask @Inject constructor(private val roomAP realm.insertOrUpdate(it) } } - roomEntity.chunks.flatMap { it.timelineEvents }.forEach { - it.updateSenderData() - } roomEntity.areAllMembersLoaded = true roomSummaryUpdater.update(realm, roomId, updateMembers = true) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/prune/PruneEventTask.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/prune/PruneEventTask.kt index c303e1c215..0d14e719a5 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/prune/PruneEventTask.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/prune/PruneEventTask.kt @@ -94,12 +94,6 @@ internal class DefaultPruneEventTask @Inject constructor(private val monarchy: M // } } } - if (eventToPrune.type == EventType.STATE_ROOM_MEMBER) { - val timelineEventsToUpdate = TimelineEventEntity.findWithSenderMembershipEvent(realm, eventToPrune.eventId) - for (timelineEvent in timelineEventsToUpdate) { - timelineEvent.updateSenderData() - } - } } private fun computeAllowedKeys(type: String): List { diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/TokenChunkEventPersistor.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/TokenChunkEventPersistor.kt index 0d9fb4e9e6..a53d6f691d 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/TokenChunkEventPersistor.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/room/timeline/TokenChunkEventPersistor.kt @@ -149,7 +149,7 @@ internal class TokenChunkEventPersistor @Inject constructor(private val monarchy val eventIds = ArrayList(receivedChunk.events.size) for (event in receivedChunk.events) { event.eventId?.also { eventIds.add(it) } - currentChunk.add(roomId, event, direction, isUnlinked = currentChunk.isUnlinked()) + currentChunk.add(realm, roomId, event, direction, isUnlinked = currentChunk.isUnlinked()) UserEntityFactory.createOrNull(event)?.also { realm.insertOrUpdate(it) } @@ -175,7 +175,6 @@ internal class TokenChunkEventPersistor @Inject constructor(private val monarchy realm.insertOrUpdate(it) } } - currentChunk.updateSenderDataFor(eventIds) } } return if (receivedChunk.events.isEmpty()) { diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/ChunkEntityFactory.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/ChunkEntityFactory.kt new file mode 100644 index 0000000000..7c9aba524c --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/ChunkEntityFactory.kt @@ -0,0 +1,114 @@ +/* + * Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package im.vector.matrix.android.internal.session.sync + +import im.vector.matrix.android.api.session.events.model.Event +import im.vector.matrix.android.internal.crypto.DefaultCryptoService +import im.vector.matrix.android.internal.database.helper.add +import im.vector.matrix.android.internal.database.helper.lastStateIndex +import im.vector.matrix.android.internal.database.model.ChunkEntity +import im.vector.matrix.android.internal.database.model.RoomEntity +import im.vector.matrix.android.internal.database.query.find +import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom +import im.vector.matrix.android.internal.database.query.where +import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection +import im.vector.matrix.android.internal.session.user.UserEntityFactory +import io.realm.Realm +import timber.log.Timber +import javax.inject.Inject + +internal class ChunkEntityFactory @Inject constructor(private val cryptoService: DefaultCryptoService) { + + fun create(realm: Realm, + roomId: String, + eventList: List, + prevToken: String? = null, + isLimited: Boolean = true, + isInitialSync: Boolean): ChunkEntity { + return if (isInitialSync) { + initialSyncStrategy(realm, roomId, eventList, prevToken) + } else { + incrementalSyncStrategy(realm, roomId, eventList, prevToken, isLimited) + } + } + + + private fun initialSyncStrategy(realm: Realm, + roomId: String, + eventList: List, + prevToken: String?): ChunkEntity { + val chunkEntity = ChunkEntity().apply { + this.prevToken = prevToken + this.isLastForward = true + } + val eventIds = ArrayList(eventList.size) + for (event in eventList) { + event.eventId?.also { eventIds.add(it) } + chunkEntity.add(realm, roomId, event, PaginationDirection.FORWARDS) + // Give info to crypto module + cryptoService.onLiveEvent(roomId, event) + UserEntityFactory.createOrNull(event)?.also { + realm.insertOrUpdate(it) + } + } + return chunkEntity + } + + private fun incrementalSyncStrategy(realm: Realm, + roomId: String, + eventList: List, + prevToken: String? = null, + isLimited: Boolean = true): ChunkEntity { + val lastChunk = ChunkEntity.findLastLiveChunkFromRoom(realm, roomId) + ?: throw IllegalStateException("You should already have a live chunk at this point") + lastChunk.isLastForward = false + var stateIndexOffset = 0 + val chunkEntity = if (isLimited) { + stateIndexOffset = lastChunk.lastStateIndex(PaginationDirection.FORWARDS) + ChunkEntity().apply { + this.prevToken = prevToken + this.isLastForward = true + } + } else { + lastChunk + } + val eventIds = ArrayList(eventList.size) + for (event in eventList) { + event.eventId?.also { eventIds.add(it) } + chunkEntity.add(realm, roomId, event, PaginationDirection.FORWARDS, stateIndexOffset) + // Give info to crypto module + cryptoService.onLiveEvent(roomId, event) + // Try to remove local echo + event.unsignedData?.transactionId?.also { + val roomEntity = RoomEntity.where(realm, roomId).findFirst() + val sendingEventEntity = roomEntity?.sendingTimelineEvents?.find(it) + if (sendingEventEntity != null) { + Timber.v("Remove local echo for tx:$it") + roomEntity.sendingTimelineEvents.remove(sendingEventEntity) + } else { + Timber.v("Can't find corresponding local echo for tx:$it") + } + } + UserEntityFactory.createOrNull(event)?.also { + realm.insertOrUpdate(it) + } + } + return chunkEntity + } + +} diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomEntityFactory.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomEntityFactory.kt new file mode 100644 index 0000000000..9176e0f8e9 --- /dev/null +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomEntityFactory.kt @@ -0,0 +1,173 @@ +/* + * Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package im.vector.matrix.android.internal.session.sync + +import im.vector.matrix.android.api.session.events.model.EventType +import im.vector.matrix.android.api.session.events.model.toModel +import im.vector.matrix.android.api.session.room.model.Membership +import im.vector.matrix.android.api.session.room.model.tag.RoomTagContent +import im.vector.matrix.android.internal.crypto.DefaultCryptoService +import im.vector.matrix.android.internal.database.helper.addOrUpdate +import im.vector.matrix.android.internal.database.helper.addStateEvent +import im.vector.matrix.android.internal.database.model.EventEntityFields +import im.vector.matrix.android.internal.database.model.RoomEntity +import im.vector.matrix.android.internal.database.query.where +import im.vector.matrix.android.internal.session.room.RoomSummaryUpdater +import im.vector.matrix.android.internal.session.room.read.FullyReadContent +import im.vector.matrix.android.internal.session.sync.model.RoomSync +import im.vector.matrix.android.internal.session.sync.model.RoomSyncAccountData +import im.vector.matrix.android.internal.session.sync.model.RoomSyncEphemeral +import im.vector.matrix.android.internal.session.user.UserEntityFactory +import io.realm.Realm +import javax.inject.Inject + +internal class RoomEntityFactory @Inject constructor(private val cryptoService: DefaultCryptoService, + private val readReceiptHandler: ReadReceiptHandler, + private val roomTagHandler: RoomTagHandler, + private val roomFullyReadHandler: RoomFullyReadHandler, + private val chunkEntityFactory: ChunkEntityFactory, + private val roomSummaryUpdater: RoomSummaryUpdater) { + + fun create(realm: Realm, + roomId: String, + roomSync: RoomSync, + membership: Membership, + isInitialSync: Boolean): RoomEntity { + return if (isInitialSync) { + initialSyncStrategy(realm, roomId, roomSync, membership) + } else { + incrementalSyncStrategy(realm, roomId, roomSync, membership) + } + } + + private fun initialSyncStrategy(realm: Realm, roomId: String, roomSync: RoomSync, membership: Membership): RoomEntity { + if (roomSync.ephemeral != null && roomSync.ephemeral.events.isNotEmpty()) { + handleEphemeral(realm, roomId, roomSync.ephemeral, isInitialSync = true) + } + if (roomSync.accountData != null && roomSync.accountData.events.isNullOrEmpty().not()) { + handleRoomAccountDataEvents(realm, roomId, roomSync.accountData) + } + val roomEntity = RoomEntity(roomId) + roomEntity.membership = membership + + // State events + if (roomSync.state != null && roomSync.state.events.isNotEmpty()) { + roomSync.state.events.forEach { event -> + roomEntity.addStateEvent(event, filterDuplicates = false, stateIndex = Int.MIN_VALUE) + // Give info to crypto module + cryptoService.onStateEvent(roomId, event) + UserEntityFactory.createOrNull(event)?.also { + realm.insertOrUpdate(it) + } + } + } + // Timeline events + if (roomSync.timeline != null && roomSync.timeline.events.isNotEmpty()) { + val chunkEntity = chunkEntityFactory.create( + realm, + roomId, + roomSync.timeline.events, + roomSync.timeline.prevToken, + roomSync.timeline.limited, + isInitialSync = true + ) + roomEntity.chunks.add(chunkEntity) + } + roomSummaryUpdater.update(realm, roomId, Membership.JOIN, roomSync.summary, roomSync.unreadNotifications, updateMembers = true) + return roomEntity + } + + private fun incrementalSyncStrategy(realm: Realm, roomId: String, roomSync: RoomSync, membership: Membership): RoomEntity { + if (roomSync.ephemeral != null && roomSync.ephemeral.events.isNotEmpty()) { + handleEphemeral(realm, roomId, roomSync.ephemeral, false) + } + + if (roomSync.accountData != null && roomSync.accountData.events.isNullOrEmpty().not()) { + handleRoomAccountDataEvents(realm, roomId, roomSync.accountData) + } + + val roomEntity = RoomEntity.where(realm, roomId).findFirst() + ?: throw IllegalStateException("You should have a room at this point") + + if (roomEntity.membership == Membership.INVITE) { + roomEntity.chunks.deleteAllFromRealm() + } + roomEntity.membership = membership + // State event + + if (roomSync.state != null && roomSync.state.events.isNotEmpty()) { + val minStateIndex = roomEntity.untimelinedStateEvents.where().min(EventEntityFields.STATE_INDEX)?.toInt() + ?: Int.MIN_VALUE + val untimelinedStateIndex = minStateIndex + 1 + roomSync.state.events.forEach { event -> + roomEntity.addStateEvent(event, filterDuplicates = true, stateIndex = untimelinedStateIndex) + // Give info to crypto module + cryptoService.onStateEvent(roomId, event) + UserEntityFactory.createOrNull(event)?.also { + realm.insertOrUpdate(it) + } + } + } + if (roomSync.timeline != null && roomSync.timeline.events.isNotEmpty()) { + val chunkEntity = chunkEntityFactory.create( + realm, + roomId, + roomSync.timeline.events, + roomSync.timeline.prevToken, + roomSync.timeline.limited, + false + ) + roomEntity.addOrUpdate(chunkEntity) + } + val hasRoomMember = roomSync.state?.events?.firstOrNull { + it.type == EventType.STATE_ROOM_MEMBER + } != null || roomSync.timeline?.events?.firstOrNull { + it.type == EventType.STATE_ROOM_MEMBER + } != null + + roomSummaryUpdater.update(realm, roomId, Membership.JOIN, roomSync.summary, roomSync.unreadNotifications, updateMembers = hasRoomMember) + return roomEntity + } + + @Suppress("UNCHECKED_CAST") + private fun handleEphemeral(realm: Realm, + roomId: String, + ephemeral: RoomSyncEphemeral, + isInitialSync: Boolean) { + for (event in ephemeral.events) { + if (event.type != EventType.RECEIPT) continue + val readReceiptContent = event.content as? ReadReceiptContent ?: continue + readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync) + } + } + + private fun handleRoomAccountDataEvents(realm: Realm, roomId: String, accountData: RoomSyncAccountData) { + for (event in accountData.events) { + val eventType = event.getClearType() + if (eventType == EventType.TAG) { + val content = event.getClearContent().toModel() + roomTagHandler.handle(realm, roomId, content) + } else if (eventType == EventType.FULLY_READ) { + val content = event.getClearContent().toModel() + roomFullyReadHandler.handle(realm, roomId, content) + } + } + } + + +} diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt index 4a003eb7d9..6374ab2433 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/RoomSyncHandler.kt @@ -55,6 +55,8 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch private val roomTagHandler: RoomTagHandler, private val roomFullyReadHandler: RoomFullyReadHandler, private val cryptoService: DefaultCryptoService, + private val roomEntityFactory: RoomEntityFactory, + private val chunkEntityFactory: ChunkEntityFactory, private val tokenStore: SyncTokenStore, private val pushRuleService: DefaultPushRuleService, private val processForPushTask: ProcessEventForPushTask, @@ -100,7 +102,7 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch } is HandlingStrategy.INVITED -> handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_invited_rooms, 0.1f) { - handleInvitedRoom(realm, it.key, it.value) + handleInvitedRoom(realm, it.key, it.value, isInitialSync) } is HandlingStrategy.LEFT -> { @@ -117,65 +119,18 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch roomSync: RoomSync, isInitialSync: Boolean): RoomEntity { Timber.v("Handle join sync for room $roomId") - - if (roomSync.ephemeral != null && roomSync.ephemeral.events.isNotEmpty()) { - handleEphemeral(realm, roomId, roomSync.ephemeral, isInitialSync) - } - - if (roomSync.accountData != null && roomSync.accountData.events.isNullOrEmpty().not()) { - handleRoomAccountDataEvents(realm, roomId, roomSync.accountData) - } - - val roomEntity = RoomEntity.where(realm, roomId).findFirst() ?: realm.createObject(roomId) - - if (roomEntity.membership == Membership.INVITE) { - roomEntity.chunks.deleteAllFromRealm() - } - roomEntity.membership = Membership.JOIN - - // State event - - if (roomSync.state != null && roomSync.state.events.isNotEmpty()) { - val minStateIndex = roomEntity.untimelinedStateEvents.where().min(EventEntityFields.STATE_INDEX)?.toInt() - ?: Int.MIN_VALUE - val untimelinedStateIndex = minStateIndex + 1 - roomSync.state.events.forEach { event -> - roomEntity.addStateEvent(event, filterDuplicates = true, stateIndex = untimelinedStateIndex) - // Give info to crypto module - cryptoService.onStateEvent(roomId, event) - UserEntityFactory.createOrNull(event)?.also { - realm.insertOrUpdate(it) - } - } - } - if (roomSync.timeline != null && roomSync.timeline.events.isNotEmpty()) { - val chunkEntity = handleTimelineEvents( - realm, - roomEntity, - roomSync.timeline.events, - roomSync.timeline.prevToken, - roomSync.timeline.limited - ) - roomEntity.addOrUpdate(chunkEntity) - } - val hasRoomMember = roomSync.state?.events?.firstOrNull { - it.type == EventType.STATE_ROOM_MEMBER - } != null || roomSync.timeline?.events?.firstOrNull { - it.type == EventType.STATE_ROOM_MEMBER - } != null - - roomSummaryUpdater.update(realm, roomId, Membership.JOIN, roomSync.summary, roomSync.unreadNotifications, updateMembers = hasRoomMember) - return roomEntity + return roomEntityFactory.create(realm, roomId, roomSync, Membership.JOIN, isInitialSync) } private fun handleInvitedRoom(realm: Realm, roomId: String, - roomSync: InvitedRoomSync): RoomEntity { + roomSync: InvitedRoomSync, + isInitialSync: Boolean): RoomEntity { Timber.v("Handle invited sync for room $roomId") val roomEntity = RoomEntity.where(realm, roomId).findFirst() ?: realm.createObject(roomId) roomEntity.membership = Membership.INVITE if (roomSync.inviteState != null && roomSync.inviteState.events.isNotEmpty()) { - val chunkEntity = handleTimelineEvents(realm, roomEntity, roomSync.inviteState.events) + val chunkEntity = chunkEntityFactory.create(realm, roomId, roomSync.inviteState.events, isInitialSync = isInitialSync) roomEntity.addOrUpdate(chunkEntity) } val hasRoomMember = roomSync.inviteState?.events?.firstOrNull { @@ -196,70 +151,4 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch return roomEntity } - private fun handleTimelineEvents(realm: Realm, - roomEntity: RoomEntity, - eventList: List, - prevToken: String? = null, - isLimited: Boolean = true): ChunkEntity { - val lastChunk = ChunkEntity.findLastLiveChunkFromRoom(realm, roomEntity.roomId) - var stateIndexOffset = 0 - val chunkEntity = if (!isLimited && lastChunk != null) { - lastChunk - } else { - realm.createObject().apply { this.prevToken = prevToken } - } - if (isLimited && lastChunk != null) { - stateIndexOffset = lastChunk.lastStateIndex(PaginationDirection.FORWARDS) - } - lastChunk?.isLastForward = false - chunkEntity.isLastForward = true - - val eventIds = ArrayList(eventList.size) - for (event in eventList) { - event.eventId?.also { eventIds.add(it) } - chunkEntity.add(roomEntity.roomId, event, PaginationDirection.FORWARDS, stateIndexOffset) - // Give info to crypto module - cryptoService.onLiveEvent(roomEntity.roomId, event) - // Try to remove local echo - event.unsignedData?.transactionId?.also { - val sendingEventEntity = roomEntity.sendingTimelineEvents.find(it) - if (sendingEventEntity != null) { - Timber.v("Remove local echo for tx:$it") - roomEntity.sendingTimelineEvents.remove(sendingEventEntity) - } else { - Timber.v("Can't find corresponding local echo for tx:$it") - } - } - UserEntityFactory.createOrNull(event)?.also { - realm.insertOrUpdate(it) - } - } - chunkEntity.updateSenderDataFor(eventIds) - return chunkEntity - } - - @Suppress("UNCHECKED_CAST") - private fun handleEphemeral(realm: Realm, - roomId: String, - ephemeral: RoomSyncEphemeral, - isInitialSync: Boolean) { - for (event in ephemeral.events) { - if (event.type != EventType.RECEIPT) continue - val readReceiptContent = event.content as? ReadReceiptContent ?: continue - readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync) - } - } - - private fun handleRoomAccountDataEvents(realm: Realm, roomId: String, accountData: RoomSyncAccountData) { - for (event in accountData.events) { - val eventType = event.getClearType() - if (eventType == EventType.TAG) { - val content = event.getClearContent().toModel() - roomTagHandler.handle(realm, roomId, content) - } else if (eventType == EventType.FULLY_READ) { - val content = event.getClearContent().toModel() - roomFullyReadHandler.handle(realm, roomId, content) - } - } - } } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt index 4e57aa5be1..80ffa2ff02 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt @@ -26,6 +26,7 @@ import im.vector.matrix.android.api.failure.MatrixError import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.session.sync.SyncTask +import im.vector.matrix.android.internal.task.TaskConstraints import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.configureWith @@ -108,6 +109,7 @@ open class SyncService : Service() { .configureWith(params) { callbackThread = TaskThread.SYNC executionThread = TaskThread.SYNC + constraints = TaskConstraints(connectedToNetwork = true) callback = object : MatrixCallback { override fun onSuccess(data: Unit) { cancelableTask = null