memtables: make memtable inherit from region
The LSA memory pressure mechanism will let us know which region is the best candidate for eviction when under pressure. We need to somehow then translate region -> memtable -> column family. The easiest way to convert from region to memtable, is having memtable inherit from region. Despite the fact that this requires multiple inheritance, which always raise a flag a bit, the other class we inherit from is enable_shared_from_this, which has a very simple and well defined interface. So I think it is worthy for us to do it. Once we have the memtable, grabing the column family is easy provided we have a database object. We can grab it from the schema. Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
32
memtable.cc
32
memtable.cc
@@ -26,20 +26,20 @@
|
||||
namespace stdx = std::experimental;
|
||||
|
||||
memtable::memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group)
|
||||
: _schema(std::move(schema))
|
||||
, _region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region())
|
||||
: logalloc::region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region())
|
||||
, _schema(std::move(schema))
|
||||
, partitions(memtable_entry::compare(_schema)) {
|
||||
}
|
||||
|
||||
memtable::~memtable() {
|
||||
with_allocator(_region.allocator(), [this] {
|
||||
with_allocator(allocator(), [this] {
|
||||
partitions.clear_and_dispose(current_deleter<memtable_entry>());
|
||||
});
|
||||
}
|
||||
|
||||
partition_entry&
|
||||
memtable::find_or_create_partition_slow(partition_key_view key) {
|
||||
assert(!_region.reclaiming_enabled());
|
||||
assert(!reclaiming_enabled());
|
||||
|
||||
// FIXME: Perform lookup using std::pair<token, partition_key_view>
|
||||
// to avoid unconditional copy of the partition key.
|
||||
@@ -59,7 +59,7 @@ memtable::find_or_create_partition_slow(partition_key_view key) {
|
||||
|
||||
partition_entry&
|
||||
memtable::find_or_create_partition(const dht::decorated_key& key) {
|
||||
assert(!_region.reclaiming_enabled());
|
||||
assert(!reclaiming_enabled());
|
||||
|
||||
// call lower_bound so we have a hint for the insert, just in case.
|
||||
auto i = partitions.lower_bound(key, memtable_entry::compare(_schema));
|
||||
@@ -127,7 +127,7 @@ private:
|
||||
}
|
||||
void update_iterators() {
|
||||
// We must be prepared that iterators may get invalidated during compaction.
|
||||
auto current_reclaim_counter = _memtable->_region.reclaim_counter();
|
||||
auto current_reclaim_counter = _memtable->reclaim_counter();
|
||||
auto cmp = memtable_entry::compare(_memtable->_schema);
|
||||
if (_last) {
|
||||
if (current_reclaim_counter != _last_reclaim_counter ||
|
||||
@@ -177,7 +177,7 @@ public:
|
||||
return _delegate();
|
||||
}
|
||||
|
||||
logalloc::reclaim_lock _(_memtable->_region);
|
||||
logalloc::reclaim_lock _(*_memtable);
|
||||
managed_bytes::linearization_context_guard lcg;
|
||||
update_iterators();
|
||||
if (_i == _end) {
|
||||
@@ -202,7 +202,7 @@ memtable::make_reader(schema_ptr s,
|
||||
|
||||
if (query::is_single_partition(range)) {
|
||||
const query::ring_position& pos = range.start()->value();
|
||||
return _read_section(_region, [&] {
|
||||
return _read_section(*this, [&] {
|
||||
managed_bytes::linearization_context_guard lcg;
|
||||
auto i = partitions.find(pos, memtable_entry::compare(_schema));
|
||||
if (i != partitions.end()) {
|
||||
@@ -236,8 +236,8 @@ memtable::apply(memtable& mt) {
|
||||
|
||||
void
|
||||
memtable::apply(const mutation& m, const db::replay_position& rp) {
|
||||
with_allocator(_region.allocator(), [this, &m] {
|
||||
_allocating_section(_region, [&, this] {
|
||||
with_allocator(allocator(), [this, &m] {
|
||||
_allocating_section(*this, [&, this] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
auto& p = find_or_create_partition(m.decorated_key());
|
||||
p.apply(*_schema, m.partition(), *m.schema());
|
||||
@@ -249,8 +249,8 @@ memtable::apply(const mutation& m, const db::replay_position& rp) {
|
||||
|
||||
void
|
||||
memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& rp) {
|
||||
with_allocator(_region.allocator(), [this, &m, &m_schema] {
|
||||
_allocating_section(_region, [&, this] {
|
||||
with_allocator(allocator(), [this, &m, &m_schema] {
|
||||
_allocating_section(*this, [&, this] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
auto& p = find_or_create_partition_slow(m.key(*_schema));
|
||||
p.apply(*_schema, m.partition(), *m_schema);
|
||||
@@ -261,7 +261,7 @@ memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::
|
||||
}
|
||||
|
||||
logalloc::occupancy_stats memtable::occupancy() const {
|
||||
return _region.occupancy();
|
||||
return logalloc::region::occupancy();
|
||||
}
|
||||
|
||||
mutation_source memtable::as_data_source() {
|
||||
@@ -308,13 +308,13 @@ memtable_entry::read(lw_shared_ptr<memtable> mtbl, const schema_ptr& target_sche
|
||||
}
|
||||
auto& cr = ck_filtering.get_ranges(_key.key());
|
||||
auto snp = _pe.read(_schema);
|
||||
return make_partition_snapshot_reader(_schema, _key, ck_filtering, cr, snp, mtbl->_region, mtbl->_read_section, mtbl);
|
||||
return make_partition_snapshot_reader(_schema, _key, ck_filtering, cr, snp, *mtbl, mtbl->_read_section, mtbl);
|
||||
}
|
||||
|
||||
void memtable::upgrade_entry(memtable_entry& e) {
|
||||
if (e._schema != _schema) {
|
||||
assert(!_region.reclaiming_enabled());
|
||||
with_allocator(_region.allocator(), [this, &e] {
|
||||
assert(!reclaiming_enabled());
|
||||
with_allocator(allocator(), [this, &e] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
e.partition().upgrade(e._schema, _schema);
|
||||
e._schema = _schema;
|
||||
|
||||
10
memtable.hh
10
memtable.hh
@@ -91,7 +91,7 @@ public:
|
||||
};
|
||||
|
||||
// Managed by lw_shared_ptr<>.
|
||||
class memtable final : public enable_lw_shared_from_this<memtable> {
|
||||
class memtable final : public enable_lw_shared_from_this<memtable>, private logalloc::region {
|
||||
public:
|
||||
using partitions_type = bi::set<memtable_entry,
|
||||
bi::member_hook<memtable_entry, bi::set_member_hook<>, &memtable_entry::_link>,
|
||||
@@ -99,7 +99,6 @@ public:
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
logalloc::allocating_section _read_section;
|
||||
mutable logalloc::region _region;
|
||||
logalloc::allocating_section _allocating_section;
|
||||
partitions_type partitions;
|
||||
db::replay_position _replay_position;
|
||||
@@ -123,8 +122,13 @@ public:
|
||||
void apply(const mutation& m, const db::replay_position& = db::replay_position());
|
||||
// The mutation is upgraded to current schema.
|
||||
void apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& = db::replay_position());
|
||||
|
||||
static memtable& from_region(logalloc::region& r) {
|
||||
return static_cast<memtable&>(r);
|
||||
}
|
||||
|
||||
const logalloc::region& region() const {
|
||||
return _region;
|
||||
return *this;
|
||||
}
|
||||
public:
|
||||
size_t partition_count() const;
|
||||
|
||||
@@ -698,7 +698,7 @@ future<> row_cache::clear() {
|
||||
}
|
||||
|
||||
future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) {
|
||||
_tracker.region().merge(m._region); // Now all data in memtable belongs to cache
|
||||
_tracker.region().merge(m); // Now all data in memtable belongs to cache
|
||||
auto attr = seastar::thread_attributes();
|
||||
attr.scheduling_group = &_update_thread_scheduling_group;
|
||||
auto t = seastar::thread(attr, [this, &m, presence_checker = std::move(presence_checker)] {
|
||||
|
||||
Reference in New Issue
Block a user