merge: add error injection to mv
Merged pull request https://github.com/scylladb/scylla/pull/6516 from Piotr Sarna: This series adds error injection points to materialized view paths: view update generation from staging sstables; view building; generating view updates from user writes. This series comes with a corresponding dtest pull request which adds some test cases based on error injection. Fixes #6488
This commit is contained in:
@@ -72,11 +72,17 @@
|
||||
#include "view_update_checks.hh"
|
||||
#include "types/user.hh"
|
||||
#include "types/list.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
static logging::logger vlogger("view");
|
||||
|
||||
static inline void inject_failure(std::string_view operation) {
|
||||
utils::get_local_injector().inject(operation,
|
||||
[operation] { throw std::runtime_error(std::string(operation)); });
|
||||
}
|
||||
|
||||
view_info::view_info(const schema& schema, const raw_view_info& raw_view_info)
|
||||
: _schema(schema)
|
||||
, _raw(raw_view_info)
|
||||
@@ -1667,6 +1673,7 @@ public:
|
||||
}
|
||||
|
||||
void load_views_to_build() {
|
||||
inject_failure("view_builder_load_views");
|
||||
for (auto&& vs : _step.build_status) {
|
||||
if (_step.current_token() >= vs.next_token) {
|
||||
if (partition_key_matches(*_step.reader.schema(), *vs.view->view_info(), _step.current_key, _now)) {
|
||||
@@ -1682,6 +1689,7 @@ public:
|
||||
}
|
||||
|
||||
void check_for_built_views() {
|
||||
inject_failure("view_builder_check_for_built_views");
|
||||
for (auto it = _step.build_status.begin(); it != _step.build_status.end();) {
|
||||
// A view starts being built at token t1. Due to resharding, that may not necessarily be a
|
||||
// shard-owned token. We finish building the view when the next_token to build is just before
|
||||
@@ -1698,6 +1706,7 @@ public:
|
||||
}
|
||||
|
||||
stop_iteration consume_new_partition(const dht::decorated_key& dk) {
|
||||
inject_failure("view_builder_consume_new_partition");
|
||||
_step.current_key = std::move(dk);
|
||||
check_for_built_views();
|
||||
_views_to_build.clear();
|
||||
@@ -1706,14 +1715,17 @@ public:
|
||||
}
|
||||
|
||||
stop_iteration consume(tombstone) {
|
||||
inject_failure("view_builder_consume_tombstone");
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration consume(static_row&&, tombstone, bool) {
|
||||
inject_failure("view_builder_consume_static_row");
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) {
|
||||
inject_failure("view_builder_consume_clustering_row");
|
||||
if (_views_to_build.empty() || _builder._as.abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
@@ -1731,10 +1743,12 @@ public:
|
||||
}
|
||||
|
||||
stop_iteration consume(range_tombstone&&) {
|
||||
inject_failure("view_builder_consume_range_tombstone");
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
void flush_fragments() {
|
||||
inject_failure("view_builder_flush_fragments");
|
||||
_builder._as.check();
|
||||
if (!_fragments.empty()) {
|
||||
_fragments.push_front(partition_start(_step.current_key, tombstone()));
|
||||
@@ -1749,11 +1763,13 @@ public:
|
||||
}
|
||||
|
||||
stop_iteration consume_end_of_partition() {
|
||||
inject_failure("view_builder_consume_end_of_partition");
|
||||
flush_fragments();
|
||||
return stop_iteration(_step.build_status.empty());
|
||||
}
|
||||
|
||||
built_views consume_end_of_stream() {
|
||||
inject_failure("view_builder_consume_end_of_stream");
|
||||
if (vlogger.is_enabled(log_level::debug)) {
|
||||
auto view_names = boost::copy_range<std::vector<sstring>>(
|
||||
_views_to_build | boost::adaptors::transformed([](auto v) {
|
||||
@@ -1819,6 +1835,7 @@ future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_t
|
||||
return result && shard_complete;
|
||||
}).then([this, view, next_token = std::move(next_token)] (bool built) {
|
||||
if (built) {
|
||||
inject_failure("view_builder_mark_view_as_built");
|
||||
return container().invoke_on_all([view_id = view->id()] (view_builder& builder) {
|
||||
if (builder._built_views.erase(view_id) == 0 || this_shard_id() != 0) {
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -22,9 +22,15 @@
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "view_update_generator.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
static logging::logger vug_logger("view_update_generator");
|
||||
|
||||
static inline void inject_failure(std::string_view operation) {
|
||||
utils::get_local_injector().inject(operation,
|
||||
[operation] { throw std::runtime_error(std::string(operation)); });
|
||||
}
|
||||
|
||||
namespace db::view {
|
||||
|
||||
future<> view_update_generator::start() {
|
||||
@@ -63,6 +69,7 @@ future<> view_update_generator::start() {
|
||||
::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::no);
|
||||
|
||||
inject_failure("view_update_generator_consume_staging_sstable");
|
||||
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as), db::no_timeout);
|
||||
if (result == stop_iteration::yes) {
|
||||
break;
|
||||
@@ -75,6 +82,7 @@ future<> view_update_generator::start() {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
inject_failure("view_update_generator_collect_consumed_sstables");
|
||||
// collect all staging sstables to move in a map, grouped by table.
|
||||
std::move(sstables.begin(), sstables.end(), std::back_inserter(_sstables_to_move[t]));
|
||||
} catch (...) {
|
||||
@@ -87,6 +95,7 @@ future<> view_update_generator::start() {
|
||||
for (auto it = _sstables_to_move.begin(); it != _sstables_to_move.end(); ) {
|
||||
auto& [t, sstables] = *it;
|
||||
try {
|
||||
inject_failure("view_update_generator_move_staging_sstable");
|
||||
t->move_sstables_from_staging(sstables).get();
|
||||
} catch (...) {
|
||||
// Move from staging will be retried upon restart.
|
||||
@@ -115,6 +124,7 @@ future<> view_update_generator::register_staging_sstable(sstables::shared_sstabl
|
||||
if (_as.abort_requested()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
inject_failure("view_update_generator_registering_staging_sstable");
|
||||
_sstables_with_tables[table].push_back(std::move(sst));
|
||||
|
||||
_pending_sstables.signal();
|
||||
|
||||
9
table.cc
9
table.cc
@@ -44,6 +44,7 @@
|
||||
#include <boost/algorithm/cxx11/any_of.hpp>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
static logging::logger tlogger("table");
|
||||
static seastar::metrics::label column_family_label("cf");
|
||||
@@ -2546,6 +2547,9 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
auto& base = schema();
|
||||
m.upgrade(base);
|
||||
gc_clock::time_point now = gc_clock::now();
|
||||
utils::get_local_injector().inject("table_push_view_replica_updates_stale_time_point", [&now] {
|
||||
now -= 10s;
|
||||
});
|
||||
auto views = affected_views(base, m, now);
|
||||
if (views.empty()) {
|
||||
return make_ready_future<row_locker::lock_holder>();
|
||||
@@ -2576,8 +2580,9 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
// We'll return this lock to the caller, which will release it after
|
||||
// writing the base-table update.
|
||||
future<row_locker::lock_holder> lockf = local_base_lock(base, m.decorated_key(), slice.default_row_ranges(), timeout);
|
||||
return lockf.then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, now, source = std::move(source), tr_state = std::move(tr_state), &sem, &io_priority] (row_locker::lock_holder lock) mutable {
|
||||
tracing::trace(tr_state, "View updates for {}.{} require read-before-write - base table reader is created", base->ks_name(), base->cf_name());
|
||||
return utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout).then([lockf = std::move(lockf), timeout] () mutable {
|
||||
return std::move(lockf);
|
||||
}).then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, now, source = std::move(source), &sem, tr_state = std::move(tr_state), &io_priority] (row_locker::lock_holder lock) mutable {
|
||||
return do_with(
|
||||
dht::partition_range::make_singular(m.decorated_key()),
|
||||
std::move(slice),
|
||||
|
||||
Reference in New Issue
Block a user