From eabd4570daccb8d242eb713b6a7158403eddbaa6 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Fri, 1 Dec 2023 11:15:40 +0200 Subject: [PATCH] test: manual: modernize message test Basically, make it work (great) again. Signed-off-by: Benny Halevy --- test/manual/message.cc | 115 ++++++++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 47 deletions(-) diff --git a/test/manual/message.cc b/test/manual/message.cc index a9f25f6132..53e9781f1f 100644 --- a/test/manual/message.cc +++ b/test/manual/message.cc @@ -12,7 +12,9 @@ #include #include #include +#include #include +#include #include "message/messaging_service.hh" #include "gms/gossip_digest_syn.hh" #include "gms/gossip_digest_ack.hh" @@ -21,10 +23,15 @@ #include "api/api.hh" #include "utils/UUID.hh" #include "utils/fb_utilities.hh" +#include "log.hh" +#include "locator/token_metadata.hh" +#include "db/schema_tables.hh" using namespace std::chrono_literals; using namespace netw; +logging::logger test_logger("message_test"); + class tester { private: messaging_service& ms; @@ -53,7 +60,7 @@ public: public: void init_handler() { ms.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gms::gossip_digest_syn msg) { - fmt::print("Server got syn msg = {}\n", msg); + test_logger.info("Server got syn msg = {}", msg); auto from = netw::messaging_service::get_source(cinfo); auto ep1 = inet_address("1.1.1.1"); @@ -70,13 +77,13 @@ public: gms::gossip_digest_ack ack(std::move(digests), std::move(eps)); // FIXME: discarded future. (void)ms.send_gossip_digest_ack(from, std::move(ack)).handle_exception([] (auto ep) { - fmt::print("Fail to send ack : {}", ep); + test_logger.error("Fail to send ack : {}", ep); }); return messaging_service::no_wait(); }); ms.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gms::gossip_digest_ack msg) { - fmt::print("Server got ack msg = {}\n", msg); + test_logger.info("Server got ack msg = {}", msg); auto from = netw::messaging_service::get_source(cinfo); // Prepare gossip_digest_ack2 message auto ep1 = inet_address("3.3.3.3"); @@ -86,32 +93,41 @@ public: gms::gossip_digest_ack2 ack2(std::move(eps)); // FIXME: discarded future. (void)ms.send_gossip_digest_ack2(from, std::move(ack2)).handle_exception([] (auto ep) { - fmt::print("Fail to send ack2 : {}", ep); + test_logger.error("Fail to send ack2 : {}", ep); }); digest_test_done.set_value(); return messaging_service::no_wait(); }); ms.register_gossip_digest_ack2([] (const rpc::client_info& cinfo, gms::gossip_digest_ack2 msg) { - fmt::print("Server got ack2 msg = {}\n", msg); + test_logger.info("Server got ack2 msg = {}", msg); return messaging_service::no_wait(); }); ms.register_gossip_shutdown([] (inet_address from, rpc::optional generation_number_opt) { - fmt::print("Server got shutdown msg = {}\n", from); + test_logger.info("Server got shutdown msg = {}", from); return messaging_service::no_wait(); }); ms.register_gossip_echo([] (const rpc::client_info& cinfo, rpc::optional gen_opt) { - fmt::print("Server got gossip echo msg\n"); + test_logger.info("Server got gossip echo msg"); throw std::runtime_error("I'm throwing runtime_error exception"); return make_ready_future<>(); }); } + future<> deinit_handler() { + co_await ms.unregister_gossip_digest_syn(); + co_await ms.unregister_gossip_digest_ack(); + co_await ms.unregister_gossip_digest_ack2(); + co_await ms.unregister_gossip_shutdown(); + co_await ms.unregister_gossip_echo(); + test_logger.info("tester deinit_hadler done"); + } + public: future<> test_gossip_digest() { - fmt::print("=== {} ===\n", __func__); + test_logger.info("=== {} ===", __func__); // Prepare gossip_digest_syn message auto id = get_msg_addr(); auto ep1 = inet_address("1.1.1.1"); @@ -123,23 +139,24 @@ public: digests.push_back(gms::gossip_digest(ep2, gen++, ver++)); gms::gossip_digest_syn syn("my_cluster", "my_partition", digests, utils::null_uuid()); return ms.send_gossip_digest_syn(id, std::move(syn)).then([this] { + test_logger.info("Sent gossip sigest syn. Waiting for digest_test_done..."); return digest_test_done.get_future(); }); } future<> test_gossip_shutdown() { - fmt::print("=== {} ===\n", __func__); + test_logger.info("=== {} ===", __func__); auto id = get_msg_addr(); inet_address from("127.0.0.1"); int64_t gen = 0x1; return ms.send_gossip_shutdown(id, from, gen).then([] () { - fmt::print("Client sent gossip_shutdown got reply = void\n"); + test_logger.info("Client sent gossip_shutdown got reply = void"); return make_ready_future<>(); }); } future<> test_echo() { - fmt::print("=== {} ===\n", __func__); + test_logger.info("=== {} ===", __func__); auto id = get_msg_addr(); int64_t gen = 0x1; return ms.send_gossip_echo(id, gen, std::chrono::seconds(10)).then_wrapped([] (auto&& f) { @@ -147,7 +164,7 @@ public: f.get(); return make_ready_future<>(); } catch (std::runtime_error& e) { - fmt::print("test_echo: {}\n", e.what()); + test_logger.error("test_echo: {}", e.what()); } return make_ready_future<>(); }); @@ -156,6 +173,7 @@ public: namespace bpo = boost::program_options; +// Usage example: build/dev/test/manual/message --listen 127.0.0.1 --server 127.0.0.1 int main(int ac, char ** av) { app_template app; app.add_options() @@ -168,45 +186,48 @@ int main(int ac, char ** av) { distributed db; return app.run_deprecated(ac, av, [&app] { - auto config = app.configuration(); - bool stay_alive = config["stay-alive"].as(); - const gms::inet_address listen = gms::inet_address(config["listen-address"].as()); - utils::fb_utilities::set_broadcast_address(listen); - seastar::sharded messaging; - return messaging.start(locator::host_id{}, listen, 7000).then([config, stay_alive, &messaging] () { - auto testers = new distributed; - return testers->start(std::ref(messaging)).then([testers]{ - auto port = testers->local().port(); - std::cout << "Messaging server listening on port " << port << " ...\n"; - return testers->invoke_on_all(&tester::init_handler); - }).then([testers, config, stay_alive, &messaging] { - auto t = &testers->local(); - if (!config.contains("server")) { - return make_ready_future<>(); - } + return seastar::async([&app] { + auto config = app.configuration(); + bool stay_alive = config["stay-alive"].as(); + const gms::inet_address listen = gms::inet_address(config["listen-address"].as()); + auto my_address = listen != gms::inet_address("0.0.0.0") ? listen : gms::inet_address("localhost"); + utils::fb_utilities::set_broadcast_address(my_address); + locator::token_metadata::config tm_cfg; + tm_cfg.topo_cfg.this_endpoint = my_address; + sharded token_metadata; + token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get(); + auto stop_tm = deferred_stop(token_metadata); + seastar::sharded messaging; + messaging.start(locator::host_id{}, listen, 7000).get(); + auto stop_messaging = deferred_stop(messaging); + seastar::sharded testers; + testers.start(std::ref(messaging)).get(); + auto stop_testers = deferred_stop(testers); + auto port = testers.local().port(); + test_logger.info("Messaging server listening on {} port {}", listen, port); + testers.invoke_on_all(&tester::init_handler).get(); + auto deinit_testers = deferred_action([&testers] { + testers.invoke_on_all(&tester::deinit_handler).get(); + }); + messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata)).get(); + if (config.contains("server")) { auto ip = config["server"].as(); auto cpuid = config["cpuid"].as(); + auto t = &testers.local(); t->set_server_ip(ip); t->set_server_cpuid(cpuid); - fmt::print("=============TEST START===========\n"); - fmt::print("Sending to server ....\n"); - return t->test_gossip_digest().then([t] { - return t->test_gossip_shutdown(); - }).then([t] { - return t->test_echo(); - }).then([testers, stay_alive, &messaging] { - if (stay_alive) { - return make_ready_future<>(); - } - fmt::print("=============TEST DONE===========\n"); - return testers->stop().then([testers, &messaging] { - delete testers; - return messaging.stop().then([]{ - engine().exit(0); - }); - }); - }); - }); + test_logger.info("=============TEST START==========="); + test_logger.info("Sending to server ...."); + t->test_gossip_digest().get(); + t->test_gossip_shutdown().get(); + t->test_echo().get(); + test_logger.info("=============TEST DONE==========="); + } + while (stay_alive) { + seastar::sleep(1s).get(); + } + }).finally([] { + exit(0); }); }); }