crypto: Send out some of our requests in parallel

This commit is contained in:
Damir Jelić 2021-04-09 19:10:25 +02:00
parent 99477914df
commit e9e3d129ba

View File

@ -28,9 +28,12 @@ import java.util.concurrent.ConcurrentHashMap
import javax.inject.Inject
import kotlin.jvm.Throws
import kotlin.math.max
import kotlinx.coroutines.async
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
@ -671,14 +674,22 @@ internal class DefaultCryptoService @Inject constructor(
val keyShareLock = roomKeyShareLocks.getOrDefault(roomId, Mutex())
keyShareLock.withLock {
for (toDeviceRequest in olmMachine!!.shareGroupSession(roomId, roomMembers)) {
// TODO these requests should be sent out in parallel
// This request can only be a to-device request.
when (toDeviceRequest) {
coroutineScope {
olmMachine!!.shareGroupSession(roomId, roomMembers).map {
when (it) {
is Request.ToDevice -> {
sendToDevice(toDeviceRequest)
async {
sendToDevice(it)
}
}
else -> {
// This request can only be a to-device request but
// we need to handle all our cases and put this
// async block for our joinAll to work.
async {}
}
}
}.joinAll()
}
}
}
@ -751,19 +762,30 @@ internal class DefaultCryptoService @Inject constructor(
private suspend fun sendOutgoingRequests() {
outgointRequestsLock.withLock {
// TODO these requests should be sent out in parallel
for (outgoingRequest in olmMachine!!.outgoingRequests()) {
when (outgoingRequest) {
coroutineScope {
olmMachine!!.outgoingRequests().map {
when (it) {
is Request.KeysUpload -> {
uploadKeys(outgoingRequest)
async {
uploadKeys(it)
}
}
is Request.KeysQuery -> {
queryKeys(outgoingRequest)
async {
queryKeys(it)
}
}
is Request.ToDevice -> {
// Timber.v("HELLO TO DEVICE REQUEST ${outgoingRequest.body}")
// TODO this sends out mostly key requests, it's a
// bit spammy as of now so it's disabled, needs to
// be fixed on the Rust side.
async {}
}
else -> {
async {}
}
}
}.joinAll()
}
}
}