storage_proxy: Propagate virtual table exceptions messages

The intention is to return some meaningful info to the CQL caller
if a virtual table update fails. Unfortunately the "generic" error
reporting in CQL is not extremely flexible, so the best option
seems to report regular write failre with custom message in it.

For now this only works for virtual table errors.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2021-11-04 14:38:21 +03:00
parent 5aefc48e28
commit 1ea301ad07
4 changed files with 35 additions and 9 deletions

View File

@@ -134,6 +134,11 @@ public:
{ }
virtual const char* what() const noexcept override { return _cause.c_str(); }
// This method is to avoid potential exceptions while copying the string
// and thus to be used when the exception is handled and is about to
// be thrown away
sstring grab_cause() noexcept { return std::move(_cause); }
};
}

View File

@@ -211,6 +211,11 @@ struct mutation_write_failure_exception : public request_failure_exception {
request_failure_exception(exception_code::WRITE_FAILURE, ks, cf, consistency_, received_, failures_, block_for_)
, type{std::move(type_)}
{ }
mutation_write_failure_exception(const sstring& msg, db::consistency_level consistency_, int32_t received_, int32_t failures_, int32_t block_for_, db::write_type type_) noexcept :
request_failure_exception(exception_code::WRITE_FAILURE, msg, consistency_, received_, failures_, block_for_)
, type{std::move(type_)}
{ }
};
struct read_failure_exception : public request_failure_exception {

View File

@@ -106,6 +106,7 @@
#include "mutation_partition_view.hh"
#include "service/paxos/paxos_state.hh"
#include "gms/feature_service.hh"
#include "db/virtual_table.hh"
namespace bi = boost::intrusive;
@@ -381,6 +382,7 @@ protected:
bool _cl_achieved = false;
bool _throttled = false;
error _error = error::NONE;
std::optional<sstring> _message;
size_t _failed = 0; // only failures that may impact consistency
size_t _all_failures = 0; // total amount of failures
size_t _total_endpoints = 0;
@@ -424,7 +426,11 @@ public:
if (_error == error::TIMEOUT) {
_ready.set_exception(mutation_write_timeout_exception(get_schema()->ks_name(), get_schema()->cf_name(), _cl, _cl_acks, _total_block_for, _type));
} else if (_error == error::FAILURE) {
_ready.set_exception(mutation_write_failure_exception(get_schema()->ks_name(), get_schema()->cf_name(), _cl, _cl_acks, _failed, _total_block_for, _type));
if (!_message) {
_ready.set_exception(mutation_write_failure_exception(get_schema()->ks_name(), get_schema()->cf_name(), _cl, _cl_acks, _failed, _total_block_for, _type));
} else {
_ready.set_exception(mutation_write_failure_exception(*_message, _cl, _cl_acks, _failed, _total_block_for, _type));
}
}
if (_cdc_operation_result_tracker) {
_cdc_operation_result_tracker->on_mutation_failed();
@@ -461,17 +467,24 @@ public:
});
}
}
virtual bool failure(gms::inet_address from, size_t count, error err) {
bool failure(gms::inet_address from, size_t count, error err, std::optional<sstring> msg) {
if (waited_for(from)) {
_failed += count;
if (_total_block_for + _failed > _total_endpoints) {
_error = err;
_message = std::move(msg);
delay(get_trace_state(), [] (abstract_write_response_handler*) { });
return true;
}
}
return false;
}
virtual bool failure(gms::inet_address from, size_t count, error err) {
return failure(std::move(from), count, std::move(err), {});
}
void on_timeout() {
if (_cl_achieved) {
slogger.trace("Write is not acknowledged by {} replicas after achieving CL", get_targets());
@@ -494,7 +507,7 @@ public:
}
// return true if handler is no longer needed because
// CL cannot be reached
bool failure_response(gms::inet_address from, size_t count, error err) {
bool failure_response(gms::inet_address from, size_t count, error err, std::optional<sstring> msg) {
if (boost::find(_targets, from) == _targets.end()) {
// There is a little change we can get outdated reply
// if the coordinator was restarted after sending a request and
@@ -506,7 +519,7 @@ public:
_all_failures += count;
// we should not fail CL=ANY requests since they may succeed after
// writing hints
return _cl != db::consistency_level::ANY && failure(from, count, err);
return _cl != db::consistency_level::ANY && failure(from, count, err, std::move(msg));
}
void check_for_early_completion() {
if (_all_failures == _targets.size()) {
@@ -1386,11 +1399,11 @@ void storage_proxy::got_response(storage_proxy::response_id_type id, gms::inet_a
maybe_update_view_backlog_of(std::move(from), std::move(backlog));
}
void storage_proxy::got_failure_response(storage_proxy::response_id_type id, gms::inet_address from, size_t count, std::optional<db::view::update_backlog> backlog, error err) {
void storage_proxy::got_failure_response(storage_proxy::response_id_type id, gms::inet_address from, size_t count, std::optional<db::view::update_backlog> backlog, error err, std::optional<sstring> msg) {
auto it = _response_handlers.find(id);
if (it != _response_handlers.end()) {
tracing::trace(it->second->get_trace_state(), "Got {} failures from /{}", count, from);
if (it->second->failure_response(from, count, err)) {
if (it->second->failure_response(from, count, err, std::move(msg))) {
remove_response_handler_entry(std::move(it));
} else {
it->second->check_for_early_completion();
@@ -2739,6 +2752,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
(void)f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) {
++stats.writes_errors.get_ep_stat(coordinator);
error err = error::FAILURE;
std::optional<sstring> msg;
try {
std::rethrow_exception(eptr);
} catch(rpc::closed_error&) {
@@ -2750,10 +2764,12 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
// database total_writes_timedout counter was incremented.
// It needs to be recorded that the timeout occurred locally though.
err = error::TIMEOUT;
} catch(db::virtual_table_update_exception& e) {
msg = e.grab_cause();
} catch(...) {
slogger.error("exception during mutation write to {}: {}", coordinator, std::current_exception());
}
p->got_failure_response(response_id, coordinator, forward_size + 1, std::nullopt, err);
p->got_failure_response(response_id, coordinator, forward_size + 1, std::nullopt, err, std::move(msg));
});
}
}
@@ -5018,7 +5034,7 @@ void storage_proxy::init_messaging_service(shared_ptr<migration_manager> mm) {
auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
get_stats().replica_cross_shard_ops += shard != this_shard_id();
return container().invoke_on(shard, _write_ack_smp_service_group, [from, response_id, num_failed, backlog = std::move(backlog)] (storage_proxy& sp) mutable {
sp.got_failure_response(response_id, from, num_failed, std::move(backlog), error::FAILURE);
sp.got_failure_response(response_id, from, num_failed, std::move(backlog), error::FAILURE, std::nullopt);
return netw::messaging_service::no_wait();
});
});

View File

@@ -331,7 +331,7 @@ private:
void remove_response_handler(response_id_type id);
void remove_response_handler_entry(response_handlers_map::iterator entry);
void got_response(response_id_type id, gms::inet_address from, std::optional<db::view::update_backlog> backlog);
void got_failure_response(response_id_type id, gms::inet_address from, size_t count, std::optional<db::view::update_backlog> backlog, error err);
void got_failure_response(response_id_type id, gms::inet_address from, size_t count, std::optional<db::view::update_backlog> backlog, error err, std::optional<sstring> msg);
future<> response_wait(response_id_type id, clock_type::time_point timeout);
::shared_ptr<abstract_write_response_handler>& get_write_response_handler(storage_proxy::response_id_type id);
response_id_type create_write_response_handler_helper(schema_ptr s, const dht::token& token,