Merge "educe overhead of partition presence checker during cache update" from Tomasz
Refs #1943. * 'tgrabiec/optimize-bloom-filter' of github.com:cloudius-systems/seastar-dev: db: Compute key hash once in partition_presence_checker bloom_filter: Allow checking presence using pre-hashed key db: Use incremental selector in partition_presence_checker
This commit is contained in:
41
database.cc
41
database.cc
@@ -139,11 +139,16 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog* cl
|
||||
}
|
||||
|
||||
partition_presence_checker
|
||||
column_family::make_partition_presence_checker(sstables::shared_sstable exclude_sstable) {
|
||||
return [this, exclude_sstable = std::move(exclude_sstable)] (partition_key_view key) {
|
||||
auto exclude = [e = std::move(exclude_sstable)] (auto s) { return s != e; };
|
||||
for (auto&& s : *_sstables->all() | boost::adaptors::filtered(exclude)) {
|
||||
if (s->filter_has_key(*_schema, key)) {
|
||||
column_family::make_partition_presence_checker(lw_shared_ptr<sstables::sstable_set> sstables) {
|
||||
auto sel = make_lw_shared(sstables->make_incremental_selector());
|
||||
return [this, sstables = std::move(sstables), sel = std::move(sel)] (const dht::decorated_key& key) {
|
||||
auto& sst = sel->select(key.token());
|
||||
if (sst.empty()) {
|
||||
return partition_presence_checker_result::definitely_doesnt_exist;
|
||||
}
|
||||
auto hk = sstables::sstable::make_hashed_key(*_schema, key.key());
|
||||
for (auto&& s : sst) {
|
||||
if (s->filter_has_key(hk)) {
|
||||
return partition_presence_checker_result::maybe_exists;
|
||||
}
|
||||
}
|
||||
@@ -885,11 +890,12 @@ void column_family::add_sstable(lw_shared_ptr<sstables::sstable> sstable) {
|
||||
}
|
||||
|
||||
future<>
|
||||
column_family::update_cache(memtable& m, sstables::shared_sstable exclude_sstable) {
|
||||
column_family::update_cache(memtable& m, lw_shared_ptr<sstables::sstable_set> old_sstables) {
|
||||
if (_config.enable_cache) {
|
||||
// be careful to use the old sstable list, since the new one will hit every
|
||||
// mutation in m.
|
||||
return _cache.update(m, make_partition_presence_checker(std::move(exclude_sstable)));
|
||||
return _cache.update(m, make_partition_presence_checker(std::move(old_sstables)));
|
||||
|
||||
} else {
|
||||
return m.clear_gently();
|
||||
}
|
||||
@@ -1082,15 +1088,20 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old) {
|
||||
try {
|
||||
ret.get();
|
||||
|
||||
// We must add sstable before we call update_cache(), because
|
||||
// memtable's data after moving to cache can be evicted at any time.
|
||||
auto old_sstables = _sstables;
|
||||
add_sstable(newtab);
|
||||
old->mark_flushed(newtab);
|
||||
// Cache updates are serialized because partition_presence_checker
|
||||
// is using data source snapshot created before the update starts, so that
|
||||
// we can use incremental_selector. If updates were done concurrently we
|
||||
// could mispopulate due to stale presence information.
|
||||
return with_semaphore(_cache_update_sem, 1, [this, old, newtab] {
|
||||
// We must add sstable before we call update_cache(), because
|
||||
// memtable's data after moving to cache can be evicted at any time.
|
||||
auto old_sstables = _sstables;
|
||||
add_sstable(newtab);
|
||||
old->mark_flushed(newtab);
|
||||
|
||||
trigger_compaction();
|
||||
|
||||
return update_cache(*old, newtab).then_wrapped([this, newtab, old] (future<> f) {
|
||||
trigger_compaction();
|
||||
return update_cache(*old, std::move(old_sstables));
|
||||
}).then_wrapped([this, newtab, old] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch(...) {
|
||||
|
||||
@@ -513,6 +513,7 @@ private:
|
||||
seastar::gate _streaming_flush_gate;
|
||||
std::unordered_map<sstring, db::view::view> _views;
|
||||
std::vector<view_ptr> _view_schemas;
|
||||
semaphore _cache_update_sem{1};
|
||||
private:
|
||||
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable);
|
||||
void add_sstable(sstables::sstable&& sstable);
|
||||
@@ -524,7 +525,7 @@ private:
|
||||
lw_shared_ptr<memtable> new_memtable();
|
||||
lw_shared_ptr<memtable> new_streaming_memtable();
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
|
||||
future<> update_cache(memtable&, sstables::shared_sstable exclude_sstable);
|
||||
future<> update_cache(memtable&, lw_shared_ptr<sstables::sstable_set> old_sstables);
|
||||
struct merge_comparator;
|
||||
|
||||
// update the sstable generation, making sure that new new sstables don't overwrite this one.
|
||||
@@ -560,7 +561,7 @@ private:
|
||||
tracing::trace_state_ptr trace_state) const;
|
||||
|
||||
mutation_source sstables_as_mutation_source();
|
||||
partition_presence_checker make_partition_presence_checker(sstables::shared_sstable exclude_sstable);
|
||||
partition_presence_checker make_partition_presence_checker(lw_shared_ptr<sstables::sstable_set>);
|
||||
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
|
||||
void do_trigger_compaction();
|
||||
public:
|
||||
|
||||
@@ -290,11 +290,11 @@ enum class partition_presence_checker_result {
|
||||
definitely_doesnt_exist,
|
||||
maybe_exists
|
||||
};
|
||||
using partition_presence_checker = std::function<partition_presence_checker_result (const partition_key& key)>;
|
||||
using partition_presence_checker = std::function<partition_presence_checker_result (const dht::decorated_key& key)>;
|
||||
|
||||
inline
|
||||
partition_presence_checker make_default_partition_presence_checker() {
|
||||
return [] (partition_key_view key) { return partition_presence_checker_result::maybe_exists; };
|
||||
return [] (const dht::decorated_key&) { return partition_presence_checker_result::maybe_exists; };
|
||||
}
|
||||
|
||||
template<typename Consumer>
|
||||
|
||||
@@ -848,7 +848,7 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec
|
||||
_tracker.touch(entry);
|
||||
_tracker.on_merge();
|
||||
}
|
||||
} else if (presence_checker(mem_e.key().key()) ==
|
||||
} else if (presence_checker(mem_e.key()) ==
|
||||
partition_presence_checker_result::definitely_doesnt_exist) {
|
||||
cache_entry* entry = current_allocator().construct<cache_entry>(
|
||||
mem_e.schema(), std::move(mem_e.key()), std::move(mem_e.partition()));
|
||||
|
||||
@@ -2627,6 +2627,10 @@ sstable::get_shards_for_this_sstable() const {
|
||||
return boost::copy_range<std::vector<unsigned>>(shards);
|
||||
}
|
||||
|
||||
utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key& key) {
|
||||
return utils::make_hashed_key(static_cast<bytes_view>(key::from_partition_key(s, key)));
|
||||
}
|
||||
|
||||
std::ostream&
|
||||
operator<<(std::ostream& os, const sstable_to_delete& std) {
|
||||
return os << std.name << "(" << (std.shared ? "shared" : "unshared") << ")";
|
||||
|
||||
@@ -607,10 +607,16 @@ public:
|
||||
return _filter->is_present(bytes_view(key));
|
||||
}
|
||||
|
||||
bool filter_has_key(utils::hashed_key key) {
|
||||
return _filter->is_present(key);
|
||||
}
|
||||
|
||||
bool filter_has_key(const schema& s, partition_key_view key) {
|
||||
return filter_has_key(key::from_partition_key(s, key));
|
||||
}
|
||||
|
||||
static utils::hashed_key make_hashed_key(const schema& s, const partition_key& key);
|
||||
|
||||
uint64_t filter_get_false_positive() {
|
||||
return _filter_tracker.false_positive;
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ int main(int argc, char** argv) {
|
||||
mt->apply(m);
|
||||
}
|
||||
|
||||
auto checker = [](const partition_key& key) {
|
||||
auto checker = [](auto) {
|
||||
return partition_presence_checker_result::maybe_exists;
|
||||
};
|
||||
|
||||
|
||||
@@ -123,7 +123,7 @@ int main(int argc, char** argv) {
|
||||
// When this assertion fails, increase amount of memory
|
||||
assert(mt->occupancy().used_space() < reclaimable_memory());
|
||||
|
||||
auto checker = [](const partition_key& key) {
|
||||
auto checker = [](auto) {
|
||||
return partition_presence_checker_result::maybe_exists;
|
||||
};
|
||||
|
||||
|
||||
@@ -51,36 +51,41 @@
|
||||
|
||||
namespace utils {
|
||||
namespace filter {
|
||||
static thread_local auto reusable_indexes = std::vector<int64_t>();
|
||||
|
||||
void bloom_filter::set_indexes(int64_t base, int64_t inc, int count, int64_t max, std::vector<int64_t>& results) {
|
||||
template<typename Func>
|
||||
void for_each_index(hashed_key hk, int count, int64_t max, Func&& func) {
|
||||
auto h = hk.hash();
|
||||
int64_t base = h[0];
|
||||
int64_t inc = h[1];
|
||||
for (int i = 0; i < count; i++) {
|
||||
results[i] = std::abs(base % max);
|
||||
if (func(std::abs(base % max)) == stop_iteration::yes) {
|
||||
break;
|
||||
}
|
||||
base = static_cast<int64_t>(static_cast<uint64_t>(base) + static_cast<uint64_t>(inc));
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<int64_t> bloom_filter::get_hash_buckets(const bytes_view& key, int hash_count, int64_t max) {
|
||||
std::array<uint64_t, 2> h;
|
||||
hash(key, 0, h);
|
||||
|
||||
auto indexes = std::vector<int64_t>();
|
||||
|
||||
indexes.resize(hash_count);
|
||||
set_indexes(h[0], h[1], hash_count, max, indexes);
|
||||
return indexes;
|
||||
bool bloom_filter::is_present(hashed_key key) {
|
||||
bool result = true;
|
||||
for_each_index(key, _hash_count, _bitset.size(), [this, &result] (auto i) {
|
||||
if (!_bitset.test(i)) {
|
||||
result = false;
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
return stop_iteration::no;
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
std::vector<int64_t> bloom_filter::indexes(const bytes_view& key) {
|
||||
// we use the same array both for storing the hash result, and for storing the indexes we return,
|
||||
// so that we do not need to allocate two arrays.
|
||||
auto& idx = reusable_indexes;
|
||||
std::array<uint64_t, 2> h;
|
||||
hash(key, 0, h);
|
||||
void bloom_filter::add(const bytes_view& key) {
|
||||
for_each_index(make_hashed_key(key), _hash_count, _bitset.size(), [this] (auto i) {
|
||||
_bitset.set(i);
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
idx.resize(_hash_count);
|
||||
set_indexes(h[0], h[1], _hash_count, _bitset.size(), idx);
|
||||
return idx;
|
||||
bool bloom_filter::is_present(const bytes_view& key) {
|
||||
return is_present(make_hashed_key(key));
|
||||
}
|
||||
|
||||
filter_ptr create_filter(int hash, large_bitset&& bitset) {
|
||||
|
||||
@@ -58,11 +58,6 @@ public:
|
||||
private:
|
||||
bitmap _bitset;
|
||||
int _hash_count;
|
||||
|
||||
void set_indexes(int64_t base, int64_t inc, int count, int64_t max, std::vector<int64_t>& results);
|
||||
std::vector<int64_t> get_hash_buckets(const bytes_view& key, int hash_count, int64_t max);
|
||||
std::vector<int64_t> indexes(const bytes_view& key);
|
||||
|
||||
public:
|
||||
int num_hashes() { return _hash_count; }
|
||||
bitmap& bits() { return _bitset; }
|
||||
@@ -70,26 +65,11 @@ public:
|
||||
bloom_filter(int hashes, bitmap&& bs) : _bitset(std::move(bs)), _hash_count(hashes) {
|
||||
}
|
||||
|
||||
virtual void hash(const bytes_view& b, int64_t seed, std::array<uint64_t, 2>& result) = 0;
|
||||
virtual void add(const bytes_view& key) override;
|
||||
|
||||
virtual void add(const bytes_view& key) override {
|
||||
virtual bool is_present(const bytes_view& key) override;
|
||||
|
||||
auto idx = indexes(key);
|
||||
for (int i = 0; i < _hash_count; i++) {
|
||||
_bitset.set(idx[i]);
|
||||
}
|
||||
}
|
||||
|
||||
virtual bool is_present(const bytes_view& key) override {
|
||||
|
||||
auto idx = indexes(key);
|
||||
for (int i = 0; i < _hash_count; i++) {
|
||||
if (!_bitset.test(idx[i])) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
virtual bool is_present(hashed_key key) override;
|
||||
|
||||
virtual void clear() override {
|
||||
_bitset.clear();
|
||||
@@ -106,9 +86,6 @@ struct murmur3_bloom_filter: public bloom_filter {
|
||||
|
||||
murmur3_bloom_filter(int hashes, bitmap&& bs) : bloom_filter(hashes, std::move(bs)) {}
|
||||
|
||||
virtual void hash(const bytes_view& b, int64_t seed, std::array<uint64_t, 2>& result) {
|
||||
utils::murmur_hash::hash3_x64_128(b, seed, result);
|
||||
}
|
||||
};
|
||||
|
||||
struct always_present_filter: public i_filter {
|
||||
@@ -117,6 +94,10 @@ struct always_present_filter: public i_filter {
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual bool is_present(hashed_key key) override {
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual void add(const bytes_view& key) override { }
|
||||
|
||||
virtual void clear() override { }
|
||||
|
||||
@@ -51,4 +51,11 @@ filter_ptr i_filter::get_filter(int64_t num_elements, int target_buckets_per_ele
|
||||
auto spec = bloom_calculations::compute_bloom_spec(buckets_per_element);
|
||||
return filter::create_filter(spec.K, num_elements, spec.buckets_per_element);
|
||||
}
|
||||
|
||||
hashed_key make_hashed_key(bytes_view b) {
|
||||
std::array<uint64_t, 2> h;
|
||||
utils::murmur_hash::hash3_x64_128(b, 0, h);
|
||||
return { h };
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -48,6 +48,16 @@ namespace utils {
|
||||
struct i_filter;
|
||||
using filter_ptr = std::unique_ptr<i_filter>;
|
||||
|
||||
class hashed_key {
|
||||
private:
|
||||
std::array<uint64_t, 2> _hash;
|
||||
public:
|
||||
hashed_key(std::array<uint64_t, 2> h) : _hash(h) {}
|
||||
std::array<uint64_t, 2> hash() const { return _hash; };
|
||||
};
|
||||
|
||||
hashed_key make_hashed_key(bytes_view key);
|
||||
|
||||
// FIXME: serialize() and serialized_size() not implemented. We should only be serializing to
|
||||
// disk, not in the wire.
|
||||
struct i_filter {
|
||||
@@ -55,6 +65,7 @@ struct i_filter {
|
||||
|
||||
virtual void add(const bytes_view& key) = 0;
|
||||
virtual bool is_present(const bytes_view& key) = 0;
|
||||
virtual bool is_present(hashed_key) = 0;
|
||||
virtual void clear() = 0;
|
||||
virtual void close() = 0;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user