db: add row locking metrics

This commit adds statistics to row_locker class. Metrics are
independendly counted for all lock types: row<->partition and
exclusive<->shared.

Metrics gathered:
 - total acquisitions
 - operations that wait on the lock
 - histogram of the time spent on waiting on this type of lock

References #3385
References #3416
This commit is contained in:
Piotr Sarna
2018-05-07 16:11:14 +02:00
parent 49bebcfa25
commit 9246bb36bc
5 changed files with 103 additions and 55 deletions

View File

@@ -1249,6 +1249,20 @@ void column_family::set_metrics() {
ms::make_gauge("pending_compaction", ms::description("Estimated number of compactions pending for this column family"), _stats.pending_compactions)(cf)(ks)
});
// Metrics related to row locking
auto add_row_lock_metrics = [this, ks, cf] (row_locker::single_lock_stats& stats, sstring stat_name) {
_metrics.add_group("column_family", {
ms::make_total_operations(sprint("row_lock_%s_acquisitions", stat_name), stats.lock_acquisitions, ms::description(sprint("Row lock acquisitions for %s lock", stat_name)))(cf)(ks),
ms::make_queue_length(sprint("row_lock_%s_operations_currently_waiting_for_lock", stat_name), stats.operations_currently_waiting_for_lock, ms::description(sprint("Operations currently waiting for %s lock", stat_name)))(cf)(ks),
ms::make_histogram(sprint("row_lock_%s_waiting_time", stat_name), ms::description(sprint("Histogram representing time that operations spent on waiting for %s lock", stat_name)),
[&stats] {return stats.estimated_waiting_for_lock.get_histogram(std::chrono::microseconds(100));})(cf)(ks)
});
};
add_row_lock_metrics(_row_locker_stats.exclusive_row, "exclusive_row");
add_row_lock_metrics(_row_locker_stats.shared_row, "shared_row");
add_row_lock_metrics(_row_locker_stats.exclusive_partition, "exclusive_partition");
add_row_lock_metrics(_row_locker_stats.shared_partition, "shared_partition");
// View metrics are created only for base tables, so there's no point in adding them to views (which cannot act as base tables for other views)
if (!_schema->is_view()) {
_metrics.add_group("column_family", {
@@ -4486,14 +4500,14 @@ column_family::local_base_lock(
_row_locker.upgrade(s);
if (rows.size() == 1 && rows[0].is_singular() && rows[0].start() && !rows[0].start()->value().is_empty(*s)) {
// A single clustering row is involved.
return _row_locker.lock_ck(pk, rows[0].start()->value(), true, timeout);
return _row_locker.lock_ck(pk, rows[0].start()->value(), true, timeout, _row_locker_stats);
} else {
// More than a single clustering row is involved. Most commonly it's
// the entire partition, so let's lock the entire partition. We could
// lock less than the entire partition in more elaborate cases where
// just a few individual rows are involved, or row ranges, but we
// don't think this will make a practical difference.
return _row_locker.lock_pk(pk, true, timeout);
return _row_locker.lock_pk(pk, true, timeout, _row_locker_stats);
}
}

View File

@@ -347,6 +347,7 @@ private:
config _config;
mutable stats _stats;
mutable db::view::stats _view_stats;
mutable row_locker::stats _row_locker_stats;
uint64_t _failed_counter_applies_to_memtable = 0;

View File

@@ -63,24 +63,32 @@ row_locker::lock_holder::lock_holder(row_locker* locker, const dht::decorated_ke
}
future<row_locker::lock_holder>
row_locker::lock_pk(const dht::decorated_key& pk, bool exclusive, db::timeout_clock::time_point timeout) {
row_locker::lock_pk(const dht::decorated_key& pk, bool exclusive, db::timeout_clock::time_point timeout, stats& stats) {
mylog.debug("taking {} lock on entire partition {}", (exclusive ? "exclusive" : "shared"), pk);
auto i = _two_level_locks.find(pk);
if (i == _two_level_locks.end()) {
// Lock doesn't exist, we need to create it first
i = _two_level_locks.emplace(pk, this).first;
}
single_lock_stats &single_lock_stats = exclusive ? stats.exclusive_partition : stats.shared_partition;
single_lock_stats.operations_currently_waiting_for_lock++;
utils::latency_counter waiting_latency;
waiting_latency.start();
auto f = exclusive ? i->second._partition_lock.write_lock(timeout) : i->second._partition_lock.read_lock(timeout);
// Note: we rely on the fact that &i->first, the pointer to a key, never
// becomes invalid (as long as the item is actually in the hash table),
// even in the case of rehashing.
return f.then([this, pk = &i->first, exclusive] () {
return f.then([this, pk = &i->first, exclusive, &single_lock_stats, waiting_latency = std::move(waiting_latency)] () mutable {
waiting_latency.stop();
single_lock_stats.estimated_waiting_for_lock.add(waiting_latency.latency(), single_lock_stats.operations_currently_waiting_for_lock);
single_lock_stats.lock_acquisitions++;
single_lock_stats.operations_currently_waiting_for_lock--;
return lock_holder(this, pk, exclusive);
});
}
future<row_locker::lock_holder>
row_locker::lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& cpk, bool exclusive, db::timeout_clock::time_point timeout) {
row_locker::lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& cpk, bool exclusive, db::timeout_clock::time_point timeout, stats& stats) {
mylog.debug("taking shared lock on partition {}, and {} lock on row {} in it", pk, (exclusive ? "exclusive" : "shared"), cpk);
auto i = _two_level_locks.find(pk);
if (i == _two_level_locks.end()) {
@@ -108,10 +116,19 @@ row_locker::lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& c
});
}
}
single_lock_stats &single_lock_stats = exclusive ? stats.exclusive_row : stats.shared_row;
single_lock_stats.operations_currently_waiting_for_lock++;
utils::latency_counter waiting_latency;
waiting_latency.start();
future<lock_type::holder> lock_row = exclusive ? j->second.hold_write_lock(timeout) : j->second.hold_read_lock(timeout);
return when_all_succeed(std::move(lock_partition), std::move(lock_row)).then([this, pk = &i->first, cpk = &j->first, exclusive] (auto lock1, auto lock2) {
return when_all_succeed(std::move(lock_partition), std::move(lock_row))
.then([this, pk = &i->first, cpk = &j->first, exclusive, &single_lock_stats, waiting_latency = std::move(waiting_latency)] (auto lock1, auto lock2) mutable {
lock1.release();
lock2.release();
waiting_latency.stop();
single_lock_stats.estimated_waiting_for_lock.add(waiting_latency.latency(), single_lock_stats.operations_currently_waiting_for_lock);
single_lock_stats.lock_acquisitions++;
single_lock_stats.operations_currently_waiting_for_lock--;
return lock_holder(this, pk, cpk, exclusive);
});
}

View File

@@ -42,9 +42,23 @@
#include "schema.hh"
#include "dht/i_partitioner.hh"
#include "query-request.hh"
#include "utils/estimated_histogram.hh"
#include "utils/histogram.hh"
#include "utils/latency.hh"
class row_locker {
public:
struct single_lock_stats {
uint64_t lock_acquisitions = 0;
uint64_t operations_currently_waiting_for_lock = 0;
utils::estimated_histogram estimated_waiting_for_lock;
};
struct stats {
single_lock_stats exclusive_row;
single_lock_stats shared_row;
single_lock_stats exclusive_partition;
single_lock_stats shared_partition;
};
// row_locker's locking functions lock_pk(), lock_ck() return a
// "lock_holder" object. When the caller destroys the object it received,
// the lock is released. The same type "lock_holder" is used regardless
@@ -116,14 +130,14 @@ public:
// The key is assumed to belong to the schema saved by row_locker. If you
// got a schema with the key, and not sure it's not a new version of the
// schema, call upgrade() before taking the lock.
future<lock_holder> lock_pk(const dht::decorated_key& pk, bool exclusive, db::timeout_clock::time_point timeout);
future<lock_holder> lock_pk(const dht::decorated_key& pk, bool exclusive, db::timeout_clock::time_point timeout, stats& stats);
// Lock a clustering row with a shared or exclusive lock.
// Also, first, takes a shared lock on the partition.
// The key is assumed to belong to the schema saved by row_locker. If you
// got a schema with the key, and not sure it's not a new version of the
// schema, call upgrade() before taking the lock.
future<lock_holder> lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& ckp, bool exclusive, db::timeout_clock::time_point timeout);
future<lock_holder> lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& ckp, bool exclusive, db::timeout_clock::time_point timeout, stats& stats);
bool empty() const { return _two_level_locks.empty(); }
};

View File

@@ -26,6 +26,8 @@
#include "db/view/row_locking.hh"
#include "schema_builder.hh"
static row_locker::stats row_locker_stats;
static schema_ptr make_schema()
{
return schema_builder("ks", "cf")
@@ -60,27 +62,27 @@ SEASTAR_TEST_CASE(test_nonblock_exclusive) {
row_locker rl(s);
auto pk = make_pk(s, "pk1");
auto ck = make_ck(s, "ck1") ;
auto lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max()).get0();
auto lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
auto ignore = [] (auto) { };
// move out the lock object, thereby releasing the lock
ignore(std::move(lock));
// after we released the lock, we can take it again.
lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max()).get0();
lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock));
// now do the same, but locking an entire partition. Should
// be fine after we unlocked the row.
lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max()).get0();
lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock));
lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max()).get0();
lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock));
// After we unlock the partition, we can lock the row
lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max()).get0();
lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock));
// Check that we can hold an exclusive lock for two rows in the
// same partition, and it doesn't hang.
auto ck2 = make_ck(s, "ck2") ;
lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max()).get0();
auto lock2 = rl.lock_ck(pk, ck2, true, db::timeout_clock::time_point::max()).get0();
lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
auto lock2 = rl.lock_ck(pk, ck2, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock));
ignore(std::move(lock2));
BOOST_REQUIRE(rl.empty() == true);
@@ -98,39 +100,39 @@ SEASTAR_TEST_CASE(test_nonblock_shared) {
auto pk = make_pk(s, "pk1");
auto ck = make_ck(s, "ck1") ;
// Check that we can lock the same row multiple times with a shared lock:
auto lock1 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max()).get0();
auto lock2 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max()).get0();
auto lock3 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max()).get0();
auto lock1 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
auto lock2 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
auto lock3 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
auto ignore = [] (auto) { };
ignore(std::move(lock1));
ignore(std::move(lock2));
ignore(std::move(lock3));
// Check that after unlocking, we can lock again. Also for exclusive lock:
lock1 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max()).get0();
lock1 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock1));
lock1 = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max()).get0();
lock1 = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock1));
// Same test but for the partition lock level
lock1 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max()).get0();
lock2 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max()).get0();
lock3 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max()).get0();
lock1 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
lock2 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
lock3 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock1));
ignore(std::move(lock2));
ignore(std::move(lock3));
lock1 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max()).get0();
lock1 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock1));
lock1 = rl.lock_pk(pk, true, db::timeout_clock::time_point::max()).get0();
lock1 = rl.lock_pk(pk, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock1));
// Check that we can hold a shared lock for a partition and a row
// in it concurrently.
lock1 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max()).get0();
lock2 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max()).get0();
lock1 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
lock2 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock1));
ignore(std::move(lock2));
// Check that the above is fine also if the row lock is exclusive
// (the "exclusivity" is only for the row).
lock1 = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max()).get0();
lock2 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max()).get0();
lock1 = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
lock2 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock1));
ignore(std::move(lock2));
BOOST_REQUIRE(rl.empty() == true);
@@ -150,7 +152,7 @@ SEASTAR_TEST_CASE(test_nonblock_many) {
for (int i = 0; i < N; i++) {
if (i % 2) {
// lock the entire partition
auto lock = rl.lock_pk(make_pk(s, to_sstring(i)), true, db::timeout_clock::time_point::max()).get0();
auto lock = rl.lock_pk(make_pk(s, to_sstring(i)), true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
if (i % 4) {
// drop half of locks immediately, half kept until end.
locks.push_back(std::move(lock));
@@ -159,7 +161,7 @@ SEASTAR_TEST_CASE(test_nonblock_many) {
// lock M rows, drop half of the locks immediately, keep half
// until the end.
for (int j = 0; j < M; j++) {
auto lock = rl.lock_ck(make_pk(s, to_sstring(i)), make_ck(s, to_sstring(j)), true, db::timeout_clock::time_point::max()).get0();
auto lock = rl.lock_ck(make_pk(s, to_sstring(i)), make_ck(s, to_sstring(j)), true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
if (j % 2) {
locks.push_back(std::move(lock));
}
@@ -190,21 +192,21 @@ SEASTAR_TEST_CASE(test_nonblock_upgrade) {
auto s = make_schema();
auto s2 = make_alternative_schema();
row_locker rl(s);
auto lock = rl.lock_ck(make_pk(s, "pk1"), make_ck(s, "ck1"), true, db::timeout_clock::time_point::max()).get0();
auto lock = rl.lock_ck(make_pk(s, "pk1"), make_ck(s, "ck1"), true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
auto ignore = [] (auto) { };
ignore(std::move(lock));
rl.upgrade(s2);
// verify that the row_locker does not not keep a reference to s any
// more, so the only remaining reference is ours.
BOOST_REQUIRE(s.use_count() == 1);
lock = rl.lock_ck(make_pk(s2, "pk1"), make_ck(s2, "ck1"), true, db::timeout_clock::time_point::max()).get0();
lock = rl.lock_ck(make_pk(s2, "pk1"), make_ck(s2, "ck1"), true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock));
// Same test, but upgrade the schema while a lock is still taken
lock = rl.lock_ck(make_pk(s2, "pk1"), make_ck(s2, "ck1"), true, db::timeout_clock::time_point::max()).get0();
lock = rl.lock_ck(make_pk(s2, "pk1"), make_ck(s2, "ck1"), true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
rl.upgrade(s);
BOOST_REQUIRE(s2.use_count() == 1);
ignore(std::move(lock));
lock = rl.lock_ck(make_pk(s, "pk1"), make_ck(s, "ck1"), true, db::timeout_clock::time_point::max()).get0();
lock = rl.lock_ck(make_pk(s, "pk1"), make_ck(s, "ck1"), true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
ignore(std::move(lock));
BOOST_REQUIRE(rl.empty() == true);
});
@@ -223,11 +225,11 @@ SEASTAR_TEST_CASE(test_block_exclusive_twice_row) {
row_locker rl(s);
auto pk = make_pk(s, "pk1");
auto ck = make_ck(s, "ck1") ;
auto lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max()).get0();
auto lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
// If we try to lock again *cannot* be ready now. It will
// become ready (and get0() won't hang) when we drop
// the first lock
auto flock1 = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max());
auto flock1 = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats);
BOOST_REQUIRE(!flock1.available());
auto ignore = [] (auto) { };
ignore(std::move(lock));
@@ -241,8 +243,8 @@ SEASTAR_TEST_CASE(test_block_exclusive_twice_partition) {
auto s = make_schema();
row_locker rl(s);
auto pk = make_pk(s, "pk1");
auto lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max()).get0();
auto flock1 = rl.lock_pk(pk, true, db::timeout_clock::time_point::max());
auto lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
auto flock1 = rl.lock_pk(pk, true, db::timeout_clock::time_point::max(), row_locker_stats);
BOOST_REQUIRE(!flock1.available());
auto ignore = [] (auto) { };
ignore(std::move(lock));
@@ -258,15 +260,15 @@ SEASTAR_TEST_CASE(test_block_exclusive_and_shared_row) {
auto pk = make_pk(s, "pk1");
auto ck = make_ck(s, "ck1") ;
// shared lock first, exclusive lock second:
auto lock = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max()).get0();
auto flock1 = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max());
auto lock = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
auto flock1 = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats);
BOOST_REQUIRE(!flock1.available());
auto ignore = [] (auto) { };
ignore(std::move(lock));
flock1.get0();
// exclusive lock first, shared lock second
lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max()).get0();
flock1 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max());
lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
flock1 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max(), row_locker_stats);
BOOST_REQUIRE(!flock1.available());
ignore(std::move(lock));
flock1.get0();
@@ -277,14 +279,14 @@ SEASTAR_TEST_CASE(test_block_exclusive_and_shared_partition) {
auto s = make_schema();
row_locker rl(s);
auto pk = make_pk(s, "pk1");
auto lock = rl.lock_pk(pk, false, db::timeout_clock::time_point::max()).get0();
auto flock1 = rl.lock_pk(pk, true, db::timeout_clock::time_point::max());
auto lock = rl.lock_pk(pk, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
auto flock1 = rl.lock_pk(pk, true, db::timeout_clock::time_point::max(), row_locker_stats);
BOOST_REQUIRE(!flock1.available());
auto ignore = [] (auto) { };
ignore(std::move(lock));
flock1.get0();
lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max()).get0();
flock1 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max());
lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
flock1 = rl.lock_pk(pk, false, db::timeout_clock::time_point::max(), row_locker_stats);
BOOST_REQUIRE(!flock1.available());
ignore(std::move(lock));
flock1.get0();
@@ -298,26 +300,26 @@ SEASTAR_TEST_CASE(test_block_partition_row) {
row_locker rl(s);
auto pk = make_pk(s, "pk1");
auto ck = make_ck(s, "ck1") ;
auto lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max()).get0();
auto flock1 = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max()); // try exclusive row lock
auto lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
auto flock1 = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats); // try exclusive row lock
BOOST_REQUIRE(!flock1.available());
auto ignore = [] (auto) { };
ignore(std::move(lock));
flock1.get0();
lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max()).get0();
flock1 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max()); // also try shared row lock
lock = rl.lock_pk(pk, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
flock1 = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max(), row_locker_stats); // also try shared row lock
BOOST_REQUIRE(!flock1.available());
ignore(std::move(lock));
flock1.get0();
// Now try the same in opposite order (the row lock first, then the
// partition lock).
lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max()).get0();
flock1 = rl.lock_pk(pk, true, db::timeout_clock::time_point::max());
lock = rl.lock_ck(pk, ck, true, db::timeout_clock::time_point::max(), row_locker_stats).get0();
flock1 = rl.lock_pk(pk, true, db::timeout_clock::time_point::max(), row_locker_stats);
BOOST_REQUIRE(!flock1.available());
ignore(std::move(lock));
flock1.get0();
lock = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max()).get0();
flock1 = rl.lock_pk(pk, true, db::timeout_clock::time_point::max());
lock = rl.lock_ck(pk, ck, false, db::timeout_clock::time_point::max(), row_locker_stats).get0();
flock1 = rl.lock_pk(pk, true, db::timeout_clock::time_point::max(), row_locker_stats);
BOOST_REQUIRE(!flock1.available());
ignore(std::move(lock));
flock1.get0();