Merge pull request #6917 from vector-im/feature/bma/incr_sync_perf
Feature/bma/incr sync perf
This commit is contained in:
commit
456d831a7d
|
@ -0,0 +1 @@
|
||||||
|
Fix long incremental sync.
|
|
@ -779,6 +779,11 @@ internal class DefaultCrossSigningService @Inject constructor(
|
||||||
|
|
||||||
override fun onUsersDeviceUpdate(userIds: List<String>) {
|
override fun onUsersDeviceUpdate(userIds: List<String>) {
|
||||||
Timber.d("## CrossSigning - onUsersDeviceUpdate for users: ${userIds.logLimit()}")
|
Timber.d("## CrossSigning - onUsersDeviceUpdate for users: ${userIds.logLimit()}")
|
||||||
|
checkTrustAndAffectedRoomShields(userIds)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun checkTrustAndAffectedRoomShields(userIds: List<String>) {
|
||||||
|
Timber.d("## CrossSigning - checkTrustAndAffectedRoomShields for users: ${userIds.logLimit()}")
|
||||||
val workerParams = UpdateTrustWorker.Params(
|
val workerParams = UpdateTrustWorker.Params(
|
||||||
sessionId = sessionId,
|
sessionId = sessionId,
|
||||||
filename = updateTrustWorkerDataRepository.createParam(userIds)
|
filename = updateTrustWorkerDataRepository.createParam(userIds)
|
||||||
|
|
|
@ -207,6 +207,7 @@ internal class UpdateTrustWorker(context: Context, params: WorkerParameters, ses
|
||||||
private suspend fun updateTrustStep2(userList: List<String>, myCrossSigningInfo: MXCrossSigningInfo?) {
|
private suspend fun updateTrustStep2(userList: List<String>, myCrossSigningInfo: MXCrossSigningInfo?) {
|
||||||
Timber.d("## CrossSigning - Updating shields for impacted rooms...")
|
Timber.d("## CrossSigning - Updating shields for impacted rooms...")
|
||||||
awaitTransaction(sessionRealmConfiguration) { sessionRealm ->
|
awaitTransaction(sessionRealmConfiguration) { sessionRealm ->
|
||||||
|
Timber.d("## CrossSigning - Updating shields for impacted rooms - in transaction")
|
||||||
Realm.getInstance(cryptoRealmConfiguration).use { cryptoRealm ->
|
Realm.getInstance(cryptoRealmConfiguration).use { cryptoRealm ->
|
||||||
sessionRealm.where(RoomMemberSummaryEntity::class.java)
|
sessionRealm.where(RoomMemberSummaryEntity::class.java)
|
||||||
.`in`(RoomMemberSummaryEntityFields.USER_ID, userList.toTypedArray())
|
.`in`(RoomMemberSummaryEntityFields.USER_ID, userList.toTypedArray())
|
||||||
|
@ -239,6 +240,7 @@ internal class UpdateTrustWorker(context: Context, params: WorkerParameters, ses
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Timber.d("## CrossSigning - Updating shields for impacted rooms - END")
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun getCrossSigningInfo(cryptoRealm: Realm, userId: String): MXCrossSigningInfo? {
|
private fun getCrossSigningInfo(cryptoRealm: Realm, userId: String): MXCrossSigningInfo? {
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.matrix.android.sdk.internal.session.space.SpaceModule
|
||||||
import org.matrix.android.sdk.internal.session.sync.SyncModule
|
import org.matrix.android.sdk.internal.session.sync.SyncModule
|
||||||
import org.matrix.android.sdk.internal.session.sync.SyncTask
|
import org.matrix.android.sdk.internal.session.sync.SyncTask
|
||||||
import org.matrix.android.sdk.internal.session.sync.SyncTokenStore
|
import org.matrix.android.sdk.internal.session.sync.SyncTokenStore
|
||||||
|
import org.matrix.android.sdk.internal.session.sync.handler.UpdateUserWorker
|
||||||
import org.matrix.android.sdk.internal.session.sync.job.SyncWorker
|
import org.matrix.android.sdk.internal.session.sync.job.SyncWorker
|
||||||
import org.matrix.android.sdk.internal.session.terms.TermsModule
|
import org.matrix.android.sdk.internal.session.terms.TermsModule
|
||||||
import org.matrix.android.sdk.internal.session.thirdparty.ThirdPartyModule
|
import org.matrix.android.sdk.internal.session.thirdparty.ThirdPartyModule
|
||||||
|
@ -128,6 +129,8 @@ internal interface SessionComponent {
|
||||||
|
|
||||||
fun inject(worker: UpdateTrustWorker)
|
fun inject(worker: UpdateTrustWorker)
|
||||||
|
|
||||||
|
fun inject(worker: UpdateUserWorker)
|
||||||
|
|
||||||
fun inject(worker: DeactivateLiveLocationShareWorker)
|
fun inject(worker: DeactivateLiveLocationShareWorker)
|
||||||
|
|
||||||
@Component.Factory
|
@Component.Factory
|
||||||
|
|
|
@ -140,7 +140,8 @@ internal class RoomMemberEventHandler @Inject constructor(
|
||||||
val previousDisplayName = prevContent?.get("displayname") as? String
|
val previousDisplayName = prevContent?.get("displayname") as? String
|
||||||
val previousAvatar = prevContent?.get("avatar_url") as? String
|
val previousAvatar = prevContent?.get("avatar_url") as? String
|
||||||
|
|
||||||
if (previousDisplayName != roomMember.displayName || previousAvatar != roomMember.avatarUrl) {
|
if ((previousDisplayName != null && previousDisplayName != roomMember.displayName) ||
|
||||||
|
(previousAvatar != null && previousAvatar != roomMember.avatarUrl)) {
|
||||||
aggregator.userIdsToFetch.add(eventUserId)
|
aggregator.userIdsToFetch.add(eventUserId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.matrix.android.sdk.internal.session.room.accountdata.RoomAccountDataD
|
||||||
import org.matrix.android.sdk.internal.session.room.membership.RoomDisplayNameResolver
|
import org.matrix.android.sdk.internal.session.room.membership.RoomDisplayNameResolver
|
||||||
import org.matrix.android.sdk.internal.session.room.membership.RoomMemberHelper
|
import org.matrix.android.sdk.internal.session.room.membership.RoomMemberHelper
|
||||||
import org.matrix.android.sdk.internal.session.room.relationship.RoomChildRelationInfo
|
import org.matrix.android.sdk.internal.session.room.relationship.RoomChildRelationInfo
|
||||||
|
import org.matrix.android.sdk.internal.session.sync.SyncResponsePostTreatmentAggregator
|
||||||
import timber.log.Timber
|
import timber.log.Timber
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
import kotlin.system.measureTimeMillis
|
import kotlin.system.measureTimeMillis
|
||||||
|
@ -91,7 +92,8 @@ internal class RoomSummaryUpdater @Inject constructor(
|
||||||
roomSummary: RoomSyncSummary? = null,
|
roomSummary: RoomSyncSummary? = null,
|
||||||
unreadNotifications: RoomSyncUnreadNotifications? = null,
|
unreadNotifications: RoomSyncUnreadNotifications? = null,
|
||||||
updateMembers: Boolean = false,
|
updateMembers: Boolean = false,
|
||||||
inviterId: String? = null
|
inviterId: String? = null,
|
||||||
|
aggregator: SyncResponsePostTreatmentAggregator? = null
|
||||||
) {
|
) {
|
||||||
val roomSummaryEntity = RoomSummaryEntity.getOrCreate(realm, roomId)
|
val roomSummaryEntity = RoomSummaryEntity.getOrCreate(realm, roomId)
|
||||||
if (roomSummary != null) {
|
if (roomSummary != null) {
|
||||||
|
@ -180,8 +182,14 @@ internal class RoomSummaryUpdater @Inject constructor(
|
||||||
roomSummaryEntity.otherMemberIds.clear()
|
roomSummaryEntity.otherMemberIds.clear()
|
||||||
roomSummaryEntity.otherMemberIds.addAll(otherRoomMembers)
|
roomSummaryEntity.otherMemberIds.addAll(otherRoomMembers)
|
||||||
if (roomSummaryEntity.isEncrypted && otherRoomMembers.isNotEmpty()) {
|
if (roomSummaryEntity.isEncrypted && otherRoomMembers.isNotEmpty()) {
|
||||||
// mmm maybe we could only refresh shield instead of checking trust also?
|
if (aggregator == null) {
|
||||||
crossSigningService.onUsersDeviceUpdate(otherRoomMembers)
|
// Do it now
|
||||||
|
// mmm maybe we could only refresh shield instead of checking trust also?
|
||||||
|
crossSigningService.checkTrustAndAffectedRoomShields(otherRoomMembers)
|
||||||
|
} else {
|
||||||
|
// Schedule it
|
||||||
|
aggregator.userIdsForCheckingTrustAndAffectedRoomShields.addAll(otherRoomMembers)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,21 +126,33 @@ internal class SyncResponseHandler @Inject constructor(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Everything else we need to do outside the transaction
|
// Everything else we need to do outside the transaction
|
||||||
aggregatorHandler.handle(aggregator)
|
measureTimeMillis {
|
||||||
|
aggregatorHandler.handle(aggregator)
|
||||||
syncResponse.rooms?.let {
|
}.also {
|
||||||
checkPushRules(it, isInitialSync)
|
Timber.v("Aggregator management took $it ms")
|
||||||
userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite)
|
|
||||||
dispatchInvitedRoom(it)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Timber.v("On sync completed")
|
measureTimeMillis {
|
||||||
cryptoSyncHandler.onSyncCompleted(syncResponse)
|
syncResponse.rooms?.let {
|
||||||
|
checkPushRules(it, isInitialSync)
|
||||||
|
userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite)
|
||||||
|
dispatchInvitedRoom(it)
|
||||||
|
}
|
||||||
|
}.also {
|
||||||
|
Timber.v("SyncResponse.rooms post treatment took $it ms")
|
||||||
|
}
|
||||||
|
|
||||||
|
measureTimeMillis {
|
||||||
|
cryptoSyncHandler.onSyncCompleted(syncResponse)
|
||||||
|
}.also {
|
||||||
|
Timber.v("cryptoSyncHandler.onSyncCompleted took $it ms")
|
||||||
|
}
|
||||||
|
|
||||||
// post sync stuffs
|
// post sync stuffs
|
||||||
monarchy.writeAsync {
|
monarchy.writeAsync {
|
||||||
roomSyncHandler.postSyncSpaceHierarchyHandle(it)
|
roomSyncHandler.postSyncSpaceHierarchyHandle(it)
|
||||||
}
|
}
|
||||||
|
Timber.v("On sync completed")
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun dispatchInvitedRoom(roomsSyncResponse: RoomsSyncResponse) {
|
private fun dispatchInvitedRoom(roomsSyncResponse: RoomsSyncResponse) {
|
||||||
|
|
|
@ -23,6 +23,9 @@ internal class SyncResponsePostTreatmentAggregator {
|
||||||
// Map of roomId to directUserId
|
// Map of roomId to directUserId
|
||||||
val directChatsToCheck = mutableMapOf<String, String>()
|
val directChatsToCheck = mutableMapOf<String, String>()
|
||||||
|
|
||||||
// List of userIds to fetch and update at the end of incremental syncs
|
// Set of userIds to fetch and update at the end of incremental syncs
|
||||||
val userIdsToFetch = mutableListOf<String>()
|
val userIdsToFetch = mutableSetOf<String>()
|
||||||
|
|
||||||
|
// Set of users to call `crossSigningService.checkTrustAndAffectedRoomShields` once per sync
|
||||||
|
val userIdsForCheckingTrustAndAffectedRoomShields = mutableSetOf<String>()
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,32 +16,39 @@
|
||||||
|
|
||||||
package org.matrix.android.sdk.internal.session.sync.handler
|
package org.matrix.android.sdk.internal.session.sync.handler
|
||||||
|
|
||||||
import com.zhuinden.monarchy.Monarchy
|
import androidx.work.BackoffPolicy
|
||||||
|
import androidx.work.ExistingWorkPolicy
|
||||||
import org.matrix.android.sdk.api.MatrixPatterns
|
import org.matrix.android.sdk.api.MatrixPatterns
|
||||||
import org.matrix.android.sdk.api.extensions.tryOrNull
|
import org.matrix.android.sdk.internal.crypto.crosssigning.DefaultCrossSigningService
|
||||||
import org.matrix.android.sdk.api.session.user.model.User
|
import org.matrix.android.sdk.internal.crypto.crosssigning.UpdateTrustWorker
|
||||||
import org.matrix.android.sdk.internal.di.SessionDatabase
|
import org.matrix.android.sdk.internal.crypto.crosssigning.UpdateTrustWorkerDataRepository
|
||||||
import org.matrix.android.sdk.internal.session.profile.GetProfileInfoTask
|
import org.matrix.android.sdk.internal.di.SessionId
|
||||||
|
import org.matrix.android.sdk.internal.di.WorkManagerProvider
|
||||||
import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore
|
import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore
|
||||||
import org.matrix.android.sdk.internal.session.sync.SyncResponsePostTreatmentAggregator
|
import org.matrix.android.sdk.internal.session.sync.SyncResponsePostTreatmentAggregator
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.accountdata.toMutable
|
import org.matrix.android.sdk.internal.session.sync.model.accountdata.toMutable
|
||||||
import org.matrix.android.sdk.internal.session.user.UserEntityFactory
|
|
||||||
import org.matrix.android.sdk.internal.session.user.accountdata.DirectChatsHelper
|
import org.matrix.android.sdk.internal.session.user.accountdata.DirectChatsHelper
|
||||||
import org.matrix.android.sdk.internal.session.user.accountdata.UpdateUserAccountDataTask
|
import org.matrix.android.sdk.internal.session.user.accountdata.UpdateUserAccountDataTask
|
||||||
import org.matrix.android.sdk.internal.util.awaitTransaction
|
import org.matrix.android.sdk.internal.util.logLimit
|
||||||
|
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
|
||||||
|
import timber.log.Timber
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
|
||||||
internal class SyncResponsePostTreatmentAggregatorHandler @Inject constructor(
|
internal class SyncResponsePostTreatmentAggregatorHandler @Inject constructor(
|
||||||
private val directChatsHelper: DirectChatsHelper,
|
private val directChatsHelper: DirectChatsHelper,
|
||||||
private val ephemeralTemporaryStore: RoomSyncEphemeralTemporaryStore,
|
private val ephemeralTemporaryStore: RoomSyncEphemeralTemporaryStore,
|
||||||
private val updateUserAccountDataTask: UpdateUserAccountDataTask,
|
private val updateUserAccountDataTask: UpdateUserAccountDataTask,
|
||||||
private val getProfileInfoTask: GetProfileInfoTask,
|
private val crossSigningService: DefaultCrossSigningService,
|
||||||
@SessionDatabase private val monarchy: Monarchy,
|
private val updateTrustWorkerDataRepository: UpdateTrustWorkerDataRepository,
|
||||||
|
private val workManagerProvider: WorkManagerProvider,
|
||||||
|
@SessionId private val sessionId: String,
|
||||||
) {
|
) {
|
||||||
suspend fun handle(aggregator: SyncResponsePostTreatmentAggregator) {
|
suspend fun handle(aggregator: SyncResponsePostTreatmentAggregator) {
|
||||||
cleanupEphemeralFiles(aggregator.ephemeralFilesToDelete)
|
cleanupEphemeralFiles(aggregator.ephemeralFilesToDelete)
|
||||||
updateDirectUserIds(aggregator.directChatsToCheck)
|
updateDirectUserIds(aggregator.directChatsToCheck)
|
||||||
fetchAndUpdateUsers(aggregator.userIdsToFetch)
|
fetchAndUpdateUsers(aggregator.userIdsToFetch)
|
||||||
|
handleUserIdsForCheckingTrustAndAffectedRoomShields(aggregator.userIdsForCheckingTrustAndAffectedRoomShields)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun cleanupEphemeralFiles(ephemeralFilesToDelete: List<String>) {
|
private fun cleanupEphemeralFiles(ephemeralFilesToDelete: List<String>) {
|
||||||
|
@ -79,23 +86,26 @@ internal class SyncResponsePostTreatmentAggregatorHandler @Inject constructor(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun fetchAndUpdateUsers(userIdsToFetch: List<String>) {
|
private fun fetchAndUpdateUsers(userIdsToFetch: Collection<String>) {
|
||||||
fetchUsers(userIdsToFetch)
|
if (userIdsToFetch.isEmpty()) return
|
||||||
.takeIf { it.isNotEmpty() }
|
Timber.d("## Configure Worker to update users: ${userIdsToFetch.logLimit()}")
|
||||||
?.saveLocally()
|
val workerParams = UpdateTrustWorker.Params(
|
||||||
|
sessionId = sessionId,
|
||||||
|
filename = updateTrustWorkerDataRepository.createParam(userIdsToFetch.toList())
|
||||||
|
)
|
||||||
|
val workerData = WorkerParamsFactory.toData(workerParams)
|
||||||
|
|
||||||
|
val workRequest = workManagerProvider.matrixOneTimeWorkRequestBuilder<UpdateUserWorker>()
|
||||||
|
.setInputData(workerData)
|
||||||
|
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY_MILLIS, TimeUnit.MILLISECONDS)
|
||||||
|
.build()
|
||||||
|
|
||||||
|
workManagerProvider.workManager
|
||||||
|
.beginUniqueWork("USER_UPDATE_QUEUE", ExistingWorkPolicy.APPEND_OR_REPLACE, workRequest)
|
||||||
|
.enqueue()
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun fetchUsers(userIdsToFetch: List<String>) = userIdsToFetch.mapNotNull {
|
private fun handleUserIdsForCheckingTrustAndAffectedRoomShields(userIdsWithDeviceUpdate: Iterable<String>) {
|
||||||
tryOrNull {
|
crossSigningService.checkTrustAndAffectedRoomShields(userIdsWithDeviceUpdate.toList())
|
||||||
val profileJson = getProfileInfoTask.execute(GetProfileInfoTask.Params(it))
|
|
||||||
User.fromJson(it, profileJson)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private suspend fun List<User>.saveLocally() {
|
|
||||||
val userEntities = map { user -> UserEntityFactory.create(user) }
|
|
||||||
monarchy.awaitTransaction {
|
|
||||||
it.insertOrUpdate(userEntities)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.matrix.android.sdk.internal.session.sync.handler
|
||||||
|
|
||||||
|
import android.content.Context
|
||||||
|
import androidx.work.WorkerParameters
|
||||||
|
import com.zhuinden.monarchy.Monarchy
|
||||||
|
import org.matrix.android.sdk.api.extensions.tryOrNull
|
||||||
|
import org.matrix.android.sdk.api.session.user.model.User
|
||||||
|
import org.matrix.android.sdk.internal.SessionManager
|
||||||
|
import org.matrix.android.sdk.internal.crypto.crosssigning.UpdateTrustWorker
|
||||||
|
import org.matrix.android.sdk.internal.crypto.crosssigning.UpdateTrustWorkerDataRepository
|
||||||
|
import org.matrix.android.sdk.internal.di.SessionDatabase
|
||||||
|
import org.matrix.android.sdk.internal.session.SessionComponent
|
||||||
|
import org.matrix.android.sdk.internal.session.profile.GetProfileInfoTask
|
||||||
|
import org.matrix.android.sdk.internal.session.user.UserEntityFactory
|
||||||
|
import org.matrix.android.sdk.internal.util.awaitTransaction
|
||||||
|
import org.matrix.android.sdk.internal.util.logLimit
|
||||||
|
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
|
||||||
|
import timber.log.Timber
|
||||||
|
import javax.inject.Inject
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Note: We reuse the same type [UpdateTrustWorker.Params], since the input data are the same.
|
||||||
|
*/
|
||||||
|
internal class UpdateUserWorker(context: Context, params: WorkerParameters, sessionManager: SessionManager) :
|
||||||
|
SessionSafeCoroutineWorker<UpdateTrustWorker.Params>(context, params, sessionManager, UpdateTrustWorker.Params::class.java) {
|
||||||
|
|
||||||
|
@SessionDatabase
|
||||||
|
@Inject lateinit var monarchy: Monarchy
|
||||||
|
@Inject lateinit var updateTrustWorkerDataRepository: UpdateTrustWorkerDataRepository
|
||||||
|
@Inject lateinit var getProfileInfoTask: GetProfileInfoTask
|
||||||
|
|
||||||
|
override fun injectWith(injector: SessionComponent) {
|
||||||
|
injector.inject(this)
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun doSafeWork(params: UpdateTrustWorker.Params): Result {
|
||||||
|
val userList = params.filename
|
||||||
|
?.let { updateTrustWorkerDataRepository.getParam(it) }
|
||||||
|
?.userIds
|
||||||
|
?: params.updatedUserIds.orEmpty()
|
||||||
|
|
||||||
|
// List should not be empty, but let's avoid go further in case of empty list
|
||||||
|
if (userList.isNotEmpty()) {
|
||||||
|
Timber.v("## UpdateUserWorker - updating users: ${userList.logLimit()}")
|
||||||
|
fetchAndUpdateUsers(userList)
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup(params)
|
||||||
|
return Result.success()
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun fetchAndUpdateUsers(userIdsToFetch: Collection<String>) {
|
||||||
|
fetchUsers(userIdsToFetch)
|
||||||
|
.takeIf { it.isNotEmpty() }
|
||||||
|
?.saveLocally()
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun fetchUsers(userIdsToFetch: Collection<String>) = userIdsToFetch.mapNotNull {
|
||||||
|
tryOrNull {
|
||||||
|
val profileJson = getProfileInfoTask.execute(GetProfileInfoTask.Params(it))
|
||||||
|
User.fromJson(it, profileJson)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun List<User>.saveLocally() {
|
||||||
|
val userEntities = map { user -> UserEntityFactory.create(user) }
|
||||||
|
Timber.d("## saveLocally()")
|
||||||
|
monarchy.awaitTransaction {
|
||||||
|
Timber.d("## saveLocally() - in transaction")
|
||||||
|
it.insertOrUpdate(userEntities)
|
||||||
|
}
|
||||||
|
Timber.d("## saveLocally() - END")
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun cleanup(params: UpdateTrustWorker.Params) {
|
||||||
|
params.filename
|
||||||
|
?.let { updateTrustWorkerDataRepository.delete(it) }
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun buildErrorParams(params: UpdateTrustWorker.Params, message: String): UpdateTrustWorker.Params {
|
||||||
|
return params.copy(lastFailureMessage = params.lastFailureMessage ?: message)
|
||||||
|
}
|
||||||
|
}
|
|
@ -154,12 +154,12 @@ internal class RoomSyncHandler @Inject constructor(
|
||||||
}
|
}
|
||||||
is HandlingStrategy.INVITED ->
|
is HandlingStrategy.INVITED ->
|
||||||
handlingStrategy.data.mapWithProgress(reporter, InitialSyncStep.ImportingAccountInvitedRooms, 0.1f) {
|
handlingStrategy.data.mapWithProgress(reporter, InitialSyncStep.ImportingAccountInvitedRooms, 0.1f) {
|
||||||
handleInvitedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis)
|
handleInvitedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis, aggregator)
|
||||||
}
|
}
|
||||||
|
|
||||||
is HandlingStrategy.LEFT -> {
|
is HandlingStrategy.LEFT -> {
|
||||||
handlingStrategy.data.mapWithProgress(reporter, InitialSyncStep.ImportingAccountLeftRooms, 0.3f) {
|
handlingStrategy.data.mapWithProgress(reporter, InitialSyncStep.ImportingAccountLeftRooms, 0.3f) {
|
||||||
handleLeftRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis)
|
handleLeftRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis, aggregator)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -285,7 +285,8 @@ internal class RoomSyncHandler @Inject constructor(
|
||||||
Membership.JOIN,
|
Membership.JOIN,
|
||||||
roomSync.summary,
|
roomSync.summary,
|
||||||
roomSync.unreadNotifications,
|
roomSync.unreadNotifications,
|
||||||
updateMembers = hasRoomMember
|
updateMembers = hasRoomMember,
|
||||||
|
aggregator = aggregator
|
||||||
)
|
)
|
||||||
return roomEntity
|
return roomEntity
|
||||||
}
|
}
|
||||||
|
@ -295,7 +296,8 @@ internal class RoomSyncHandler @Inject constructor(
|
||||||
roomId: String,
|
roomId: String,
|
||||||
roomSync: InvitedRoomSync,
|
roomSync: InvitedRoomSync,
|
||||||
insertType: EventInsertType,
|
insertType: EventInsertType,
|
||||||
syncLocalTimestampMillis: Long
|
syncLocalTimestampMillis: Long,
|
||||||
|
aggregator: SyncResponsePostTreatmentAggregator
|
||||||
): RoomEntity {
|
): RoomEntity {
|
||||||
Timber.v("Handle invited sync for room $roomId")
|
Timber.v("Handle invited sync for room $roomId")
|
||||||
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC
|
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC
|
||||||
|
@ -319,7 +321,7 @@ internal class RoomSyncHandler @Inject constructor(
|
||||||
it.type == EventType.STATE_ROOM_MEMBER
|
it.type == EventType.STATE_ROOM_MEMBER
|
||||||
}
|
}
|
||||||
roomChangeMembershipStateDataSource.setMembershipFromSync(roomId, Membership.INVITE)
|
roomChangeMembershipStateDataSource.setMembershipFromSync(roomId, Membership.INVITE)
|
||||||
roomSummaryUpdater.update(realm, roomId, Membership.INVITE, updateMembers = true, inviterId = inviterEvent?.senderId)
|
roomSummaryUpdater.update(realm, roomId, Membership.INVITE, updateMembers = true, inviterId = inviterEvent?.senderId, aggregator = aggregator)
|
||||||
return roomEntity
|
return roomEntity
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -328,7 +330,8 @@ internal class RoomSyncHandler @Inject constructor(
|
||||||
roomId: String,
|
roomId: String,
|
||||||
roomSync: RoomSync,
|
roomSync: RoomSync,
|
||||||
insertType: EventInsertType,
|
insertType: EventInsertType,
|
||||||
syncLocalTimestampMillis: Long
|
syncLocalTimestampMillis: Long,
|
||||||
|
aggregator: SyncResponsePostTreatmentAggregator
|
||||||
): RoomEntity {
|
): RoomEntity {
|
||||||
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC
|
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC
|
||||||
val roomEntity = RoomEntity.getOrCreate(realm, roomId)
|
val roomEntity = RoomEntity.getOrCreate(realm, roomId)
|
||||||
|
@ -366,7 +369,7 @@ internal class RoomSyncHandler @Inject constructor(
|
||||||
roomEntity.chunks.clearWith { it.deleteOnCascade(deleteStateEvents = true, canDeleteRoot = true) }
|
roomEntity.chunks.clearWith { it.deleteOnCascade(deleteStateEvents = true, canDeleteRoot = true) }
|
||||||
roomTypingUsersHandler.handle(realm, roomId, null)
|
roomTypingUsersHandler.handle(realm, roomId, null)
|
||||||
roomChangeMembershipStateDataSource.setMembershipFromSync(roomId, Membership.LEAVE)
|
roomChangeMembershipStateDataSource.setMembershipFromSync(roomId, Membership.LEAVE)
|
||||||
roomSummaryUpdater.update(realm, roomId, membership, roomSync.summary, roomSync.unreadNotifications)
|
roomSummaryUpdater.update(realm, roomId, membership, roomSync.summary, roomSync.unreadNotifications, aggregator = aggregator)
|
||||||
return roomEntity
|
return roomEntity
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.matrix.android.sdk.internal.session.room.aggregation.livelocation.Dea
|
||||||
import org.matrix.android.sdk.internal.session.room.send.MultipleEventSendingDispatcherWorker
|
import org.matrix.android.sdk.internal.session.room.send.MultipleEventSendingDispatcherWorker
|
||||||
import org.matrix.android.sdk.internal.session.room.send.RedactEventWorker
|
import org.matrix.android.sdk.internal.session.room.send.RedactEventWorker
|
||||||
import org.matrix.android.sdk.internal.session.room.send.SendEventWorker
|
import org.matrix.android.sdk.internal.session.room.send.SendEventWorker
|
||||||
|
import org.matrix.android.sdk.internal.session.sync.handler.UpdateUserWorker
|
||||||
import org.matrix.android.sdk.internal.session.sync.job.SyncWorker
|
import org.matrix.android.sdk.internal.session.sync.job.SyncWorker
|
||||||
import timber.log.Timber
|
import timber.log.Timber
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
@ -62,6 +63,8 @@ internal class MatrixWorkerFactory @Inject constructor(private val sessionManage
|
||||||
SyncWorker(appContext, workerParameters, sessionManager)
|
SyncWorker(appContext, workerParameters, sessionManager)
|
||||||
UpdateTrustWorker::class.java.name ->
|
UpdateTrustWorker::class.java.name ->
|
||||||
UpdateTrustWorker(appContext, workerParameters, sessionManager)
|
UpdateTrustWorker(appContext, workerParameters, sessionManager)
|
||||||
|
UpdateUserWorker::class.java.name ->
|
||||||
|
UpdateUserWorker(appContext, workerParameters, sessionManager)
|
||||||
UploadContentWorker::class.java.name ->
|
UploadContentWorker::class.java.name ->
|
||||||
UploadContentWorker(appContext, workerParameters, sessionManager)
|
UploadContentWorker(appContext, workerParameters, sessionManager)
|
||||||
DeactivateLiveLocationShareWorker::class.java.name ->
|
DeactivateLiveLocationShareWorker::class.java.name ->
|
||||||
|
|
Loading…
Reference in New Issue