service: strong_consistency: Use timeout when mutating
We remove the inconsistency between reads and writes to strongly consistent tables. Before the commit, only reads used a timeout. Now, writes do as well. Although the parameter isn't used yet, that will change in the following commit. This is a prerequisite for it.
This commit is contained in:
@@ -52,6 +52,7 @@ future<shared_ptr<result_message>> modification_statement::execute_without_check
|
||||
}
|
||||
|
||||
auto [coordinator, holder] = qp.acquire_strongly_consistent_coordinator();
|
||||
|
||||
const auto mutate_result = co_await coordinator.get().mutate(_statement->s,
|
||||
keys[0].start()->value().token(),
|
||||
[&](api::timestamp_type ts) {
|
||||
@@ -65,7 +66,7 @@ future<shared_ptr<result_message>> modification_statement::execute_without_check
|
||||
raw_cql_statement, muts.size()));
|
||||
}
|
||||
return std::move(*muts.begin());
|
||||
});
|
||||
}, timeout);
|
||||
|
||||
using namespace service::strong_consistency;
|
||||
if (const auto* redirect = get_if<need_redirect>(&mutate_result)) {
|
||||
|
||||
@@ -116,7 +116,8 @@ coordinator::coordinator(groups_manager& groups_manager, replica::database& db,
|
||||
|
||||
future<value_or_redirect<>> coordinator::mutate(schema_ptr schema,
|
||||
const dht::token& token,
|
||||
mutation_gen&& mutation_gen)
|
||||
mutation_gen&& mutation_gen,
|
||||
timeout_clock::time_point timeout)
|
||||
{
|
||||
try {
|
||||
auto op_result = co_await create_operation_ctx(*schema, token);
|
||||
@@ -194,7 +195,7 @@ auto coordinator::query(schema_ptr schema,
|
||||
const query::read_command& cmd,
|
||||
const dht::partition_range_vector& ranges,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout
|
||||
timeout_clock::time_point timeout
|
||||
) -> future<query_result_type>
|
||||
{
|
||||
try {
|
||||
|
||||
@@ -28,6 +28,10 @@ template <typename T = std::monostate>
|
||||
using value_or_redirect = std::variant<T, need_redirect>;
|
||||
|
||||
class coordinator : public peering_sharded_service<coordinator> {
|
||||
public:
|
||||
using timeout_clock = typename db::timeout_clock;
|
||||
|
||||
private:
|
||||
groups_manager& _groups_manager;
|
||||
replica::database& _db;
|
||||
gms::gossiper& _gossiper;
|
||||
@@ -40,14 +44,15 @@ public:
|
||||
using mutation_gen = noncopyable_function<mutation(api::timestamp_type)>;
|
||||
future<value_or_redirect<>> mutate(schema_ptr schema,
|
||||
const dht::token& token,
|
||||
mutation_gen&& mutation_gen);
|
||||
mutation_gen&& mutation_gen,
|
||||
timeout_clock::time_point timeout);
|
||||
|
||||
using query_result_type = value_or_redirect<lw_shared_ptr<query::result>>;
|
||||
future<query_result_type> query(schema_ptr schema,
|
||||
const query::read_command& cmd,
|
||||
const dht::partition_range_vector& ranges,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
db::timeout_clock::time_point timeout);
|
||||
timeout_clock::time_point timeout);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user