Start investigate on perfs [WIP]
This commit is contained in:
parent
3f4f7457c7
commit
fb4b42db32
@ -28,7 +28,6 @@ import im.vector.matrix.android.session.room.timeline.RoomDataHelper.createFakeR
|
||||
import io.realm.Realm
|
||||
import io.realm.RealmConfiguration
|
||||
import io.realm.kotlin.createObject
|
||||
import org.amshove.kluent.shouldBeFalse
|
||||
import org.amshove.kluent.shouldBeTrue
|
||||
import org.amshove.kluent.shouldEqual
|
||||
import org.junit.Before
|
||||
@ -52,7 +51,7 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
monarchy.runTransactionSync { realm ->
|
||||
val chunk: ChunkEntity = realm.createObject()
|
||||
val fakeEvent = createFakeMessageEvent()
|
||||
chunk.add("roomId", fakeEvent, PaginationDirection.FORWARDS)
|
||||
chunk.add(realm, "roomId", fakeEvent, PaginationDirection.FORWARDS)
|
||||
chunk.timelineEvents.size shouldEqual 1
|
||||
}
|
||||
}
|
||||
@ -62,8 +61,8 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
monarchy.runTransactionSync { realm ->
|
||||
val chunk: ChunkEntity = realm.createObject()
|
||||
val fakeEvent = createFakeMessageEvent()
|
||||
chunk.add("roomId", fakeEvent, PaginationDirection.FORWARDS)
|
||||
chunk.add("roomId", fakeEvent, PaginationDirection.FORWARDS)
|
||||
chunk.add(realm, "roomId", fakeEvent, PaginationDirection.FORWARDS)
|
||||
chunk.add(realm, "roomId", fakeEvent, PaginationDirection.FORWARDS)
|
||||
chunk.timelineEvents.size shouldEqual 1
|
||||
}
|
||||
}
|
||||
@ -73,7 +72,7 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
monarchy.runTransactionSync { realm ->
|
||||
val chunk: ChunkEntity = realm.createObject()
|
||||
val fakeEvent = createFakeRoomMemberEvent()
|
||||
chunk.add("roomId", fakeEvent, PaginationDirection.FORWARDS)
|
||||
chunk.add(realm, "roomId", fakeEvent, PaginationDirection.FORWARDS)
|
||||
chunk.lastStateIndex(PaginationDirection.FORWARDS) shouldEqual 1
|
||||
}
|
||||
}
|
||||
@ -83,7 +82,7 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
monarchy.runTransactionSync { realm ->
|
||||
val chunk: ChunkEntity = realm.createObject()
|
||||
val fakeEvent = createFakeMessageEvent()
|
||||
chunk.add("roomId", fakeEvent, PaginationDirection.FORWARDS)
|
||||
chunk.add(realm, "roomId", fakeEvent, PaginationDirection.FORWARDS)
|
||||
chunk.lastStateIndex(PaginationDirection.FORWARDS) shouldEqual 0
|
||||
}
|
||||
}
|
||||
@ -94,7 +93,9 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
val chunk: ChunkEntity = realm.createObject()
|
||||
val fakeEvents = createFakeListOfEvents(30)
|
||||
val numberOfStateEvents = fakeEvents.filter { it.isStateEvent() }.size
|
||||
chunk.addAll("roomId", fakeEvents, PaginationDirection.FORWARDS)
|
||||
fakeEvents.forEach {
|
||||
chunk.add(realm, "roomId", it, PaginationDirection.FORWARDS)
|
||||
}
|
||||
chunk.lastStateIndex(PaginationDirection.FORWARDS) shouldEqual numberOfStateEvents
|
||||
}
|
||||
}
|
||||
@ -107,7 +108,9 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
val numberOfStateEvents = fakeEvents.filter { it.isStateEvent() }.size
|
||||
val lastIsState = fakeEvents.last().isStateEvent()
|
||||
val expectedStateIndex = if (lastIsState) -numberOfStateEvents + 1 else -numberOfStateEvents
|
||||
chunk.addAll("roomId", fakeEvents, PaginationDirection.BACKWARDS)
|
||||
fakeEvents.forEach {
|
||||
chunk.add(realm, "roomId", it, PaginationDirection.BACKWARDS)
|
||||
}
|
||||
chunk.lastStateIndex(PaginationDirection.BACKWARDS) shouldEqual expectedStateIndex
|
||||
}
|
||||
}
|
||||
@ -117,8 +120,12 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
monarchy.runTransactionSync { realm ->
|
||||
val chunk1: ChunkEntity = realm.createObject()
|
||||
val chunk2: ChunkEntity = realm.createObject()
|
||||
chunk1.addAll("roomId", createFakeListOfEvents(30), PaginationDirection.BACKWARDS)
|
||||
chunk2.addAll("roomId", createFakeListOfEvents(30), PaginationDirection.BACKWARDS)
|
||||
createFakeListOfEvents(30).forEach {
|
||||
chunk1.add(realm, "roomId", it, PaginationDirection.BACKWARDS)
|
||||
}
|
||||
createFakeListOfEvents(30).forEach {
|
||||
chunk2.add(realm, "roomId", it, PaginationDirection.BACKWARDS)
|
||||
}
|
||||
chunk1.merge("roomId", chunk2, PaginationDirection.BACKWARDS)
|
||||
chunk1.timelineEvents.size shouldEqual 60
|
||||
}
|
||||
@ -133,14 +140,20 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
val eventsForChunk2 = eventsForChunk1 + createFakeListOfEvents(10)
|
||||
chunk1.isLastForward = true
|
||||
chunk2.isLastForward = false
|
||||
chunk1.addAll("roomId", eventsForChunk1, PaginationDirection.FORWARDS)
|
||||
chunk2.addAll("roomId", eventsForChunk2, PaginationDirection.BACKWARDS)
|
||||
eventsForChunk1.forEach {
|
||||
chunk1.add(realm, "roomId", it, PaginationDirection.FORWARDS)
|
||||
}
|
||||
eventsForChunk2.forEach {
|
||||
chunk2.add(realm, "roomId", it, PaginationDirection.BACKWARDS)
|
||||
}
|
||||
chunk1.merge("roomId", chunk2, PaginationDirection.BACKWARDS)
|
||||
chunk1.timelineEvents.size shouldEqual 40
|
||||
chunk1.isLastForward.shouldBeTrue()
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@Test
|
||||
fun merge_shouldEventsBeLinked_whenMergingLinkedWithUnlinked() {
|
||||
monarchy.runTransactionSync { realm ->
|
||||
@ -192,4 +205,6 @@ internal class ChunkEntityTest : InstrumentedTest {
|
||||
chunk1.nextToken shouldEqual nextToken
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
}
|
||||
|
@ -33,6 +33,7 @@ import im.vector.matrix.android.internal.database.query.getOrCreate
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.extensions.assertIsManaged
|
||||
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
|
||||
import io.realm.Realm
|
||||
import io.realm.Sort
|
||||
|
||||
// By default if a chunk is empty we consider it unlinked
|
||||
@ -74,45 +75,20 @@ internal fun ChunkEntity.merge(roomId: String,
|
||||
val events = eventsToMerge.mapNotNull { it.root?.asDomain() }
|
||||
val eventIds = ArrayList<String>()
|
||||
events.forEach { event ->
|
||||
add(roomId, event, direction, isUnlinked = isUnlinked)
|
||||
add(realm, roomId, event, direction, isUnlinked = isUnlinked)
|
||||
if (event.eventId != null) {
|
||||
eventIds.add(event.eventId)
|
||||
}
|
||||
}
|
||||
updateSenderDataFor(eventIds)
|
||||
}
|
||||
|
||||
internal fun ChunkEntity.addAll(roomId: String,
|
||||
events: List<Event>,
|
||||
direction: PaginationDirection,
|
||||
stateIndexOffset: Int = 0,
|
||||
// Set to true for Event retrieved from a Permalink (i.e. not linked to live Chunk)
|
||||
isUnlinked: Boolean = false) {
|
||||
assertIsManaged()
|
||||
val eventIds = ArrayList<String>()
|
||||
events.forEach { event ->
|
||||
add(roomId, event, direction, stateIndexOffset, isUnlinked)
|
||||
if (event.eventId != null) {
|
||||
eventIds.add(event.eventId)
|
||||
}
|
||||
}
|
||||
updateSenderDataFor(eventIds)
|
||||
}
|
||||
|
||||
internal fun ChunkEntity.updateSenderDataFor(eventIds: List<String>) {
|
||||
for (eventId in eventIds) {
|
||||
val timelineEventEntity = timelineEvents.find(eventId) ?: continue
|
||||
timelineEventEntity.updateSenderData()
|
||||
}
|
||||
}
|
||||
|
||||
internal fun ChunkEntity.add(roomId: String,
|
||||
internal fun ChunkEntity.add(localRealm: Realm,
|
||||
roomId: String,
|
||||
event: Event,
|
||||
direction: PaginationDirection,
|
||||
stateIndexOffset: Int = 0,
|
||||
isUnlinked: Boolean = false) {
|
||||
assertIsManaged()
|
||||
if (event.eventId != null && timelineEvents.find(event.eventId) != null) {
|
||||
if (event.eventId != null) {
|
||||
return
|
||||
}
|
||||
var currentDisplayIndex = lastDisplayIndex(direction, 0)
|
||||
@ -134,22 +110,21 @@ internal fun ChunkEntity.add(roomId: String,
|
||||
backwardsStateIndex = currentStateIndex
|
||||
}
|
||||
}
|
||||
|
||||
val localId = TimelineEventEntity.nextId(realm)
|
||||
val localId = TimelineEventEntity.nextId(localRealm)
|
||||
val eventId = event.eventId ?: ""
|
||||
val senderId = event.senderId ?: ""
|
||||
|
||||
val readReceiptsSummaryEntity = ReadReceiptsSummaryEntity.where(realm, eventId).findFirst()
|
||||
val readReceiptsSummaryEntity = ReadReceiptsSummaryEntity.where(localRealm, eventId).findFirst()
|
||||
?: ReadReceiptsSummaryEntity(eventId, roomId)
|
||||
|
||||
// Update RR for the sender of a new message with a dummy one
|
||||
|
||||
if (event.originServerTs != null) {
|
||||
val timestampOfEvent = event.originServerTs.toDouble()
|
||||
val readReceiptOfSender = ReadReceiptEntity.getOrCreate(realm, roomId = roomId, userId = senderId)
|
||||
val readReceiptOfSender = ReadReceiptEntity.getOrCreate(localRealm, roomId = roomId, userId = senderId)
|
||||
// If the synced RR is older, update
|
||||
if (timestampOfEvent > readReceiptOfSender.originServerTs) {
|
||||
val previousReceiptsSummary = ReadReceiptsSummaryEntity.where(realm, eventId = readReceiptOfSender.eventId).findFirst()
|
||||
val previousReceiptsSummary = ReadReceiptsSummaryEntity.where(localRealm, eventId = readReceiptOfSender.eventId).findFirst()
|
||||
readReceiptOfSender.eventId = eventId
|
||||
readReceiptOfSender.originServerTs = timestampOfEvent
|
||||
previousReceiptsSummary?.readReceipts?.remove(readReceiptOfSender)
|
||||
@ -166,9 +141,9 @@ internal fun ChunkEntity.add(roomId: String,
|
||||
}
|
||||
it.eventId = eventId
|
||||
it.roomId = roomId
|
||||
it.annotations = EventAnnotationsSummaryEntity.where(realm, eventId).findFirst()
|
||||
it.annotations = EventAnnotationsSummaryEntity.where(localRealm, eventId).findFirst()
|
||||
it.readReceipts = readReceiptsSummaryEntity
|
||||
it.readMarker = ReadMarkerEntity.where(realm, roomId = roomId, eventId = eventId).findFirst()
|
||||
it.readMarker = ReadMarkerEntity.where(localRealm, roomId = roomId, eventId = eventId).findFirst()
|
||||
}
|
||||
val position = if (direction == PaginationDirection.FORWARDS) 0 else this.timelineEvents.size
|
||||
timelineEvents.add(position, eventEntity)
|
||||
|
@ -41,8 +41,7 @@ internal fun RoomEntity.addStateEvent(stateEvent: Event,
|
||||
stateIndex: Int = Int.MIN_VALUE,
|
||||
filterDuplicates: Boolean = false,
|
||||
isUnlinked: Boolean = false) {
|
||||
assertIsManaged()
|
||||
if (stateEvent.eventId == null || (filterDuplicates && fastContains(stateEvent.eventId))) {
|
||||
if (stateEvent.eventId == null || (filterDuplicates && untimelinedStateEvents.fastContains(stateEvent.eventId))) {
|
||||
return
|
||||
} else {
|
||||
val entity = stateEvent.toEntity(roomId).apply {
|
||||
@ -53,6 +52,7 @@ internal fun RoomEntity.addStateEvent(stateEvent: Event,
|
||||
untimelinedStateEvents.add(entity)
|
||||
}
|
||||
}
|
||||
|
||||
internal fun RoomEntity.addSendingEvent(event: Event) {
|
||||
assertIsManaged()
|
||||
val senderId = event.senderId ?: return
|
||||
|
@ -18,6 +18,7 @@ package im.vector.matrix.android.internal.extensions
|
||||
|
||||
import io.realm.RealmObject
|
||||
|
||||
internal fun RealmObject.assertIsManaged() {
|
||||
internal fun RealmObject.assertIsManaged(): Boolean {
|
||||
check(isManaged) { "${javaClass.simpleName} entity should be managed to use this function" }
|
||||
return true
|
||||
}
|
||||
|
@ -70,9 +70,6 @@ internal class DefaultLoadRoomMembersTask @Inject constructor(private val roomAP
|
||||
realm.insertOrUpdate(it)
|
||||
}
|
||||
}
|
||||
roomEntity.chunks.flatMap { it.timelineEvents }.forEach {
|
||||
it.updateSenderData()
|
||||
}
|
||||
roomEntity.areAllMembersLoaded = true
|
||||
roomSummaryUpdater.update(realm, roomId, updateMembers = true)
|
||||
}
|
||||
|
@ -94,12 +94,6 @@ internal class DefaultPruneEventTask @Inject constructor(private val monarchy: M
|
||||
// }
|
||||
}
|
||||
}
|
||||
if (eventToPrune.type == EventType.STATE_ROOM_MEMBER) {
|
||||
val timelineEventsToUpdate = TimelineEventEntity.findWithSenderMembershipEvent(realm, eventToPrune.eventId)
|
||||
for (timelineEvent in timelineEventsToUpdate) {
|
||||
timelineEvent.updateSenderData()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun computeAllowedKeys(type: String): List<String> {
|
||||
|
@ -149,7 +149,7 @@ internal class TokenChunkEventPersistor @Inject constructor(private val monarchy
|
||||
val eventIds = ArrayList<String>(receivedChunk.events.size)
|
||||
for (event in receivedChunk.events) {
|
||||
event.eventId?.also { eventIds.add(it) }
|
||||
currentChunk.add(roomId, event, direction, isUnlinked = currentChunk.isUnlinked())
|
||||
currentChunk.add(realm, roomId, event, direction, isUnlinked = currentChunk.isUnlinked())
|
||||
UserEntityFactory.createOrNull(event)?.also {
|
||||
realm.insertOrUpdate(it)
|
||||
}
|
||||
@ -175,7 +175,6 @@ internal class TokenChunkEventPersistor @Inject constructor(private val monarchy
|
||||
realm.insertOrUpdate(it)
|
||||
}
|
||||
}
|
||||
currentChunk.updateSenderDataFor(eventIds)
|
||||
}
|
||||
}
|
||||
return if (receivedChunk.events.isEmpty()) {
|
||||
|
@ -0,0 +1,114 @@
|
||||
/*
|
||||
* Copyright 2019 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package im.vector.matrix.android.internal.session.sync
|
||||
|
||||
import im.vector.matrix.android.api.session.events.model.Event
|
||||
import im.vector.matrix.android.internal.crypto.DefaultCryptoService
|
||||
import im.vector.matrix.android.internal.database.helper.add
|
||||
import im.vector.matrix.android.internal.database.helper.lastStateIndex
|
||||
import im.vector.matrix.android.internal.database.model.ChunkEntity
|
||||
import im.vector.matrix.android.internal.database.model.RoomEntity
|
||||
import im.vector.matrix.android.internal.database.query.find
|
||||
import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
|
||||
import im.vector.matrix.android.internal.session.user.UserEntityFactory
|
||||
import io.realm.Realm
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class ChunkEntityFactory @Inject constructor(private val cryptoService: DefaultCryptoService) {
|
||||
|
||||
fun create(realm: Realm,
|
||||
roomId: String,
|
||||
eventList: List<Event>,
|
||||
prevToken: String? = null,
|
||||
isLimited: Boolean = true,
|
||||
isInitialSync: Boolean): ChunkEntity {
|
||||
return if (isInitialSync) {
|
||||
initialSyncStrategy(realm, roomId, eventList, prevToken)
|
||||
} else {
|
||||
incrementalSyncStrategy(realm, roomId, eventList, prevToken, isLimited)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private fun initialSyncStrategy(realm: Realm,
|
||||
roomId: String,
|
||||
eventList: List<Event>,
|
||||
prevToken: String?): ChunkEntity {
|
||||
val chunkEntity = ChunkEntity().apply {
|
||||
this.prevToken = prevToken
|
||||
this.isLastForward = true
|
||||
}
|
||||
val eventIds = ArrayList<String>(eventList.size)
|
||||
for (event in eventList) {
|
||||
event.eventId?.also { eventIds.add(it) }
|
||||
chunkEntity.add(realm, roomId, event, PaginationDirection.FORWARDS)
|
||||
// Give info to crypto module
|
||||
cryptoService.onLiveEvent(roomId, event)
|
||||
UserEntityFactory.createOrNull(event)?.also {
|
||||
realm.insertOrUpdate(it)
|
||||
}
|
||||
}
|
||||
return chunkEntity
|
||||
}
|
||||
|
||||
private fun incrementalSyncStrategy(realm: Realm,
|
||||
roomId: String,
|
||||
eventList: List<Event>,
|
||||
prevToken: String? = null,
|
||||
isLimited: Boolean = true): ChunkEntity {
|
||||
val lastChunk = ChunkEntity.findLastLiveChunkFromRoom(realm, roomId)
|
||||
?: throw IllegalStateException("You should already have a live chunk at this point")
|
||||
lastChunk.isLastForward = false
|
||||
var stateIndexOffset = 0
|
||||
val chunkEntity = if (isLimited) {
|
||||
stateIndexOffset = lastChunk.lastStateIndex(PaginationDirection.FORWARDS)
|
||||
ChunkEntity().apply {
|
||||
this.prevToken = prevToken
|
||||
this.isLastForward = true
|
||||
}
|
||||
} else {
|
||||
lastChunk
|
||||
}
|
||||
val eventIds = ArrayList<String>(eventList.size)
|
||||
for (event in eventList) {
|
||||
event.eventId?.also { eventIds.add(it) }
|
||||
chunkEntity.add(realm, roomId, event, PaginationDirection.FORWARDS, stateIndexOffset)
|
||||
// Give info to crypto module
|
||||
cryptoService.onLiveEvent(roomId, event)
|
||||
// Try to remove local echo
|
||||
event.unsignedData?.transactionId?.also {
|
||||
val roomEntity = RoomEntity.where(realm, roomId).findFirst()
|
||||
val sendingEventEntity = roomEntity?.sendingTimelineEvents?.find(it)
|
||||
if (sendingEventEntity != null) {
|
||||
Timber.v("Remove local echo for tx:$it")
|
||||
roomEntity.sendingTimelineEvents.remove(sendingEventEntity)
|
||||
} else {
|
||||
Timber.v("Can't find corresponding local echo for tx:$it")
|
||||
}
|
||||
}
|
||||
UserEntityFactory.createOrNull(event)?.also {
|
||||
realm.insertOrUpdate(it)
|
||||
}
|
||||
}
|
||||
return chunkEntity
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,173 @@
|
||||
/*
|
||||
* Copyright 2019 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package im.vector.matrix.android.internal.session.sync
|
||||
|
||||
import im.vector.matrix.android.api.session.events.model.EventType
|
||||
import im.vector.matrix.android.api.session.events.model.toModel
|
||||
import im.vector.matrix.android.api.session.room.model.Membership
|
||||
import im.vector.matrix.android.api.session.room.model.tag.RoomTagContent
|
||||
import im.vector.matrix.android.internal.crypto.DefaultCryptoService
|
||||
import im.vector.matrix.android.internal.database.helper.addOrUpdate
|
||||
import im.vector.matrix.android.internal.database.helper.addStateEvent
|
||||
import im.vector.matrix.android.internal.database.model.EventEntityFields
|
||||
import im.vector.matrix.android.internal.database.model.RoomEntity
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.session.room.RoomSummaryUpdater
|
||||
import im.vector.matrix.android.internal.session.room.read.FullyReadContent
|
||||
import im.vector.matrix.android.internal.session.sync.model.RoomSync
|
||||
import im.vector.matrix.android.internal.session.sync.model.RoomSyncAccountData
|
||||
import im.vector.matrix.android.internal.session.sync.model.RoomSyncEphemeral
|
||||
import im.vector.matrix.android.internal.session.user.UserEntityFactory
|
||||
import io.realm.Realm
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class RoomEntityFactory @Inject constructor(private val cryptoService: DefaultCryptoService,
|
||||
private val readReceiptHandler: ReadReceiptHandler,
|
||||
private val roomTagHandler: RoomTagHandler,
|
||||
private val roomFullyReadHandler: RoomFullyReadHandler,
|
||||
private val chunkEntityFactory: ChunkEntityFactory,
|
||||
private val roomSummaryUpdater: RoomSummaryUpdater) {
|
||||
|
||||
fun create(realm: Realm,
|
||||
roomId: String,
|
||||
roomSync: RoomSync,
|
||||
membership: Membership,
|
||||
isInitialSync: Boolean): RoomEntity {
|
||||
return if (isInitialSync) {
|
||||
initialSyncStrategy(realm, roomId, roomSync, membership)
|
||||
} else {
|
||||
incrementalSyncStrategy(realm, roomId, roomSync, membership)
|
||||
}
|
||||
}
|
||||
|
||||
private fun initialSyncStrategy(realm: Realm, roomId: String, roomSync: RoomSync, membership: Membership): RoomEntity {
|
||||
if (roomSync.ephemeral != null && roomSync.ephemeral.events.isNotEmpty()) {
|
||||
handleEphemeral(realm, roomId, roomSync.ephemeral, isInitialSync = true)
|
||||
}
|
||||
if (roomSync.accountData != null && roomSync.accountData.events.isNullOrEmpty().not()) {
|
||||
handleRoomAccountDataEvents(realm, roomId, roomSync.accountData)
|
||||
}
|
||||
val roomEntity = RoomEntity(roomId)
|
||||
roomEntity.membership = membership
|
||||
|
||||
// State events
|
||||
if (roomSync.state != null && roomSync.state.events.isNotEmpty()) {
|
||||
roomSync.state.events.forEach { event ->
|
||||
roomEntity.addStateEvent(event, filterDuplicates = false, stateIndex = Int.MIN_VALUE)
|
||||
// Give info to crypto module
|
||||
cryptoService.onStateEvent(roomId, event)
|
||||
UserEntityFactory.createOrNull(event)?.also {
|
||||
realm.insertOrUpdate(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Timeline events
|
||||
if (roomSync.timeline != null && roomSync.timeline.events.isNotEmpty()) {
|
||||
val chunkEntity = chunkEntityFactory.create(
|
||||
realm,
|
||||
roomId,
|
||||
roomSync.timeline.events,
|
||||
roomSync.timeline.prevToken,
|
||||
roomSync.timeline.limited,
|
||||
isInitialSync = true
|
||||
)
|
||||
roomEntity.chunks.add(chunkEntity)
|
||||
}
|
||||
roomSummaryUpdater.update(realm, roomId, Membership.JOIN, roomSync.summary, roomSync.unreadNotifications, updateMembers = true)
|
||||
return roomEntity
|
||||
}
|
||||
|
||||
private fun incrementalSyncStrategy(realm: Realm, roomId: String, roomSync: RoomSync, membership: Membership): RoomEntity {
|
||||
if (roomSync.ephemeral != null && roomSync.ephemeral.events.isNotEmpty()) {
|
||||
handleEphemeral(realm, roomId, roomSync.ephemeral, false)
|
||||
}
|
||||
|
||||
if (roomSync.accountData != null && roomSync.accountData.events.isNullOrEmpty().not()) {
|
||||
handleRoomAccountDataEvents(realm, roomId, roomSync.accountData)
|
||||
}
|
||||
|
||||
val roomEntity = RoomEntity.where(realm, roomId).findFirst()
|
||||
?: throw IllegalStateException("You should have a room at this point")
|
||||
|
||||
if (roomEntity.membership == Membership.INVITE) {
|
||||
roomEntity.chunks.deleteAllFromRealm()
|
||||
}
|
||||
roomEntity.membership = membership
|
||||
// State event
|
||||
|
||||
if (roomSync.state != null && roomSync.state.events.isNotEmpty()) {
|
||||
val minStateIndex = roomEntity.untimelinedStateEvents.where().min(EventEntityFields.STATE_INDEX)?.toInt()
|
||||
?: Int.MIN_VALUE
|
||||
val untimelinedStateIndex = minStateIndex + 1
|
||||
roomSync.state.events.forEach { event ->
|
||||
roomEntity.addStateEvent(event, filterDuplicates = true, stateIndex = untimelinedStateIndex)
|
||||
// Give info to crypto module
|
||||
cryptoService.onStateEvent(roomId, event)
|
||||
UserEntityFactory.createOrNull(event)?.also {
|
||||
realm.insertOrUpdate(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (roomSync.timeline != null && roomSync.timeline.events.isNotEmpty()) {
|
||||
val chunkEntity = chunkEntityFactory.create(
|
||||
realm,
|
||||
roomId,
|
||||
roomSync.timeline.events,
|
||||
roomSync.timeline.prevToken,
|
||||
roomSync.timeline.limited,
|
||||
false
|
||||
)
|
||||
roomEntity.addOrUpdate(chunkEntity)
|
||||
}
|
||||
val hasRoomMember = roomSync.state?.events?.firstOrNull {
|
||||
it.type == EventType.STATE_ROOM_MEMBER
|
||||
} != null || roomSync.timeline?.events?.firstOrNull {
|
||||
it.type == EventType.STATE_ROOM_MEMBER
|
||||
} != null
|
||||
|
||||
roomSummaryUpdater.update(realm, roomId, Membership.JOIN, roomSync.summary, roomSync.unreadNotifications, updateMembers = hasRoomMember)
|
||||
return roomEntity
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
private fun handleEphemeral(realm: Realm,
|
||||
roomId: String,
|
||||
ephemeral: RoomSyncEphemeral,
|
||||
isInitialSync: Boolean) {
|
||||
for (event in ephemeral.events) {
|
||||
if (event.type != EventType.RECEIPT) continue
|
||||
val readReceiptContent = event.content as? ReadReceiptContent ?: continue
|
||||
readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync)
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleRoomAccountDataEvents(realm: Realm, roomId: String, accountData: RoomSyncAccountData) {
|
||||
for (event in accountData.events) {
|
||||
val eventType = event.getClearType()
|
||||
if (eventType == EventType.TAG) {
|
||||
val content = event.getClearContent().toModel<RoomTagContent>()
|
||||
roomTagHandler.handle(realm, roomId, content)
|
||||
} else if (eventType == EventType.FULLY_READ) {
|
||||
val content = event.getClearContent().toModel<FullyReadContent>()
|
||||
roomFullyReadHandler.handle(realm, roomId, content)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -55,6 +55,8 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
||||
private val roomTagHandler: RoomTagHandler,
|
||||
private val roomFullyReadHandler: RoomFullyReadHandler,
|
||||
private val cryptoService: DefaultCryptoService,
|
||||
private val roomEntityFactory: RoomEntityFactory,
|
||||
private val chunkEntityFactory: ChunkEntityFactory,
|
||||
private val tokenStore: SyncTokenStore,
|
||||
private val pushRuleService: DefaultPushRuleService,
|
||||
private val processForPushTask: ProcessEventForPushTask,
|
||||
@ -100,7 +102,7 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
||||
}
|
||||
is HandlingStrategy.INVITED ->
|
||||
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_invited_rooms, 0.1f) {
|
||||
handleInvitedRoom(realm, it.key, it.value)
|
||||
handleInvitedRoom(realm, it.key, it.value, isInitialSync)
|
||||
}
|
||||
|
||||
is HandlingStrategy.LEFT -> {
|
||||
@ -117,65 +119,18 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
||||
roomSync: RoomSync,
|
||||
isInitialSync: Boolean): RoomEntity {
|
||||
Timber.v("Handle join sync for room $roomId")
|
||||
|
||||
if (roomSync.ephemeral != null && roomSync.ephemeral.events.isNotEmpty()) {
|
||||
handleEphemeral(realm, roomId, roomSync.ephemeral, isInitialSync)
|
||||
}
|
||||
|
||||
if (roomSync.accountData != null && roomSync.accountData.events.isNullOrEmpty().not()) {
|
||||
handleRoomAccountDataEvents(realm, roomId, roomSync.accountData)
|
||||
}
|
||||
|
||||
val roomEntity = RoomEntity.where(realm, roomId).findFirst() ?: realm.createObject(roomId)
|
||||
|
||||
if (roomEntity.membership == Membership.INVITE) {
|
||||
roomEntity.chunks.deleteAllFromRealm()
|
||||
}
|
||||
roomEntity.membership = Membership.JOIN
|
||||
|
||||
// State event
|
||||
|
||||
if (roomSync.state != null && roomSync.state.events.isNotEmpty()) {
|
||||
val minStateIndex = roomEntity.untimelinedStateEvents.where().min(EventEntityFields.STATE_INDEX)?.toInt()
|
||||
?: Int.MIN_VALUE
|
||||
val untimelinedStateIndex = minStateIndex + 1
|
||||
roomSync.state.events.forEach { event ->
|
||||
roomEntity.addStateEvent(event, filterDuplicates = true, stateIndex = untimelinedStateIndex)
|
||||
// Give info to crypto module
|
||||
cryptoService.onStateEvent(roomId, event)
|
||||
UserEntityFactory.createOrNull(event)?.also {
|
||||
realm.insertOrUpdate(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (roomSync.timeline != null && roomSync.timeline.events.isNotEmpty()) {
|
||||
val chunkEntity = handleTimelineEvents(
|
||||
realm,
|
||||
roomEntity,
|
||||
roomSync.timeline.events,
|
||||
roomSync.timeline.prevToken,
|
||||
roomSync.timeline.limited
|
||||
)
|
||||
roomEntity.addOrUpdate(chunkEntity)
|
||||
}
|
||||
val hasRoomMember = roomSync.state?.events?.firstOrNull {
|
||||
it.type == EventType.STATE_ROOM_MEMBER
|
||||
} != null || roomSync.timeline?.events?.firstOrNull {
|
||||
it.type == EventType.STATE_ROOM_MEMBER
|
||||
} != null
|
||||
|
||||
roomSummaryUpdater.update(realm, roomId, Membership.JOIN, roomSync.summary, roomSync.unreadNotifications, updateMembers = hasRoomMember)
|
||||
return roomEntity
|
||||
return roomEntityFactory.create(realm, roomId, roomSync, Membership.JOIN, isInitialSync)
|
||||
}
|
||||
|
||||
private fun handleInvitedRoom(realm: Realm,
|
||||
roomId: String,
|
||||
roomSync: InvitedRoomSync): RoomEntity {
|
||||
roomSync: InvitedRoomSync,
|
||||
isInitialSync: Boolean): RoomEntity {
|
||||
Timber.v("Handle invited sync for room $roomId")
|
||||
val roomEntity = RoomEntity.where(realm, roomId).findFirst() ?: realm.createObject(roomId)
|
||||
roomEntity.membership = Membership.INVITE
|
||||
if (roomSync.inviteState != null && roomSync.inviteState.events.isNotEmpty()) {
|
||||
val chunkEntity = handleTimelineEvents(realm, roomEntity, roomSync.inviteState.events)
|
||||
val chunkEntity = chunkEntityFactory.create(realm, roomId, roomSync.inviteState.events, isInitialSync = isInitialSync)
|
||||
roomEntity.addOrUpdate(chunkEntity)
|
||||
}
|
||||
val hasRoomMember = roomSync.inviteState?.events?.firstOrNull {
|
||||
@ -196,70 +151,4 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
||||
return roomEntity
|
||||
}
|
||||
|
||||
private fun handleTimelineEvents(realm: Realm,
|
||||
roomEntity: RoomEntity,
|
||||
eventList: List<Event>,
|
||||
prevToken: String? = null,
|
||||
isLimited: Boolean = true): ChunkEntity {
|
||||
val lastChunk = ChunkEntity.findLastLiveChunkFromRoom(realm, roomEntity.roomId)
|
||||
var stateIndexOffset = 0
|
||||
val chunkEntity = if (!isLimited && lastChunk != null) {
|
||||
lastChunk
|
||||
} else {
|
||||
realm.createObject<ChunkEntity>().apply { this.prevToken = prevToken }
|
||||
}
|
||||
if (isLimited && lastChunk != null) {
|
||||
stateIndexOffset = lastChunk.lastStateIndex(PaginationDirection.FORWARDS)
|
||||
}
|
||||
lastChunk?.isLastForward = false
|
||||
chunkEntity.isLastForward = true
|
||||
|
||||
val eventIds = ArrayList<String>(eventList.size)
|
||||
for (event in eventList) {
|
||||
event.eventId?.also { eventIds.add(it) }
|
||||
chunkEntity.add(roomEntity.roomId, event, PaginationDirection.FORWARDS, stateIndexOffset)
|
||||
// Give info to crypto module
|
||||
cryptoService.onLiveEvent(roomEntity.roomId, event)
|
||||
// Try to remove local echo
|
||||
event.unsignedData?.transactionId?.also {
|
||||
val sendingEventEntity = roomEntity.sendingTimelineEvents.find(it)
|
||||
if (sendingEventEntity != null) {
|
||||
Timber.v("Remove local echo for tx:$it")
|
||||
roomEntity.sendingTimelineEvents.remove(sendingEventEntity)
|
||||
} else {
|
||||
Timber.v("Can't find corresponding local echo for tx:$it")
|
||||
}
|
||||
}
|
||||
UserEntityFactory.createOrNull(event)?.also {
|
||||
realm.insertOrUpdate(it)
|
||||
}
|
||||
}
|
||||
chunkEntity.updateSenderDataFor(eventIds)
|
||||
return chunkEntity
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
private fun handleEphemeral(realm: Realm,
|
||||
roomId: String,
|
||||
ephemeral: RoomSyncEphemeral,
|
||||
isInitialSync: Boolean) {
|
||||
for (event in ephemeral.events) {
|
||||
if (event.type != EventType.RECEIPT) continue
|
||||
val readReceiptContent = event.content as? ReadReceiptContent ?: continue
|
||||
readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync)
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleRoomAccountDataEvents(realm: Realm, roomId: String, accountData: RoomSyncAccountData) {
|
||||
for (event in accountData.events) {
|
||||
val eventType = event.getClearType()
|
||||
if (eventType == EventType.TAG) {
|
||||
val content = event.getClearContent().toModel<RoomTagContent>()
|
||||
roomTagHandler.handle(realm, roomId, content)
|
||||
} else if (eventType == EventType.FULLY_READ) {
|
||||
val content = event.getClearContent().toModel<FullyReadContent>()
|
||||
roomFullyReadHandler.handle(realm, roomId, content)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import im.vector.matrix.android.api.failure.MatrixError
|
||||
import im.vector.matrix.android.api.util.Cancelable
|
||||
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
|
||||
import im.vector.matrix.android.internal.session.sync.SyncTask
|
||||
import im.vector.matrix.android.internal.task.TaskConstraints
|
||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||
import im.vector.matrix.android.internal.task.TaskThread
|
||||
import im.vector.matrix.android.internal.task.configureWith
|
||||
@ -108,6 +109,7 @@ open class SyncService : Service() {
|
||||
.configureWith(params) {
|
||||
callbackThread = TaskThread.SYNC
|
||||
executionThread = TaskThread.SYNC
|
||||
constraints = TaskConstraints(connectedToNetwork = true)
|
||||
callback = object : MatrixCallback<Unit> {
|
||||
override fun onSuccess(data: Unit) {
|
||||
cancelableTask = null
|
||||
|
Loading…
x
Reference in New Issue
Block a user