Change sstable_list from a map to a set
sstable_list is now a map<generation, sstable>; change it to a set in preparation for replacing it with sstable_set. The change simplifies a lot of code; the only casualty is the code that computes the highest generation number.
This commit is contained in:
@@ -169,7 +169,7 @@ static future<json::json_return_type> get_cf_unleveled_sstables(http_context& ct
|
||||
static int64_t min_row_size(column_family& cf) {
|
||||
int64_t res = INT64_MAX;
|
||||
for (auto i: *cf.get_sstables() ) {
|
||||
res = std::min(res, i.second->get_stats_metadata().estimated_row_size.min());
|
||||
res = std::min(res, i->get_stats_metadata().estimated_row_size.min());
|
||||
}
|
||||
return (res == INT64_MAX) ? 0 : res;
|
||||
}
|
||||
@@ -177,7 +177,7 @@ static int64_t min_row_size(column_family& cf) {
|
||||
static int64_t max_row_size(column_family& cf) {
|
||||
int64_t res = 0;
|
||||
for (auto i: *cf.get_sstables() ) {
|
||||
res = std::max(i.second->get_stats_metadata().estimated_row_size.max(), res);
|
||||
res = std::max(i->get_stats_metadata().estimated_row_size.max(), res);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
@@ -194,8 +194,8 @@ static double update_ratio(double acc, double f, double total) {
|
||||
static ratio_holder mean_row_size(column_family& cf) {
|
||||
ratio_holder res;
|
||||
for (auto i: *cf.get_sstables() ) {
|
||||
auto c = i.second->get_stats_metadata().estimated_row_size.count();
|
||||
res.sub += i.second->get_stats_metadata().estimated_row_size.mean() * c;
|
||||
auto c = i->get_stats_metadata().estimated_row_size.count();
|
||||
res.sub += i->get_stats_metadata().estimated_row_size.mean() * c;
|
||||
res.total += c;
|
||||
}
|
||||
return res;
|
||||
@@ -221,7 +221,7 @@ static future<json::json_return_type> sum_sstable(http_context& ctx, const sstr
|
||||
std::unordered_map<sstring, uint64_t> m;
|
||||
for (auto t :*((total) ? db.find_column_family(uuid).get_sstables_including_compacted_undeleted() :
|
||||
db.find_column_family(uuid).get_sstables()).get()) {
|
||||
m[t.second->get_filename()] = t.second->bytes_on_disk();
|
||||
m[t->get_filename()] = t->bytes_on_disk();
|
||||
}
|
||||
return m;
|
||||
}, std::unordered_map<sstring, uint64_t>(), merge_maps).
|
||||
@@ -236,7 +236,7 @@ static future<json::json_return_type> sum_sstable(http_context& ctx, bool total)
|
||||
std::unordered_map<sstring, uint64_t> m;
|
||||
for (auto t :*((total) ? cf.get_sstables_including_compacted_undeleted() :
|
||||
cf.get_sstables()).get()) {
|
||||
m[t.second->get_filename()] = t.second->bytes_on_disk();
|
||||
m[t->get_filename()] = t->bytes_on_disk();
|
||||
}
|
||||
return m;
|
||||
},merge_maps).then([](const std::unordered_map<sstring, uint64_t>& val) {
|
||||
@@ -265,7 +265,7 @@ public:
|
||||
static double get_compression_ratio(column_family& cf) {
|
||||
sum_ratio<double> result;
|
||||
for (auto i : *cf.get_sstables()) {
|
||||
auto compression_ratio = i.second->get_compression_ratio();
|
||||
auto compression_ratio = i->get_compression_ratio();
|
||||
if (compression_ratio != sstables::metadata_collector::NO_COMPRESSION_RATIO) {
|
||||
result(compression_ratio);
|
||||
}
|
||||
@@ -396,7 +396,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
return map_reduce_cf(ctx, req->param["name"], sstables::estimated_histogram(0), [](column_family& cf) {
|
||||
sstables::estimated_histogram res(0);
|
||||
for (auto i: *cf.get_sstables() ) {
|
||||
res.merge(i.second->get_stats_metadata().estimated_row_size);
|
||||
res.merge(i->get_stats_metadata().estimated_row_size);
|
||||
}
|
||||
return res;
|
||||
},
|
||||
@@ -407,7 +407,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
return map_reduce_cf(ctx, req->param["name"], int64_t(0), [](column_family& cf) {
|
||||
uint64_t res = 0;
|
||||
for (auto i: *cf.get_sstables() ) {
|
||||
res += i.second->get_stats_metadata().estimated_row_size.count();
|
||||
res += i->get_stats_metadata().estimated_row_size.count();
|
||||
}
|
||||
return res;
|
||||
},
|
||||
@@ -418,7 +418,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
return map_reduce_cf(ctx, req->param["name"], sstables::estimated_histogram(0), [](column_family& cf) {
|
||||
sstables::estimated_histogram res(0);
|
||||
for (auto i: *cf.get_sstables() ) {
|
||||
res.merge(i.second->get_stats_metadata().estimated_column_count);
|
||||
res.merge(i->get_stats_metadata().estimated_column_count);
|
||||
}
|
||||
return res;
|
||||
},
|
||||
@@ -558,7 +558,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_bloom_filter_false_positives.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, req->param["name"], uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return s + sst.second->filter_get_false_positive();
|
||||
return s + sst->filter_get_false_positive();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -566,7 +566,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_all_bloom_filter_false_positives.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return s + sst.second->filter_get_false_positive();
|
||||
return s + sst->filter_get_false_positive();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -574,7 +574,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_recent_bloom_filter_false_positives.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, req->param["name"], uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return s + sst.second->filter_get_recent_false_positive();
|
||||
return s + sst->filter_get_recent_false_positive();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -582,7 +582,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_all_recent_bloom_filter_false_positives.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return s + sst.second->filter_get_recent_false_positive();
|
||||
return s + sst->filter_get_recent_false_positive();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -590,8 +590,8 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_bloom_filter_false_ratio.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, req->param["name"], double(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), double(0), [](double s, auto& sst) {
|
||||
double f = sst.second->filter_get_false_positive();
|
||||
return update_ratio(s, f, f + sst.second->filter_get_true_positive());
|
||||
double f = sst->filter_get_false_positive();
|
||||
return update_ratio(s, f, f + sst->filter_get_true_positive());
|
||||
});
|
||||
}, std::plus<double>());
|
||||
});
|
||||
@@ -599,8 +599,8 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_all_bloom_filter_false_ratio.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, double(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), double(0), [](double s, auto& sst) {
|
||||
double f = sst.second->filter_get_false_positive();
|
||||
return update_ratio(s, f, f + sst.second->filter_get_true_positive());
|
||||
double f = sst->filter_get_false_positive();
|
||||
return update_ratio(s, f, f + sst->filter_get_true_positive());
|
||||
});
|
||||
}, std::plus<double>());
|
||||
});
|
||||
@@ -608,8 +608,8 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_recent_bloom_filter_false_ratio.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, req->param["name"], double(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), double(0), [](double s, auto& sst) {
|
||||
double f = sst.second->filter_get_recent_false_positive();
|
||||
return update_ratio(s, f, f + sst.second->filter_get_recent_true_positive());
|
||||
double f = sst->filter_get_recent_false_positive();
|
||||
return update_ratio(s, f, f + sst->filter_get_recent_true_positive());
|
||||
});
|
||||
}, std::plus<double>());
|
||||
});
|
||||
@@ -617,8 +617,8 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_all_recent_bloom_filter_false_ratio.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, double(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), double(0), [](double s, auto& sst) {
|
||||
double f = sst.second->filter_get_recent_false_positive();
|
||||
return update_ratio(s, f, f + sst.second->filter_get_recent_true_positive());
|
||||
double f = sst->filter_get_recent_false_positive();
|
||||
return update_ratio(s, f, f + sst->filter_get_recent_true_positive());
|
||||
});
|
||||
}, std::plus<double>());
|
||||
});
|
||||
@@ -626,7 +626,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_bloom_filter_disk_space_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, req->param["name"], uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst.second->filter_size();
|
||||
return sst->filter_size();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -634,7 +634,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_all_bloom_filter_disk_space_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst.second->filter_size();
|
||||
return sst->filter_size();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -642,7 +642,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_bloom_filter_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, req->param["name"], uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst.second->filter_memory_size();
|
||||
return sst->filter_memory_size();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -650,7 +650,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_all_bloom_filter_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst.second->filter_memory_size();
|
||||
return sst->filter_memory_size();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -658,7 +658,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_index_summary_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, req->param["name"], uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst.second->get_summary().memory_footprint();
|
||||
return sst->get_summary().memory_footprint();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
@@ -666,7 +666,7 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_all_index_summary_off_heap_memory_used.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, uint64_t(0), [] (column_family& cf) {
|
||||
return std::accumulate(cf.get_sstables()->begin(), cf.get_sstables()->end(), uint64_t(0), [](uint64_t s, auto& sst) {
|
||||
return sst.second->get_summary().memory_footprint();
|
||||
return sst->get_summary().memory_footprint();
|
||||
});
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
|
||||
43
database.cc
43
database.cc
@@ -47,6 +47,7 @@
|
||||
#include <boost/range/algorithm/heap_algorithm.hpp>
|
||||
#include <boost/range/algorithm/remove_if.hpp>
|
||||
#include <boost/range/algorithm/find.hpp>
|
||||
#include <boost/range/algorithm/find_if.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "frozen_mutation.hh"
|
||||
#include "mutation_partition_applier.hh"
|
||||
@@ -128,7 +129,7 @@ partition_presence_checker
|
||||
column_family::make_partition_presence_checker(sstables::shared_sstable exclude_sstable) {
|
||||
return [this, exclude_sstable = std::move(exclude_sstable)] (partition_key_view key) {
|
||||
auto exclude = [e = std::move(exclude_sstable)] (auto s) { return s != e; };
|
||||
for (auto&& s : boost::make_iterator_range(*_sstables) | boost::adaptors::map_values | boost::adaptors::filtered(exclude)) {
|
||||
for (auto&& s : *_sstables | boost::adaptors::filtered(exclude)) {
|
||||
if (s->filter_has_key(*_schema, key)) {
|
||||
return partition_presence_checker_result::maybe_exists;
|
||||
}
|
||||
@@ -188,7 +189,7 @@ public:
|
||||
, _ck_filtering(ck_filtering)
|
||||
{
|
||||
std::vector<mutation_reader> readers;
|
||||
for (const lw_shared_ptr<sstables::sstable>& sst : *_sstables | boost::adaptors::map_values) {
|
||||
for (const lw_shared_ptr<sstables::sstable>& sst : *_sstables) {
|
||||
// FIXME: make sstable::read_range_rows() return ::mutation_reader so that we can drop this wrapper.
|
||||
mutation_reader reader =
|
||||
make_mutation_reader<sstable_range_wrapping_reader>(sst, s, pr, _ck_filtering, _pc);
|
||||
@@ -234,7 +235,7 @@ public:
|
||||
if (_done) {
|
||||
return make_ready_future<streamed_mutation_opt>();
|
||||
}
|
||||
return parallel_for_each(*_sstables | boost::adaptors::map_values,
|
||||
return parallel_for_each(*_sstables,
|
||||
[this](const lw_shared_ptr<sstables::sstable>& sstable) {
|
||||
return sstable->read_row(_schema, _key, _ck_filtering, _pc).then([this](auto smo) {
|
||||
if (smo) {
|
||||
@@ -281,8 +282,7 @@ key_source column_family::sstables_as_key_source() const {
|
||||
return key_source([this] (const query::partition_range& range, const io_priority_class& pc) {
|
||||
std::vector<key_reader> readers;
|
||||
readers.reserve(_sstables->size());
|
||||
std::transform(_sstables->begin(), _sstables->end(), std::back_inserter(readers), [&] (auto&& entry) {
|
||||
auto& sst = entry.second;
|
||||
std::transform(_sstables->begin(), _sstables->end(), std::back_inserter(readers), [&] (auto&& sst) {
|
||||
auto rd = sstables::make_key_reader(_schema, sst, range, pc);
|
||||
if (sst->is_shared()) {
|
||||
rd = make_filtering_reader(std::move(rd), [] (const dht::decorated_key& dk) {
|
||||
@@ -587,11 +587,11 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
|
||||
update_sstables_known_generation(comps.generation);
|
||||
|
||||
{
|
||||
auto i = _sstables->find(comps.generation);
|
||||
auto i = boost::range::find_if(*_sstables, [gen = comps.generation] (sstables::shared_sstable sst) { return sst->generation() == gen; });
|
||||
if (i != _sstables->end()) {
|
||||
auto new_toc = sstdir + "/" + fname;
|
||||
throw std::runtime_error(sprint("Attempted to add sstable generation %d twice: new=%s existing=%s",
|
||||
comps.generation, new_toc, i->second->toc_filename()));
|
||||
comps.generation, new_toc, (*i)->toc_filename()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -623,11 +623,10 @@ void column_family::add_sstable(sstables::sstable&& sstable) {
|
||||
}
|
||||
|
||||
void column_family::add_sstable(lw_shared_ptr<sstables::sstable> sstable) {
|
||||
auto generation = sstable->generation();
|
||||
// allow in-progress reads to continue using old list
|
||||
_sstables = make_lw_shared<sstable_list>(*_sstables);
|
||||
update_stats_for_new_sstable(sstable->bytes_on_disk());
|
||||
_sstables->emplace(generation, std::move(sstable));
|
||||
_sstables->insert(std::move(sstable));
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -851,7 +850,7 @@ column_family::stop() {
|
||||
|
||||
future<std::vector<sstables::entry_descriptor>> column_family::flush_upload_dir() {
|
||||
struct work {
|
||||
sstable_list sstables;
|
||||
std::map<int64_t, sstables::shared_sstable> sstables;
|
||||
std::unordered_map<int64_t, sstables::entry_descriptor> descriptors;
|
||||
std::vector<sstables::entry_descriptor> flushed;
|
||||
};
|
||||
@@ -900,7 +899,7 @@ column_family::reshuffle_sstables(std::set<int64_t> all_generations, int64_t sta
|
||||
struct work {
|
||||
int64_t current_gen;
|
||||
std::set<int64_t> all_generations; // Stores generation of all live sstables in the system.
|
||||
sstable_list sstables;
|
||||
std::map<int64_t, sstables::shared_sstable> sstables;
|
||||
std::unordered_map<int64_t, sstables::entry_descriptor> descriptors;
|
||||
std::vector<sstables::entry_descriptor> reshuffled;
|
||||
work(int64_t start, std::set<int64_t> gens)
|
||||
@@ -963,7 +962,7 @@ void column_family::rebuild_statistics() {
|
||||
// this might seem dangerous, but "move" here just avoids constness,
|
||||
// making the two ranges compatible when compiling with boost 1.55.
|
||||
// Noone is actually moving anything...
|
||||
std::move(*_sstables) | boost::adaptors::map_values)) {
|
||||
std::move(*_sstables))) {
|
||||
update_stats_for_new_sstable(tab->data_size());
|
||||
}
|
||||
}
|
||||
@@ -994,10 +993,10 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
|
||||
// this might seem dangerous, but "move" here just avoids constness,
|
||||
// making the two ranges compatible when compiling with boost 1.55.
|
||||
// Noone is actually moving anything...
|
||||
for (auto&& tab : boost::range::join(new_sstables, std::move(*current_sstables) | boost::adaptors::map_values)) {
|
||||
for (auto&& tab : boost::range::join(new_sstables, std::move(*current_sstables))) {
|
||||
// Checks if oldtab is a sstable not being compacted.
|
||||
if (!s.count(tab)) {
|
||||
new_sstable_list->emplace(tab->generation(), tab);
|
||||
new_sstable_list->insert(tab);
|
||||
} else {
|
||||
new_compacted_but_not_deleted.push_back(tab);
|
||||
}
|
||||
@@ -1110,8 +1109,8 @@ future<>
|
||||
column_family::compact_all_sstables() {
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(_sstables->size());
|
||||
for (auto&& entry : *_sstables) {
|
||||
sstables.push_back(entry.second);
|
||||
for (auto&& sst : *_sstables) {
|
||||
sstables.push_back(sst);
|
||||
}
|
||||
// FIXME: check if the lower bound min_compaction_threshold() from schema
|
||||
// should be taken into account before proceeding with compaction.
|
||||
@@ -1171,7 +1170,7 @@ lw_shared_ptr<sstable_list> column_family::get_sstables_including_compacted_unde
|
||||
}
|
||||
auto ret = make_lw_shared(*_sstables);
|
||||
for (auto&& s : _sstables_compacted_but_not_deleted) {
|
||||
ret->insert(std::make_pair(s->generation(), s));
|
||||
ret->insert(s);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@@ -2476,7 +2475,7 @@ seal_snapshot(sstring jsondir) {
|
||||
|
||||
future<> column_family::snapshot(sstring name) {
|
||||
return flush().then([this, name = std::move(name)]() {
|
||||
auto tables = boost::copy_range<std::vector<sstables::shared_sstable>>(*_sstables | boost::adaptors::map_values);
|
||||
auto tables = boost::copy_range<std::vector<sstables::shared_sstable>>(*_sstables);
|
||||
return do_with(std::move(tables), [this, name](std::vector<sstables::shared_sstable> & tables) {
|
||||
auto jsondir = _config.datadir + "/snapshots/" + name;
|
||||
|
||||
@@ -2743,12 +2742,12 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
|
||||
std::vector<sstables::shared_sstable> remove;
|
||||
|
||||
for (auto&p : *_sstables) {
|
||||
if (p.second->max_data_age() <= gc_trunc) {
|
||||
rp = std::max(p.second->get_stats_metadata().position, rp);
|
||||
remove.emplace_back(p.second);
|
||||
if (p->max_data_age() <= gc_trunc) {
|
||||
rp = std::max(p->get_stats_metadata().position, rp);
|
||||
remove.emplace_back(p);
|
||||
continue;
|
||||
}
|
||||
pruned->emplace(p.first, p.second);
|
||||
pruned->insert(p);
|
||||
}
|
||||
|
||||
_sstables = std::move(pruned);
|
||||
|
||||
@@ -516,7 +516,11 @@ public:
|
||||
if (_sstables->empty()) {
|
||||
return make_ready_future<int64_t>(0);
|
||||
}
|
||||
return make_ready_future<int64_t>((*_sstables->rbegin()).first);
|
||||
int64_t max = 0;
|
||||
for (auto&& s : *_sstables) {
|
||||
max = std::max(max, s->generation());
|
||||
}
|
||||
return make_ready_future<int64_t>(max);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -124,7 +124,7 @@ future<> db::commitlog_replayer::impl::init() {
|
||||
return do_with(shard_rpm_map{}, [this, &qp](shard_rpm_map& map) {
|
||||
return parallel_for_each(qp.db().local().get_column_families(), [&map, &qp](auto& cfp) {
|
||||
auto uuid = cfp.first;
|
||||
for (auto& sst : *cfp.second->get_sstables() | boost::adaptors::map_values) {
|
||||
for (auto& sst : *cfp.second->get_sstables()) {
|
||||
try {
|
||||
auto p = sst->get_stats_metadata().position;
|
||||
logger.trace("sstable {} -> rp {}", sst->get_filename(), p);
|
||||
|
||||
@@ -467,7 +467,7 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
|
||||
auto sstables = db.local().find_column_family(keyspace, cf).get_sstables();
|
||||
uint64_t estimated_partitions = 0;
|
||||
for (auto sst : *sstables) {
|
||||
estimated_partitions += sst.second->get_estimated_key_count();
|
||||
estimated_partitions += sst->get_estimated_key_count();
|
||||
}
|
||||
// This node contains replicas of rf * vnodes ranges like this one, so
|
||||
// estimate the number of partitions in just this range:
|
||||
|
||||
@@ -2609,7 +2609,7 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
|
||||
auto& cf = db.find_column_family(ks_name, cf_name);
|
||||
std::set<int64_t> generations;
|
||||
for (auto& p : *(cf.get_sstables())) {
|
||||
generations.insert(p.second->generation());
|
||||
generations.insert(p->generation());
|
||||
}
|
||||
return make_ready_future<std::set<int64_t>>(std::move(generations));
|
||||
}).then([this, max_seen_sstable, ks_name, cf_name] (std::set<int64_t> all_generations) {
|
||||
|
||||
@@ -145,7 +145,7 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
|
||||
return x->generation() < y->generation();
|
||||
});
|
||||
std::vector<shared_sstable> not_compacted_sstables;
|
||||
boost::set_difference(*all_sstables | boost::adaptors::map_values, sstables,
|
||||
boost::set_difference(*all_sstables, sstables,
|
||||
std::back_inserter(not_compacted_sstables), [] (const shared_sstable& x, const shared_sstable& y) {
|
||||
return x->generation() < y->generation();
|
||||
});
|
||||
|
||||
@@ -135,8 +135,7 @@ lw_shared_ptr<compaction_manager::task> compaction_manager::task_start(column_fa
|
||||
|
||||
candidates.reserve(cf.sstables_count());
|
||||
// Filter out sstables that are being compacted.
|
||||
for (auto& entry : *cf.get_sstables()) {
|
||||
auto& sst = entry.second;
|
||||
for (auto& sst : *cf.get_sstables()) {
|
||||
if (!_compacting_sstables.count(sst)) {
|
||||
candidates.push_back(sst);
|
||||
}
|
||||
|
||||
@@ -358,7 +358,7 @@ std::vector<sstables::shared_sstable> size_tiered_most_interesting_bucket(lw_sha
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(candidates->size());
|
||||
for (auto& entry : *candidates) {
|
||||
sstables.push_back(entry.second);
|
||||
sstables.push_back(entry);
|
||||
}
|
||||
|
||||
auto buckets = cs.get_buckets(sstables, DEFAULT_MAX_COMPACTION_THRESHOLD);
|
||||
|
||||
@@ -571,7 +571,7 @@ public:
|
||||
};
|
||||
|
||||
using shared_sstable = lw_shared_ptr<sstable>;
|
||||
using sstable_list = std::map<int64_t, shared_sstable>;
|
||||
using sstable_list = std::unordered_set<shared_sstable>;
|
||||
|
||||
::key_reader make_key_reader(schema_ptr s, shared_sstable sst, const query::partition_range& range,
|
||||
const io_priority_class& pc = default_priority_class());
|
||||
|
||||
@@ -44,6 +44,7 @@
|
||||
#include <stdio.h>
|
||||
#include <ftw.h>
|
||||
#include <unistd.h>
|
||||
#include <boost/range/algorithm/find_if.hpp>
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
@@ -1160,7 +1161,7 @@ SEASTAR_TEST_CASE(compact) {
|
||||
static lw_shared_ptr<sstable_list> create_sstable_list(std::vector<sstables::shared_sstable>& sstables) {
|
||||
sstable_list list;
|
||||
for (auto& sst : sstables) {
|
||||
list.insert({sst->generation(), sst});
|
||||
list.insert(sst);
|
||||
}
|
||||
return make_lw_shared<sstable_list>(std::move(list));
|
||||
}
|
||||
@@ -1170,7 +1171,7 @@ static std::vector<sstables::shared_sstable> get_candidates_for_leveled_strategy
|
||||
std::vector<sstables::shared_sstable> candidates;
|
||||
candidates.reserve(cf.sstables_count());
|
||||
for (auto& entry : *cf.get_sstables()) {
|
||||
candidates.push_back(entry.second);
|
||||
candidates.push_back(entry);
|
||||
}
|
||||
return candidates;
|
||||
}
|
||||
@@ -1676,11 +1677,10 @@ static bool key_range_overlaps(sstring a, sstring b, sstring c, sstring d) {
|
||||
|
||||
static shared_sstable get_sstable(const lw_shared_ptr<column_family>& cf, int64_t generation) {
|
||||
auto sstables = cf->get_sstables();
|
||||
auto entry = sstables->find(generation);
|
||||
auto entry = boost::range::find_if(*sstables, [generation] (shared_sstable sst) { return generation == sst->generation(); });
|
||||
assert(entry != sstables->end());
|
||||
assert(entry->first == generation);
|
||||
assert(entry->second->generation() == generation);
|
||||
return entry->second;
|
||||
assert((*entry)->generation() == generation);
|
||||
return *entry;
|
||||
}
|
||||
|
||||
static bool sstable_overlaps(const lw_shared_ptr<column_family>& cf, int64_t gen1, int64_t gen2) {
|
||||
|
||||
@@ -37,8 +37,7 @@ public:
|
||||
column_family_test(lw_shared_ptr<column_family> cf) : _cf(cf) {}
|
||||
|
||||
void add_sstable(sstables::sstable&& sstable) {
|
||||
auto generation = sstable.generation();
|
||||
_cf->_sstables->emplace(generation, make_lw_shared(std::move(sstable)));
|
||||
_cf->_sstables->insert(make_lw_shared(std::move(sstable)));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user