messaging_service: add COUNTER_MUTATION verb

This verb is going to be used for coordinator<->leader communication
during counter updates.
This commit is contained in:
Paweł Dziepak
2017-01-24 22:47:04 +00:00
parent 67ca6959bd
commit bf60b7844b
2 changed files with 19 additions and 1 deletions

View File

@@ -38,6 +38,7 @@
#include "frozen_schema.hh"
#include "repair/repair.hh"
#include "digest_algorithm.hh"
#include "idl/consistency_level.dist.hh"
#include "idl/tracing.dist.hh"
#include "idl/result.dist.hh"
#include "idl/reconcilable_result.dist.hh"
@@ -55,6 +56,7 @@
#include "idl/query.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/consistency_level.dist.impl.hh"
#include "idl/tracing.dist.impl.hh"
#include "idl/result.dist.impl.hh"
#include "idl/reconcilable_result.dist.impl.hh"
@@ -822,6 +824,16 @@ future<> messaging_service::send_mutation(msg_addr id, clock_type::time_point ti
std::move(reply_to), std::move(shard), std::move(response_id), std::move(trace_info));
}
void messaging_service::register_counter_mutation(std::function<future<> (const rpc::client_info&, rpc::opt_time_point, std::vector<frozen_mutation> fms, db::consistency_level cl, stdx::optional<tracing::trace_info> trace_info)>&& func) {
register_handler(this, net::messaging_verb::COUNTER_MUTATION, std::move(func));
}
void messaging_service::unregister_counter_mutation() {
_rpc->unregister_handler(net::messaging_verb::COUNTER_MUTATION);
}
future<> messaging_service::send_counter_mutation(msg_addr id, clock_type::time_point timeout, std::vector<frozen_mutation> fms, db::consistency_level cl, stdx::optional<tracing::trace_info> trace_info) {
return send_message_timeout<void>(this, messaging_verb::COUNTER_MUTATION, std::move(id), timeout, std::move(fms), cl, std::move(trace_info));
}
void messaging_service::register_mutation_done(std::function<future<rpc::no_wait_type> (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id)>&& func) {
register_handler(this, net::messaging_verb::MUTATION_DONE, std::move(func));
}

View File

@@ -107,7 +107,8 @@ enum class messaging_verb : int32_t {
REPAIR_CHECKSUM_RANGE = 20,
GET_SCHEMA_VERSION = 21,
SCHEMA_CHECK = 22,
LAST = 23,
COUNTER_MUTATION = 23,
LAST = 24,
};
} // namespace net
@@ -292,6 +293,11 @@ public:
future<> send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector<inet_address> forward,
inet_address reply_to, unsigned shard, response_id_type response_id, std::experimental::optional<tracing::trace_info> trace_info = std::experimental::nullopt);
// Wrapper for COUNTER_MUTATION
void register_counter_mutation(std::function<future<> (const rpc::client_info&, rpc::opt_time_point, std::vector<frozen_mutation> fms, db::consistency_level cl, stdx::optional<tracing::trace_info> trace_info)>&& func);
void unregister_counter_mutation();
future<> send_counter_mutation(msg_addr id, clock_type::time_point timeout, std::vector<frozen_mutation> fms, db::consistency_level cl, stdx::optional<tracing::trace_info> trace_info = std::experimental::nullopt);
// Wrapper for MUTATION_DONE
void register_mutation_done(std::function<future<rpc::no_wait_type> (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id)>&& func);
void unregister_mutation_done();