From 4cb761860180f8a058b27cca220edc6ef1806e67 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 30 Jun 2016 17:40:23 +0300 Subject: [PATCH] Convert column_family::_sstables to sstable_set Using sstable_set will allow us to filter sstables during a query before actually creating a reader (this is left to the next patch; here we just convert the users of the _sstables field). --- database.cc | 57 +++++++++++++++++++++++++++-------------------------- database.hh | 9 +++++---- 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/database.cc b/database.cc index 27f1f11600..6448f6d04c 100644 --- a/database.cc +++ b/database.cc @@ -114,7 +114,8 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog* cl , _config(std::move(config)) , _memtables(_config.enable_disk_writes ? make_memtable_list() : make_memory_only_memtable_list()) , _streaming_memtables(_config.enable_disk_writes ? make_streaming_memtable_list() : make_memory_only_memtable_list()) - , _sstables(make_lw_shared()) + , _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options())) + , _sstables(make_lw_shared(_compaction_strategy.make_sstable_set(_schema))) , _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker()) , _commitlog(cl) , _compaction_manager(compaction_manager) @@ -129,7 +130,7 @@ partition_presence_checker column_family::make_partition_presence_checker(sstables::shared_sstable exclude_sstable) { return [this, exclude_sstable = std::move(exclude_sstable)] (partition_key_view key) { auto exclude = [e = std::move(exclude_sstable)] (auto s) { return s != e; }; - for (auto&& s : *_sstables | boost::adaptors::filtered(exclude)) { + for (auto&& s : *_sstables->all() | boost::adaptors::filtered(exclude)) { if (s->filter_has_key(*_schema, key)) { return partition_presence_checker_result::maybe_exists; } @@ -171,7 +172,7 @@ bool belongs_to_current_shard(const streamed_mutation& m) { class range_sstable_reader final : public mutation_reader::impl { const query::partition_range& _pr; - lw_shared_ptr _sstables; + lw_shared_ptr _sstables; mutation_reader _reader; // Use a pointer instead of copying, so we don't need to regenerate the reader if // the priority changes. @@ -179,7 +180,7 @@ class range_sstable_reader final : public mutation_reader::impl { query::clustering_key_filtering_context _ck_filtering; public: range_sstable_reader(schema_ptr s, - lw_shared_ptr sstables, + lw_shared_ptr sstables, const query::partition_range& pr, query::clustering_key_filtering_context ck_filtering, const io_priority_class& pc) @@ -189,7 +190,7 @@ public: , _ck_filtering(ck_filtering) { std::vector readers; - for (const lw_shared_ptr& sst : *_sstables) { + for (const lw_shared_ptr& sst : *_sstables->all()) { // FIXME: make sstable::read_range_rows() return ::mutation_reader so that we can drop this wrapper. mutation_reader reader = make_mutation_reader(sst, s, pr, _ck_filtering, _pc); @@ -213,14 +214,14 @@ class single_key_sstable_reader final : public mutation_reader::impl { sstables::key _key; std::vector _mutations; bool _done = false; - lw_shared_ptr _sstables; + lw_shared_ptr _sstables; // Use a pointer instead of copying, so we don't need to regenerate the reader if // the priority changes. const io_priority_class& _pc; query::clustering_key_filtering_context _ck_filtering; public: single_key_sstable_reader(schema_ptr schema, - lw_shared_ptr sstables, + lw_shared_ptr sstables, const partition_key& key, query::clustering_key_filtering_context ck_filtering, const io_priority_class& pc) @@ -235,7 +236,7 @@ public: if (_done) { return make_ready_future(); } - return parallel_for_each(*_sstables, + return parallel_for_each(*_sstables->all(), [this](const lw_shared_ptr& sstable) { return sstable->read_row(_schema, _key, _ck_filtering, _pc).then([this](auto smo) { if (smo) { @@ -281,8 +282,8 @@ column_family::make_sstable_reader(schema_ptr s, key_source column_family::sstables_as_key_source() const { return key_source([this] (const query::partition_range& range, const io_priority_class& pc) { std::vector readers; - readers.reserve(_sstables->size()); - std::transform(_sstables->begin(), _sstables->end(), std::back_inserter(readers), [&] (auto&& sst) { + readers.reserve(_sstables->all()->size()); + std::transform(_sstables->all()->begin(), _sstables->all()->end(), std::back_inserter(readers), [&] (auto&& sst) { auto rd = sstables::make_key_reader(_schema, sst, range, pc); if (sst->is_shared()) { rd = make_filtering_reader(std::move(rd), [] (const dht::decorated_key& dk) { @@ -344,7 +345,7 @@ column_family::make_reader(schema_ptr s, } std::vector readers; - readers.reserve(_memtables->size() + _sstables->size()); + readers.reserve(_memtables->size() + _sstables->all()->size()); // We're assuming that cache and memtables are both read atomically // for single-key queries, so we don't need to special case memtable @@ -587,8 +588,8 @@ future column_family::probe_file(sstring sstdir, sst update_sstables_known_generation(comps.generation); { - auto i = boost::range::find_if(*_sstables, [gen = comps.generation] (sstables::shared_sstable sst) { return sst->generation() == gen; }); - if (i != _sstables->end()) { + auto i = boost::range::find_if(*_sstables->all(), [gen = comps.generation] (sstables::shared_sstable sst) { return sst->generation() == gen; }); + if (i != _sstables->all()->end()) { auto new_toc = sstdir + "/" + fname; throw std::runtime_error(sprint("Attempted to add sstable generation %d twice: new=%s existing=%s", comps.generation, new_toc, (*i)->toc_filename())); @@ -624,7 +625,7 @@ void column_family::add_sstable(sstables::sstable&& sstable) { void column_family::add_sstable(lw_shared_ptr sstable) { // allow in-progress reads to continue using old list - _sstables = make_lw_shared(*_sstables); + _sstables = make_lw_shared(*_sstables); update_stats_for_new_sstable(sstable->bytes_on_disk()); _sstables->insert(std::move(sstable)); } @@ -962,7 +963,7 @@ void column_family::rebuild_statistics() { // this might seem dangerous, but "move" here just avoids constness, // making the two ranges compatible when compiling with boost 1.55. // Noone is actually moving anything... - std::move(*_sstables))) { + std::move(*_sstables->all()))) { update_stats_for_new_sstable(tab->data_size()); } } @@ -981,7 +982,7 @@ column_family::rebuild_sstable_list(const std::vector& // if the deletion fails (note deletion of shared sstables can take // unbounded time, because all shards must agree on the deletion). auto current_sstables = _sstables; - auto new_sstable_list = make_lw_shared(); + auto new_sstable_list = _compaction_strategy.make_sstable_set(_schema); auto new_compacted_but_not_deleted = _sstables_compacted_but_not_deleted; @@ -993,15 +994,15 @@ column_family::rebuild_sstable_list(const std::vector& // this might seem dangerous, but "move" here just avoids constness, // making the two ranges compatible when compiling with boost 1.55. // Noone is actually moving anything... - for (auto&& tab : boost::range::join(new_sstables, std::move(*current_sstables))) { + for (auto&& tab : boost::range::join(new_sstables, std::move(*current_sstables->all()))) { // Checks if oldtab is a sstable not being compacted. if (!s.count(tab)) { - new_sstable_list->insert(tab); + new_sstable_list.insert(tab); } else { new_compacted_but_not_deleted.push_back(tab); } } - _sstables = std::move(new_sstable_list); + _sstables = make_lw_shared(std::move(new_sstable_list)); _sstables_compacted_but_not_deleted = std::move(new_compacted_but_not_deleted); rebuild_statistics(); @@ -1108,8 +1109,8 @@ column_family::load_new_sstables(std::vector new_tab future<> column_family::compact_all_sstables() { std::vector sstables; - sstables.reserve(_sstables->size()); - for (auto&& sst : *_sstables) { + sstables.reserve(_sstables->all()->size()); + for (auto&& sst : *_sstables->all()) { sstables.push_back(sst); } // FIXME: check if the lower bound min_compaction_threshold() from schema @@ -1143,7 +1144,7 @@ void column_family::set_compaction_strategy(sstables::compaction_strategy_type s } size_t column_family::sstables_count() { - return _sstables->size(); + return _sstables->all()->size(); } int64_t column_family::get_unleveled_sstables() const { @@ -1154,7 +1155,7 @@ int64_t column_family::get_unleveled_sstables() const { } lw_shared_ptr column_family::get_sstables() { - return _sstables; + return _sstables->all(); } // Gets the list of all sstables in the column family, including ones that are @@ -1166,9 +1167,9 @@ lw_shared_ptr column_family::get_sstables() { // successfully deleted. lw_shared_ptr column_family::get_sstables_including_compacted_undeleted() { if (_sstables_compacted_but_not_deleted.empty()) { - return _sstables; + return get_sstables(); } - auto ret = make_lw_shared(*_sstables); + auto ret = make_lw_shared(*_sstables->all()); for (auto&& s : _sstables_compacted_but_not_deleted) { ret->insert(s); } @@ -2475,7 +2476,7 @@ seal_snapshot(sstring jsondir) { future<> column_family::snapshot(sstring name) { return flush().then([this, name = std::move(name)]() { - auto tables = boost::copy_range>(*_sstables); + auto tables = boost::copy_range>(*_sstables->all()); return do_with(std::move(tables), [this, name](std::vector & tables) { auto jsondir = _config.datadir + "/snapshots/" + name; @@ -2738,10 +2739,10 @@ future column_family::discard_sstables(db_clock::time_point db::replay_position rp; auto gc_trunc = to_gc_clock(truncated_at); - auto pruned = make_lw_shared(); + auto pruned = make_lw_shared(_compaction_strategy.make_sstable_set(_schema)); std::vector remove; - for (auto&p : *_sstables) { + for (auto&p : *_sstables->all()) { if (p->max_data_age() <= gc_trunc) { rp = std::max(p->get_stats_metadata().position, rp); remove.emplace_back(p); diff --git a/database.hh b/database.hh index 3ccbbf7b2e..5c113fe08a 100644 --- a/database.hh +++ b/database.hh @@ -69,6 +69,7 @@ #include "utils/histogram.hh" #include "sstables/estimated_histogram.hh" #include "sstables/compaction.hh" +#include "sstables/sstable_set.hh" #include "key_reader.hh" #include #include @@ -338,8 +339,9 @@ private: lw_shared_ptr make_memtable_list(); lw_shared_ptr make_streaming_memtable_list(); + sstables::compaction_strategy _compaction_strategy; // generation -> sstable. Ordered by key so we can easily get the most recent. - lw_shared_ptr _sstables; + lw_shared_ptr _sstables; // sstables that have been compacted (so don't look up in query) but // have not been deleted yet, so must not GC any tombstones in other sstables // that may delete data in these sstables: @@ -360,7 +362,6 @@ private: db::replay_position _highest_flushed_rp; // Provided by the database that owns this commitlog db::commitlog* _commitlog; - sstables::compaction_strategy _compaction_strategy; compaction_manager& _compaction_manager; int _compaction_disabled = 0; class memtable_flush_queue; @@ -513,11 +514,11 @@ public: future disable_sstable_write() { _sstable_writes_disabled_at = std::chrono::steady_clock::now(); return _sstables_lock.write_lock().then([this] { - if (_sstables->empty()) { + if (_sstables->all()->empty()) { return make_ready_future(0); } int64_t max = 0; - for (auto&& s : *_sstables) { + for (auto&& s : *_sstables->all()) { max = std::max(max, s->generation()); } return make_ready_future(max);