Merge "Make stronger guarantees in row_cache's clear/invalidate" from Tomasz

"Correctness of current uses of clear() and invalidate() relies on fact
that cache is not populated using readers created before
invalidation. Sstables are first modified and then cache is
invalidated. This is not guaranteed by current implementation
though. As pointed out by Avi, a populating read may race with the
call to clear(). If that read started before clear() and completed
after it, the cache may be populated with data which does not
correspond to the new sstable set.

To provide such guarantee, invalidate() variants were adjusted to
synchronize using _populate_phaser, similarly like row_cache::update()
does.

Fixes #1291."
This commit is contained in:
Avi Kivity
2016-06-13 09:55:29 +03:00
5 changed files with 161 additions and 64 deletions

View File

@@ -1067,7 +1067,7 @@ column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tab
}).then([this] {
// Drop entire cache for this column family because it may be populated
// with stale data.
get_row_cache().clear();
return get_row_cache().clear();
});
}
@@ -2258,7 +2258,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
// gotten all things to disk. Again, need queue-ish or something.
f = cf.flush();
} else {
cf.clear();
f = cf.clear();
}
return cf.run_with_compaction_disabled([f = std::move(f), &cf, auto_snapshot, tsf = std::move(tsf)]() mutable {
@@ -2634,21 +2634,24 @@ future<> column_family::flush_streaming_mutations(std::vector<query::partition_r
// temporary counter measure.
return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] {
return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed).finally([this, ranges = std::move(ranges)] {
if (_config.enable_cache) {
for (auto& range : ranges) {
_cache.invalidate(range);
}
if (!_config.enable_cache) {
return make_ready_future<>();
}
return do_with(std::move(ranges), [this] (auto& ranges) {
return parallel_for_each(ranges, [this](auto&& range) {
return _cache.invalidate(range);
});
});
});
});
}
void column_family::clear() {
_cache.clear();
future<> column_family::clear() {
_memtables->clear();
_memtables->add_memtable();
_streaming_memtables->clear();
_streaming_memtables->add_memtable();
return _cache.clear();
}
// NOTE: does not need to be futurized, but might eventually, depending on
@@ -2674,13 +2677,13 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point
_sstables = std::move(pruned);
dblog.debug("cleaning out row cache");
_cache.clear();
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return sstables::delete_atomically({s});
}).then([rp] {
return make_ready_future<db::replay_position>(rp);
}).finally([remove] {}); // keep the objects alive until here.
return _cache.clear().then([rp, remove = std::move(remove)] () mutable {
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return sstables::delete_atomically({s});
}).then([rp] {
return make_ready_future<db::replay_position>(rp);
}).finally([remove] {}); // keep the objects alive until here.
});
});
}

View File

@@ -497,7 +497,7 @@ public:
future<> flush();
future<> flush(const db::replay_position&);
future<> flush_streaming_mutations(std::vector<query::partition_range> ranges = std::vector<query::partition_range>{});
void clear(); // discards memtable(s) without flushing them to disk.
future<> clear(); // discards memtable(s) without flushing them to disk.
future<db::replay_position> discard_sstables(db_clock::time_point);
// Important warning: disabling writes will only have an effect in the current shard.

View File

@@ -456,7 +456,16 @@ row_cache::make_reader(schema_ptr s,
}
row_cache::~row_cache() {
clear();
clear_now();
}
void row_cache::clear_now() noexcept {
with_allocator(_tracker.allocator(), [this] {
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
_tracker.on_erase();
deleter(p);
});
});
}
void row_cache::populate(const mutation& m) {
@@ -480,16 +489,8 @@ void row_cache::populate(const mutation& m) {
});
}
void row_cache::clear() {
with_allocator(_tracker.allocator(), [this] {
// We depend on clear_and_dispose() below not looking up any keys.
// Using with_linearized_managed_bytes() is no helps, because we don't
// want to propagate an exception from here.
_partitions.clear_and_dispose([this, deleter = current_deleter<cache_entry>()] (auto&& p) mutable {
_tracker.on_erase();
deleter(p);
});
});
future<> row_cache::clear() {
return invalidate(query::full_partition_range);
}
future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) {
@@ -515,8 +516,8 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec
});
if (blow_cache) {
// We failed to invalidate the key, presumably due to with_linearized_managed_bytes()
// running out of memory. Recover using clear(), which doesn't throw.
clear();
// running out of memory. Recover using clear_now(), which doesn't throw.
clear_now();
}
});
});
@@ -590,7 +591,8 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) {
});
}
void row_cache::invalidate(const dht::decorated_key& dk) {
future<> row_cache::invalidate(const dht::decorated_key& dk) {
return _populate_phaser.advance_and_await().then([this, &dk] {
_read_section(_tracker.region(), [&] {
with_allocator(_tracker.allocator(), [this, &dk] {
with_linearized_managed_bytes([&] {
@@ -598,17 +600,24 @@ void row_cache::invalidate(const dht::decorated_key& dk) {
});
});
});
});
}
void row_cache::invalidate(const query::partition_range& range) {
with_linearized_managed_bytes([&] {
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
auto unwrapped = range.unwrap();
invalidate(unwrapped.first);
invalidate(unwrapped.second);
return;
}
future<> row_cache::invalidate(const query::partition_range& range) {
return _populate_phaser.advance_and_await().then([this, &range] {
with_linearized_managed_bytes([&] {
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
auto unwrapped = range.unwrap();
invalidate_unwrapped(unwrapped.first);
invalidate_unwrapped(unwrapped.second);
} else {
invalidate_unwrapped(range);
}
});
});
}
void row_cache::invalidate_unwrapped(const query::partition_range& range) {
logalloc::reclaim_lock _(_tracker.region());
auto cmp = cache_entry::compare(_schema);
@@ -634,7 +643,6 @@ void row_cache::invalidate(const query::partition_range& range) {
deleter(p);
});
});
});
}
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys,

View File

@@ -186,13 +186,13 @@ private:
mutation_source _underlying;
key_source _underlying_keys;
// Synchronizes populating reads with update() to ensure that cache
// Synchronizes populating reads with updates of underlying data source to ensure that cache
// remains consistent across flushes with the underlying data source.
// Readers obtained from the underlying data source in earlier than
// current phases must not be used to populate the cache, unless they hold
// phaser::operation created in the reader's phase of origin. Readers
// should hold to a phase only briefly because this inhibits progress of
// update(). Phase changes occur only in update(), which can be assumed to
// updates. Phase changes occur in update()/clear(), which can be assumed to
// be asynchronous wrt invoking of the underlying data source.
utils::phased_barrier _populate_phaser;
@@ -206,6 +206,8 @@ private:
void on_miss();
void upgrade_entry(cache_entry&);
void invalidate_locked(const dht::decorated_key&);
void invalidate_unwrapped(const query::partition_range&);
void clear_now() noexcept;
static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group;
public:
~row_cache();
@@ -230,7 +232,9 @@ public:
void populate(const mutation& m);
// Clears the cache.
void clear();
// Guarantees that cache will not be populated using readers created
// before this method was invoked.
future<> clear();
// Synchronizes cache with the underlying data source from a memtable which
// has just been flushed to the underlying data source.
@@ -242,11 +246,21 @@ public:
void touch(const dht::decorated_key&);
// Removes given partition from cache.
void invalidate(const dht::decorated_key&);
//
// Guarantees that cache will not be populated with given key
// using readers created before this method was invoked.
//
// The key must be kept alive until method resolves.
future<> invalidate(const dht::decorated_key& key);
// Removes given range of partitions from cache.
// The range can be a wrap around.
void invalidate(const query::partition_range&);
//
// Guarantees that cache will not be populated with partitions from that range
// using readers created before this method was invoked.
//
// The range must be kept alive until method resolves.
future<> invalidate(const query::partition_range&);
auto num_entries() const {
return _partitions.size();

View File

@@ -546,27 +546,33 @@ static std::vector<mutation> updated_ring(std::vector<mutation>& mutations) {
return result;
}
static mutation_source make_mutation_source(std::vector<lw_shared_ptr<memtable>>& memtables) {
return mutation_source([&memtables] (schema_ptr s, const query::partition_range& pr) {
std::vector<mutation_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->make_reader(s, pr));
}
return make_combined_reader(std::move(readers));
});
}
static key_source make_key_source(schema_ptr s, std::vector<lw_shared_ptr<memtable>>& memtables) {
return key_source([s, &memtables] (const query::partition_range& pr) {
std::vector<key_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->as_key_source()(pr));
}
return make_combined_reader(s, std::move(readers));
});
}
SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
return seastar::async([] {
auto s = make_schema();
std::vector<lw_shared_ptr<memtable>> memtables;
auto memtables_data_source = mutation_source([&] (schema_ptr s, const query::partition_range& pr) {
std::vector<mutation_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->make_reader(s, pr));
}
return make_combined_reader(std::move(readers));
});
auto memtables_key_source = key_source([&] (const query::partition_range& pr) {
std::vector<key_reader> readers;
for (auto&& mt : memtables) {
readers.emplace_back(mt->as_key_source()(pr));
}
return make_combined_reader(s, std::move(readers));
});
throttled_mutation_source cache_source(memtables_data_source);
throttled_mutation_source cache_source(make_mutation_source(memtables));
cache_tracker tracker;
row_cache cache(s, cache_source, memtables_key_source, tracker);
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
auto mt1 = make_lw_shared<memtable>(s);
memtables.push_back(mt1);
@@ -656,7 +662,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
auto some_element = keys_in_cache.begin() + 547;
std::vector<dht::decorated_key> keys_not_in_cache;
keys_not_in_cache.push_back(*some_element);
cache.invalidate(*some_element);
cache.invalidate(*some_element).get();
keys_in_cache.erase(some_element);
for (auto&& key : keys_in_cache) {
@@ -676,7 +682,7 @@ SEASTAR_TEST_CASE(test_invalidate) {
{ *some_range_begin, true }, { *some_range_end, false }
);
keys_not_in_cache.insert(keys_not_in_cache.end(), some_range_begin, some_range_end);
cache.invalidate(range);
cache.invalidate(range).get();
keys_in_cache.erase(some_range_begin, some_range_end);
for (auto&& key : keys_in_cache) {
@@ -688,6 +694,72 @@ SEASTAR_TEST_CASE(test_invalidate) {
});
}
SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
return seastar::async([] {
auto s = make_schema();
std::vector<lw_shared_ptr<memtable>> memtables;
throttled_mutation_source cache_source(make_mutation_source(memtables));
cache_tracker tracker;
row_cache cache(s, cache_source, make_key_source(s, memtables), tracker);
auto mt1 = make_lw_shared<memtable>(s);
memtables.push_back(mt1);
auto ring = make_ring(s, 3);
for (auto&& m : ring) {
mt1->apply(m);
}
auto mt2 = make_lw_shared<memtable>(s);
auto ring2 = updated_ring(ring);
for (auto&& m : ring2) {
mt2->apply(m);
}
cache_source.block();
auto rd1 = cache.make_reader(s);
auto rd1_result = rd1();
sleep(10ms).get();
memtables.clear();
memtables.push_back(mt2);
// This update should miss on all partitions
auto cache_cleared = cache.clear();
auto rd2 = cache.make_reader(s);
// rd1, which is in progress, should not prevent forward progress of clear()
cache_source.unblock();
cache_cleared.get();
// Reads started before memtable flush should return previous value, otherwise this test
// doesn't trigger the conditions it is supposed to protect against.
assert_that(rd1_result.get0()).has_mutation().is_equal_to(ring[0]);
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[1]);
assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[2]);
assert_that(rd1().get0()).has_no_mutation();
// Reads started after clear but before previous populations completed
// should already see the new data
assert_that(std::move(rd2))
.produces(ring2[0])
.produces(ring2[1])
.produces(ring2[2])
.produces_end_of_stream();
// Reads started after clear should see new data
assert_that(cache.make_reader(s))
.produces(ring2[0])
.produces(ring2[1])
.produces(ring2[2])
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
return seastar::async([] {
auto s = make_schema();
@@ -707,7 +779,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
}
// wrap-around
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()}));
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})).get();
verify_does_not_have(cache, ring[0].decorated_key());
verify_does_not_have(cache, ring[1].decorated_key());
@@ -719,7 +791,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
verify_does_not_have(cache, ring[7].decorated_key());
// not wrap-around
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()}));
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})).get();
verify_does_not_have(cache, ring[0].decorated_key());
verify_does_not_have(cache, ring[1].decorated_key());