thrift: switch from replica module to data_dictionary module

Thrift is a coordinator-side service and should not touch the replica
module. Switch it to data_dictionary.

The switch is straightforward with two exceptions:
 - client_state still receives replica::database parameters. After
   this change it will be easier to adapt client_state too.
 - calls to replica::database::get_version() remain. They should be
   rerouted to migration_manager instead, as that deals with schema
   management.
This commit is contained in:
Avi Kivity
2022-01-07 17:31:04 +02:00
parent 85061b694b
commit 6205d40d5f
5 changed files with 68 additions and 62 deletions

View File

@@ -85,7 +85,7 @@ future<> thrift_controller::do_start_server() {
tsc.max_request_size = cfg.thrift_max_message_length_in_mb() * (uint64_t(1) << 20);
return utils::resolve(cfg.rpc_address, family, preferred).then([this, tserver, port = cfg.rpc_port(), keepalive, tsc] (gms::inet_address ip) {
_addr.emplace(ip, port);
return tserver->start(std::ref(_db), std::ref(_qp), std::ref(_ss), std::ref(_proxy), std::ref(_auth_service), std::ref(_mem_limiter), tsc).then([tserver, port, ip, keepalive] {
return tserver->start(sharded_parameter([this] { return _db.local().as_data_dictionary(); }), std::ref(_qp), std::ref(_ss), std::ref(_proxy), std::ref(_auth_service), std::ref(_mem_limiter), tsc).then([tserver, port, ip, keepalive] {
// #293 - do not stop anything
//engine().at_exit([tserver] {
// return tserver->stop();

View File

@@ -27,7 +27,8 @@
#include "Cassandra.h"
#include <seastar/core/distributed.hh>
#include <seastar/core/coroutine.hh>
#include "replica/database.hh"
#include "replica/database.hh" // for database::get_version()
#include "data_dictionary/data_dictionary.hh"
#include <seastar/core/sstring.hh>
#include <seastar/core/print.hh>
#include "frozen_mutation.hh"
@@ -48,6 +49,7 @@
#include "cql3/query_processor.hh"
#include "cql3/column_identifier.hh"
#include "timeout_config.hh"
#include "mutation.hh"
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/adaptor/indirected.hpp>
@@ -107,9 +109,9 @@ public:
throw make_exception<InvalidRequestException>(ce.what());
} catch (exceptions::invalid_request_exception& ire) {
throw make_exception<InvalidRequestException>(ire.what());
} catch (replica::no_such_column_family& nocf) {
} catch (data_dictionary::no_such_column_family& nocf) {
throw make_exception<InvalidRequestException>(nocf.what());
} catch (replica::no_such_keyspace&) {
} catch (data_dictionary::no_such_keyspace&) {
throw NotFoundException();
} catch (exceptions::syntax_exception& se) {
throw make_exception<InvalidRequestException>("syntax error: {}", se.what());
@@ -203,7 +205,7 @@ concept Aggregator =
enum class query_order { no, yes };
class thrift_handler : public CassandraCobSvIf {
distributed<replica::database>& _db;
data_dictionary::database _db;
distributed<cql3::query_processor>& _query_processor;
sharded<service::storage_service>& _ss;
sharded<service::storage_proxy>& _proxy;
@@ -219,12 +221,12 @@ private:
const std::string& cf,
Func&& func) {
with_cob(std::move(cob), std::move(exn_cob), [this, &cf, func = std::move(func)] {
auto schema = lookup_schema(_db.local(), current_keyspace(), cf);
auto schema = lookup_schema(_db, current_keyspace(), cf);
return func(std::move(schema));
});
}
public:
explicit thrift_handler(distributed<replica::database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, sharded<service::storage_proxy>& proxy,
explicit thrift_handler(data_dictionary::database db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, sharded<service::storage_proxy>& proxy,
auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit)
: _db(db)
, _query_processor(qp)
@@ -261,7 +263,7 @@ public:
void set_keyspace(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
_query_state.get_client_state().set_keyspace(_db.local(), keyspace);
_query_state.get_client_state().set_keyspace(_db.real_database(), keyspace);
});
}
@@ -302,7 +304,7 @@ public:
auto cmd = slice_pred_to_read_cmd(proxy, *schema, predicate);
auto cell_limit = predicate.__isset.slice_range ? static_cast<uint32_t>(predicate.slice_range.count) : std::numeric_limits<uint32_t>::max();
auto pranges = make_partition_ranges(*schema, keys);
auto f = _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::SELECT);
auto f = _query_state.get_client_state().has_schema_access(_db.real_database(), *schema, auth::permission::SELECT);
return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys, permit = std::move(permit)]() mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout;
return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then(
@@ -332,7 +334,7 @@ public:
auto cmd = slice_pred_to_read_cmd(proxy, *schema, predicate);
auto cell_limit = predicate.__isset.slice_range ? static_cast<uint32_t>(predicate.slice_range.count) : std::numeric_limits<uint32_t>::max();
auto pranges = make_partition_ranges(*schema, keys);
auto f = _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::SELECT);
auto f = _query_state.get_client_state().has_schema_access(_db.real_database(), *schema, auth::permission::SELECT);
return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys, permit = std::move(permit)]() mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout;
return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then(
@@ -371,7 +373,7 @@ public:
// For static CFs each thrift row maps to a CQL row.
cmd->set_row_limit(static_cast<uint64_t>(range.count));
}
auto f = _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::SELECT);
auto f = _query_state.get_client_state().has_schema_access(_db.real_database(), *schema, auth::permission::SELECT);
return f.then([this, &proxy, schema, cmd, prange = std::move(prange), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.range_read_timeout;
return proxy.query(schema, cmd, std::move(prange), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then(
@@ -488,7 +490,7 @@ public:
throw make_exception<InvalidRequestException>("If start column is provided, so must the start key");
}
}
auto f = _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::SELECT);
auto f = _query_state.get_client_state().has_schema_access(_db.real_database(), *schema, auth::permission::SELECT);
return f.then([this, schema, count = range.count, start_column, prange = std::move(prange), consistency_level, &output, permit = std::move(permit)] () mutable {
return do_get_paged_slice(_proxy, std::move(schema), count, std::move(prange), &start_column,
cl_from_thrift(consistency_level), _timeout_config, output, _query_state, std::move(permit)).then([&output] {
@@ -520,7 +522,7 @@ public:
mutation m_to_apply(schema, key_from_thrift(*schema, to_bytes_view(key)));
add_to_mutation(*schema, column, m_to_apply);
return _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
return _query_state.get_client_state().has_schema_access(_db.real_database(), *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
});
@@ -536,7 +538,7 @@ public:
mutation m_to_apply(schema, key_from_thrift(*schema, to_bytes_view(key)));
add_to_mutation(*schema, column, m_to_apply);
return _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
return _query_state.get_client_state().has_schema_access(_db.real_database(), *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
});
@@ -573,7 +575,7 @@ public:
m_to_apply.partition().apply(tombstone(timestamp, gc_clock::now()));
}
return _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
return _query_state.get_client_state().has_schema_access(_db.real_database(), *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
});
@@ -599,7 +601,7 @@ public:
m_to_apply.partition().apply(tombstone(timestamp, gc_clock::now()));
}
return _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
return _query_state.get_client_state().has_schema_access(_db.real_database(), *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
// This mutation contains only counter tombstones so it can be applied like non-counter mutations.
auto timeout = db::timeout_clock::now() + _timeout_config.counter_write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
@@ -610,9 +612,9 @@ public:
void batch_mutate(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::map<std::string, std::map<std::string, std::vector<Mutation> > > & mutation_map, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
auto p = prepare_mutations(_db.local(), current_keyspace(), mutation_map);
auto p = prepare_mutations(_db, current_keyspace(), mutation_map);
return parallel_for_each(std::move(p.second), [this](auto&& schema) {
return _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::MODIFY);
return _query_state.get_client_state().has_schema_access(_db.real_database(), *schema, auth::permission::MODIFY);
}).then([this, muts = std::move(p.first), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
@@ -623,9 +625,9 @@ public:
void atomic_batch_mutate(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::map<std::string, std::map<std::string, std::vector<Mutation> > > & mutation_map, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
auto p = prepare_mutations(_db.local(), current_keyspace(), mutation_map);
auto p = prepare_mutations(_db, current_keyspace(), mutation_map);
return parallel_for_each(std::move(p.second), [this](auto&& schema) {
return _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::MODIFY);
return _query_state.get_client_state().has_schema_access(_db.real_database(), *schema, auth::permission::MODIFY);
}).then([this, muts = std::move(p.first), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate_atomically(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
@@ -640,8 +642,8 @@ public:
throw make_exception<InvalidRequestException>("keyspace not set");
}
return _query_state.get_client_state().has_column_family_access(_db.local(), current_keyspace(), cfname, auth::permission::MODIFY).then([this, cfname] {
if (_db.local().find_schema(current_keyspace(), cfname)->is_view()) {
return _query_state.get_client_state().has_column_family_access(_db.real_database(), current_keyspace(), cfname, auth::permission::MODIFY).then([this, cfname] {
if (_db.find_schema(current_keyspace(), cfname)->is_view()) {
throw make_exception<InvalidRequestException>("Cannot truncate Materialized Views");
}
return _proxy.local().truncate_blocking(current_keyspace(), cfname);
@@ -703,7 +705,7 @@ public:
auto& proxy = _proxy.local();
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.get_max_result_size(slice),
query::row_limit(row_limit));
auto f = _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::SELECT);
auto f = _query_state.get_client_state().has_schema_access(_db.real_database(), *schema, auth::permission::SELECT);
return f.then([this, &proxy, dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout;
return proxy.query(schema, cmd, {dht::partition_range::make_singular(dk)}, cl_from_thrift(cl), {timeout, std::move(permit), _query_state.get_client_state()}).then(
@@ -737,8 +739,8 @@ public:
with_cob(std::move(cob), std::move(exn_cob), [&] {
validate_login();
std::vector<KsDef> ret;
for (auto&& ks : _db.local().get_keyspaces()) {
ret.emplace_back(get_keyspace_definition(ks.second));
for (auto&& ks : _db.get_keyspaces()) {
ret.emplace_back(get_keyspace_definition(ks));
}
return ret;
});
@@ -746,7 +748,7 @@ public:
void describe_cluster_name(thrift_fn::function<void(std::string const& _return)> cob) {
service_permit permit = obtain_permit();
cob(_db.local().get_config().cluster_name());
cob(_db.get_config().cluster_name());
}
void describe_version(thrift_fn::function<void(std::string const& _return)> cob) {
@@ -757,7 +759,7 @@ public:
void do_describe_ring(thrift_fn::function<void(std::vector<TokenRange> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace, bool local) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] () -> future<std::vector<TokenRange>> {
auto& ks = _db.local().find_keyspace(keyspace);
auto ks = _db.find_keyspace(keyspace);
if (ks.get_replication_strategy().get_type() == locator::replication_strategy_type::local) {
throw make_exception<InvalidRequestException>("There is no ring for the keyspace: {}", keyspace);
}
@@ -808,19 +810,19 @@ public:
void describe_partitioner(thrift_fn::function<void(std::string const& _return)> cob) {
service_permit permit = obtain_permit();
cob(_db.local().get_config().partitioner());
cob(_db.get_config().partitioner());
}
void describe_snitch(thrift_fn::function<void(std::string const& _return)> cob) {
service_permit permit = obtain_permit();
cob(format("org.apache.cassandra.locator.{}", _db.local().get_snitch_name()));
cob(format("org.apache.cassandra.locator.{}", _db.real_database().get_snitch_name()));
}
void describe_keyspace(thrift_fn::function<void(KsDef const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
validate_login();
auto& ks = _db.local().find_keyspace(keyspace);
auto ks = _db.find_keyspace(keyspace);
return get_keyspace_definition(ks);
});
}
@@ -869,18 +871,18 @@ public:
});
}
future<std::string> execute_schema_command(std::function<future<std::vector<mutation>>(service::migration_manager&, replica::database&)> ddl) {
future<std::string> execute_schema_command(std::function<future<std::vector<mutation>>(service::migration_manager&, data_dictionary::database)> ddl) {
distributed<service::migration_manager>& dmm = _query_processor.local().get_migration_manager().container();
auto func = [ddl, &dmm] (replica::database& db) -> future<std::string> {
auto func = [ddl, &dmm] (cql3::query_processor& qp) -> future<std::string> {
auto& mm = dmm.local();
co_await mm.schema_read_barrier();
co_await mm.announce(co_await ddl(mm, db));
co_await mm.announce(co_await ddl(mm, qp.db()));
co_return std::string(db.get_version().to_sstring());
co_return std::string(qp.db().real_database().get_version().to_sstring());
};
co_return co_await _db.invoke_on(0, func);
co_return co_await _query_processor.invoke_on(0, func);
}
void system_add_column_family(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const CfDef& cf_def) {
@@ -889,9 +891,9 @@ public:
auto& t = *this;
auto cf_def = def;
co_await t._query_state.get_client_state().has_keyspace_access(t._db.local(), cf_def.keyspace, auth::permission::CREATE);
co_await t._query_state.get_client_state().has_keyspace_access(t._db.real_database(), cf_def.keyspace, auth::permission::CREATE);
co_return co_await t.execute_schema_command([&cf_def] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
co_return co_await t.execute_schema_command([&cf_def] (service::migration_manager& mm, data_dictionary::database db) -> future<std::vector<mutation>> {
if (!db.has_keyspace(cf_def.keyspace)) {
throw NotFoundException();
}
@@ -909,11 +911,11 @@ public:
with_cob(std::move(cob), std::move(exn_cob), [this, cfm = column_family] () -> future<std::string> {
auto& t = *this;
auto column_family = cfm;
co_await t._query_state.get_client_state().has_column_family_access(t._db.local(), t.current_keyspace(), column_family, auth::permission::DROP);
co_await t._query_state.get_client_state().has_column_family_access(t._db.real_database(), t.current_keyspace(), column_family, auth::permission::DROP);
co_return co_await t.execute_schema_command(
[&column_family, &current_keyspace = t.current_keyspace()] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
auto& cf = db.find_column_family(current_keyspace, column_family);
[&column_family, &current_keyspace = t.current_keyspace()] (service::migration_manager& mm, data_dictionary::database db) -> future<std::vector<mutation>> {
auto cf = db.find_table(current_keyspace, column_family);
if (cf.schema()->is_view()) {
throw make_exception<InvalidRequestException>("Cannot drop Materialized Views from Thrift");
}
@@ -934,7 +936,7 @@ public:
co_await t._query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE);
co_return co_await t.execute_schema_command([&ks_def] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
co_return co_await t.execute_schema_command([&ks_def] (service::migration_manager& mm, data_dictionary::database db) -> future<std::vector<mutation>> {
co_return mm.prepare_new_keyspace_announcement(keyspace_from_thrift(ks_def));
});
});
@@ -946,9 +948,9 @@ public:
auto& t = *this;
auto keyspace = ks;
co_await t._query_state.get_client_state().has_keyspace_access(t._db.local(), keyspace, auth::permission::DROP);
co_await t._query_state.get_client_state().has_keyspace_access(t._db.real_database(), keyspace, auth::permission::DROP);
co_return co_await t.execute_schema_command([&keyspace] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
co_return co_await t.execute_schema_command([&keyspace] (service::migration_manager& mm, data_dictionary::database db) -> future<std::vector<mutation>> {
thrift_validation::validate_keyspace_not_system(keyspace);
if (!db.has_keyspace(keyspace)) {
throw NotFoundException();
@@ -966,9 +968,9 @@ public:
auto ks_def = def;
thrift_validation::validate_keyspace_not_system(ks_def.name);
co_await t._query_state.get_client_state().has_keyspace_access(t._db.local(), ks_def.name, auth::permission::ALTER);
co_await t._query_state.get_client_state().has_keyspace_access(t._db.real_database(), ks_def.name, auth::permission::ALTER);
co_return co_await t.execute_schema_command([&ks_def] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
co_return co_await t.execute_schema_command([&ks_def] (service::migration_manager& mm, data_dictionary::database db) -> future<std::vector<mutation>> {
if (db.has_keyspace(ks_def.name)) {
throw NotFoundException();
}
@@ -988,10 +990,10 @@ public:
auto& t = *this;
auto cf_def = def;
co_await t._query_state.get_client_state().has_schema_access(t._db.local(), cf_def.keyspace, cf_def.name, auth::permission::ALTER);
co_await t._query_state.get_client_state().has_schema_access(t._db.real_database(), cf_def.keyspace, cf_def.name, auth::permission::ALTER);
co_return co_await t.execute_schema_command([&cf_def] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
auto& cf = db.find_column_family(cf_def.keyspace, cf_def.name);
co_return co_await t.execute_schema_command([&cf_def] (service::migration_manager& mm, data_dictionary::database db) -> future<std::vector<mutation>> {
auto cf = db.find_table(cf_def.keyspace, cf_def.name);
auto schema = cf.schema();
if (schema->is_cql3_table()) {
@@ -1237,7 +1239,7 @@ private:
result.__set_rows(std::move(v._rows));
return result;
}
static KsDef get_keyspace_definition(const replica::keyspace& ks) {
static KsDef get_keyspace_definition(const data_dictionary::keyspace& ks) {
auto make_options = [](auto&& m) {
return std::map<std::string, std::string>(m.begin(), m.end());
};
@@ -1430,7 +1432,7 @@ private:
ks_def.durable_writes,
std::move(cf_defs));
}
static schema_ptr lookup_schema(replica::database& db, const sstring& ks_name, const sstring& cf_name) {
static schema_ptr lookup_schema(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) {
if (ks_name.empty()) {
throw make_exception<InvalidRequestException>("keyspace not set");
}
@@ -2024,7 +2026,7 @@ private:
}
return ret;
}
static std::pair<std::vector<mutation>, std::vector<schema_ptr>> prepare_mutations(replica::database& db, const sstring& ks_name, const mutation_map& m) {
static std::pair<std::vector<mutation>, std::vector<schema_ptr>> prepare_mutations(data_dictionary::database db, const sstring& ks_name, const mutation_map& m) {
std::vector<mutation> muts;
std::vector<schema_ptr> schemas;
auto m_by_cf = group_by_cf(const_cast<mutation_map&>(m));
@@ -2051,7 +2053,7 @@ protected:
};
class handler_factory : public CassandraCobSvIfFactory {
distributed<replica::database>& _db;
data_dictionary::database _db;
distributed<cql3::query_processor>& _query_processor;
sharded<service::storage_service>& _ss;
sharded<service::storage_proxy>& _proxy;
@@ -2059,7 +2061,7 @@ class handler_factory : public CassandraCobSvIfFactory {
timeout_config _timeout_config;
service_permit& _current_permit;
public:
explicit handler_factory(distributed<replica::database>& db,
explicit handler_factory(data_dictionary::database db,
distributed<cql3::query_processor>& qp,
sharded<service::storage_service>& ss,
sharded<service::storage_proxy>& proxy,
@@ -2077,7 +2079,7 @@ public:
};
std::unique_ptr<CassandraCobSvIfFactory>
create_handler_factory(distributed<replica::database>& db, distributed<cql3::query_processor>& qp,
create_handler_factory(data_dictionary::database db, distributed<cql3::query_processor>& qp,
sharded<service::storage_service>& ss, sharded<service::storage_proxy>& proxy,
auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit) {
return std::make_unique<handler_factory>(db, qp, ss, proxy, auth_service, timeout_config, current_permit);

View File

@@ -24,8 +24,6 @@
#include "Cassandra.h"
#include "auth/service.hh"
#include "replica/database_fwd.hh"
#include <seastar/core/distributed.hh>
#include "cql3/query_processor.hh"
#include <memory>
@@ -33,6 +31,10 @@ struct timeout_config;
class service_permit;
namespace service { class storage_service; }
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<replica::database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, sharded<service::storage_proxy>& proxy, auth::service&, timeout_config, service_permit& current_permit);
namespace data_dictionary {
class database;
}
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(data_dictionary::database db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, sharded<service::storage_proxy>& proxy, auth::service&, timeout_config, service_permit& current_permit);
#endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */

View File

@@ -22,7 +22,6 @@
#include "server.hh"
#include "handler.hh"
#include "db/config.hh"
#include "replica/database.hh"
#include <seastar/core/future-util.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/metrics.hh>
@@ -66,7 +65,7 @@ public:
thrift_stats(thrift_server& server);
};
thrift_server::thrift_server(distributed<replica::database>& db,
thrift_server::thrift_server(data_dictionary::database db,
distributed<cql3::query_processor>& qp,
sharded<service::storage_service>& ss,
sharded<service::storage_proxy>& proxy,
@@ -78,7 +77,7 @@ thrift_server::thrift_server(distributed<replica::database>& db,
, _protocol_factory(new TBinaryProtocolFactoryT<TMemoryBuffer>())
, _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory))
, _memory_available(ml.get_semaphore())
, _max_concurrent_requests(db.local().get_config().max_concurrent_requests_per_shard)
, _max_concurrent_requests(db.get_config().max_concurrent_requests_per_shard)
, _config(config) {
}

View File

@@ -31,7 +31,6 @@
#include <memory>
#include <cstdint>
#include <boost/intrusive/list.hpp>
#include "replica/database_fwd.hh"
#include "utils/updateable_value.hh"
#include "service_permit.hh"
@@ -78,6 +77,10 @@ class service;
namespace service { class storage_service; }
namespace data_dictionary {
class database;
}
struct thrift_server_config {
::timeout_config timeout_config;
uint64_t max_request_size;
@@ -129,7 +132,7 @@ private:
boost::intrusive::list<connection> _connections_list;
seastar::gate _stop_gate;
public:
thrift_server(distributed<replica::database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, sharded<service::storage_proxy>& proxy, auth::service&, service::memory_limiter& ml, thrift_server_config config);
thrift_server(data_dictionary::database db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, sharded<service::storage_proxy>& proxy, auth::service&, service::memory_limiter& ml, thrift_server_config config);
~thrift_server();
future<> listen(socket_address addr, bool keepalive);
future<> stop();