storage_proxy: add allow rate limit flag to mutate/mutate_result

Now, mutate/mutate_result accept a flag which decides whether the write
should be rate limited or not.

The new parameter is mandatory and all call sites were updated.
This commit is contained in:
Piotr Dulikowski
2022-04-28 13:58:49 +02:00
parent 1f65c4e001
commit e6beab3106
11 changed files with 38 additions and 31 deletions

View File

@@ -1543,7 +1543,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
if (!m) {
return make_ready_future<executor::request_return_type>(api_error::conditional_check_failed("Failed condition."));
}
return proxy.mutate(std::vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit)).then([this] () mutable {
return proxy.mutate(std::vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit), db::allow_per_partition_rate_limit::yes).then([this] () mutable {
return rmw_operation_return(std::move(_return_attributes));
});
});
@@ -1551,7 +1551,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
} else if (_write_isolation != write_isolation::LWT_ALWAYS) {
std::optional<mutation> m = apply(nullptr, api::new_timestamp());
assert(m); // !needs_read_before_write, so apply() did not check a condition
return proxy.mutate(std::vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit)).then([this] () mutable {
return proxy.mutate(std::vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit), db::allow_per_partition_rate_limit::yes).then([this] () mutable {
return rmw_operation_return(std::move(_return_attributes));
});
}
@@ -1896,7 +1896,8 @@ static future<> do_batch_write(service::storage_proxy& proxy,
db::consistency_level::LOCAL_QUORUM,
executor::default_timeout(),
trace_state,
std::move(permit));
std::move(permit),
db::allow_per_partition_rate_limit::yes);
} else {
// Do the write via LWT:
// Multiple mutations may be destined for the same partition, adding

View File

@@ -284,7 +284,8 @@ static future<> expire_item(service::storage_proxy& proxy,
return proxy.mutate(std::vector<mutation>{std::move(m)},
db::consistency_level::LOCAL_QUORUM,
executor::default_timeout(), // FIXME - which timeout?
qs.get_trace_state(), qs.get_permit());
qs.get_trace_state(), qs.get_permit(),
db::allow_per_partition_rate_limit::no);
}
static size_t random_offset(size_t min, size_t max) {

View File

@@ -317,7 +317,7 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
mutate_atomic = false;
}
}
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, mutate_atomic, std::move(tr_state), std::move(permit));
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, mutate_atomic, std::move(tr_state), std::move(permit), db::allow_per_partition_rate_limit::yes);
}
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute_with_conditions(

View File

@@ -284,7 +284,7 @@ modification_statement::execute_without_condition(query_processor& qp, service::
return make_ready_future<coordinator_result<>>(bo::success());
}
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, false, qs.get_trace_state(), qs.get_permit(), this->is_raw_counter_shard_write());
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, false, qs.get_trace_state(), qs.get_permit(), db::allow_per_partition_rate_limit::yes, this->is_raw_counter_shard_write());
});
}

View File

@@ -243,7 +243,7 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
// send to partially or wholly fail in actually sending stuff. Since we don't
// have hints (yet), send with CL=ALL, and hope we can re-do this soon.
// See below, we use retry on write failure.
return _qp.proxy().mutate(mutations, db::consistency_level::ALL, db::no_timeout, nullptr, empty_service_permit());
return _qp.proxy().mutate(mutations, db::consistency_level::ALL, db::no_timeout, nullptr, empty_service_permit(), db::allow_per_partition_rate_limit::no);
});
}).then_wrapped([this, id](future<> batch_result) {
try {

View File

@@ -565,6 +565,7 @@ system_distributed_keyspace::insert_cdc_generation(
db::timeout_clock::now() + 60s,
nullptr, // trace_state
empty_service_permit(),
db::allow_per_partition_rate_limit::no,
false // raw_counters
);
});
@@ -661,6 +662,7 @@ system_distributed_keyspace::create_cdc_desc(
db::timeout_clock::now() + 30s,
nullptr, // trace_state
empty_service_permit(),
db::allow_per_partition_rate_limit::no,
false // raw_counters
);
});
@@ -704,6 +706,7 @@ system_distributed_keyspace::cdc_desc_exists(
db::timeout_clock::now() + 10s,
nullptr, // trace_state
empty_service_permit(),
db::allow_per_partition_rate_limit::no,
false // raw_counters
);

View File

@@ -2248,7 +2248,7 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
auto& row = m.partition().clustered_row(*_view, ck);
row.apply(tombstone(api::new_timestamp(), gc_clock::now()));
timeout = db::timeout_clock::now() + _timeout_duration;
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit()).get();
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no).get();
}
}

View File

@@ -50,7 +50,7 @@ future<> write_hashes(service::storage_proxy& proxy, redis::redis_options& optio
m.set_clustered_cell(ckey, column, std::move(cell));
auto write_consistency_level = options.get_write_consistency_level();
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit);
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit, db::allow_per_partition_rate_limit::yes);
}
@@ -68,7 +68,7 @@ future<> write_strings(service::storage_proxy& proxy, redis::redis_options& opti
db::timeout_clock::time_point timeout = db::timeout_clock::now() + options.get_write_timeout();
auto m = make_mutation(proxy, options, std::move(key), std::move(data), ttl);
auto write_consistency_level = options.get_write_consistency_level();
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit);
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit, db::allow_per_partition_rate_limit::yes);
}
@@ -87,7 +87,7 @@ future<> delete_objects(service::storage_proxy& proxy, redis::redis_options& opt
auto remove = [&proxy, timeout, write_consistency_level, permit, &options, keys = std::move(keys)] (const sstring& cf_name) {
return parallel_for_each(keys.begin(), keys.end(), [&proxy, timeout, write_consistency_level, &options, permit, cf_name] (const bytes& key) {
auto m = make_tombstone(proxy, options, cf_name, key);
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit);
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit, db::allow_per_partition_rate_limit::yes);
});
};
return parallel_for_each(tables.begin(), tables.end(), remove);
@@ -107,7 +107,7 @@ future<> delete_fields(service::storage_proxy& proxy, redis::redis_options& opti
m.partition().apply_delete(*schema, ckey, tombstone { ts, clk });
mutations.push_back(m);
}
return proxy.mutate(mutations, write_consistency_level, timeout, nullptr, permit);
return proxy.mutate(mutations, write_consistency_level, timeout, nullptr, permit, db::allow_per_partition_rate_limit::yes);
}
}

View File

@@ -2414,29 +2414,29 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
* @param consistency_level the consistency level for the operation
* @param tr_state trace state handle
*/
future<> storage_proxy::mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters) {
return mutate_result(std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters)
future<> storage_proxy::mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, bool raw_counters) {
return mutate_result(std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), allow_limit, raw_counters)
.then(utils::result_into_future<result<>>);
}
future<result<>> storage_proxy::mutate_result(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters) {
future<result<>> storage_proxy::mutate_result(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, bool raw_counters) {
if (_cdc && _cdc->needs_cdc_augmentation(mutations)) {
return _cdc->augment_mutation_call(timeout, std::move(mutations), tr_state, cl).then([this, cl, timeout, tr_state, permit = std::move(permit), raw_counters, cdc = _cdc->shared_from_this()](std::tuple<std::vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>&& t) mutable {
return _cdc->augment_mutation_call(timeout, std::move(mutations), tr_state, cl).then([this, cl, timeout, tr_state, permit = std::move(permit), raw_counters, cdc = _cdc->shared_from_this(), allow_limit](std::tuple<std::vector<mutation>, lw_shared_ptr<cdc::operation_result_tracker>>&& t) mutable {
auto mutations = std::move(std::get<0>(t));
auto tracker = std::move(std::get<1>(t));
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, std::move(tracker));
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, allow_limit, std::move(tracker));
});
}
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, nullptr);
return _mutate_stage(this, std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters, allow_limit, nullptr);
}
future<result<>> storage_proxy::do_mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker) {
future<result<>> storage_proxy::do_mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters, db::allow_per_partition_rate_limit allow_limit, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker) {
auto mid = raw_counters ? mutations.begin() : boost::range::partition(mutations, [] (auto&& m) {
return m.schema()->is_counter();
});
return seastar::when_all_succeed(
mutate_counters(boost::make_iterator_range(mutations.begin(), mid), cl, tr_state, permit, timeout),
mutate_internal(boost::make_iterator_range(mid, mutations.end()), cl, false, tr_state, permit, timeout, std::move(cdc_tracker))
mutate_internal(boost::make_iterator_range(mid, mutations.end()), cl, false, tr_state, permit, timeout, std::move(cdc_tracker), allow_limit)
).then([] (std::tuple<result<>> res) {
// For now, only mutate_internal returns a result<>
return std::get<0>(std::move(res));
@@ -2488,13 +2488,13 @@ storage_proxy::mutate_internal(Range mutations, db::consistency_level cl, bool c
future<result<>>
storage_proxy::mutate_with_triggers(std::vector<mutation> mutations, db::consistency_level cl,
clock_type::time_point timeout,
bool should_mutate_atomically, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters) {
bool should_mutate_atomically, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, bool raw_counters) {
warn(unimplemented::cause::TRIGGERS);
if (should_mutate_atomically) {
assert(!raw_counters);
return mutate_atomically_result(std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit));
}
return mutate_result(std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), raw_counters);
return mutate_result(std::move(mutations), cl, timeout, std::move(tr_state), std::move(permit), allow_limit, raw_counters);
}
/**

View File

@@ -271,6 +271,7 @@ private:
tracing::trace_state_ptr,
service_permit,
bool,
db::allow_per_partition_rate_limit,
lw_shared_ptr<cdc::operation_result_tracker>> _mutate_stage;
netw::connection_drop_slot_t _connection_dropped;
netw::connection_drop_registration_t _condrop_registration;
@@ -404,7 +405,7 @@ private:
gms::inet_address find_leader_for_counter_update(const mutation& m, db::consistency_level cl);
future<result<>> do_mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker);
future<result<>> do_mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool, db::allow_per_partition_rate_limit allow_limit, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker);
future<> send_to_endpoint(
std::unique_ptr<mutation_holder> m,
@@ -537,14 +538,14 @@ public:
* @param consistency_level the consistency level for the operation
* @param tr_state trace state handle
*/
future<> mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters = false);
future<> mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, bool raw_counters = false);
/**
* See mutate. Does the same, but returns some exceptions
* through the result<>, which allows for efficient inspection
* of the exception on the exception handling path.
*/
future<result<>> mutate_result(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters = false);
future<result<>> mutate_result(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, bool raw_counters = false);
paxos_participants
get_paxos_participants(const sstring& ks_name, const dht::token& token, db::consistency_level consistency_for_paxos);
@@ -553,7 +554,8 @@ public:
clock_type::time_point timeout, service_permit permit);
future<result<>> mutate_with_triggers(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout,
bool should_mutate_atomically, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters = false);
bool should_mutate_atomically, tracing::trace_state_ptr tr_state, service_permit permit,
db::allow_per_partition_rate_limit allow_limit, bool raw_counters = false);
/**
* See mutate. Adds additional steps before and after writing a batch.

View File

@@ -511,7 +511,7 @@ public:
add_to_mutation(*schema, column, m_to_apply);
return _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit), db::allow_per_partition_rate_limit::yes);
});
});
}
@@ -527,7 +527,7 @@ public:
add_to_mutation(*schema, column, m_to_apply);
return _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit), db::allow_per_partition_rate_limit::yes);
});
});
}
@@ -564,7 +564,7 @@ public:
return _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit), db::allow_per_partition_rate_limit::yes);
});
});
}
@@ -591,7 +591,7 @@ public:
return _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
// This mutation contains only counter tombstones so it can be applied like non-counter mutations.
auto timeout = db::timeout_clock::now() + _timeout_config.counter_write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit), db::allow_per_partition_rate_limit::yes);
});
});
}
@@ -604,7 +604,7 @@ public:
return _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::MODIFY);
}).then([this, muts = std::move(p.first), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
return _proxy.local().mutate(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit), db::allow_per_partition_rate_limit::yes);
});
});
}