Merge 'Make DESCRIBE CLUSTER get cluster information from storage_service' from Pavel Emelyanov
Currently the statement returns cluster, partitioner and snitch names by accessing global db::config via database. As the part of an effort to detach components from global db::config, this PR tweaks the statement handler to get the cluster information from some other source. Currently the needed cluster information is stored in different components, but they are all under storage_service umbrella which seems to be a good central source of this truth. Unit test included. Cleaning components inter-dependencies, not backporting Closes scylladb/scylladb#29429 * github.com:scylladb/scylladb: test: Add test_describe_cluster_sanity for DESCRIBE CLUSTER validation describe_statement: Get cluster info from storage_service storage_service: Add describe_cluster() method query_processor: Expose storage_service accessor
This commit is contained in:
@@ -560,6 +560,11 @@ query_processor::acquire_strongly_consistent_coordinator() {
|
||||
return {remote_.get().sc_coordinator, std::move(holder)};
|
||||
}
|
||||
|
||||
service::storage_service& query_processor::storage_service() {
|
||||
auto [remote_, holder] = remote();
|
||||
return remote_.get().ss;
|
||||
}
|
||||
|
||||
void query_processor::start_remote(service::migration_manager& mm, service::mapreduce_service& mapreducer,
|
||||
service::storage_service& ss, service::raft_group0_client& group0_client,
|
||||
service::strong_consistency::coordinator& sc_coordinator) {
|
||||
|
||||
@@ -209,6 +209,8 @@ public:
|
||||
return _proxy;
|
||||
}
|
||||
|
||||
service::storage_service& storage_service();
|
||||
|
||||
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
|
||||
acquire_strongly_consistent_coordinator();
|
||||
|
||||
|
||||
@@ -49,7 +49,6 @@
|
||||
#include "cql3/functions/user_function.hh"
|
||||
#include "cql3/functions/user_aggregate.hh"
|
||||
#include "utils/overloaded_functor.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/extensions.hh"
|
||||
#include "utils/sorting.hh"
|
||||
@@ -614,17 +613,14 @@ future<managed_bytes_opt> cluster_describe_statement::range_ownership(const serv
|
||||
}
|
||||
|
||||
future<std::vector<std::vector<managed_bytes_opt>>> cluster_describe_statement::describe(cql3::query_processor& qp, const service::client_state& client_state) const {
|
||||
auto db = qp.db();
|
||||
auto& proxy = qp.proxy();
|
||||
auto& ss = qp.storage_service();
|
||||
|
||||
auto cluster = to_managed_bytes(db.get_config().cluster_name());
|
||||
auto partitioner = to_managed_bytes(db.get_config().partitioner());
|
||||
auto snitch = to_managed_bytes(db.get_config().endpoint_snitch());
|
||||
|
||||
auto cluster_info = ss.describe_cluster();
|
||||
std::vector<managed_bytes_opt> row {
|
||||
{cluster},
|
||||
{partitioner},
|
||||
{snitch}
|
||||
{to_managed_bytes(cluster_info.cluster_name)},
|
||||
{to_managed_bytes(cluster_info.partitioner)},
|
||||
{to_managed_bytes(cluster_info.snitch_name)}
|
||||
};
|
||||
|
||||
if (should_add_range_ownership(proxy.local_db(), client_state)) {
|
||||
|
||||
@@ -2552,6 +2552,14 @@ sstring storage_service::get_schema_version() {
|
||||
return _db.local().get_version().to_sstring();
|
||||
}
|
||||
|
||||
cluster_info storage_service::describe_cluster() const {
|
||||
return cluster_info{
|
||||
.cluster_name = _gossiper.get_cluster_name(),
|
||||
.partitioner = _gossiper.get_partitioner_name(),
|
||||
.snitch_name = _snitch.local()->get_name()
|
||||
};
|
||||
}
|
||||
|
||||
static constexpr auto UNREACHABLE = "UNREACHABLE";
|
||||
|
||||
future<std::unordered_map<sstring, std::vector<sstring>>> storage_service::describe_schema_versions() {
|
||||
|
||||
@@ -157,6 +157,12 @@ struct token_metadata_change {
|
||||
future<> destroy();
|
||||
};
|
||||
|
||||
struct cluster_info {
|
||||
sstring cluster_name;
|
||||
sstring partitioner;
|
||||
sstring snitch_name;
|
||||
};
|
||||
|
||||
class schema_getter {
|
||||
public:
|
||||
virtual flat_hash_map<sstring, locator::replication_strategy_ptr> get_keyspaces_replication() const = 0;
|
||||
@@ -644,6 +650,8 @@ public:
|
||||
|
||||
sstring get_schema_version();
|
||||
|
||||
cluster_info describe_cluster() const;
|
||||
|
||||
future<std::unordered_map<sstring, std::vector<sstring>>> describe_schema_versions();
|
||||
|
||||
|
||||
|
||||
@@ -8,6 +8,12 @@ import asyncio
|
||||
import pytest
|
||||
from test.cluster.util import new_test_keyspace, new_test_table
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for
|
||||
from cassandra.connection import UnixSocketEndPoint
|
||||
from cassandra.policies import WhiteListRoundRobinPolicy
|
||||
from test.cluster.conftest import cluster_con
|
||||
from time import time
|
||||
import os
|
||||
|
||||
# The following test verifies that Scylla avoids making an oversized allocation
|
||||
# when generating a large create statement when performing a DESCRIBE statement.
|
||||
@@ -46,3 +52,41 @@ async def test_large_create_statement(manager: ManagerClient):
|
||||
|
||||
matches = await log.grep("oversized allocation", from_mark=marker)
|
||||
assert len(matches) == 0
|
||||
|
||||
@pytest.mark.parametrize("mode", ["normal", "maintenance"])
|
||||
@pytest.mark.asyncio
|
||||
async def test_describe_cluster_sanity(manager: ManagerClient, mode: str):
|
||||
"""
|
||||
Parametrized test that DESCRIBE CLUSTER returns correct cluster information
|
||||
in both normal and maintenance modes.
|
||||
|
||||
This test verifies that cluster metadata from gossiper is properly initialized
|
||||
and the cluster name is consistent with system.local in both:
|
||||
- normal mode: standard cluster operation
|
||||
- maintenance mode: node isolated from the cluster
|
||||
"""
|
||||
|
||||
if mode == "normal":
|
||||
await manager.server_add()
|
||||
cql = manager.get_cql()
|
||||
else: # maintenance mode
|
||||
srv = await manager.server_add(config={"maintenance_mode": True}, connect_driver=False)
|
||||
maintenance_socket_path = await manager.server_get_maintenance_socket_path(srv.server_id)
|
||||
async def socket_exists():
|
||||
return True if os.path.exists(maintenance_socket_path) else None
|
||||
await wait_for(socket_exists, time() + 30)
|
||||
socket_endpoint = UnixSocketEndPoint(maintenance_socket_path)
|
||||
cluster = cluster_con([socket_endpoint], load_balancing_policy=WhiteListRoundRobinPolicy([socket_endpoint]))
|
||||
cql = cluster.connect()
|
||||
|
||||
try:
|
||||
system_local_results = await cql.run_async("SELECT cluster_name FROM system.local")
|
||||
assert system_local_results[0].cluster_name != "" # sanity check
|
||||
|
||||
describe_results = await cql.run_async("DESCRIBE CLUSTER")
|
||||
assert describe_results[0].partitioner == 'org.apache.cassandra.dht.Murmur3Partitioner'
|
||||
assert describe_results[0].snitch == 'org.apache.cassandra.locator.SimpleSnitch'
|
||||
assert describe_results[0].cluster == system_local_results[0].cluster_name
|
||||
finally:
|
||||
if mode == "maintenance":
|
||||
cluster.shutdown()
|
||||
|
||||
Reference in New Issue
Block a user