test: manual: modernize message test

Basically, make it work (great) again.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-12-01 11:15:40 +02:00
parent f9acc90926
commit eabd4570da

View File

@@ -12,7 +12,9 @@
#include <seastar/core/reactor.hh>
#include <seastar/core/app-template.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/thread.hh>
#include <seastar/rpc/rpc_types.hh>
#include <seastar/util/closeable.hh>
#include "message/messaging_service.hh"
#include "gms/gossip_digest_syn.hh"
#include "gms/gossip_digest_ack.hh"
@@ -21,10 +23,15 @@
#include "api/api.hh"
#include "utils/UUID.hh"
#include "utils/fb_utilities.hh"
#include "log.hh"
#include "locator/token_metadata.hh"
#include "db/schema_tables.hh"
using namespace std::chrono_literals;
using namespace netw;
logging::logger test_logger("message_test");
class tester {
private:
messaging_service& ms;
@@ -53,7 +60,7 @@ public:
public:
void init_handler() {
ms.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gms::gossip_digest_syn msg) {
fmt::print("Server got syn msg = {}\n", msg);
test_logger.info("Server got syn msg = {}", msg);
auto from = netw::messaging_service::get_source(cinfo);
auto ep1 = inet_address("1.1.1.1");
@@ -70,13 +77,13 @@ public:
gms::gossip_digest_ack ack(std::move(digests), std::move(eps));
// FIXME: discarded future.
(void)ms.send_gossip_digest_ack(from, std::move(ack)).handle_exception([] (auto ep) {
fmt::print("Fail to send ack : {}", ep);
test_logger.error("Fail to send ack : {}", ep);
});
return messaging_service::no_wait();
});
ms.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gms::gossip_digest_ack msg) {
fmt::print("Server got ack msg = {}\n", msg);
test_logger.info("Server got ack msg = {}", msg);
auto from = netw::messaging_service::get_source(cinfo);
// Prepare gossip_digest_ack2 message
auto ep1 = inet_address("3.3.3.3");
@@ -86,32 +93,41 @@ public:
gms::gossip_digest_ack2 ack2(std::move(eps));
// FIXME: discarded future.
(void)ms.send_gossip_digest_ack2(from, std::move(ack2)).handle_exception([] (auto ep) {
fmt::print("Fail to send ack2 : {}", ep);
test_logger.error("Fail to send ack2 : {}", ep);
});
digest_test_done.set_value();
return messaging_service::no_wait();
});
ms.register_gossip_digest_ack2([] (const rpc::client_info& cinfo, gms::gossip_digest_ack2 msg) {
fmt::print("Server got ack2 msg = {}\n", msg);
test_logger.info("Server got ack2 msg = {}", msg);
return messaging_service::no_wait();
});
ms.register_gossip_shutdown([] (inet_address from, rpc::optional<int64_t> generation_number_opt) {
fmt::print("Server got shutdown msg = {}\n", from);
test_logger.info("Server got shutdown msg = {}", from);
return messaging_service::no_wait();
});
ms.register_gossip_echo([] (const rpc::client_info& cinfo, rpc::optional<int64_t> gen_opt) {
fmt::print("Server got gossip echo msg\n");
test_logger.info("Server got gossip echo msg");
throw std::runtime_error("I'm throwing runtime_error exception");
return make_ready_future<>();
});
}
future<> deinit_handler() {
co_await ms.unregister_gossip_digest_syn();
co_await ms.unregister_gossip_digest_ack();
co_await ms.unregister_gossip_digest_ack2();
co_await ms.unregister_gossip_shutdown();
co_await ms.unregister_gossip_echo();
test_logger.info("tester deinit_hadler done");
}
public:
future<> test_gossip_digest() {
fmt::print("=== {} ===\n", __func__);
test_logger.info("=== {} ===", __func__);
// Prepare gossip_digest_syn message
auto id = get_msg_addr();
auto ep1 = inet_address("1.1.1.1");
@@ -123,23 +139,24 @@ public:
digests.push_back(gms::gossip_digest(ep2, gen++, ver++));
gms::gossip_digest_syn syn("my_cluster", "my_partition", digests, utils::null_uuid());
return ms.send_gossip_digest_syn(id, std::move(syn)).then([this] {
test_logger.info("Sent gossip sigest syn. Waiting for digest_test_done...");
return digest_test_done.get_future();
});
}
future<> test_gossip_shutdown() {
fmt::print("=== {} ===\n", __func__);
test_logger.info("=== {} ===", __func__);
auto id = get_msg_addr();
inet_address from("127.0.0.1");
int64_t gen = 0x1;
return ms.send_gossip_shutdown(id, from, gen).then([] () {
fmt::print("Client sent gossip_shutdown got reply = void\n");
test_logger.info("Client sent gossip_shutdown got reply = void");
return make_ready_future<>();
});
}
future<> test_echo() {
fmt::print("=== {} ===\n", __func__);
test_logger.info("=== {} ===", __func__);
auto id = get_msg_addr();
int64_t gen = 0x1;
return ms.send_gossip_echo(id, gen, std::chrono::seconds(10)).then_wrapped([] (auto&& f) {
@@ -147,7 +164,7 @@ public:
f.get();
return make_ready_future<>();
} catch (std::runtime_error& e) {
fmt::print("test_echo: {}\n", e.what());
test_logger.error("test_echo: {}", e.what());
}
return make_ready_future<>();
});
@@ -156,6 +173,7 @@ public:
namespace bpo = boost::program_options;
// Usage example: build/dev/test/manual/message --listen 127.0.0.1 --server 127.0.0.1
int main(int ac, char ** av) {
app_template app;
app.add_options()
@@ -168,45 +186,48 @@ int main(int ac, char ** av) {
distributed<replica::database> db;
return app.run_deprecated(ac, av, [&app] {
auto config = app.configuration();
bool stay_alive = config["stay-alive"].as<bool>();
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
utils::fb_utilities::set_broadcast_address(listen);
seastar::sharded<netw::messaging_service> messaging;
return messaging.start(locator::host_id{}, listen, 7000).then([config, stay_alive, &messaging] () {
auto testers = new distributed<tester>;
return testers->start(std::ref(messaging)).then([testers]{
auto port = testers->local().port();
std::cout << "Messaging server listening on port " << port << " ...\n";
return testers->invoke_on_all(&tester::init_handler);
}).then([testers, config, stay_alive, &messaging] {
auto t = &testers->local();
if (!config.contains("server")) {
return make_ready_future<>();
}
return seastar::async([&app] {
auto config = app.configuration();
bool stay_alive = config["stay-alive"].as<bool>();
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
auto my_address = listen != gms::inet_address("0.0.0.0") ? listen : gms::inet_address("localhost");
utils::fb_utilities::set_broadcast_address(my_address);
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
sharded<locator::shared_token_metadata> token_metadata;
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_tm = deferred_stop(token_metadata);
seastar::sharded<netw::messaging_service> messaging;
messaging.start(locator::host_id{}, listen, 7000).get();
auto stop_messaging = deferred_stop(messaging);
seastar::sharded<tester> testers;
testers.start(std::ref(messaging)).get();
auto stop_testers = deferred_stop(testers);
auto port = testers.local().port();
test_logger.info("Messaging server listening on {} port {}", listen, port);
testers.invoke_on_all(&tester::init_handler).get();
auto deinit_testers = deferred_action([&testers] {
testers.invoke_on_all(&tester::deinit_handler).get();
});
messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata)).get();
if (config.contains("server")) {
auto ip = config["server"].as<std::string>();
auto cpuid = config["cpuid"].as<uint32_t>();
auto t = &testers.local();
t->set_server_ip(ip);
t->set_server_cpuid(cpuid);
fmt::print("=============TEST START===========\n");
fmt::print("Sending to server ....\n");
return t->test_gossip_digest().then([t] {
return t->test_gossip_shutdown();
}).then([t] {
return t->test_echo();
}).then([testers, stay_alive, &messaging] {
if (stay_alive) {
return make_ready_future<>();
}
fmt::print("=============TEST DONE===========\n");
return testers->stop().then([testers, &messaging] {
delete testers;
return messaging.stop().then([]{
engine().exit(0);
});
});
});
});
test_logger.info("=============TEST START===========");
test_logger.info("Sending to server ....");
t->test_gossip_digest().get();
t->test_gossip_shutdown().get();
t->test_echo().get();
test_logger.info("=============TEST DONE===========");
}
while (stay_alive) {
seastar::sleep(1s).get();
}
}).finally([] {
exit(0);
});
});
}