diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 750f3b9af4..75bd7be22e 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -38,6 +38,7 @@ #include "frozen_schema.hh" #include "repair/repair.hh" #include "digest_algorithm.hh" +#include "idl/consistency_level.dist.hh" #include "idl/tracing.dist.hh" #include "idl/result.dist.hh" #include "idl/reconcilable_result.dist.hh" @@ -55,6 +56,7 @@ #include "idl/query.dist.hh" #include "serializer_impl.hh" #include "serialization_visitors.hh" +#include "idl/consistency_level.dist.impl.hh" #include "idl/tracing.dist.impl.hh" #include "idl/result.dist.impl.hh" #include "idl/reconcilable_result.dist.impl.hh" @@ -822,6 +824,16 @@ future<> messaging_service::send_mutation(msg_addr id, clock_type::time_point ti std::move(reply_to), std::move(shard), std::move(response_id), std::move(trace_info)); } +void messaging_service::register_counter_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, std::vector fms, db::consistency_level cl, stdx::optional trace_info)>&& func) { + register_handler(this, net::messaging_verb::COUNTER_MUTATION, std::move(func)); +} +void messaging_service::unregister_counter_mutation() { + _rpc->unregister_handler(net::messaging_verb::COUNTER_MUTATION); +} +future<> messaging_service::send_counter_mutation(msg_addr id, clock_type::time_point timeout, std::vector fms, db::consistency_level cl, stdx::optional trace_info) { + return send_message_timeout(this, messaging_verb::COUNTER_MUTATION, std::move(id), timeout, std::move(fms), cl, std::move(trace_info)); +} + void messaging_service::register_mutation_done(std::function (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id)>&& func) { register_handler(this, net::messaging_verb::MUTATION_DONE, std::move(func)); } diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 4d5f76c453..f858351473 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -107,7 +107,8 @@ enum class messaging_verb : int32_t { REPAIR_CHECKSUM_RANGE = 20, GET_SCHEMA_VERSION = 21, SCHEMA_CHECK = 22, - LAST = 23, + COUNTER_MUTATION = 23, + LAST = 24, }; } // namespace net @@ -292,6 +293,11 @@ public: future<> send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::experimental::optional trace_info = std::experimental::nullopt); + // Wrapper for COUNTER_MUTATION + void register_counter_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, std::vector fms, db::consistency_level cl, stdx::optional trace_info)>&& func); + void unregister_counter_mutation(); + future<> send_counter_mutation(msg_addr id, clock_type::time_point timeout, std::vector fms, db::consistency_level cl, stdx::optional trace_info = std::experimental::nullopt); + // Wrapper for MUTATION_DONE void register_mutation_done(std::function (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id)>&& func); void unregister_mutation_done();