mv, test: add injection point to delay remove view update
It's difficult to write a test (as we plan to do in to in the next patch) that verifies that synchronous view updates are indeed synchronous, i.e., that write with CL=QUORUM on the base-table write returns only after CL=QUORUM was also achieved in the view table. The difficulty is that in a fast test machine, even if the synchronous-view-update is completely buggy, it's likely that by the time the test reads from the view, all view updates will have been completed anyway. So in this patch we introduce an injection point, for testing, named "delay_before_remote_view_update", which adds a delay before the base replica sends its update to the remote view replica (in case the view replica is indeed remote). As usual, this injection point isn't configurable - when enabled it adds a fixed (0.5 second) delay, on all view updates on all tables. The existing code used continuation-style Seastar programming, and the addition of the injection point in this patch made it even uglier, so in the next patch we will coroutine-ize this code. Signed-off-by: Nadav Har'El <nyh@scylladb.com>
This commit is contained in:
@@ -1746,26 +1746,33 @@ future<> view_update_generator::mutate_MV(
|
||||
stats.view_updates_pushed_remote += updates_pushed_remote;
|
||||
cf_stats.total_view_updates_pushed_remote += updates_pushed_remote;
|
||||
schema_ptr s = mut.s;
|
||||
future<> view_update = apply_to_remote_endpoints(_proxy.local(), std::move(view_ermp), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped(
|
||||
[s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote,
|
||||
units = sem_units.split(sem_units.count()), apply_update_synchronously] (future<>&& f) mutable {
|
||||
if (f.failed()) {
|
||||
stats.view_updates_failed_remote += updates_pushed_remote;
|
||||
cf_stats.total_view_updates_failed_remote += updates_pushed_remote;
|
||||
auto ep = f.get_exception();
|
||||
tracing::trace(tr_state, "Failed to apply view update for {} and {} remote endpoints",
|
||||
// The "delay_before_remote_view_update" injection point can be
|
||||
// used to add a short delay (currently 0.5 seconds) before a base
|
||||
// replica sends its update to the remove view replica.
|
||||
future<> view_update = utils::get_local_injector().inject("delay_before_remote_view_update", 500ms).then(
|
||||
[this, s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint,
|
||||
updates_pushed_remote, units = sem_units.split(sem_units.count()), apply_update_synchronously,
|
||||
view_ermp = std::move(view_ermp), remote_endpoints = std::move(remote_endpoints), mut = std::move(mut), allow_hints] () mutable {
|
||||
return apply_to_remote_endpoints(_proxy.local(), std::move(view_ermp), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped(
|
||||
[s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote, units = std::move(units), apply_update_synchronously] (future<>&& f) mutable {
|
||||
if (f.failed()) {
|
||||
stats.view_updates_failed_remote += updates_pushed_remote;
|
||||
cf_stats.total_view_updates_failed_remote += updates_pushed_remote;
|
||||
auto ep = f.get_exception();
|
||||
tracing::trace(tr_state, "Failed to apply view update for {} and {} remote endpoints",
|
||||
*target_endpoint, updates_pushed_remote);
|
||||
|
||||
// Printing an error on every failed view mutation would cause log spam, so a rate limit is needed.
|
||||
static thread_local logger::rate_limit view_update_error_rate_limit(std::chrono::seconds(4));
|
||||
vlogger.log(log_level::warn, view_update_error_rate_limit,
|
||||
// Printing an error on every failed view mutation would cause log spam, so a rate limit is needed.
|
||||
static thread_local logger::rate_limit view_update_error_rate_limit(std::chrono::seconds(4));
|
||||
vlogger.log(log_level::warn, view_update_error_rate_limit,
|
||||
"Error applying view update to {} (view: {}.{}, base token: {}, view token: {}): {}",
|
||||
*target_endpoint, s->ks_name(), s->cf_name(), base_token, view_token, ep);
|
||||
return apply_update_synchronously ? make_exception_future<>(std::move(ep)) : make_ready_future<>();
|
||||
}
|
||||
tracing::trace(tr_state, "Successfully applied view update for {} and {} remote endpoints",
|
||||
return apply_update_synchronously ? make_exception_future<>(std::move(ep)) : make_ready_future<>();
|
||||
}
|
||||
tracing::trace(tr_state, "Successfully applied view update for {} and {} remote endpoints",
|
||||
*target_endpoint, updates_pushed_remote);
|
||||
return make_ready_future<>();
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
if (apply_update_synchronously) {
|
||||
remote_view_update = std::move(view_update);
|
||||
|
||||
Reference in New Issue
Block a user