Remove run blocking from realm tx

This commit is contained in:
valere 2022-12-15 18:44:43 +01:00
parent 49239e6bf2
commit 3efaa8e171
6 changed files with 101 additions and 84 deletions

View File

@ -193,7 +193,7 @@ internal class DefaultCryptoService @Inject constructor(
private val isStarting = AtomicBoolean(false) private val isStarting = AtomicBoolean(false)
private val isStarted = AtomicBoolean(false) private val isStarted = AtomicBoolean(false)
override fun onStateEvent(roomId: String, event: Event) { override suspend fun onStateEvent(roomId: String, event: Event) {
when (event.type) { when (event.type) {
EventType.STATE_ROOM_ENCRYPTION -> onRoomEncryptionEvent(roomId, event) EventType.STATE_ROOM_ENCRYPTION -> onRoomEncryptionEvent(roomId, event)
EventType.STATE_ROOM_MEMBER -> onRoomMembershipEvent(roomId, event) EventType.STATE_ROOM_MEMBER -> onRoomMembershipEvent(roomId, event)
@ -201,7 +201,7 @@ internal class DefaultCryptoService @Inject constructor(
} }
} }
override fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean) { override suspend fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean) {
// handle state events // handle state events
if (event.isStateEvent()) { if (event.isStateEvent()) {
when (event.type) { when (event.type) {
@ -214,7 +214,7 @@ internal class DefaultCryptoService @Inject constructor(
// handle verification // handle verification
if (!initialSync) { if (!initialSync) {
if (event.type != null && verificationMessageProcessor.shouldProcess(event.type)) { if (event.type != null && verificationMessageProcessor.shouldProcess(event.type)) {
cryptoCoroutineScope.launch(coroutineDispatchers.dmVerif) { withContext(coroutineDispatchers.dmVerif) {
verificationMessageProcessor.process(roomId, event) verificationMessageProcessor.process(roomId, event)
} }
} }
@ -788,7 +788,7 @@ internal class DefaultCryptoService @Inject constructor(
*/ */
@Throws(MXCryptoError::class) @Throws(MXCryptoError::class)
private suspend fun internalDecryptEvent(event: Event, timeline: String): MXEventDecryptionResult { private suspend fun internalDecryptEvent(event: Event, timeline: String): MXEventDecryptionResult {
return eventDecryptor.decryptEvent(event, timeline) return withContext(coroutineDispatchers.crypto) { eventDecryptor.decryptEvent(event, timeline) }
} }
/** /**
@ -937,13 +937,13 @@ internal class DefaultCryptoService @Inject constructor(
* @param roomId the room Id * @param roomId the room Id
* @param event the encryption event. * @param event the encryption event.
*/ */
private fun onRoomEncryptionEvent(roomId: String, event: Event) { private suspend fun onRoomEncryptionEvent(roomId: String, event: Event) {
if (!event.isStateEvent()) { if (!event.isStateEvent()) {
// Ignore // Ignore
Timber.tag(loggerTag.value).w("Invalid encryption event") Timber.tag(loggerTag.value).w("Invalid encryption event")
return return
} }
cryptoCoroutineScope.launch(coroutineDispatchers.crypto) { withContext(coroutineDispatchers.io) {
val userIds = getRoomUserIds(roomId) val userIds = getRoomUserIds(roomId)
setEncryptionInRoom(roomId, event.content?.get("algorithm")?.toString(), true, userIds) setEncryptionInRoom(roomId, event.content?.get("algorithm")?.toString(), true, userIds)
} }
@ -961,7 +961,7 @@ internal class DefaultCryptoService @Inject constructor(
* @param roomId the room Id * @param roomId the room Id
* @param event the membership event causing the change * @param event the membership event causing the change
*/ */
private fun onRoomMembershipEvent(roomId: String, event: Event) { private suspend fun onRoomMembershipEvent(roomId: String, event: Event) {
// because the encryption event can be after the join/invite in the same batch // because the encryption event can be after the join/invite in the same batch
event.stateKey?.let { _ -> event.stateKey?.let { _ ->
val roomMember: RoomMemberContent? = event.content.toModel() val roomMember: RoomMemberContent? = event.content.toModel()
@ -970,37 +970,39 @@ internal class DefaultCryptoService @Inject constructor(
unrequestedForwardManager.onInviteReceived(roomId, event.senderId.orEmpty(), clock.epochMillis()) unrequestedForwardManager.onInviteReceived(roomId, event.senderId.orEmpty(), clock.epochMillis())
} }
} }
roomEncryptorsStore.get(roomId) ?: /* No encrypting in this room */ return roomEncryptorsStore.get(roomId) ?: /* No encrypting in this room */ return
withContext(coroutineDispatchers.io) {
event.stateKey?.let { userId -> event.stateKey?.let { userId ->
val roomMember: RoomMemberContent? = event.content.toModel() val roomMember: RoomMemberContent? = event.content.toModel()
val membership = roomMember?.membership val membership = roomMember?.membership
if (membership == Membership.JOIN) { if (membership == Membership.JOIN) {
// make sure we are tracking the deviceList for this user. // make sure we are tracking the deviceList for this user.
deviceListManager.startTrackingDeviceList(listOf(userId)) deviceListManager.startTrackingDeviceList(listOf(userId))
} else if (membership == Membership.INVITE && } else if (membership == Membership.INVITE &&
shouldEncryptForInvitedMembers(roomId) && shouldEncryptForInvitedMembers(roomId) &&
isEncryptionEnabledForInvitedUser()) { isEncryptionEnabledForInvitedUser()) {
// track the deviceList for this invited user. // track the deviceList for this invited user.
// Caution: there's a big edge case here in that federated servers do not // Caution: there's a big edge case here in that federated servers do not
// know what other servers are in the room at the time they've been invited. // know what other servers are in the room at the time they've been invited.
// They therefore will not send device updates if a user logs in whilst // They therefore will not send device updates if a user logs in whilst
// their state is invite. // their state is invite.
deviceListManager.startTrackingDeviceList(listOf(userId)) deviceListManager.startTrackingDeviceList(listOf(userId))
}
} }
} }
} }
private fun onRoomHistoryVisibilityEvent(roomId: String, event: Event) { private suspend fun onRoomHistoryVisibilityEvent(roomId: String, event: Event) {
if (!event.isStateEvent()) return if (!event.isStateEvent()) return
val eventContent = event.content.toModel<RoomHistoryVisibilityContent>() val eventContent = event.content.toModel<RoomHistoryVisibilityContent>()
val historyVisibility = eventContent?.historyVisibility val historyVisibility = eventContent?.historyVisibility
if (historyVisibility == null) { withContext(coroutineDispatchers.io) {
cryptoStore.setShouldShareHistory(roomId, false) if (historyVisibility == null) {
} else { cryptoStore.setShouldShareHistory(roomId, false)
cryptoStore.setShouldEncryptForInvitedMembers(roomId, historyVisibility != RoomHistoryVisibility.JOINED) } else {
cryptoStore.setShouldShareHistory(roomId, historyVisibility.shouldShareHistory()) cryptoStore.setShouldEncryptForInvitedMembers(roomId, historyVisibility != RoomHistoryVisibility.JOINED)
cryptoStore.setShouldShareHistory(roomId, historyVisibility.shouldShareHistory())
}
} }
} }

View File

@ -222,8 +222,8 @@ interface CryptoService {
suspend fun onSyncWillProcess(isInitialSync: Boolean) suspend fun onSyncWillProcess(isInitialSync: Boolean)
fun isStarted(): Boolean fun isStarted(): Boolean
suspend fun receiveSyncChanges(toDevice: ToDeviceSyncResponse?, deviceChanges: DeviceListResponse?, keyCounts: DeviceOneTimeKeysCountSyncResponse?) suspend fun receiveSyncChanges(toDevice: ToDeviceSyncResponse?, deviceChanges: DeviceListResponse?, keyCounts: DeviceOneTimeKeysCountSyncResponse?)
fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean) suspend fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean)
fun onStateEvent(roomId: String, event: Event) {} suspend fun onStateEvent(roomId: String, event: Event) {}
suspend fun onSyncCompleted(syncResponse: SyncResponse) suspend fun onSyncCompleted(syncResponse: SyncResponse)
fun logDbUsageInfo() fun logDbUsageInfo()
suspend fun setRoomUnBlacklistUnverifiedDevices(roomId: String) suspend fun setRoomUnBlacklistUnverifiedDevices(roomId: String)

View File

@ -179,7 +179,9 @@ internal class DefaultCreateLocalRoomTask @Inject constructor(
} }
// Give info to crypto module // Give info to crypto module
cryptoService.onStateEvent(roomId, event) runBlocking {
cryptoService.onStateEvent(roomId, event)
}
} }
roomMemberContentsByUser.getOrPut(event.senderId) { roomMemberContentsByUser.getOrPut(event.senderId) {

View File

@ -23,6 +23,9 @@ import org.matrix.android.sdk.api.extensions.measureMetric
import org.matrix.android.sdk.api.extensions.measureSpan import org.matrix.android.sdk.api.extensions.measureSpan
import org.matrix.android.sdk.api.metrics.SyncDurationMetricPlugin import org.matrix.android.sdk.api.metrics.SyncDurationMetricPlugin
import org.matrix.android.sdk.api.session.crypto.CryptoService import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.crypto.MXCryptoError
import org.matrix.android.sdk.api.session.crypto.model.OlmDecryptionResult
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.pushrules.PushRuleService import org.matrix.android.sdk.api.session.pushrules.PushRuleService
import org.matrix.android.sdk.api.session.pushrules.RuleScope import org.matrix.android.sdk.api.session.pushrules.RuleScope
import org.matrix.android.sdk.api.session.sync.InitialSyncStep import org.matrix.android.sdk.api.session.sync.InitialSyncStep
@ -39,6 +42,7 @@ import org.matrix.android.sdk.internal.session.sync.handler.SyncResponsePostTrea
import org.matrix.android.sdk.internal.session.sync.handler.UserAccountDataSyncHandler import org.matrix.android.sdk.internal.session.sync.handler.UserAccountDataSyncHandler
import org.matrix.android.sdk.internal.session.sync.handler.room.RoomSyncHandler import org.matrix.android.sdk.internal.session.sync.handler.room.RoomSyncHandler
import org.matrix.android.sdk.internal.util.awaitTransaction import org.matrix.android.sdk.internal.util.awaitTransaction
import org.matrix.android.sdk.internal.util.time.Clock
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
import kotlin.system.measureTimeMillis import kotlin.system.measureTimeMillis
@ -56,6 +60,7 @@ internal class SyncResponseHandler @Inject constructor(
private val processEventForPushTask: ProcessEventForPushTask, private val processEventForPushTask: ProcessEventForPushTask,
private val pushRuleService: PushRuleService, private val pushRuleService: PushRuleService,
private val presenceSyncHandler: PresenceSyncHandler, private val presenceSyncHandler: PresenceSyncHandler,
private val clock: Clock,
matrixConfiguration: MatrixConfiguration, matrixConfiguration: MatrixConfiguration,
) { ) {
@ -67,7 +72,6 @@ internal class SyncResponseHandler @Inject constructor(
reporter: ProgressReporter? reporter: ProgressReporter?
) { ) {
val isInitialSync = fromToken == null val isInitialSync = fromToken == null
Timber.v("Start handling sync, is InitialSync: $isInitialSync")
relevantPlugins.measureMetric { relevantPlugins.measureMetric {
startCryptoService(isInitialSync) startCryptoService(isInitialSync)
@ -78,6 +82,25 @@ internal class SyncResponseHandler @Inject constructor(
relevantPlugins.measureSpan("task", "handle_to_device") { relevantPlugins.measureSpan("task", "handle_to_device") {
handleToDevice(syncResponse) handleToDevice(syncResponse)
} }
val syncLocalTimestampMillis = clock.epochMillis()
// pass live state/crypto related event to crypto
syncResponse.rooms?.join?.entries?.map { (roomId, roomSync) ->
roomSync.state
?.events
?.filter { it.isStateEvent() }
?.forEach {
cryptoService.onStateEvent(roomId, it)
}
roomSync.timeline?.events?.forEach {
if (it.isEncrypted() && !isInitialSync) {
decryptIfNeeded(it, roomId)
}
it.ageLocalTs = syncLocalTimestampMillis - (it.unsignedData?.age ?: 0)
cryptoService.onLiveEvent(roomId, it, isInitialSync)
}
}
val aggregator = SyncResponsePostTreatmentAggregator() val aggregator = SyncResponsePostTreatmentAggregator()
@ -101,6 +124,32 @@ internal class SyncResponseHandler @Inject constructor(
} }
} }
private suspend fun decryptIfNeeded(event: Event, roomId: String) {
try {
val timelineId = generateTimelineId(roomId)
// Event from sync does not have roomId, so add it to the event first
// note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching
val result = cryptoService.decryptEvent(event.copy(roomId = roomId), timelineId)
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain,
isSafe = result.isSafe
)
} catch (e: MXCryptoError) {
Timber.v(e, "Failed to decrypt $roomId")
if (e is MXCryptoError.Base) {
event.mCryptoError = e.errorType
event.mCryptoErrorReason = e.technicalMessage.takeIf { it.isNotEmpty() } ?: e.detailedErrorDescription
}
}
}
private fun generateTimelineId(roomId: String): String {
return "RoomSyncHandler$roomId"
}
private suspend fun startCryptoService(isInitialSync: Boolean) { private suspend fun startCryptoService(isInitialSync: Boolean) {
relevantPlugins.measureSpan("task", "start_crypto_service") { relevantPlugins.measureSpan("task", "start_crypto_service") {
measureTimeMillis { measureTimeMillis {

View File

@ -19,10 +19,7 @@ package org.matrix.android.sdk.internal.session.sync.handler.room
import dagger.Lazy import dagger.Lazy
import io.realm.Realm import io.realm.Realm
import io.realm.kotlin.createObject import io.realm.kotlin.createObject
import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.crypto.MXCRYPTO_ALGORITHM_MEGOLM import org.matrix.android.sdk.api.crypto.MXCRYPTO_ALGORITHM_MEGOLM
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.crypto.MXCryptoError
import org.matrix.android.sdk.api.session.crypto.model.OlmDecryptionResult import org.matrix.android.sdk.api.session.crypto.model.OlmDecryptionResult
import org.matrix.android.sdk.api.session.events.model.Event import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.EventType import org.matrix.android.sdk.api.session.events.model.EventType
@ -92,7 +89,6 @@ internal class RoomSyncHandler @Inject constructor(
private val readReceiptHandler: ReadReceiptHandler, private val readReceiptHandler: ReadReceiptHandler,
private val roomSummaryUpdater: RoomSummaryUpdater, private val roomSummaryUpdater: RoomSummaryUpdater,
private val roomAccountDataHandler: RoomSyncAccountDataHandler, private val roomAccountDataHandler: RoomSyncAccountDataHandler,
private val cryptoService: CryptoService,
private val roomMemberEventHandler: RoomMemberEventHandler, private val roomMemberEventHandler: RoomMemberEventHandler,
private val roomTypingUsersHandler: RoomTypingUsersHandler, private val roomTypingUsersHandler: RoomTypingUsersHandler,
private val threadsAwarenessHandler: ThreadsAwarenessHandler, private val threadsAwarenessHandler: ThreadsAwarenessHandler,
@ -258,7 +254,6 @@ internal class RoomSyncHandler @Inject constructor(
root = eventEntity root = eventEntity
} }
// Give info to crypto module // Give info to crypto module
cryptoService.onStateEvent(roomId, event)
roomMemberEventHandler.handle(realm, roomId, event, isInitialSync, aggregator) roomMemberEventHandler.handle(realm, roomId, event, isInitialSync, aggregator)
} }
} }
@ -423,15 +418,9 @@ internal class RoomSyncHandler @Inject constructor(
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC val isInitialSync = insertType == EventInsertType.INITIAL_SYNC
eventIds.add(event.eventId) eventIds.add(event.eventId)
liveEventService.get().dispatchLiveEventReceived(event, roomId, isInitialSync) liveEventService.get().dispatchLiveEventReceived(event, roomId, isInitialSync)
if (event.isEncrypted() && !isInitialSync) {
try {
decryptIfNeeded(event, roomId)
} catch (e: InterruptedException) {
Timber.i("Decryption got interrupted")
}
}
var contentToInject: String? = null var contentToInject: String? = null
if (!isInitialSync) { if (!isInitialSync) {
contentToInject = threadsAwarenessHandler.makeEventThreadAware(realm, roomId, event) contentToInject = threadsAwarenessHandler.makeEventThreadAware(realm, roomId, event)
@ -486,7 +475,9 @@ internal class RoomSyncHandler @Inject constructor(
} }
} }
// Give info to crypto module // Give info to crypto module
cryptoService.onLiveEvent(roomEntity.roomId, event, isInitialSync) // runBlocking {
// cryptoService.onLiveEvent(roomEntity.roomId, event, isInitialSync)
// }
// Try to remove local echo // Try to remove local echo
event.unsignedData?.transactionId?.also { txId -> event.unsignedData?.transactionId?.also { txId ->
@ -567,31 +558,6 @@ internal class RoomSyncHandler @Inject constructor(
} }
} }
private fun decryptIfNeeded(event: Event, roomId: String) {
try {
val timelineId = generateTimelineId(roomId)
// Event from sync does not have roomId, so add it to the event first
// note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching
val result = runBlocking { cryptoService.decryptEvent(event.copy(roomId = roomId), timelineId) }
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain,
isSafe = result.isSafe
)
} catch (e: MXCryptoError) {
if (e is MXCryptoError.Base) {
event.mCryptoError = e.errorType
event.mCryptoErrorReason = e.technicalMessage.takeIf { it.isNotEmpty() } ?: e.detailedErrorDescription
}
}
}
private fun generateTimelineId(roomId: String): String {
return "RoomSyncHandler$roomId"
}
data class EphemeralResult( data class EphemeralResult(
val typingUserIds: List<String> = emptyList() val typingUserIds: List<String> = emptyList()
) )

View File

@ -137,7 +137,7 @@ internal class RustCryptoService @Inject constructor(
private val isStarting = AtomicBoolean(false) private val isStarting = AtomicBoolean(false)
private val isStarted = AtomicBoolean(false) private val isStarted = AtomicBoolean(false)
override fun onStateEvent(roomId: String, event: Event) { override suspend fun onStateEvent(roomId: String, event: Event) {
when (event.type) { when (event.type) {
EventType.STATE_ROOM_ENCRYPTION -> onRoomEncryptionEvent(roomId, event) EventType.STATE_ROOM_ENCRYPTION -> onRoomEncryptionEvent(roomId, event)
EventType.STATE_ROOM_MEMBER -> onRoomMembershipEvent(roomId, event) EventType.STATE_ROOM_MEMBER -> onRoomMembershipEvent(roomId, event)
@ -145,7 +145,7 @@ internal class RustCryptoService @Inject constructor(
} }
} }
override fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean) { override suspend fun onLiveEvent(roomId: String, event: Event, initialSync: Boolean) {
if (event.isStateEvent()) { if (event.isStateEvent()) {
when (event.getClearType()) { when (event.getClearType()) {
EventType.STATE_ROOM_ENCRYPTION -> onRoomEncryptionEvent(roomId, event) EventType.STATE_ROOM_ENCRYPTION -> onRoomEncryptionEvent(roomId, event)
@ -153,9 +153,7 @@ internal class RustCryptoService @Inject constructor(
EventType.STATE_ROOM_HISTORY_VISIBILITY -> onRoomHistoryVisibilityEvent(roomId, event) EventType.STATE_ROOM_HISTORY_VISIBILITY -> onRoomHistoryVisibilityEvent(roomId, event)
} }
} else { } else {
cryptoCoroutineScope.launch { verificationService.onEvent(roomId, event)
verificationService.onEvent(roomId, event)
}
} }
} }
@ -487,7 +485,7 @@ internal class RustCryptoService @Inject constructor(
* *
* @param event the encryption event. * @param event the encryption event.
*/ */
private fun onRoomEncryptionEvent(roomId: String, event: Event) { private suspend fun onRoomEncryptionEvent(roomId: String, event: Event) {
if (!event.isStateEvent()) { if (!event.isStateEvent()) {
// Ignore // Ignore
Timber.tag(loggerTag.value).w("Invalid encryption event") Timber.tag(loggerTag.value).w("Invalid encryption event")
@ -495,7 +493,7 @@ internal class RustCryptoService @Inject constructor(
} }
// Do not load members here, would defeat lazy loading // Do not load members here, would defeat lazy loading
cryptoCoroutineScope.launch(coroutineDispatchers.crypto) { // cryptoCoroutineScope.launch(coroutineDispatchers.crypto) {
// val params = LoadRoomMembersTask.Params(roomId) // val params = LoadRoomMembersTask.Params(roomId)
// try { // try {
// loadRoomMembersTask.execute(params) // loadRoomMembersTask.execute(params)
@ -505,7 +503,7 @@ internal class RustCryptoService @Inject constructor(
val userIds = getRoomUserIds(roomId) val userIds = getRoomUserIds(roomId)
setEncryptionInRoom(roomId, event.content?.get("algorithm")?.toString(), userIds) setEncryptionInRoom(roomId, event.content?.get("algorithm")?.toString(), userIds)
// } // }
} // }
} }
override fun onE2ERoomMemberLoadedFromServer(roomId: String) { override fun onE2ERoomMemberLoadedFromServer(roomId: String) {
@ -530,7 +528,7 @@ internal class RustCryptoService @Inject constructor(
* *
* @param event the membership event causing the change * @param event the membership event causing the change
*/ */
private fun onRoomMembershipEvent(roomId: String, event: Event) { private suspend fun onRoomMembershipEvent(roomId: String, event: Event) {
// We only care about the memberships if this room is encrypted // We only care about the memberships if this room is encrypted
if (!isRoomEncrypted(roomId)) { if (!isRoomEncrypted(roomId)) {
return return
@ -551,9 +549,7 @@ internal class RustCryptoService @Inject constructor(
// know what other servers are in the room at the time they've been invited. // know what other servers are in the room at the time they've been invited.
// They therefore will not send device updates if a user logs in whilst // They therefore will not send device updates if a user logs in whilst
// their state is invite. // their state is invite.
cryptoCoroutineScope.launch {
olmMachine.updateTrackedUsers(listOf(userId)) olmMachine.updateTrackedUsers(listOf(userId))
}
} else { } else {
// nop // nop
} }
@ -699,7 +695,9 @@ internal class RustCryptoService @Inject constructor(
} }
override fun enableShareKeyOnInvite(enable: Boolean) { override fun enableShareKeyOnInvite(enable: Boolean) {
TODO("Enable share key on invite not implemented") if (enable) {
TODO("Enable share key on invite not implemented")
}
} }
override fun isShareKeysOnInviteEnabled(): Boolean { override fun isShareKeysOnInviteEnabled(): Boolean {