diff --git a/db/view/row_locking.cc b/db/view/row_locking.cc index b02eb965d0..5310c66520 100644 --- a/db/view/row_locking.cc +++ b/db/view/row_locking.cc @@ -85,29 +85,22 @@ future row_locker::lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& cpk, bool exclusive, db::timeout_clock::time_point timeout, stats& stats) { mylog.debug("taking shared lock on partition {}, and {} lock on row {} in it", pk, (exclusive ? "exclusive" : "shared"), cpk); auto tracker = latency_stats_tracker(exclusive ? stats.exclusive_row : stats.shared_row); + auto ck = cpk; + // Create a two-level lock entry for the partition if it doesn't exist already. auto i = _two_level_locks.try_emplace(pk, this).first; + // The two-level lock entry we've just created is guaranteed to be kept alive as long as it's locked. + // Initiating read locking in the background below ensures that even if the two-level lock is currently + // write-locked, releasing the write-lock will synchronously engage any waiting + // locks and will keep the entry alive. future lock_partition = i->second._partition_lock.hold_read_lock(timeout); - auto j = i->second._row_locks.find(cpk); - if (j == i->second._row_locks.end()) { - // Not yet locked, need to create the lock. This makes a copy of cpk. - try { - j = i->second._row_locks.emplace(cpk, lock_type()).first; - } catch(...) { - // If this emplace() failed, e.g., out of memory, we fail. We - // could do nothing - the partition lock we already started - // taking will be unlocked automatically after being locked. - // But it's better form to wait for the work we started, and it - // will also allow us to remove the hash-table row we added. - return lock_partition.then([ex = std::current_exception()] (auto lock) { - // The lock is automatically released when "lock" goes out of scope. - // TODO: unlock (lock = {}) now, search for the partition in the - // hash table (we know it's still there, because we held the lock until - // now) and remove the unused lock from the hash table if still unused. - return make_exception_future(std::current_exception()); - }); - } - } - return lock_partition.then([this, pk = &i->first, cpk = &j->first, &row_lock = j->second, exclusive, tracker = std::move(tracker), timeout] (auto lock1) mutable { + return lock_partition.then([this, pk = &i->first, row_locks = &i->second._row_locks, ck = std::move(ck), exclusive, tracker = std::move(tracker), timeout] (auto lock1) mutable { + // Create a row_lock entry if it doesn't exist already. + auto j = row_locks->try_emplace(std::move(ck), lock_type()).first; + auto* cpk = &j->first; + auto& row_lock = j->second; + // Like to the two-level lock entry above, the row_lock entry we've just created + // is guaranteed to be kept alive as long as it's locked. + // Initiating read/write locking in the background below ensures that. auto lock_row = exclusive ? row_lock.hold_write_lock(timeout) : row_lock.hold_read_lock(timeout); return lock_row.then([this, pk, cpk, exclusive, tracker = std::move(tracker), lock1 = std::move(lock1)] (auto lock2) mutable { lock1.release();