Materialized views: implement row and partition locking mechanism

This patch adds a "row_locker" class providing locking (shard-locally) of
individual clustering rows or entire partitions, and both exclusive and
shared locks (a.k.a. reader/writer lock).

As we'll see in a following patch, we need this locking capability for
materialized views, to serialize the read-modify-update modifications
which involve the same rows or partitions.

The new row_locker is significantly different from the existing cell_locker.
The two main differences are that 1. row_locker also supports locking the
entire partition, not just individual rows (or cells in them), and that
2. row_locker supports also shared (reader) locks, not just exclusive locks.
For this reason we opted for a new implementation, instead of making large
modificiations to the existing cell_locker. And we put the source files
in the view/ directory, because row_locker's requirements are pretty
specific to the needs of materialized views.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
This commit is contained in:
Nadav Har'El
2018-01-30 16:16:27 +02:00
parent bec2b015e3
commit 31d0a1dd0c
3 changed files with 326 additions and 0 deletions

View File

@@ -500,6 +500,7 @@ scylla_core = (['database.cc',
'db/marshal/type_parser.cc',
'db/batchlog_manager.cc',
'db/view/view.cc',
'db/view/row_locking.cc',
'index/secondary_index_manager.cc',
'io/io.cc',
'utils/utils.cc',

198
db/view/row_locking.cc Normal file
View File

@@ -0,0 +1,198 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "row_locking.hh"
#include "log.hh"
static logging::logger mylog("row_locking");
row_locker::row_locker(schema_ptr s)
: _schema(s)
, _two_level_locks(1, decorated_key_hash(), decorated_key_equals_comparator(this))
{
}
void row_locker::upgrade(schema_ptr new_schema) {
if (new_schema == _schema) {
return;
}
mylog.debug("row_locker::upgrade from {} to {}", _schema.get(), new_schema.get());
_schema = new_schema;
}
row_locker::lock_holder::lock_holder()
: _locker(nullptr)
, _partition(nullptr)
, _partition_exclusive(true)
, _row(nullptr)
, _row_exclusive(true) {
}
row_locker::lock_holder::lock_holder(row_locker* locker, const dht::decorated_key* pk, bool exclusive)
: _locker(locker)
, _partition(pk)
, _partition_exclusive(exclusive)
, _row(nullptr)
, _row_exclusive(true) {
}
row_locker::lock_holder::lock_holder(row_locker* locker, const dht::decorated_key* pk, const clustering_key_prefix* cpk, bool exclusive)
: _locker(locker)
, _partition(pk)
, _partition_exclusive(false)
, _row(cpk)
, _row_exclusive(exclusive) {
}
future<row_locker::lock_holder>
row_locker::lock_pk(const dht::decorated_key& pk, bool exclusive) {
mylog.debug("taking {} lock on entire partition {}", (exclusive ? "exclusive" : "shared"), pk);
auto i = _two_level_locks.find(pk);
if (i == _two_level_locks.end()) {
// Lock doesn't exist, we need to create it first
i = _two_level_locks.emplace(pk, this).first;
}
auto f = exclusive ? i->second._partition_lock.write_lock() : i->second._partition_lock.read_lock();
// Note: we rely on the fact that &i->first, the pointer to a key, never
// becomes invalid (as long as the item is actually in the hash table),
// even in the case of rehashing.
return f.then([this, pk = &i->first, exclusive] () {
return lock_holder(this, pk, exclusive);
});
}
future<row_locker::lock_holder>
row_locker::lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& cpk, bool exclusive) {
mylog.debug("taking shared lock on partition {}, and {} lock on row {} in it", pk, (exclusive ? "exclusive" : "shared"), cpk);
auto i = _two_level_locks.find(pk);
if (i == _two_level_locks.end()) {
// Not yet locked, we need to create the lock. This makes a copy of pk.
i = _two_level_locks.emplace(pk, this).first;
}
future<rwlock::holder> lock_partition = i->second._partition_lock.hold_read_lock();
auto j = i->second._row_locks.find(cpk);
if (j == i->second._row_locks.end()) {
// Not yet locked, need to create the lock. This makes a copy of cpk.
try {
j = i->second._row_locks.emplace(cpk, rwlock()).first;
} catch(...) {
// If this emplace() failed, e.g., out of memory, we fail. We
// could do nothing - the partition lock we already started
// taking will be unlocked automatically after being locked.
// But it's better form to wait for the work we started, and it
// will also allow us to remove the hash-table row we added.
return lock_partition.then([ex = std::current_exception()] (auto lock) {
// The lock is automatically released when "lock" goes out of scope.
// TODO: unlock (lock = {}) now, search for the partition in the
// hash table (we know it's still there, because we held the lock until
// now) and remove the unused lock from the hash table if still unused.
return make_exception_future<row_locker::lock_holder>(std::current_exception());
});
}
}
future<rwlock::holder> lock_row = exclusive ? j->second.hold_write_lock() : j->second.hold_read_lock();
return when_all_succeed(std::move(lock_partition), std::move(lock_row)).then([this, pk = &i->first, cpk = &j->first, exclusive] (auto lock1, auto lock2) {
lock1.release();
lock2.release();
return lock_holder(this, pk, cpk, exclusive);
});
}
row_locker::lock_holder::lock_holder(row_locker::lock_holder&& old) noexcept
: _locker(old._locker)
, _partition(old._partition)
, _partition_exclusive(old._partition_exclusive)
, _row(old._row)
, _row_exclusive(old._row_exclusive)
{
// We also need to zero old's _partition and _row, so when destructed
// the destructor will do nothing and further moves will not create
// duplicates.
old._partition = nullptr;
old._row = nullptr;
}
row_locker::lock_holder& row_locker::lock_holder::operator=(row_locker::lock_holder&& old) noexcept {
if (this != &old) {
this->~lock_holder();
_locker = old._locker;
_partition = old._partition;
_partition_exclusive = old._partition_exclusive;
_row = old._row;
_row_exclusive = old._row_exclusive;
// As above, need to also zero other's data
old._partition = nullptr;
old._row = nullptr;
}
return *this;
}
void
row_locker::unlock(const dht::decorated_key* pk, bool partition_exclusive,
const clustering_key_prefix* cpk, bool row_exclusive) {
// Look for the partition and/or row locks given keys, release the locks,
// and if nobody is using one of lock objects any more, delete it:
if (pk) {
auto pli = _two_level_locks.find(*pk);
if (pli == _two_level_locks.end()) {
// This shouldn't happen... We can't unlock this lock if we can't find it...
mylog.error("column_family::local_base_lock_holder::~local_base_lock_holder() can't find lock for partition", *pk);
return;
}
assert(&pli->first == pk);
if (cpk) {
auto rli = pli->second._row_locks.find(*cpk);
if (rli == pli->second._row_locks.end()) {
mylog.error("column_family::local_base_lock_holder::~local_base_lock_holder() can't find lock for row", *cpk);
return;
}
assert(&rli->first == cpk);
mylog.debug("releasing {} lock for row {} in partition {}", (row_exclusive ? "exclusive" : "shared"), *cpk, *pk);
rwlock& lock = rli->second;
if (row_exclusive) {
lock.write_unlock();
} else {
lock.read_unlock();
}
if (!lock.locked()) {
mylog.debug("Erasing lock object for row {} in partition {}", *cpk, *pk);
pli->second._row_locks.erase(rli);
}
}
mylog.debug("releasing {} lock for entire partition {}", (partition_exclusive ? "exclusive" : "shared"), *pk);
rwlock& lock = pli->second._partition_lock;
if (partition_exclusive) {
lock.write_unlock();
} else {
lock.read_unlock();
}
if (!lock.locked()) {
mylog.debug("Erasing lock object for partition {}", *pk);
_two_level_locks.erase(pli);
}
}
}
row_locker::lock_holder::~lock_holder() {
if (_locker) {
_locker->unlock(_partition, _partition_exclusive, _row, _row_exclusive);
}
}

127
db/view/row_locking.hh Normal file
View File

@@ -0,0 +1,127 @@
/*
* Copyright (C) 2018 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
// row_locker provides a mechanism needed by the Materialized Views code to
// lock clustering rows or entire partitions. The locks are shared/exclusive
// (a.k.a. read/write) locks, and locking a row always first locks the
// partition containing it with a shared lock.
//
// Each row_locker is local to a shard (obviously), and to one specific
// column_family. row_locker needs to know the column_family's schema, and
// if that schema is updated the upgrade() method should be called so that
// row_locker could release its shared-pointer to the old schema, and take
// the new.
#include <unordered_map>
#include <memory>
#include <seastar/core/rwlock.hh>
#include <seastar/core/future.hh>
#include "schema.hh"
#include "dht/i_partitioner.hh"
#include "query-request.hh"
class row_locker {
public:
// row_locker's locking functions lock_pk(), lock_ck() return a
// "lock_holder" object. When the caller destroys the object it received,
// the lock is released. The same type "lock_holder" is used regardless
// of whether a row or partition was locked, for read or write.
class lock_holder {
row_locker* _locker;
// The lock holder pointers to the partition and clustering keys,
// which are stored inside the _two_level_locks hash table (we may
// only drop them from the hash table when all the lock holders for
// this partition or row are released).
const dht::decorated_key* _partition;
bool _partition_exclusive;
const clustering_key_prefix* _row;
bool _row_exclusive;
public:
lock_holder();
lock_holder(row_locker* locker, const dht::decorated_key* pk, bool exclusive);
lock_holder(row_locker* locker, const dht::decorated_key* pk, const clustering_key_prefix* cpk, bool exclusive);
~lock_holder();
// Allow move (noexcept) but disallow copy
lock_holder(lock_holder&&) noexcept;
lock_holder& operator=(lock_holder&&) noexcept;
};
private:
schema_ptr _schema;
struct two_level_lock {
rwlock _partition_lock;
struct clustering_key_prefix_less {
// Since the schema object may change, we need to use the
// row_locker's current schema every time.
const row_locker* locker;
clustering_key_prefix_less(const row_locker* rl) : locker(rl) {}
bool operator()(const clustering_key_prefix& k1, const clustering_key_prefix& k2) const {
return clustering_key_prefix::less_compare(*locker->_schema)(k1, k2);
}
};
std::map<clustering_key_prefix, rwlock, clustering_key_prefix_less> _row_locks;
two_level_lock(row_locker* locker)
: _row_locks(locker) { }
};
struct decorated_key_hash {
size_t operator()(const dht::decorated_key& k) const {
return std::hash<dht::token>()(k.token());
}
};
struct decorated_key_equals_comparator {
const row_locker* locker;
explicit decorated_key_equals_comparator(const row_locker* rl) : locker(rl) {}
bool operator()(const dht::decorated_key& k1, const dht::decorated_key& k2) const {
return k1.equal(*locker->_schema, k2);
}
};
std::unordered_map<dht::decorated_key, two_level_lock, decorated_key_hash, decorated_key_equals_comparator> _two_level_locks;
void unlock(const dht::decorated_key* pk, bool partition_exclusive, const clustering_key_prefix* cpk, bool row_exclusive);
public:
// row_locker needs to know the column_family's schema because key
// comparisons needs the schema.
explicit row_locker(schema_ptr s);
// If new_schema is different from the current schema, convert this
// row_locker to use the new schema, and hold the shared pointer to the
// new schema instead of the old schema. This is a trivial operation
// requiring just comparison/assignment - the hash tables do not need
// to be rebuilt on upgrade().
void upgrade(schema_ptr new_schema);
// Lock an entire partition with a shared or exclusive lock.
// The key is assumed to belong to the schema saved by row_locker. If you
// got a schema with the key, and not sure it's not a new version of the
// schema, call upgrade() before taking the lock.
future<lock_holder> lock_pk(const dht::decorated_key& pk, bool exclusive);
// Lock a clustering row with a shared or exclusive lock.
// Also, first, takes a shared lock on the partition.
// The key is assumed to belong to the schema saved by row_locker. If you
// got a schema with the key, and not sure it's not a new version of the
// schema, call upgrade() before taking the lock.
future<lock_holder> lock_ck(const dht::decorated_key& pk, const clustering_key_prefix& ckp, bool exclusive);
bool empty() const { return _two_level_locks.empty(); }
};