diff --git a/database.cc b/database.cc index 603663f207..5da96bbf63 100644 --- a/database.cc +++ b/database.cc @@ -1067,7 +1067,7 @@ column_family::load_new_sstables(std::vector new_tab }).then([this] { // Drop entire cache for this column family because it may be populated // with stale data. - get_row_cache().clear(); + return get_row_cache().clear(); }); } @@ -2258,7 +2258,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun // gotten all things to disk. Again, need queue-ish or something. f = cf.flush(); } else { - cf.clear(); + f = cf.clear(); } return cf.run_with_compaction_disabled([f = std::move(f), &cf, auto_snapshot, tsf = std::move(tsf)]() mutable { @@ -2634,21 +2634,24 @@ future<> column_family::flush_streaming_mutations(std::vectorseal_active_memtable(memtable_list::flush_behavior::delayed).finally([this, ranges = std::move(ranges)] { - if (_config.enable_cache) { - for (auto& range : ranges) { - _cache.invalidate(range); - } + if (!_config.enable_cache) { + return make_ready_future<>(); } + return do_with(std::move(ranges), [this] (auto& ranges) { + return parallel_for_each(ranges, [this](auto&& range) { + return _cache.invalidate(range); + }); + }); }); }); } -void column_family::clear() { - _cache.clear(); +future<> column_family::clear() { _memtables->clear(); _memtables->add_memtable(); _streaming_memtables->clear(); _streaming_memtables->add_memtable(); + return _cache.clear(); } // NOTE: does not need to be futurized, but might eventually, depending on @@ -2674,13 +2677,13 @@ future column_family::discard_sstables(db_clock::time_point _sstables = std::move(pruned); dblog.debug("cleaning out row cache"); - _cache.clear(); - - return parallel_for_each(remove, [](sstables::shared_sstable s) { - return sstables::delete_atomically({s}); - }).then([rp] { - return make_ready_future(rp); - }).finally([remove] {}); // keep the objects alive until here. + return _cache.clear().then([rp, remove = std::move(remove)] () mutable { + return parallel_for_each(remove, [](sstables::shared_sstable s) { + return sstables::delete_atomically({s}); + }).then([rp] { + return make_ready_future(rp); + }).finally([remove] {}); // keep the objects alive until here. + }); }); } diff --git a/database.hh b/database.hh index dec6bdd468..db71222835 100644 --- a/database.hh +++ b/database.hh @@ -497,7 +497,7 @@ public: future<> flush(); future<> flush(const db::replay_position&); future<> flush_streaming_mutations(std::vector ranges = std::vector{}); - void clear(); // discards memtable(s) without flushing them to disk. + future<> clear(); // discards memtable(s) without flushing them to disk. future discard_sstables(db_clock::time_point); // Important warning: disabling writes will only have an effect in the current shard. diff --git a/row_cache.cc b/row_cache.cc index 4304ad3427..6792906418 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -456,7 +456,16 @@ row_cache::make_reader(schema_ptr s, } row_cache::~row_cache() { - clear(); + clear_now(); +} + +void row_cache::clear_now() noexcept { + with_allocator(_tracker.allocator(), [this] { + _partitions.clear_and_dispose([this, deleter = current_deleter()] (auto&& p) mutable { + _tracker.on_erase(); + deleter(p); + }); + }); } void row_cache::populate(const mutation& m) { @@ -480,16 +489,8 @@ void row_cache::populate(const mutation& m) { }); } -void row_cache::clear() { - with_allocator(_tracker.allocator(), [this] { - // We depend on clear_and_dispose() below not looking up any keys. - // Using with_linearized_managed_bytes() is no helps, because we don't - // want to propagate an exception from here. - _partitions.clear_and_dispose([this, deleter = current_deleter()] (auto&& p) mutable { - _tracker.on_erase(); - deleter(p); - }); - }); +future<> row_cache::clear() { + return invalidate(query::full_partition_range); } future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) { @@ -515,8 +516,8 @@ future<> row_cache::update(memtable& m, partition_presence_checker presence_chec }); if (blow_cache) { // We failed to invalidate the key, presumably due to with_linearized_managed_bytes() - // running out of memory. Recover using clear(), which doesn't throw. - clear(); + // running out of memory. Recover using clear_now(), which doesn't throw. + clear_now(); } }); }); @@ -590,7 +591,8 @@ void row_cache::invalidate_locked(const dht::decorated_key& dk) { }); } -void row_cache::invalidate(const dht::decorated_key& dk) { +future<> row_cache::invalidate(const dht::decorated_key& dk) { +return _populate_phaser.advance_and_await().then([this, &dk] { _read_section(_tracker.region(), [&] { with_allocator(_tracker.allocator(), [this, &dk] { with_linearized_managed_bytes([&] { @@ -598,17 +600,24 @@ void row_cache::invalidate(const dht::decorated_key& dk) { }); }); }); +}); } -void row_cache::invalidate(const query::partition_range& range) { - with_linearized_managed_bytes([&] { - if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) { - auto unwrapped = range.unwrap(); - invalidate(unwrapped.first); - invalidate(unwrapped.second); - return; - } +future<> row_cache::invalidate(const query::partition_range& range) { + return _populate_phaser.advance_and_await().then([this, &range] { + with_linearized_managed_bytes([&] { + if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) { + auto unwrapped = range.unwrap(); + invalidate_unwrapped(unwrapped.first); + invalidate_unwrapped(unwrapped.second); + } else { + invalidate_unwrapped(range); + } + }); + }); +} +void row_cache::invalidate_unwrapped(const query::partition_range& range) { logalloc::reclaim_lock _(_tracker.region()); auto cmp = cache_entry::compare(_schema); @@ -634,7 +643,6 @@ void row_cache::invalidate(const query::partition_range& range) { deleter(p); }); }); - }); } row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys, diff --git a/row_cache.hh b/row_cache.hh index 8a7e03c45f..23659201c9 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -186,13 +186,13 @@ private: mutation_source _underlying; key_source _underlying_keys; - // Synchronizes populating reads with update() to ensure that cache + // Synchronizes populating reads with updates of underlying data source to ensure that cache // remains consistent across flushes with the underlying data source. // Readers obtained from the underlying data source in earlier than // current phases must not be used to populate the cache, unless they hold // phaser::operation created in the reader's phase of origin. Readers // should hold to a phase only briefly because this inhibits progress of - // update(). Phase changes occur only in update(), which can be assumed to + // updates. Phase changes occur in update()/clear(), which can be assumed to // be asynchronous wrt invoking of the underlying data source. utils::phased_barrier _populate_phaser; @@ -206,6 +206,8 @@ private: void on_miss(); void upgrade_entry(cache_entry&); void invalidate_locked(const dht::decorated_key&); + void invalidate_unwrapped(const query::partition_range&); + void clear_now() noexcept; static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group; public: ~row_cache(); @@ -230,7 +232,9 @@ public: void populate(const mutation& m); // Clears the cache. - void clear(); + // Guarantees that cache will not be populated using readers created + // before this method was invoked. + future<> clear(); // Synchronizes cache with the underlying data source from a memtable which // has just been flushed to the underlying data source. @@ -242,11 +246,21 @@ public: void touch(const dht::decorated_key&); // Removes given partition from cache. - void invalidate(const dht::decorated_key&); + // + // Guarantees that cache will not be populated with given key + // using readers created before this method was invoked. + // + // The key must be kept alive until method resolves. + future<> invalidate(const dht::decorated_key& key); // Removes given range of partitions from cache. // The range can be a wrap around. - void invalidate(const query::partition_range&); + // + // Guarantees that cache will not be populated with partitions from that range + // using readers created before this method was invoked. + // + // The range must be kept alive until method resolves. + future<> invalidate(const query::partition_range&); auto num_entries() const { return _partitions.size(); diff --git a/tests/row_cache_test.cc b/tests/row_cache_test.cc index a4f8537df1..42c2aad084 100644 --- a/tests/row_cache_test.cc +++ b/tests/row_cache_test.cc @@ -546,27 +546,33 @@ static std::vector updated_ring(std::vector& mutations) { return result; } +static mutation_source make_mutation_source(std::vector>& memtables) { + return mutation_source([&memtables] (schema_ptr s, const query::partition_range& pr) { + std::vector readers; + for (auto&& mt : memtables) { + readers.emplace_back(mt->make_reader(s, pr)); + } + return make_combined_reader(std::move(readers)); + }); +} + +static key_source make_key_source(schema_ptr s, std::vector>& memtables) { + return key_source([s, &memtables] (const query::partition_range& pr) { + std::vector readers; + for (auto&& mt : memtables) { + readers.emplace_back(mt->as_key_source()(pr)); + } + return make_combined_reader(s, std::move(readers)); + }); +} + SEASTAR_TEST_CASE(test_cache_population_and_update_race) { return seastar::async([] { auto s = make_schema(); std::vector> memtables; - auto memtables_data_source = mutation_source([&] (schema_ptr s, const query::partition_range& pr) { - std::vector readers; - for (auto&& mt : memtables) { - readers.emplace_back(mt->make_reader(s, pr)); - } - return make_combined_reader(std::move(readers)); - }); - auto memtables_key_source = key_source([&] (const query::partition_range& pr) { - std::vector readers; - for (auto&& mt : memtables) { - readers.emplace_back(mt->as_key_source()(pr)); - } - return make_combined_reader(s, std::move(readers)); - }); - throttled_mutation_source cache_source(memtables_data_source); + throttled_mutation_source cache_source(make_mutation_source(memtables)); cache_tracker tracker; - row_cache cache(s, cache_source, memtables_key_source, tracker); + row_cache cache(s, cache_source, make_key_source(s, memtables), tracker); auto mt1 = make_lw_shared(s); memtables.push_back(mt1); @@ -656,7 +662,7 @@ SEASTAR_TEST_CASE(test_invalidate) { auto some_element = keys_in_cache.begin() + 547; std::vector keys_not_in_cache; keys_not_in_cache.push_back(*some_element); - cache.invalidate(*some_element); + cache.invalidate(*some_element).get(); keys_in_cache.erase(some_element); for (auto&& key : keys_in_cache) { @@ -676,7 +682,7 @@ SEASTAR_TEST_CASE(test_invalidate) { { *some_range_begin, true }, { *some_range_end, false } ); keys_not_in_cache.insert(keys_not_in_cache.end(), some_range_begin, some_range_end); - cache.invalidate(range); + cache.invalidate(range).get(); keys_in_cache.erase(some_range_begin, some_range_end); for (auto&& key : keys_in_cache) { @@ -688,6 +694,72 @@ SEASTAR_TEST_CASE(test_invalidate) { }); } +SEASTAR_TEST_CASE(test_cache_population_and_clear_race) { + return seastar::async([] { + auto s = make_schema(); + std::vector> memtables; + throttled_mutation_source cache_source(make_mutation_source(memtables)); + cache_tracker tracker; + row_cache cache(s, cache_source, make_key_source(s, memtables), tracker); + + auto mt1 = make_lw_shared(s); + memtables.push_back(mt1); + auto ring = make_ring(s, 3); + for (auto&& m : ring) { + mt1->apply(m); + } + + auto mt2 = make_lw_shared(s); + auto ring2 = updated_ring(ring); + for (auto&& m : ring2) { + mt2->apply(m); + } + + cache_source.block(); + + auto rd1 = cache.make_reader(s); + auto rd1_result = rd1(); + + sleep(10ms).get(); + + memtables.clear(); + memtables.push_back(mt2); + + // This update should miss on all partitions + auto cache_cleared = cache.clear(); + + auto rd2 = cache.make_reader(s); + + // rd1, which is in progress, should not prevent forward progress of clear() + cache_source.unblock(); + cache_cleared.get(); + + // Reads started before memtable flush should return previous value, otherwise this test + // doesn't trigger the conditions it is supposed to protect against. + + assert_that(rd1_result.get0()).has_mutation().is_equal_to(ring[0]); + assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[1]); + assert_that(rd1().get0()).has_mutation().is_equal_to(ring2[2]); + assert_that(rd1().get0()).has_no_mutation(); + + // Reads started after clear but before previous populations completed + // should already see the new data + assert_that(std::move(rd2)) + .produces(ring2[0]) + .produces(ring2[1]) + .produces(ring2[2]) + .produces_end_of_stream(); + + // Reads started after clear should see new data + assert_that(cache.make_reader(s)) + .produces(ring2[0]) + .produces(ring2[1]) + .produces(ring2[2]) + .produces_end_of_stream(); + }); +} + + SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) { return seastar::async([] { auto s = make_schema(); @@ -707,7 +779,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) { } // wrap-around - cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})); + cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})).get(); verify_does_not_have(cache, ring[0].decorated_key()); verify_does_not_have(cache, ring[1].decorated_key()); @@ -719,7 +791,7 @@ SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) { verify_does_not_have(cache, ring[7].decorated_key()); // not wrap-around - cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})); + cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})).get(); verify_does_not_have(cache, ring[0].decorated_key()); verify_does_not_have(cache, ring[1].decorated_key());