From ec0ada7b8cbed840dd08a7a8dfb6ea25a69196a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9B=D0=B5=D0=BE=D0=BD=D0=B8=D0=B4=20=D0=AE=D1=80=D1=8C?= =?UTF-8?q?=D0=B5=D0=B2=20=28Leonid=20Yuriev=29?= Date: Tue, 9 Jul 2024 16:04:01 +0300 Subject: [PATCH] =?UTF-8?q?mdbx:=20=D0=BF=D0=B0=D1=80=D0=BA=D0=BE=D0=B2?= =?UTF-8?q?=D0=BA=D0=B0=20=D1=87=D0=B8=D1=82=D0=B0=D1=8E=D1=89=D0=B8=D1=85?= =?UTF-8?q?=20=D1=82=D1=80=D0=B0=D0=BD=D0=B7=D0=B0=D0=BA=D1=86=D0=B8=D0=B9?= =?UTF-8?q?.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mdbx.h | 135 +++++++++++++++++++++++++++++++++++---- src/api-env.c | 3 +- src/bits.md | 6 +- src/cogs.h | 10 ++- src/dbi.c | 2 +- src/internals.h | 3 +- src/layout-lck.h | 8 ++- src/lck-windows.c | 3 +- src/mvcc-readers.c | 154 +++++++++++++++++++++++++++++++++++++++------ src/proto.h | 12 ++-- src/txn.c | 143 +++++++++++++++++++++++++++++++---------- 11 files changed, 399 insertions(+), 80 deletions(-) diff --git a/mdbx.h b/mdbx.h index 8ee45375..5a956e64 100644 --- a/mdbx.h +++ b/mdbx.h @@ -1556,34 +1556,54 @@ typedef enum MDBX_txn_flags { MDBX_TXN_INVALID = INT32_MIN, /** Transaction is finished or never began. - * \note Transaction state flag. Returned from \ref mdbx_txn_flags() + * \note This is a transaction state flag. Returned from \ref mdbx_txn_flags() * but can't be used with \ref mdbx_txn_begin(). */ MDBX_TXN_FINISHED = 0x01, /** Transaction is unusable after an error. - * \note Transaction state flag. Returned from \ref mdbx_txn_flags() + * \note This is a transaction state flag. Returned from \ref mdbx_txn_flags() * but can't be used with \ref mdbx_txn_begin(). */ MDBX_TXN_ERROR = 0x02, /** Transaction must write, even if dirty list is empty. - * \note Transaction state flag. Returned from \ref mdbx_txn_flags() + * \note This is a transaction state flag. Returned from \ref mdbx_txn_flags() * but can't be used with \ref mdbx_txn_begin(). */ MDBX_TXN_DIRTY = 0x04, /** Transaction or a parent has spilled pages. - * \note Transaction state flag. Returned from \ref mdbx_txn_flags() + * \note This is a transaction state flag. Returned from \ref mdbx_txn_flags() * but can't be used with \ref mdbx_txn_begin(). */ MDBX_TXN_SPILLS = 0x08, /** Transaction has a nested child transaction. - * \note Transaction state flag. Returned from \ref mdbx_txn_flags() + * \note This is a transaction state flag. Returned from \ref mdbx_txn_flags() * but can't be used with \ref mdbx_txn_begin(). */ MDBX_TXN_HAS_CHILD = 0x10, - /** Most operations on the transaction are currently illegal. - * \note Transaction state flag. Returned from \ref mdbx_txn_flags() + /** Transaction is parked by \ref mdbx_txn_park(). + * \note This is a transaction state flag. Returned from \ref mdbx_txn_flags() * but can't be used with \ref mdbx_txn_begin(). */ - MDBX_TXN_BLOCKED = MDBX_TXN_FINISHED | MDBX_TXN_ERROR | MDBX_TXN_HAS_CHILD + MDBX_TXN_PARKED = 0x20, + + /** Transaction is parked by \ref mdbx_txn_park() with `autounpark=true`, + * and therefore it can be used without explicitly calling + * \ref mdbx_txn_unpark() first. + * \note This is a transaction state flag. Returned from \ref mdbx_txn_flags() + * but can't be used with \ref mdbx_txn_begin(). */ + MDBX_TXN_AUTOUNPARK = 0x40, + + /** The transaction was blocked using the \ref mdbx_txn_park() function, + * and then ousted by a write transaction because + * this transaction was interfered with garbage recycling. + * \note This is a transaction state flag. Returned from \ref mdbx_txn_flags() + * but can't be used with \ref mdbx_txn_begin(). */ + MDBX_TXN_OUSTED = 0x80, + + /** Most operations on the transaction are currently illegal. + * \note This is a transaction state flag. Returned from \ref mdbx_txn_flags() + * but can't be used with \ref mdbx_txn_begin(). */ + MDBX_TXN_BLOCKED = + MDBX_TXN_FINISHED | MDBX_TXN_ERROR | MDBX_TXN_HAS_CHILD | MDBX_TXN_PARKED } MDBX_txn_flags_t; DEFINE_ENUM_FLAG_OPERATORS(MDBX_txn_flags) @@ -1962,8 +1982,11 @@ typedef enum MDBX_error { * corresponding DBI-handle could be (re)used */ MDBX_DANGLING_DBI = -30412, + /** Транзакция была асинхронно отменена/вытеснена */ + MDBX_OUSTED = -30411, + /* The last of MDBX-added error codes */ - MDBX_LAST_ADDED_ERRCODE = MDBX_DANGLING_DBI, + MDBX_LAST_ADDED_ERRCODE = MDBX_OUSTED, #if defined(_WIN32) || defined(_WIN64) MDBX_ENODATA = ERROR_HANDLE_EOF, @@ -3972,7 +3995,8 @@ mdbx_txn_env(const MDBX_txn *txn); * * \returns A transaction flags, valid if input is an valid transaction, * otherwise \ref MDBX_TXN_INVALID. */ -MDBX_NOTHROW_PURE_FUNCTION LIBMDBX_API int mdbx_txn_flags(const MDBX_txn *txn); +MDBX_NOTHROW_PURE_FUNCTION LIBMDBX_API MDBX_txn_flags_t +mdbx_txn_flags(const MDBX_txn *txn); /** \brief Return the transaction's ID. * \ingroup c_statinfo @@ -4190,8 +4214,8 @@ LIBMDBX_API int mdbx_txn_break(MDBX_txn *txn); * transaction soon, and also locking overhead if \ref MDBX_NOSTICKYTHREADS is * in use. The reader table lock is released, but the table slot stays tied to * its thread or \ref MDBX_txn. Use \ref mdbx_txn_abort() to discard a reset - * handle, and to free its lock table slot if \ref MDBX_NOSTICKYTHREADS is in - * use. + * handle, and to free its lock table slot if \ref MDBX_NOSTICKYTHREADS + * is in use. * * Cursors opened within the transaction must not be used again after this * call, except with \ref mdbx_cursor_renew() and \ref mdbx_cursor_close(). @@ -4216,6 +4240,93 @@ LIBMDBX_API int mdbx_txn_break(MDBX_txn *txn); * \retval MDBX_EINVAL Transaction handle is NULL. */ LIBMDBX_API int mdbx_txn_reset(MDBX_txn *txn); +/** \brief Переводит читающую транзакцию в "припаркованное" состояние. + * \ingroup c_transactions + * + * Выполняющиеся читающие транзакции не позволяют перерабатывать старые + * MVCC-снимки данных, начиная с самой старой используемой/читаемой версии и все + * последующие. Припаркованная же транзакция может быть вытеснена транзакцией + * записи, если будет мешать переработке мусора (старых MVCC-снимков данных). + * А если вытеснения не произойдет, то восстановление (перевод в рабочее + * состояние и продолжение выполнение) читающей транзакции будет существенно + * дешевле. Таким образом, парковка транзакций позволяет предотвратить + * негативные последствия связанные с остановкой переработки мусора, + * одновременно сохранив накладные расходы на минимальном уровне. + * + * Для продолжения выполнения (чтения и/или использования данных) припаркованная + * транзакция должна быть восстановлена посредством \ref mdbx_txn_unpark(). + * Для удобства использования и предотвращения лишних вызовов API, посредством + * параметра `autounpark`, предусмотрена возможность автоматической + * «распарковки» при использовании припаркованной транзакции в функциях API + * предполагающих чтение данных. + * + * \warning До восстановления/распарковки транзакции, вне зависимости от + * аргумента `autounpark`, нельзя допускать разыменования указателей полученных + * ранее при чтении данных в рамках припаркованной транзакции, так как + * MVCC-снимок в котором размещены эти данные не удерживается и может + * переработан в любой момент. + * + * Припаркованная транзакция без "распарковки" может быть прервана, сброшена + * или перезапущена в любой момент посредством \ref mdbx_txn_abort(), + * \ref mdbx_txn_reset() и \ref mdbx_txn_renew(), соответственно. + * + * \see long-lived-read + * \see mdbx_txn_unpark() + * \see mdbx_txn_flags() + * + * \param [in] txn Транзакция чтения запущенная посредством + * \ref mdbx_txn_begin(). + * + * \param [in] autounpark Позволяет включить автоматическую + * распарковку/восстановление транзакции при вызове + * функций API предполагающих чтение данных. + * + * \returns Ненулевое значение кода ошибки, либо 0 при успешном выполнении. */ +LIBMDBX_API int mdbx_txn_park(MDBX_txn *txn, bool autounpark); + +/** \brief Распарковывает ранее припаркованную читающую транзакцию. + * \ingroup c_transactions + * + * Функция пытается восстановить ранее припаркованную транзакцию. Если + * припаркованная транзакция была вытеснена ради переработки старых + * MVCC-снимков, то в зависимости от аргумента `restart_if_ousted` выполняется + * её перезапуск аналогично \ref mdbx_txn_renew(), либо транзакция сбрасывается + * и возвращается код ошибки \ref MDBX_OUSTED. + * + * \see long-lived-read + * \see mdbx_txn_park() + * \see mdbx_txn_flags() + * + * \param [in] txn Транзакция чтения запущенная посредством + * \ref mdbx_txn_begin() и затем припаркованная + * посредством \ref mdbx_txn_park. + * + * \param [in] restart_if_ousted Позволяет сразу выполнить перезапуск + * транзакции, если она была вынестена. + * + * \returns Ненулевое значение кода ошибки, либо 0 при успешном выполнении. + * Некоторые специфичекие коды результата: + * + * \retval MDBX_SUCCESS Припаркованная транзакция успешно восстановлена, + * либо она не была припаркована. + * + * \retval MDBX_OUSTED Читающая транзакция была вытеснена пишущей + * транзакцией ради переработки старых MVCC-снимков, + * а аргумент `restart_if_ousted` был задан `false`. + * Транзакция сбрасывается в состояние аналогичное + * после вызова \ref mdbx_txn_reset(), но экземпляр + * (хендл) не освобождается и может быть использован + * повторно посредством \ref mdbx_txn_renew(), либо + * освобожден посредством \ref mdbx_txn_abort(). + * + * \retval MDBX_RESULT_TRUE Читающая транзакция была вынеснена, но теперь + * перезапущена для чтения другого (последнего) + * MVCC-снимка, так как restart_if_ousted` был задан + * `true`. + * + * \retval MDBX_BAD_TXN Транзакция уже завершена, либо не была запущена. */ +LIBMDBX_API int mdbx_txn_unpark(MDBX_txn *txn, bool restart_if_ousted); + /** \brief Renew a read-only transaction. * \ingroup c_transactions * diff --git a/src/api-env.c b/src/api-env.c index ce2d0755..852d3394 100644 --- a/src/api-env.c +++ b/src/api-env.c @@ -1350,8 +1350,7 @@ __cold int mdbx_env_set_geometry(MDBX_env *env, intptr_t size_lower, begin + atomic_load32(&env->lck_mmap.lck->rdt_length, mo_AcquireRelease); for (const reader_slot_t *reader = begin; reader < end; ++reader) { - if (reader->pid.weak == env->pid && reader->tid.weak && - reader->tid.weak != CurrentTid) { + if (reader->pid.weak == env->pid && reader->tid.weak != CurrentTid) { /* At least one thread may don't use SRWL */ rc = MDBX_EPERM; break; diff --git a/src/bits.md b/src/bits.md index 29154b67..b0712ee8 100644 --- a/src/bits.md +++ b/src/bits.md @@ -5,9 +5,9 @@ N | MASK | ENV | TXN | DB | PUT | DBI | NOD 2 |0000 0004|ALLOC_COLSC|TXN_DIRTY |DUPSORT | |DBI_FRESH |N_DUPDATA|P_LARGE | | 3 |0000 0008|ALLOC_SSCAN|TXN_SPILLS |INTEGERKEY| |DBI_CREAT | |P_META | | 4 |0000 0010|ALLOC_FIFO |TXN_HAS_CHILD |DUPFIXED |NOOVERWRITE|DBI_VALID | |P_BAD | | -5 |0000 0020| |TXN_DRAINED_GC|INTEGERDUP|NODUPDATA | | |P_DUPFIX | | -6 |0000 0040| | |REVERSEDUP|CURRENT |DBI_OLDEN | |P_SUBP | | -7 |0000 0080| | |DB_VALID |ALLDUPS |DBI_LINDO | | | | +5 |0000 0020| |TXN_PARKED |INTEGERDUP|NODUPDATA | | |P_DUPFIX | | +6 |0000 0040| |TXN_AUTOUNPARK|REVERSEDUP|CURRENT |DBI_OLDEN | |P_SUBP | | +7 |0000 0080| |TXN_DRAINED_GC|DB_VALID |ALLDUPS |DBI_LINDO | | | | 8 |0000 0100| _MAY_MOVE | | | | | | | <= | 9 |0000 0200| _MAY_UNMAP| | | | | | | <= | 10|0000 0400| | | | | | | | | diff --git a/src/cogs.h b/src/cogs.h index caaed0bd..f0677f23 100644 --- a/src/cogs.h +++ b/src/cogs.h @@ -469,8 +469,12 @@ static inline int check_txn(const MDBX_txn *txn, int bad_bits) { if (unlikely(txn->signature != txn_signature)) return MDBX_EBADSIGN; - if (unlikely(txn->flags & bad_bits)) - return MDBX_BAD_TXN; + if (bad_bits && unlikely(txn->flags & bad_bits)) { + if ((bad_bits & MDBX_TXN_PARKED) == 0) + return MDBX_BAD_TXN; + else + return txn_check_badbits_parked(txn, bad_bits); + } tASSERT(txn, (txn->flags & MDBX_TXN_FINISHED) || (txn->flags & MDBX_NOSTICKYTHREADS) == @@ -490,7 +494,7 @@ static inline int check_txn(const MDBX_txn *txn, int bad_bits) { } static inline int check_txn_rw(const MDBX_txn *txn, int bad_bits) { - int err = check_txn(txn, bad_bits); + int err = check_txn(txn, bad_bits & ~MDBX_TXN_PARKED); if (unlikely(err)) return err; diff --git a/src/dbi.c b/src/dbi.c index 5f2c8ccb..9a6d0169 100644 --- a/src/dbi.c +++ b/src/dbi.c @@ -864,7 +864,7 @@ int mdbx_dbi_close(MDBX_env *env, MDBX_dbi dbi) { int mdbx_dbi_flags_ex(const MDBX_txn *txn, MDBX_dbi dbi, unsigned *flags, unsigned *state) { - int rc = check_txn(txn, MDBX_TXN_BLOCKED - MDBX_TXN_ERROR); + int rc = check_txn(txn, MDBX_TXN_BLOCKED - MDBX_TXN_ERROR - MDBX_TXN_PARKED); if (unlikely(rc != MDBX_SUCCESS)) return rc; diff --git a/src/internals.h b/src/internals.h index e986e6c5..e15f7c09 100644 --- a/src/internals.h +++ b/src/internals.h @@ -154,7 +154,8 @@ enum txn_flags { txn_ro_begin_flags = MDBX_TXN_RDONLY | MDBX_TXN_RDONLY_PREPARE, txn_rw_begin_flags = MDBX_TXN_NOMETASYNC | MDBX_TXN_NOSYNC | MDBX_TXN_TRY, txn_shrink_allowed = UINT32_C(0x40000000), - txn_gc_drained = 0x20 /* GC was depleted up to oldest reader */, + txn_parked = MDBX_TXN_PARKED, + txn_gc_drained = 0x40 /* GC was depleted up to oldest reader */, txn_state_flags = MDBX_TXN_FINISHED | MDBX_TXN_ERROR | MDBX_TXN_DIRTY | MDBX_TXN_SPILLS | MDBX_TXN_HAS_CHILD | MDBX_TXN_INVALID | txn_gc_drained diff --git a/src/layout-lck.h b/src/layout-lck.h index 27edec08..b24b20c6 100644 --- a/src/layout-lck.h +++ b/src/layout-lck.h @@ -8,7 +8,7 @@ #include "essentials.h" /* The version number for a database's lockfile format. */ -#define MDBX_LOCK_VERSION 5 +#define MDBX_LOCK_VERSION 6 #if MDBX_LOCKING == MDBX_LOCKING_WIN32FILES @@ -159,6 +159,12 @@ typedef struct reader_slot { * We simply re-init the table when we know that we're the only process * opening the lock file. */ + /* Псевдо thread_id для пометки вытесненных читающих транзакций. */ +#define MDBX_TID_TXN_OUSTED (UINT64_MAX - 1) + + /* Псевдо thread_id для пометки припаркованных читающих транзакций. */ +#define MDBX_TID_TXN_PARKED UINT64_MAX + /* The thread ID of the thread owning this txn. */ mdbx_atomic_uint64_t tid; diff --git a/src/lck-windows.c b/src/lck-windows.c index 62710142..fb8aa78e 100644 --- a/src/lck-windows.c +++ b/src/lck-windows.c @@ -240,7 +240,8 @@ osal_suspend_threads_before_remap(MDBX_env *env, mdbx_handle_array_t **array) { atomic_load32(&env->lck_mmap.lck->rdt_length, mo_AcquireRelease); const uintptr_t WriteTxnOwner = env->basal_txn ? env->basal_txn->owner : 0; for (const reader_slot_t *reader = begin; reader < end; ++reader) { - if (reader->pid.weak != env->pid || !reader->tid.weak) { + if (reader->pid.weak != env->pid || !reader->tid.weak || + reader->tid.weak >= MDBX_TID_TXN_OUSTED) { skip_lck: continue; } diff --git a/src/mvcc-readers.c b/src/mvcc-readers.c index 025881df..4bfdfa5b 100644 --- a/src/mvcc-readers.c +++ b/src/mvcc-readers.c @@ -3,7 +3,7 @@ #include "internals.h" -bsr_t mvcc_bind_slot(MDBX_env *env, const uintptr_t tid) { +bsr_t mvcc_bind_slot(MDBX_env *env) { eASSERT(env, env->lck_mmap.lck); eASSERT(env, env->lck->magic_and_version == MDBX_LOCK_MAGIC); eASSERT(env, env->lck->os_and_format == MDBX_LOCK_FORMAT); @@ -61,7 +61,8 @@ bsr_t mvcc_bind_slot(MDBX_env *env, const uintptr_t tid) { safe64_reset(&result.rslot->txnid, true); if (slot == nreaders) env->lck->rdt_length.weak = (uint32_t)++nreaders; - result.rslot->tid.weak = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : tid; + result.rslot->tid.weak = + (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self(); atomic_store32(&result.rslot->pid, env->pid, mo_AcquireRelease); lck_rdt_unlock(env); @@ -318,6 +319,92 @@ __cold MDBX_INTERNAL int mvcc_cleanup_dead(MDBX_env *env, int rdt_locked, return rc; } +int txn_park(MDBX_txn *txn, bool autounpark) { + reader_slot_t *const rslot = txn->to.reader; + tASSERT(txn, (txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY | + MDBX_TXN_PARKED)) == MDBX_TXN_RDONLY); + tASSERT(txn, txn->to.reader->tid.weak < MDBX_TID_TXN_OUSTED); + if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY | + MDBX_TXN_PARKED)) != MDBX_TXN_RDONLY)) + return MDBX_BAD_TXN; + + const uint32_t pid = atomic_load32(&rslot->pid, mo_Relaxed); + const uint64_t tid = atomic_load64(&rslot->tid, mo_Relaxed); + const uint64_t txnid = atomic_load64(&rslot->txnid, mo_Relaxed); + if (unlikely(pid != txn->env->pid)) { + ERROR("unexpected pid %u%s%u", pid, " != must ", txn->env->pid); + return MDBX_PROBLEM; + } + if (unlikely(tid != txn->owner || txnid != txn->txnid)) { + ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%0zx" + " and/or txn-id %" PRIaTXN "%s%" PRIaTXN, + tid, " != must ", txn->owner, txnid, " != must ", txn->txnid); + return MDBX_BAD_RSLOT; + } + + atomic_store64(&rslot->tid, MDBX_TID_TXN_PARKED, mo_AcquireRelease); + atomic_store32(&txn->env->lck->rdt_refresh_flag, true, mo_Relaxed); + txn->flags += + autounpark ? MDBX_TXN_PARKED | MDBX_TXN_AUTOUNPARK : MDBX_TXN_PARKED; + return MDBX_SUCCESS; +} + +int txn_unpark(MDBX_txn *txn) { + if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_HAS_CHILD | + MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) != + (MDBX_TXN_RDONLY | MDBX_TXN_PARKED))) + return MDBX_BAD_TXN; + + for (reader_slot_t *const rslot = txn->to.reader; rslot; atomic_yield()) { + const uint32_t pid = atomic_load32(&rslot->pid, mo_Relaxed); + uint64_t tid = safe64_read(&rslot->tid); + uint64_t txnid = safe64_read(&rslot->txnid); + if (unlikely(pid != txn->env->pid)) { + ERROR("unexpected pid %u%s%u", pid, " != expected ", txn->env->pid); + return MDBX_PROBLEM; + } + if (unlikely(tid == MDBX_TID_TXN_OUSTED || + txnid >= SAFE64_INVALID_THRESHOLD)) + break; + if (unlikely(tid != MDBX_TID_TXN_PARKED || txnid != txn->txnid)) { + ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%" PRIx64 + " and/or txn-id %" PRIaTXN "%s%" PRIaTXN, + tid, " != must ", MDBX_TID_TXN_OUSTED, txnid, " != must ", + txn->txnid); + break; + } + if (unlikely((txn->flags & MDBX_TXN_ERROR))) + break; + +#if MDBX_64BIT_CAS + if (unlikely(!atomic_cas64(&rslot->tid, MDBX_TID_TXN_PARKED, txn->owner))) + continue; +#else + atomic_store32(&rslot->tid.high, (uint32_t)((uint64_t)txn->owner >> 32), + mo_Relaxed); + if (unlikely(!atomic_cas32(&rslot->tid.low, (uint32_t)MDBX_TID_TXN_PARKED, + (uint32_t)txn->owner))) { + atomic_store32(&rslot->tid.high, (uint32_t)(MDBX_TID_TXN_PARKED >> 32), + mo_AcquireRelease); + continue; + } +#endif + txnid = safe64_read(&rslot->txnid); + tid = safe64_read(&rslot->tid); + if (unlikely(txnid != txn->txnid || tid != txn->owner)) { + ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%zx" + " and/or txn-id %" PRIaTXN "%s%" PRIaTXN, + tid, " != must ", txn->owner, txnid, " != must ", txn->txnid); + break; + } + txn->flags &= ~(MDBX_TXN_PARKED | MDBX_TXN_AUTOUNPARK); + return MDBX_SUCCESS; + } + + int err = txn_end(txn, TXN_END_OUSTED | TXN_END_RESET | TXN_END_UPDATE); + return err ? err : MDBX_OUSTED; +} + __cold txnid_t mvcc_kick_laggards(MDBX_env *env, const txnid_t straggler) { DEBUG("DB size maxed out by reading #%" PRIaTXN, straggler); osal_memory_fence(mo_AcquireRelease, false); @@ -341,29 +428,61 @@ __cold txnid_t mvcc_kick_laggards(MDBX_env *env, const txnid_t straggler) { if (MDBX_IS_ERROR(mvcc_cleanup_dead(env, false, nullptr))) break; - if (!callback) - break; - reader_slot_t *stucked = nullptr; uint64_t hold_retired = 0; for (size_t i = 0; i < lck->rdt_length.weak; ++i) { - const uint64_t snap_retired = - atomic_load64(&lck->rdt[i].snapshot_pages_retired, mo_Relaxed); - const txnid_t rtxn = safe64_read(&lck->rdt[i].txnid); + uint32_t pid; + reader_slot_t *const rslot = &lck->rdt[i]; + txnid_t rtxn = safe64_read(&rslot->txnid); + retry: if (rtxn == straggler && - atomic_load32(&lck->rdt[i].pid, mo_AcquireRelease)) { - hold_retired = snap_retired; - stucked = &lck->rdt[i]; + (pid = atomic_load32(&rslot->pid, mo_AcquireRelease)) != 0) { + const uint64_t tid = safe64_read(&rslot->tid); + if (tid == MDBX_TID_TXN_PARKED) { + /* Читающая транзакция была помечена владельцем как "припаркованная", + * т.е. подлежащая асинхронному прерыванию, либо восстановлению + * по активности читателя. + * + * Если первый CAS(slot->tid) будет успешным, то + * safe64_reset_compare() безопасно очистит txnid, либо откажется + * из-за того что читатель сбросил и/или перезапустил транзакцию. + * При этом читатеть может не заметить вытестения, если приступит + * к завершению транзакции. Все эти исходы нас устраивют. + * + * Если первый CAS(slot->tid) будет НЕ успешным, то значит читатеть + * восстановил транзакцию, либо завершил её, либо даже освободил слот. + */ + bool ousted = +#if MDBX_64BIT_CAS + atomic_cas64(&rslot->tid, MDBX_TID_TXN_PARKED, + MDBX_TID_TXN_OUSTED); +#else + atomic_cas32(&rslot->tid.low, (uint32_t)MDBX_TID_TXN_PARKED, + (uint32_t)MDBX_TID_TXN_OUSTED); +#endif + if (likely(ousted)) { + ousted = safe64_reset_compare(&rslot->txnid, rtxn); + NOTICE("ousted-%s parked read-txn %" PRIaTXN + ", pid %u, tid 0x%" PRIx64, + ousted ? "complete" : "half", rtxn, pid, tid); + eASSERT(env, ousted || safe64_read(&rslot->txnid) > straggler); + continue; + } + rtxn = safe64_read(&rslot->txnid); + goto retry; + } + hold_retired = + atomic_load64(&lck->rdt[i].snapshot_pages_retired, mo_Relaxed); + stucked = rslot; } } - if (!stucked) + if (!callback || !stucked) break; uint32_t pid = atomic_load32(&stucked->pid, mo_AcquireRelease); - uint64_t tid = atomic_load64(&stucked->tid, mo_AcquireRelease); - if (safe64_read(&stucked->txnid) != straggler || !pid || - stucked->snapshot_pages_retired.weak != hold_retired) + uint64_t tid = safe64_read(&stucked->tid); + if (safe64_read(&stucked->txnid) != straggler || !pid) continue; const meta_ptr_t head = meta_recent(env, &env->txn->tw.troika); @@ -437,10 +556,7 @@ __cold int mdbx_thread_register(const MDBX_env *env) { return MDBX_RESULT_TRUE /* already registered */; } - const uintptr_t tid = osal_thread_self(); - if (env->txn && unlikely(env->basal_txn->owner == tid)) - return MDBX_TXN_OVERLAPPING; - return mvcc_bind_slot((MDBX_env *)env, tid).err; + return mvcc_bind_slot((MDBX_env *)env).err; } __cold int mdbx_thread_unregister(const MDBX_env *env) { diff --git a/src/proto.h b/src/proto.h index ebee21a3..a2aaa3e1 100644 --- a/src/proto.h +++ b/src/proto.h @@ -12,7 +12,7 @@ MDBX_INTERNAL int audit_ex(MDBX_txn *txn, size_t retired_stored, bool dont_filter_gc); /* mvcc-readers.c */ -MDBX_INTERNAL bsr_t mvcc_bind_slot(MDBX_env *env, const uintptr_t tid); +MDBX_INTERNAL bsr_t mvcc_bind_slot(MDBX_env *env); MDBX_MAYBE_UNUSED MDBX_INTERNAL pgno_t mvcc_largest_this(MDBX_env *env, pgno_t largest); MDBX_INTERNAL txnid_t mvcc_shapshot_oldest(MDBX_env *const env, @@ -56,10 +56,13 @@ MDBX_INTERNAL bool txn_refund(MDBX_txn *txn); MDBX_INTERNAL txnid_t txn_snapshot_oldest(const MDBX_txn *const txn); MDBX_INTERNAL int txn_abort(MDBX_txn *txn); MDBX_INTERNAL int txn_renew(MDBX_txn *txn, unsigned flags); +MDBX_INTERNAL int txn_park(MDBX_txn *txn, bool autounpark); +MDBX_INTERNAL int txn_unpark(MDBX_txn *txn); +MDBX_INTERNAL int txn_check_badbits_parked(const MDBX_txn *txn, int bad_bits); #define TXN_END_NAMES \ - {"committed", "empty-commit", "abort", "reset", \ - "reset-tmp", "fail-begin", "fail-beginchild"} + {"committed", "empty-commit", "abort", "reset", \ + "reset-tmp", "fail-begin", "fail-beginchild", "ousted"} enum { /* txn_end operation number, for logging */ TXN_END_COMMITTED, @@ -69,6 +72,7 @@ enum { TXN_END_RESET_TMP, TXN_END_FAIL_BEGIN, TXN_END_FAIL_BEGINCHILD, + TXN_END_OUSTED, TXN_END_OPMASK = 0x0F /* mask for txn_end() operation number */, TXN_END_UPDATE = 0x10 /* update env state (DBIs) */, @@ -76,7 +80,7 @@ enum { TXN_END_EOTDONE = 0x40 /* txn's cursors already closed */, TXN_END_SLOT = 0x80 /* release any reader slot if NOSTICKYTHREADS */ }; -MDBX_INTERNAL int txn_end(MDBX_txn *txn, const unsigned mode); +MDBX_INTERNAL int txn_end(MDBX_txn *txn, unsigned mode); MDBX_INTERNAL int txn_write(MDBX_txn *txn, iov_ctx_t *ctx); /* env.c */ diff --git a/src/txn.c b/src/txn.c index c3d32eb5..59c661f1 100644 --- a/src/txn.c +++ b/src/txn.c @@ -453,8 +453,8 @@ static void take_gcprof(MDBX_txn *txn, MDBX_commit_latency *latency) { } int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) { - STATIC_ASSERT(MDBX_TXN_FINISHED == - MDBX_TXN_BLOCKED - MDBX_TXN_HAS_CHILD - MDBX_TXN_ERROR); + STATIC_ASSERT(MDBX_TXN_FINISHED == MDBX_TXN_BLOCKED - MDBX_TXN_HAS_CHILD - + MDBX_TXN_ERROR - MDBX_TXN_PARKED); const uint64_t ts_0 = latency ? osal_monotime() : 0; uint64_t ts_1 = 0, ts_2 = 0, ts_3 = 0, ts_4 = 0, ts_5 = 0, gc_cputime = 0; @@ -919,7 +919,6 @@ int txn_renew(MDBX_txn *txn, unsigned flags) { } #endif /* MDBX_ENV_CHECKPID */ - const uintptr_t tid = osal_thread_self(); flags |= env->flags & (MDBX_NOSTICKYTHREADS | MDBX_WRITEMAP); if (flags & MDBX_TXN_RDONLY) { eASSERT(env, (flags & ~(txn_ro_begin_flags | MDBX_WRITEMAP | @@ -949,7 +948,7 @@ int txn_renew(MDBX_txn *txn, unsigned flags) { r->txnid.weak < SAFE64_INVALID_THRESHOLD)) return MDBX_BAD_RSLOT; } else if (env->lck_mmap.lck) { - bsr_t brs = mvcc_bind_slot(env, tid); + bsr_t brs = mvcc_bind_slot(env); if (unlikely(brs.err != MDBX_SUCCESS)) return brs.err; r = brs.rslot; @@ -968,7 +967,11 @@ int txn_renew(MDBX_txn *txn, unsigned flags) { txn->flags = MDBX_TXN_RDONLY | MDBX_TXN_FINISHED; return MDBX_SUCCESS; } - txn->owner = tid; + txn->owner = (uintptr_t)r->tid.weak; + if ((env->flags & MDBX_NOSTICKYTHREADS) == 0 && env->txn && + unlikely(env->basal_txn->owner == txn->owner) && + (globals.runtime_flags & MDBX_DBG_LEGACY_OVERLAP) == 0) + return MDBX_TXN_OVERLAPPING; /* Seek & fetch the last meta */ uint64_t timestamp = 0; @@ -980,7 +983,7 @@ int txn_renew(MDBX_txn *txn, unsigned flags) { ? /* regular */ meta_recent(env, &troika) : /* recovery mode */ meta_ptr(env, env->stuck_meta); if (likely(r)) { - safe64_reset(&r->txnid, false); + safe64_reset(&r->txnid, true); atomic_store32(&r->snapshot_pages_used, head.ptr_v->geometry.first_unallocated, mo_Relaxed); atomic_store64( @@ -1014,7 +1017,7 @@ int txn_renew(MDBX_txn *txn, unsigned flags) { rc = MDBX_PROBLEM; txn->txnid = INVALID_TXNID; if (likely(r)) - safe64_reset(&r->txnid, false); + safe64_reset(&r->txnid, true); goto bailout; } timestamp = 0; @@ -1029,7 +1032,7 @@ int txn_renew(MDBX_txn *txn, unsigned flags) { if (unlikely(rc != MDBX_RESULT_TRUE)) { txn->txnid = INVALID_TXNID; if (likely(r)) - safe64_reset(&r->txnid, false); + safe64_reset(&r->txnid, true); goto bailout; } } @@ -1037,7 +1040,7 @@ int txn_renew(MDBX_txn *txn, unsigned flags) { if (unlikely(txn->txnid < MIN_TXNID || txn->txnid > MAX_TXNID)) { ERROR("%s", "environment corrupted by died writer, must shutdown!"); if (likely(r)) - safe64_reset(&r->txnid, false); + safe64_reset(&r->txnid, true); txn->txnid = INVALID_TXNID; rc = MDBX_CORRUPTED; goto bailout; @@ -1050,6 +1053,7 @@ int txn_renew(MDBX_txn *txn, unsigned flags) { } else { eASSERT(env, (flags & ~(txn_rw_begin_flags | MDBX_TXN_SPILLS | MDBX_WRITEMAP | MDBX_NOSTICKYTHREADS)) == 0); + const uintptr_t tid = osal_thread_self(); if (unlikely(txn->owner == tid || /* not recovery mode */ env->stuck_meta >= 0)) return MDBX_BUSY; @@ -1165,7 +1169,8 @@ int txn_renew(MDBX_txn *txn, unsigned flags) { if (unlikely(env->dbs_flags[MAIN_DBI] != (DB_VALID | txn->dbs[MAIN_DBI].flags))) { - const bool need_txn_lock = env->basal_txn && env->basal_txn->owner != tid; + const bool need_txn_lock = + env->basal_txn && env->basal_txn->owner != osal_thread_self(); bool should_unlock = false; if (need_txn_lock) { rc = lck_txn_lock(env, true); @@ -1330,7 +1335,7 @@ bailout: return rc; } -int txn_end(MDBX_txn *txn, const unsigned mode) { +int txn_end(MDBX_txn *txn, unsigned mode) { MDBX_env *env = txn->env; static const char *const names[] = TXN_END_NAMES; @@ -1349,14 +1354,27 @@ int txn_end(MDBX_txn *txn, const unsigned mode) { reader_slot_t *slot = txn->to.reader; eASSERT(env, slot->pid.weak == env->pid); if (likely(!(txn->flags & MDBX_TXN_FINISHED))) { - ENSURE(env, txn->txnid >= - /* paranoia is appropriate here */ env->lck - ->cached_oldest.weak); - eASSERT(env, txn->txnid == slot->txnid.weak && - slot->txnid.weak >= env->lck->cached_oldest.weak); + if (likely((txn->flags & MDBX_TXN_PARKED) == 0)) { + ENSURE(env, txn->txnid >= + /* paranoia is appropriate here */ env->lck + ->cached_oldest.weak); + eASSERT(env, txn->txnid == slot->txnid.weak && + slot->txnid.weak >= env->lck->cached_oldest.weak); + } else { + if ((mode & TXN_END_OUSTED) == 0 && + safe64_read(&slot->tid) == MDBX_TID_TXN_OUSTED) + mode += TXN_END_OUSTED; + do { + safe64_reset(&slot->txnid, false); + atomic_store64(&slot->tid, txn->owner, mo_AcquireRelease); + atomic_yield(); + } while ( + unlikely(safe64_read(&slot->txnid) < SAFE64_INVALID_THRESHOLD || + safe64_read(&slot->tid) != txn->owner)); + } dxb_sanitize_tail(env, nullptr); atomic_store32(&slot->snapshot_pages_used, 0, mo_Relaxed); - safe64_reset(&slot->txnid, false); + safe64_reset(&slot->txnid, true); atomic_store32(&env->lck->rdt_refresh_flag, true, mo_Relaxed); } else { eASSERT(env, slot->pid.weak == env->pid); @@ -1373,7 +1391,9 @@ int txn_end(MDBX_txn *txn, const unsigned mode) { imports.srwl_ReleaseShared(&env->remap_guard); #endif txn->n_dbi = 0; /* prevent further DBI activity */ - txn->flags = MDBX_TXN_RDONLY | MDBX_TXN_FINISHED; + txn->flags = (mode & TXN_END_OUSTED) + ? MDBX_TXN_RDONLY | MDBX_TXN_FINISHED | MDBX_TXN_OUSTED + : MDBX_TXN_RDONLY | MDBX_TXN_FINISHED; txn->owner = 0; } else if (!(txn->flags & MDBX_TXN_FINISHED)) { ENSURE(env, @@ -1483,16 +1503,17 @@ int mdbx_txn_renew(MDBX_txn *txn) { if (unlikely((txn->flags & MDBX_TXN_RDONLY) == 0)) return MDBX_EINVAL; - int rc; if (unlikely(txn->owner != 0 || !(txn->flags & MDBX_TXN_FINISHED))) { - rc = mdbx_txn_reset(txn); + int rc = mdbx_txn_reset(txn); if (unlikely(rc != MDBX_SUCCESS)) return rc; } - rc = txn_renew(txn, MDBX_TXN_RDONLY); + int rc = txn_renew(txn, MDBX_TXN_RDONLY); if (rc == MDBX_SUCCESS) { - tASSERT(txn, txn->owner == osal_thread_self()); + tASSERT(txn, txn->owner == (txn->flags & MDBX_NOSTICKYTHREADS) + ? 0 + : osal_thread_self()); DEBUG("renew txn %" PRIaTXN "%c %p on env %p, root page %" PRIaPGNO "/%" PRIaPGNO, txn->txnid, (txn->flags & MDBX_TXN_RDONLY) ? 'r' : 'w', (void *)txn, @@ -1550,12 +1571,7 @@ int mdbx_txn_begin_ex(MDBX_env *env, MDBX_txn *parent, MDBX_txn_flags_t flags, flags |= parent->flags & (txn_rw_begin_flags | MDBX_TXN_SPILLS | MDBX_NOSTICKYTHREADS | MDBX_WRITEMAP); - } else if (flags & MDBX_TXN_RDONLY) { - if ((env->flags & MDBX_NOSTICKYTHREADS) == 0 && env->txn && - unlikely(env->basal_txn->owner == osal_thread_self()) && - (globals.runtime_flags & MDBX_DBG_LEGACY_OVERLAP) == 0) - return MDBX_TXN_OVERLAPPING; - } else { + } else if ((flags & MDBX_TXN_RDONLY) == 0) { /* Reuse preallocated write txn. However, do not touch it until * txn_renew() succeeds, since it currently may be active. */ txn = env->basal_txn; @@ -1789,8 +1805,10 @@ int mdbx_txn_info(const MDBX_txn *txn, MDBX_txn_info *info, bool scan_rlt) { info->txn_reader_lag = head.txnid - info->txn_id; info->txn_space_dirty = info->txn_space_retired = 0; - uint64_t reader_snapshot_pages_retired; + uint64_t reader_snapshot_pages_retired = 0; if (txn->to.reader && + ((txn->flags & MDBX_TXN_PARKED) == 0 || + safe64_read(&txn->to.reader->tid) != MDBX_TID_TXN_OUSTED) && head_retired > (reader_snapshot_pages_retired = atomic_load64( &txn->to.reader->snapshot_pages_retired, mo_Relaxed))) { @@ -1808,19 +1826,21 @@ int mdbx_txn_info(const MDBX_txn *txn, MDBX_txn_info *info, bool scan_rlt) { retry: if (atomic_load32(&lck->rdt[i].pid, mo_AcquireRelease)) { jitter4testing(true); + const uint64_t snap_tid = safe64_read(&lck->rdt[i].tid); const txnid_t snap_txnid = safe64_read(&lck->rdt[i].txnid); const uint64_t snap_retired = atomic_load64( &lck->rdt[i].snapshot_pages_retired, mo_AcquireRelease); if (unlikely(snap_retired != atomic_load64(&lck->rdt[i].snapshot_pages_retired, mo_Relaxed)) || - snap_txnid != safe64_read(&lck->rdt[i].txnid)) + snap_txnid != safe64_read(&lck->rdt[i].txnid) || + snap_tid != safe64_read(&lck->rdt[i].tid)) goto retry; if (snap_txnid <= txn->txnid) { retired_next_reader = 0; break; } - if (snap_txnid < next_reader) { + if (snap_txnid < next_reader && snap_tid >= MDBX_TID_TXN_OUSTED) { next_reader = snap_txnid; retired_next_reader = pgno2bytes( env, (pgno_t)(snap_retired - @@ -1885,7 +1905,7 @@ uint64_t mdbx_txn_id(const MDBX_txn *txn) { return txn->txnid; } -int mdbx_txn_flags(const MDBX_txn *txn) { +MDBX_txn_flags_t mdbx_txn_flags(const MDBX_txn *txn) { STATIC_ASSERT( (MDBX_TXN_INVALID & (MDBX_TXN_FINISHED | MDBX_TXN_ERROR | MDBX_TXN_DIRTY | MDBX_TXN_SPILLS | @@ -1894,7 +1914,12 @@ int mdbx_txn_flags(const MDBX_txn *txn) { if (unlikely(!txn || txn->signature != txn_signature)) return MDBX_TXN_INVALID; assert(0 == (int)(txn->flags & MDBX_TXN_INVALID)); - return txn->flags; + + MDBX_txn_flags_t flags = txn->flags; + if (F_ISSET(flags, MDBX_TXN_PARKED | MDBX_TXN_RDONLY) && txn->to.reader && + safe64_read(&txn->to.reader->tid) == MDBX_TID_TXN_OUSTED) + flags |= MDBX_TXN_OUSTED; + return flags; } int mdbx_txn_reset(MDBX_txn *txn) { @@ -1946,3 +1971,55 @@ int mdbx_txn_abort(MDBX_txn *txn) { return txn_abort(txn); } + +int mdbx_txn_park(MDBX_txn *txn, bool autounpark) { + STATIC_ASSERT(MDBX_TXN_BLOCKED > MDBX_TXN_ERROR); + int rc = check_txn(txn, MDBX_TXN_BLOCKED - MDBX_TXN_ERROR); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + if (unlikely((txn->flags & MDBX_TXN_RDONLY) == 0)) + return MDBX_TXN_INVALID; + + if (unlikely((txn->flags & MDBX_TXN_ERROR))) { + rc = txn_end(txn, TXN_END_RESET | TXN_END_UPDATE); + return rc ? rc : MDBX_OUSTED; + } + + return txn_park(txn, autounpark); +} + +int mdbx_txn_unpark(MDBX_txn *txn, bool restart_if_ousted) { + STATIC_ASSERT(MDBX_TXN_BLOCKED > MDBX_TXN_PARKED + MDBX_TXN_ERROR); + int rc = check_txn(txn, MDBX_TXN_BLOCKED - MDBX_TXN_PARKED - MDBX_TXN_ERROR); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + if (unlikely(!F_ISSET(txn->flags, MDBX_TXN_RDONLY | MDBX_TXN_PARKED))) + return MDBX_SUCCESS; + + rc = txn_unpark(txn); + if (likely(rc != MDBX_OUSTED) || !restart_if_ousted) + return rc; + + tASSERT(txn, txn->flags & MDBX_TXN_FINISHED); + rc = txn_renew(txn, MDBX_TXN_RDONLY); + return (rc == MDBX_SUCCESS) ? MDBX_RESULT_TRUE : rc; +} + +int txn_check_badbits_parked(const MDBX_txn *txn, int bad_bits) { + tASSERT(txn, (bad_bits & MDBX_TXN_PARKED) && (txn->flags & bad_bits)); + /* Здесь осознано заложено отличие в поведении припаркованных транзакций: + * - некоторые функции (например mdbx_env_info_ex()), допускают + * использование поломанных транзакций (с флагом MDBX_TXN_ERROR), но + * не могут работать с припаркованными транзакциями (требуют распарковки). + * - но при распарковке поломанные транзакции завершаются. + * - получается что транзакцию можно припарковать, потом поломать вызвав + * mdbx_txn_break(), но далее любое её использование приведет к завершению + * при распарковке. */ + if ((txn->flags & (bad_bits | MDBX_TXN_AUTOUNPARK)) != + (MDBX_TXN_PARKED | MDBX_TXN_AUTOUNPARK)) + return MDBX_BAD_TXN; + + tASSERT(txn, bad_bits == MDBX_TXN_BLOCKED || + bad_bits == MDBX_TXN_BLOCKED - MDBX_TXN_ERROR); + return mdbx_txn_unpark((MDBX_txn *)txn, false); +}