storage_proxy: mutate_atomically_result: keep schema of batchlog mutation in context

The batchlog mutation is for system.batchlog.
Rather than looking the schema up in multiple places
do that once and keep it in the context object.

It will be used in the next patch to get a respective
effective_replication_map_ptr.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-08-24 10:41:56 +03:00
parent 93be4c0cb0
commit 098dd5021a
2 changed files with 13 additions and 6 deletions

View File

@@ -3609,6 +3609,7 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
class context {
storage_proxy& _p;
schema_ptr _schema;
const locator::token_metadata_ptr _tmptr;
std::vector<mutation> _mutations;
lw_shared_ptr<cdc::operation_result_tracker> _cdc_tracker;
@@ -3624,6 +3625,7 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
public:
context(storage_proxy & p, std::vector<mutation>&& mutations, lw_shared_ptr<cdc::operation_result_tracker>&& cdc_tracker, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit)
: _p(p)
, _schema(_p.local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG))
, _tmptr(p.get_token_metadata_ptr())
, _mutations(std::move(mutations))
, _cdc_tracker(std::move(cdc_tracker))
@@ -3657,7 +3659,7 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
future<result<>> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) {
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) {
auto& table = _p._db.local().find_column_family(m.schema()->id());
auto& table = _p._db.local().find_column_family(_schema->id());
auto ermp = table.get_effective_replication_map();
return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate(), is_cancellable::no);
}).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) {
@@ -3666,17 +3668,16 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
}));
}
future<result<>> sync_write_to_batchlog() {
auto m = _p.get_batchlog_mutation_for(_mutations, _batch_uuid, netw::messaging_service::current_version, db_clock::now());
auto m = _p.do_get_batchlog_mutation_for(_schema, _mutations, _batch_uuid, netw::messaging_service::current_version, db_clock::now());
tracing::trace(_trace_state, "Sending a batchlog write mutation");
return send_batchlog_mutation(std::move(m));
};
future<> async_remove_from_batchlog() {
// delete batch
auto schema = _p._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG);
auto key = partition_key::from_exploded(*schema, {uuid_type->decompose(_batch_uuid)});
auto key = partition_key::from_exploded(*_schema, {uuid_type->decompose(_batch_uuid)});
auto now = service::client_state(service::client_state::internal_tag()).get_timestamp();
mutation m(schema, key);
m.partition().apply_delete(*schema, clustering_key_prefix::make_empty(), tombstone(now, gc_clock::now()));
mutation m(_schema, key);
m.partition().apply_delete(*_schema, clustering_key_prefix::make_empty(), tombstone(now, gc_clock::now()));
tracing::trace(_trace_state, "Sending a batchlog remove mutation");
return send_batchlog_mutation(std::move(m), db::consistency_level::ANY).then_wrapped([] (future<result<>> f) {
@@ -3732,6 +3733,10 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
mutation storage_proxy::get_batchlog_mutation_for(const std::vector<mutation>& mutations, const utils::UUID& id, int32_t version, db_clock::time_point now) {
auto schema = local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG);
return do_get_batchlog_mutation_for(std::move(schema), mutations, id, version, now);
}
mutation storage_proxy::do_get_batchlog_mutation_for(schema_ptr schema, const std::vector<mutation>& mutations, const utils::UUID& id, int32_t version, db_clock::time_point now) {
auto key = partition_key::from_singular(*schema, id);
auto timestamp = api::new_timestamp();
auto data = [&mutations] {

View File

@@ -515,6 +515,8 @@ private:
// Do the same when the future is resolved without exception.
template <typename T>
future<T> apply_fence(future<T> future, fencing_token fence, gms::inet_address caller_address) const;
mutation do_get_batchlog_mutation_for(schema_ptr schema, const std::vector<mutation>& mutations, const utils::UUID& id, int32_t version, db_clock::time_point now);
public:
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.