From 2ab18dcd2d0228d4ef8e3945921b94f56244078f Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 3 Jun 2016 13:34:39 +0200 Subject: [PATCH 1/2] row_cache: Implement clear() using invalidate() Reduces code duplication. --- row_cache.cc | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/row_cache.cc b/row_cache.cc index 2d4a2f845a..827913d0ba 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -468,15 +468,7 @@ void row_cache::populate(const mutation& m) { } void row_cache::clear() { - with_allocator(_tracker.allocator(), [this] { - // We depend on clear_and_dispose() below not looking up any keys. - // Using with_linearized_managed_bytes() is no helps, because we don't - // want to propagate an exception from here. - _partitions.clear_and_dispose([this, deleter = current_deleter()] (auto&& p) mutable { - _tracker.on_erase(); - deleter(p); - }); - }); + invalidate(query::full_partition_range); } future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) { From 170a214628c2aec26dcc1d0c8d121213895b3a96 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 6 Jun 2016 13:21:06 +0200 Subject: [PATCH 2/2] row_cache: Make stronger guarantees in clear/invalidate Correctness of current uses of clear() and invalidate() relies on fact that cache is not populated using readers created before invalidation. Sstables are first modified and then cache is invalidated. This is not guaranteed by current implementation though. As pointed out by Avi, a populating read may race with the call to clear(). If that read started before clear() and completed after it, the cache may be populated with data which does not correspond to the new sstable set. To provide such guarantee, invalidate() variants were adjusted to synchronize using _populate_phaser, similarly like row_cache::update() does. --- database.cc | 33 ++++++------ database.hh | 2 +- row_cache.cc | 46 +++++++++++------ row_cache.hh | 24 +++++++-- tests/row_cache_test.cc | 112 +++++++++++++++++++++++++++++++++------- 5 files changed, 161 insertions(+), 56 deletions(-) diff --git a/database.cc b/database.cc index 0c39a8a16d..be6dd680f3 100644 --- a/database.cc +++ b/database.cc @@ -1049,7 +1049,7 @@ column_family::load_new_sstables(std::vector new_tab }).then([this] { // Drop entire cache for this column family because it may be populated // with stale data. - get_row_cache().clear(); + return get_row_cache().clear(); }); } @@ -2253,7 +2253,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun // gotten all things to disk. Again, need queue-ish or something. f = cf.flush(); } else { - cf.clear(); + f = cf.clear(); } return cf.run_with_compaction_disabled([f = std::move(f), &cf, auto_snapshot, tsf = std::move(tsf)]() mutable { @@ -2629,21 +2629,24 @@ future<> column_family::flush_streaming_mutations(std::vectorseal_active_memtable(memtable_list::flush_behavior::delayed).finally([this, ranges = std::move(ranges)] { - if (_config.enable_cache) { - for (auto& range : ranges) { - _cache.invalidate(range); - } + if (!_config.enable_cache) { + return make_ready_future<>(); } + return do_with(std::move(ranges), [this] (auto& ranges) { + return parallel_for_each(ranges, [this](auto&& range) { + return _cache.invalidate(range); + }); + }); }); }); } -void column_family::clear() { - _cache.clear(); +future<> column_family::clear() { _memtables->clear(); _memtables->add_memtable(); _streaming_memtables->clear(); _streaming_memtables->add_memtable(); + return _cache.clear(); } // NOTE: does not need to be futurized, but might eventually, depending on @@ -2669,13 +2672,13 @@ future column_family::discard_sstables(db_clock::time_point _sstables = std::move(pruned); dblog.debug("cleaning out row cache"); - _cache.clear(); - - return parallel_for_each(remove, [](sstables::shared_sstable s) { - return sstables::delete_atomically({s}); - }).then([rp] { - return make_ready_future(rp); - }).finally([remove] {}); // keep the objects alive until here. + return _cache.clear().then([rp, remove = std::move(remove)] () mutable { + return parallel_for_each(remove, [](sstables::shared_sstable s) { + return sstables::delete_atomically({s}); + }).then([rp] { + return make_ready_future(rp); + }).finally([remove] {}); // keep the objects alive until here. + }); }); } diff --git a/database.hh b/database.hh index 488a206801..9bc596e333 100644 --- a/database.hh +++ b/database.hh @@ -496,7 +496,7 @@ public: future<> flush(); future<> flush(const db::replay_position&); future<> flush_streaming_mutations(std::vector ranges = std::vector{}); - void clear(); // discards memtable(s) without flushing them to disk. + future<> clear(); // discards memtable(s) without flushing them to disk. future discard_sstables(db_clock::time_point); // Important warning: disabling writes will only have an effect in the current shard. diff --git a/row_cache.cc b/row_cache.cc index 827913d0ba..da16c09f1c 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -443,7 +443,16 @@ row_cache::make_reader(schema_ptr s, } row_cache::~row_cache() { - clear(); + clear_now(); +} + +void row_cache::clear_now() noexcept { + with_allocator(_tracker.allocator(), [this] { + _partitions.clear_and_dispose([this, deleter = current_deleter()] (auto&& p) mutable { + _tracker.on_erase(); + deleter(p); + }); + }); } void row_cache::populate(const mutation& m) { @@ -467,8 +476,8 @@ void row_cache::populate(const mutation& m) { }); } -void row_cache::clear() { - invalidate(query::full_partition_range); +future<> row_cache::clear() { + return invalidate(query::full_partition_range); } future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) { @@ -494,8 +503,8 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec }); if (blow_cache) { // We failed to invalidate the key, presumably due to with_linearized_managed_bytes() - // running out of memory. Recover using clear(), which doesn't throw. - clear(); + // running out of memory. Recover using clear_now(), which doesn't throw. + clear_now(); } }); }); @@ -569,7 +578,8 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) { }); } -void row_cache::invalidate(const dht::decorated_key& dk) { +future<> row_cache::invalidate(const dht::decorated_key& dk) { +return _populate_phaser.advance_and_await().then([this, &dk] { _read_section(_tracker.region(), [&] { with_allocator(_tracker.allocator(), [this, &dk] { with_linearized_managed_bytes([&] { @@ -577,17 +587,24 @@ void row_cache::invalidate(const dht::decorated_key& dk) { }); }); }); +}); } -void row_cache::invalidate(const query::partition_range& range) { - with_linearized_managed_bytes([&] { - if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) { - auto unwrapped = range.unwrap(); - invalidate(unwrapped.first); - invalidate(unwrapped.second); - return; - } +future<> row_cache::invalidate(const query::partition_range& range) { + return _populate_phaser.advance_and_await().then([this, &range] { + with_linearized_managed_bytes([&] { + if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) { + auto unwrapped = range.unwrap(); + invalidate_unwrapped(unwrapped.first); + invalidate_unwrapped(unwrapped.second); + } else { + invalidate_unwrapped(range); + } + }); + }); +} +void row_cache::invalidate_unwrapped(const query::partition_range& range) { logalloc::reclaim_lock _(_tracker.region()); auto cmp = cache_entry::compare(_schema); @@ -613,7 +630,6 @@ void row_cache::invalidate(const query::partition_range& range) { deleter(p); }); }); - }); } row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys, diff --git a/row_cache.hh b/row_cache.hh index 335626da5a..ae65a301f8 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -184,13 +184,13 @@ private: mutation_source _underlying; key_source _underlying_keys; - // Synchronizes populating reads with update() to ensure that cache + // Synchronizes populating reads with updates of underlying data source to ensure that cache // remains consistent across flushes with the underlying data source. // Readers obtained from the underlying data source in earlier than // current phases must not be used to populate the cache, unless they hold // phaser::operation created in the reader's phase of origin. Readers // should hold to a phase only briefly because this inhibits progress of - // update(). Phase changes occur only in update(), which can be assumed to + // updates. Phase changes occur in update()/clear(), which can be assumed to // be asynchronous wrt invoking of the underlying data source. utils::phased_barrier _populate_phaser; @@ -204,6 +204,8 @@ private: void on_miss(); void upgrade_entry(cache_entry&); void invalidate_locked(const dht::decorated_key&); + void invalidate_unwrapped(const query::partition_range&); + void clear_now() noexcept; static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group; public: ~row_cache(); @@ -228,7 +230,9 @@ public: void populate(const mutation& m); // Clears the cache. - void clear(); + // Guarantees that cache will not be populated using readers created + // before this method was invoked. + future<> clear(); // Synchronizes cache with the underlying data source from a memtable which // has just been flushed to the underlying data source. @@ -240,11 +244,21 @@ public: void touch(const dht::decorated_key&); // Removes given partition from cache. - void invalidate(const dht::decorated_key&); + // + // Guarantees that cache will not be populated with given key + // using readers created before this method was invoked. + // + // The key must be kept alive until method resolves. + future<> invalidate(const dht::decorated_key& key); // Removes given range of partitions from cache. // The range can be a wrap around. - void invalidate(const query::partition_range&); + // + // Guarantees that cache will not be populated with partitions from that range + // using readers created before this method was invoked. + // + // The range must be kept alive until method resolves. + future<> invalidate(const query::partition_range&); auto num_entries() const { return _partitions.size(); diff --git a/tests/row_cache_test.cc b/tests/row_cache_test.cc index a4f8537df1..42c2aad084 100644 --- a/tests/row_cache_test.cc +++ b/tests/row_cache_test.cc @@ -546,27 +546,33 @@ static std::vector updated_ring(std::vector& mutations) { return result; } +static mutation_source make_mutation_source(std::vector>& memtables) { + return mutation_source([&memtables] (schema_ptr s, const query::partition_range& pr) { + std::vector readers; + for (auto&& mt : memtables) { + readers.emplace_back(mt->make_reader(s, pr)); + } + return make_combined_reader(std::move(readers)); + }); +} + +static key_source make_key_source(schema_ptr s, std::vector>& memtables) { + return key_source([s, &memtables] (const query::partition_range& pr) { + std::vector readers; + for (auto&& mt : memtables) { + readers.emplace_back(mt->as_key_source()(pr)); + } + return make_combined_reader(s, std::move(readers)); + }); +} + SEASTAR_TEST_CASE(test_cache_population_and_update_race) { return seastar::async([] { auto s = make_schema(); std::vector> memtables; - auto memtables_data_source = mutation_source([&] (schema_ptr s, const query::partition_range& pr) { - std::vector readers; - for (auto&& mt : memtables) { - readers.emplace_back(mt->make_reader(s, pr)); - } - return make_combined_reader(std::move(readers)); - }); - auto memtables_key_source = key_source([&] (const query::partition_range& pr) { - std::vector readers; - for (auto&& mt : memtables) { - readers.emplace_back(mt->as_key_source()(pr)); - } - return make_combined_reader(s, std::move(readers)); - }); - throttled_mutation_source cache_source(memtables_data_source); + throttled_mutation_source cache_source(make_mutation_source(memtables)); cache_tracker tracker; - row_cache cache(s, cache_source, memtables_key_source, tracker); + row_cache cache(s, cache_source, make_key_source(s, memtables), tracker); auto mt1 = make_lw_shared(s); memtables.push_back(mt1); @@ -656,7 +662,7 @@ SEASTAR_TEST_CASE(test_invalidate) { auto some_element = keys_in_cache.begin() + 547; std::vector keys_not_in_cache; keys_not_in_cache.push_back(*some_element); - cache.invalidate(*some_element); + cache.invalidate(*some_element).get(); keys_in_cache.erase(some_element); for (auto&& key : keys_in_cache) { @@ -676,7 +682,7 @@ SEASTAR_TEST_CASE(test_invalidate) { { *some_range_begin, true }, { *some_range_end, false } ); keys_not_in_cache.insert(keys_not_in_cache.end(), some_range_begin, some_range_end); - cache.invalidate(range); + cache.invalidate(range).get(); keys_in_cache.erase(some_range_begin, some_range_end); for (auto&& key : keys_in_cache) { @@ -688,6 +694,72 @@ SEASTAR_TEST_CASE(test_invalidate) { }); } +SEASTAR_TEST_CASE(test_cache_population_and_clear_race) { + return seastar::async([] { + auto s = make_schema(); + std::vector> memtables; + throttled_mutation_source cache_source(make_mutation_source(memtables)); + cache_tracker tracker; + row_cache cache(s, cache_source, make_key_source(s, memtables), tracker); + + auto mt1 = make_lw_shared(s); + memtables.push_back(mt1); + auto ring = make_ring(s, 3); + for (auto&& m : ring) { + mt1->apply(m); + } + + auto mt2 = make_lw_shared(s); + auto ring2 = updated_ring(ring); + for (auto&& m : ring2) { + mt2->apply(m); + } + + cache_source.block(); + + auto rd1 = cache.make_reader(s); + auto rd1_result = rd1(); + + sleep(10ms).get(); + + memtables.clear(); + memtables.push_back(mt2); + + // This update should miss on all partitions + auto cache_cleared = cache.clear(); + + auto rd2 = cache.make_reader(s); + + // rd1, which is in progress, should not prevent forward progress of clear() + cache_source.unblock(); + cache_cleared.get(); + + // Reads started before memtable flush should return previous value, otherwise this test + // doesn't trigger the conditions it is supposed to protect against. + + assert_that(rd1_result.get0()).has_mutation().is_equal_to(ring[0]); + assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[1]); + assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[2]); + assert_that(rd1().get0()).has_no_mutation(); + + // Reads started after clear but before previous populations completed + // should already see the new data + assert_that(std::move(rd2)) + .produces(ring2[0]) + .produces(ring2[1]) + .produces(ring2[2]) + .produces_end_of_stream(); + + // Reads started after clear should see new data + assert_that(cache.make_reader(s)) + .produces(ring2[0]) + .produces(ring2[1]) + .produces(ring2[2]) + .produces_end_of_stream(); + }); +} + + SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) { return seastar::async([] { auto s = make_schema(); @@ -707,7 +779,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) { } // wrap-around - cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})); + cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})).get(); verify_does_not_have(cache, ring[0].decorated_key()); verify_does_not_have(cache, ring[1].decorated_key()); @@ -719,7 +791,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) { verify_does_not_have(cache, ring[7].decorated_key()); // not wrap-around - cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})); + cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})).get(); verify_does_not_have(cache, ring[0].decorated_key()); verify_does_not_have(cache, ring[1].decorated_key());