Merge seatar upstream (seastar namespace)

- introcduced "seastarx.hh" header, which does a "using namespace seastar";
 - 'net' namespace conflicts with seastar::net, renamed to 'netw'.
 - 'transport' namespace conflicts with seastar::transport, renamed to
   cql_transport.
 - "logger" global variables now conflict with logger global type, renamed
   to xlogger.
 - other minor changes
This commit is contained in:
Avi Kivity
2017-05-21 10:07:17 +03:00
parent dab2783b58
commit ebaeefa02b
178 changed files with 1088 additions and 1000 deletions

View File

@@ -29,6 +29,7 @@
#include "utils/histogram.hh"
#include "http/exception.hh"
#include "api_init.hh"
#include "seastarx.hh"
namespace api {

View File

@@ -29,11 +29,11 @@
namespace api {
static logging::logger logger("lsa-api");
static logging::logger alogger("lsa-api");
void set_lsa(http_context& ctx, routes& r) {
httpd::lsa_json::lsa_compact.set(r, [&ctx](std::unique_ptr<request> req) {
logger.info("Triggering compaction");
alogger.info("Triggering compaction");
return ctx.db.invoke_on_all([] (database&) {
logalloc::shard_tracker().reclaim(std::numeric_limits<size_t>::max());
}).then([] {

View File

@@ -27,7 +27,7 @@
#include <sstream>
using namespace httpd::messaging_service_json;
using namespace net;
using namespace netw;
namespace api {
@@ -120,13 +120,13 @@ void set_messaging_service(http_context& ctx, routes& r) {
}));
get_version.set(r, [](const_req req) {
return net::get_local_messaging_service().get_raw_version(req.get_query_param("addr"));
return netw::get_local_messaging_service().get_raw_version(req.get_query_param("addr"));
});
get_dropped_messages_by_ver.set(r, [](std::unique_ptr<request> req) {
shared_ptr<std::vector<uint64_t>> map = make_shared<std::vector<uint64_t>>(num_verb);
return net::get_messaging_service().map_reduce([map](const uint64_t* local_map) mutable {
return netw::get_messaging_service().map_reduce([map](const uint64_t* local_map) mutable {
for (auto i = 0; i < num_verb; i++) {
(*map)[i]+= local_map[i];
}

View File

@@ -61,7 +61,7 @@ const sstring auth::auth::USERS_CF("users");
static const sstring USER_NAME("name");
static const sstring SUPER("super");
static logging::logger logger("auth");
static logging::logger alogger("auth");
// TODO: configurable
using namespace std::chrono_literals;
@@ -123,9 +123,9 @@ public:
}
permissions_cache(const db::config& cfg)
: _cache(cfg.permissions_cache_max_entries(), std::chrono::milliseconds(cfg.permissions_validity_in_ms()), std::chrono::milliseconds(cfg.permissions_update_interval_in_ms()), logger,
: _cache(cfg.permissions_cache_max_entries(), std::chrono::milliseconds(cfg.permissions_validity_in_ms()), std::chrono::milliseconds(cfg.permissions_update_interval_in_ms()), alogger,
[] (const key_type& k) {
logger.debug("Refreshing permissions for {}", k.first.name());
alogger.debug("Refreshing permissions for {}", k.first.name());
return authorizer::get().authorize(::make_shared<authenticated_user>(k.first), k.second);
}) {}
@@ -141,11 +141,15 @@ private:
cache_type _cache;
};
namespace std { // for ADL, yuch
std::ostream& operator<<(std::ostream& os, const std::pair<auth::authenticated_user, auth::data_resource>& p) {
os << "{user: " << p.first.name() << ", data_resource: " << p.second << "}";
return os;
}
}
static distributed<auth::auth::permissions_cache> perm_cache;
/**
@@ -172,7 +176,7 @@ struct waiter {
tmr.cancel();
done.set_exception(std::runtime_error("shutting down"));
}
logger.trace("Deleting scheduled task");
alogger.trace("Deleting scheduled task");
}
void kill() {
}
@@ -186,7 +190,7 @@ static std::vector<waiter_ptr> & thread_waiters() {
}
void auth::auth::schedule_when_up(scheduled_func f) {
logger.trace("Adding scheduled task");
alogger.trace("Adding scheduled task");
auto & waiters = thread_waiters();
@@ -202,7 +206,7 @@ void auth::auth::schedule_when_up(scheduled_func f) {
waiters.erase(i);
}
}).then([f = std::move(f)] {
logger.trace("Running scheduled task");
alogger.trace("Running scheduled task");
return f();
}).handle_exception([](auto ep) {
return make_ready_future();
@@ -262,12 +266,12 @@ future<> auth::auth::setup() {
auto query = sprint("INSERT INTO %s.%s (%s, %s) VALUES (?, ?) USING TIMESTAMP 0",
AUTH_KS, USERS_CF, USER_NAME, SUPER);
cql3::get_local_query_processor().process(query, db::consistency_level::ONE, {DEFAULT_SUPERUSER_NAME, true}).then([](auto) {
logger.info("Created default superuser '{}'", DEFAULT_SUPERUSER_NAME);
alogger.info("Created default superuser '{}'", DEFAULT_SUPERUSER_NAME);
}).handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (exceptions::request_execution_exception&) {
logger.warn("Skipped default superuser setup: some nodes were not ready");
alogger.warn("Skipped default superuser setup: some nodes were not ready");
}
});
}

View File

@@ -43,6 +43,7 @@
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include "seastarx.hh"
namespace auth {

View File

@@ -51,6 +51,8 @@
#include "permission.hh"
#include "data_resource.hh"
#include "seastarx.hh"
namespace auth {
class authenticated_user;

View File

@@ -45,6 +45,7 @@
#include <iosfwd>
#include <set>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
namespace auth {

View File

@@ -62,7 +62,7 @@ static const sstring RESOURCE_NAME = "resource";
static const sstring PERMISSIONS_NAME = "permissions";
static const sstring PERMISSIONS_CF = "permissions";
static logging::logger logger("default_authorizer");
static logging::logger alogger("default_authorizer");
auth::default_authorizer::default_authorizer() {
}
@@ -107,7 +107,7 @@ future<auth::permission_set> auth::default_authorizer::authorize(
}
return make_ready_future<permission_set>(permissions::from_strings(res->one().get_set<sstring>(PERMISSIONS_NAME)));
} catch (exceptions::request_execution_exception& e) {
logger.warn("CassandraAuthorizer failed to authorize {} for {}", user->name(), resource);
alogger.warn("CassandraAuthorizer failed to authorize {} for {}", user->name(), resource);
return make_ready_future<permission_set>(permissions::NONE);
}
});
@@ -196,7 +196,7 @@ future<> auth::default_authorizer::revoke_all(sstring dropped_user) {
try {
std::rethrow_exception(ep);
} catch (exceptions::request_execution_exception& e) {
logger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", dropped_user, e);
alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", dropped_user, e);
}
});
}
@@ -217,13 +217,13 @@ future<> auth::default_authorizer::revoke_all(data_resource resource) {
try {
std::rethrow_exception(ep);
} catch (exceptions::request_execution_exception& e) {
logger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
}
});
});
} catch (exceptions::request_execution_exception& e) {
logger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
return make_ready_future();
}
});

View File

@@ -61,7 +61,7 @@ static const sstring DEFAULT_USER_NAME = auth::auth::DEFAULT_SUPERUSER_NAME;
static const sstring DEFAULT_USER_PASSWORD = auth::auth::DEFAULT_SUPERUSER_NAME;
static const sstring CREDENTIALS_CF = "credentials";
static logging::logger logger("password_authenticator");
static logging::logger plogger("password_authenticator");
auth::password_authenticator::~password_authenticator()
{}
@@ -169,7 +169,7 @@ future<> auth::password_authenticator::init() {
USER_NAME, SALTED_HASH
),
db::consistency_level::ONE, {DEFAULT_USER_NAME, hashpw(DEFAULT_USER_PASSWORD)}).then([](auto) {
logger.info("Created default user '{}'", DEFAULT_USER_NAME);
plogger.info("Created default user '{}'", DEFAULT_USER_NAME);
});
}
});
@@ -302,7 +302,7 @@ const auth::resource_ids& auth::password_authenticator::protected_resources() co
* @throws javax.security.sasl.SaslException
*/
bytes evaluate_response(bytes_view client_response) override {
logger.debug("Decoding credentials from client token");
plogger.debug("Decoding credentials from client token");
sstring username, password;

View File

@@ -44,6 +44,7 @@
#include <unordered_set>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
#include "enum_set.hh"
namespace auth {

View File

@@ -21,6 +21,7 @@
#pragma once
#include "seastarx.hh"
#include "core/sstring.hh"
#include "hashing.hh"
#include <experimental/optional>

View File

@@ -24,6 +24,7 @@
#include <boost/lexical_cast.hpp>
#include "exceptions/exceptions.hh"
#include "json.hh"
#include "seastarx.hh"
class schema;

View File

@@ -46,7 +46,7 @@
#include "service/storage_proxy.hh"
#include "cql3/query_options.hh"
namespace transport {
namespace cql_transport {
namespace messages {
@@ -89,7 +89,7 @@ public:
* @param state the current query state
* @param options options for this query (consistency, variables, pageSize, ...)
*/
virtual future<::shared_ptr<transport::messages::result_message>>
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) = 0;
/**
@@ -97,7 +97,7 @@ public:
*
* @param state the current query state
*/
virtual future<::shared_ptr<transport::messages::result_message>>
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute_internal(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) = 0;
virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const = 0;

View File

@@ -41,6 +41,7 @@
#pragma once
#include "seastarx.hh"
#include <seastar/core/sstring.hh>
#include <antlr3.hpp>

View File

@@ -42,6 +42,7 @@
#pragma once
#include "core/sstring.hh"
#include "seastarx.hh"
#include <experimental/optional>

View File

@@ -44,6 +44,7 @@
#include <cstddef>
#include <iosfwd>
#include "core/sstring.hh"
#include "seastarx.hh"
namespace cql3 {

View File

@@ -54,7 +54,7 @@
namespace cql3 {
using namespace statements;
using namespace transport::messages;
using namespace cql_transport::messages;
logging::logger log("query_processor");
@@ -179,7 +179,7 @@ query_processor::process_statement(::shared_ptr<cql_statement> statement,
statement->validate(_proxy, client_state);
auto fut = make_ready_future<::shared_ptr<transport::messages::result_message>>();
auto fut = make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
if (client_state.is_internal()) {
fut = statement->execute_internal(_proxy, query_state, options);
} else {
@@ -196,24 +196,24 @@ query_processor::process_statement(::shared_ptr<cql_statement> statement,
});
}
future<::shared_ptr<transport::messages::result_message::prepared>>
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
query_processor::prepare(const std::experimental::string_view& query_string, service::query_state& query_state)
{
auto& client_state = query_state.get_client_state();
return prepare(query_string, client_state, client_state.is_thrift());
}
future<::shared_ptr<transport::messages::result_message::prepared>>
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
query_processor::prepare(const std::experimental::string_view& query_string,
const service::client_state& client_state,
bool for_thrift)
{
auto existing = get_stored_prepared_statement(query_string, client_state.get_raw_keyspace(), for_thrift);
if (existing) {
return make_ready_future<::shared_ptr<transport::messages::result_message::prepared>>(existing);
return make_ready_future<::shared_ptr<cql_transport::messages::result_message::prepared>>(existing);
}
return futurize<::shared_ptr<transport::messages::result_message::prepared>>::apply([this, &query_string, &client_state, for_thrift] {
return futurize<::shared_ptr<cql_transport::messages::result_message::prepared>>::apply([this, &query_string, &client_state, for_thrift] {
auto prepared = get_statement(query_string, client_state);
auto bound_terms = prepared->statement->get_bound_terms();
if (bound_terms > std::numeric_limits<uint16_t>::max()) {
@@ -224,7 +224,7 @@ query_processor::prepare(const std::experimental::string_view& query_string,
});
}
::shared_ptr<transport::messages::result_message::prepared>
::shared_ptr<cql_transport::messages::result_message::prepared>
query_processor::get_stored_prepared_statement(const std::experimental::string_view& query_string,
const sstring& keyspace,
bool for_thrift)
@@ -246,7 +246,7 @@ query_processor::get_stored_prepared_statement(const std::experimental::string_v
}
}
future<::shared_ptr<transport::messages::result_message::prepared>>
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
query_processor::store_prepared_statement(const std::experimental::string_view& query_string,
const sstring& keyspace,
std::unique_ptr<statements::prepared_statement> prepared,
@@ -427,7 +427,7 @@ query_processor::process(statements::prepared_statement::checked_weak_ptr p,
});
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
query_processor::process_batch(::shared_ptr<statements::batch_statement> batch,
service::query_state& query_state,
query_options& options)

View File

@@ -275,7 +275,7 @@ public:
}
#endif
public:
future<::shared_ptr<transport::messages::result_message>> process_statement(::shared_ptr<cql_statement> statement,
future<::shared_ptr<cql_transport::messages::result_message>> process_statement(::shared_ptr<cql_statement> statement,
service::query_state& query_state, const query_options& options);
#if 0
@@ -286,7 +286,7 @@ public:
}
#endif
future<::shared_ptr<transport::messages::result_message>> process(const std::experimental::string_view& query_string,
future<::shared_ptr<cql_transport::messages::result_message>> process(const std::experimental::string_view& query_string,
service::query_state& query_state, query_options& options);
#if 0
@@ -434,20 +434,20 @@ public:
}
#endif
future<::shared_ptr<transport::messages::result_message::prepared>>
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
prepare(const std::experimental::string_view& query_string, service::query_state& query_state);
future<::shared_ptr<transport::messages::result_message::prepared>>
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
prepare(const std::experimental::string_view& query_string, const service::client_state& client_state, bool for_thrift);
static bytes compute_id(const std::experimental::string_view& query_string, const sstring& keyspace);
static int32_t compute_thrift_id(const std::experimental::string_view& query_string, const sstring& keyspace);
private:
::shared_ptr<transport::messages::result_message::prepared>
::shared_ptr<cql_transport::messages::result_message::prepared>
get_stored_prepared_statement(const std::experimental::string_view& query_string, const sstring& keyspace, bool for_thrift);
future<::shared_ptr<transport::messages::result_message::prepared>>
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
store_prepared_statement(const std::experimental::string_view& query_string, const sstring& keyspace, std::unique_ptr<statements::prepared_statement> prepared, bool for_thrift);
// Erases the statements for which filter returns true.
@@ -497,7 +497,7 @@ private:
#endif
public:
future<::shared_ptr<transport::messages::result_message>> process_batch(::shared_ptr<statements::batch_statement>,
future<::shared_ptr<cql_transport::messages::result_message>> process_batch(::shared_ptr<statements::batch_statement>,
service::query_state& query_state, query_options& options);
std::unique_ptr<statements::prepared_statement> get_statement(const std::experimental::string_view& query,

View File

@@ -89,10 +89,10 @@ void cql3::statements::alter_keyspace_statement::validate(distributed<service::s
}
}
future<shared_ptr<transport::event::schema_change>> cql3::statements::alter_keyspace_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) {
future<shared_ptr<cql_transport::event::schema_change>> cql3::statements::alter_keyspace_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) {
auto old_ksm = service::get_local_storage_proxy().get_db().local().find_keyspace(_name).metadata();
return service::get_local_migration_manager().announce_keyspace_update(_attrs->as_ks_metadata_update(old_ksm), is_local_only).then([this] {
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,
keyspace());

View File

@@ -61,7 +61,7 @@ public:
future<> check_access(const service::client_state& state) override;
void validate(distributed<service::storage_proxy>& proxy, const service::client_state& state) override;
future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
};

View File

@@ -166,7 +166,7 @@ static void validate_column_rename(const schema& schema, const column_identifier
}
}
future<shared_ptr<transport::event::schema_change>> alter_table_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
{
auto& db = proxy.local().get_db().local();
auto schema = validation::validate_column_family(db, keyspace(), column_family());
@@ -370,7 +370,7 @@ future<shared_ptr<transport::event::schema_change>> alter_table_statement::annou
}
return service::get_local_migration_manager().announce_column_family_update(cfm.build(), false, std::move(view_updates), is_local_only).then([this] {
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,
event::schema_change::target_type::TABLE,

View File

@@ -78,7 +78,7 @@ public:
virtual future<> check_access(const service::client_state& state) override;
virtual void validate(distributed<service::storage_proxy>& proxy, const service::client_state& state) override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
};

View File

@@ -135,14 +135,14 @@ void alter_type_statement::do_announce_migration(database& db, ::keyspace& ks, b
}
}
future<shared_ptr<transport::event::schema_change>> alter_type_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
future<shared_ptr<cql_transport::event::schema_change>> alter_type_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
{
return seastar::async([this, &proxy, is_local_only] {
auto&& db = proxy.local().get_db().local();
try {
auto&& ks = db.find_keyspace(keyspace());
do_announce_migration(db, ks, is_local_only);
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,
event::schema_change::target_type::TYPE,

View File

@@ -63,7 +63,7 @@ public:
virtual const sstring& keyspace() const override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
class add_or_alter;
class renames;

View File

@@ -92,7 +92,7 @@ future<> cql3::statements::alter_user_statement::check_access(const service::cli
});
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::alter_user_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
return auth::auth::is_existing_user(_username).then([this](bool exists) {
if (!exists) {
@@ -104,7 +104,7 @@ cql3::statements::alter_user_statement::execute(distributed<service::storage_pro
return auth::auth::insert_user(_username, *_superuser);
});
}
return f.then([] { return make_ready_future<::shared_ptr<transport::messages::result_message>>(); });
return f.then([] { return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(); });
});
}

View File

@@ -62,7 +62,7 @@ public:
void validate(distributed<service::storage_proxy>&, const service::client_state&) override;
future<> check_access(const service::client_state&) override;
future<::shared_ptr<transport::messages::result_message>> execute(distributed<service::storage_proxy>&
future<::shared_ptr<cql_transport::messages::result_message>> execute(distributed<service::storage_proxy>&
, service::query_state&
, const query_options&) override;
};

View File

@@ -73,7 +73,7 @@ void alter_view_statement::validate(distributed<service::storage_proxy>&, const
// validated in announce_migration()
}
future<shared_ptr<transport::event::schema_change>> alter_view_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
future<shared_ptr<cql_transport::event::schema_change>> alter_view_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
{
auto&& db = proxy.local().get_db().local();
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
@@ -105,7 +105,7 @@ future<shared_ptr<transport::event::schema_change>> alter_view_statement::announ
}
return service::get_local_migration_manager().announce_view_update(view_ptr(builder.build()), is_local_only).then([this] {
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,

View File

@@ -63,7 +63,7 @@ public:
virtual void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
};

View File

@@ -75,7 +75,7 @@ future<> cql3::statements::authentication_statement::check_access(const service:
return make_ready_future<>();
}
future<::shared_ptr<transport::messages::result_message>> cql3::statements::authentication_statement::execute_internal(
future<::shared_ptr<cql_transport::messages::result_message>> cql3::statements::authentication_statement::execute_internal(
distributed<service::storage_proxy>& proxy,
service::query_state& state, const query_options& options) {
// Internal queries are exclusively on the system keyspace and makes no sense here

View File

@@ -66,7 +66,7 @@ public:
void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
execute_internal(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) override;
};

View File

@@ -75,7 +75,7 @@ future<> cql3::statements::authorization_statement::check_access(const service::
return make_ready_future<>();
}
future<::shared_ptr<transport::messages::result_message>> cql3::statements::authorization_statement::execute_internal(
future<::shared_ptr<cql_transport::messages::result_message>> cql3::statements::authorization_statement::execute_internal(
distributed<service::storage_proxy>& proxy,
service::query_state& state, const query_options& options) {
// Internal queries are exclusively on the system keyspace and makes no sense here

View File

@@ -70,7 +70,7 @@ public:
void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
execute_internal(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) override;
protected:

View File

@@ -273,14 +273,14 @@ struct batch_statement_executor {
};
static thread_local auto batch_stage = seastar::make_execution_stage("cql3_batch", batch_statement_executor::get());
future<shared_ptr<transport::messages::result_message>> batch_statement::execute(
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute(
distributed<service::storage_proxy>& storage, service::query_state& state, const query_options& options) {
++_stats.batches;
return batch_stage(this, seastar::ref(storage), seastar::ref(state),
seastar::cref(options), false, options.get_timestamp(state));
}
future<shared_ptr<transport::messages::result_message>> batch_statement::do_execute(
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_execute(
distributed<service::storage_proxy>& storage,
service::query_state& query_state, const query_options& options,
bool local, api::timestamp_type now)
@@ -299,8 +299,8 @@ future<shared_ptr<transport::messages::result_message>> batch_statement::do_exec
return get_mutations(storage, options, local, now, query_state.get_trace_state()).then([this, &storage, &options, tr_state = query_state.get_trace_state()] (std::vector<mutation> ms) mutable {
return execute_without_conditions(storage, std::move(ms), options.get_consistency(), std::move(tr_state));
}).then([] {
return make_ready_future<shared_ptr<transport::messages::result_message>>(
make_shared<transport::messages::result_message::void_message>());
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
make_shared<cql_transport::messages::result_message::void_message>());
});
}
@@ -338,7 +338,7 @@ future<> batch_statement::execute_without_conditions(
return storage.local().mutate_with_triggers(std::move(mutations), cl, mutate_atomic, std::move(tr_state));
}
future<shared_ptr<transport::messages::result_message>> batch_statement::execute_with_conditions(
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute_with_conditions(
distributed<service::storage_proxy>& storage,
const query_options& options,
service::query_state& state)
@@ -391,7 +391,7 @@ future<shared_ptr<transport::messages::result_message>> batch_statement::execute
#endif
}
future<shared_ptr<transport::messages::result_message>> batch_statement::execute_internal(
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute_internal(
distributed<service::storage_proxy>& proxy,
service::query_state& query_state, const query_options& options)
{

View File

@@ -120,11 +120,11 @@ public:
*/
static void verify_batch_size(const std::vector<mutation>& mutations);
virtual future<shared_ptr<transport::messages::result_message>> execute(
virtual future<shared_ptr<cql_transport::messages::result_message>> execute(
distributed<service::storage_proxy>& storage, service::query_state& state, const query_options& options) override;
private:
friend class batch_statement_executor;
future<shared_ptr<transport::messages::result_message>> do_execute(
future<shared_ptr<cql_transport::messages::result_message>> do_execute(
distributed<service::storage_proxy>& storage,
service::query_state& query_state, const query_options& options,
bool local, api::timestamp_type now);
@@ -135,12 +135,12 @@ private:
db::consistency_level cl,
tracing::trace_state_ptr tr_state);
future<shared_ptr<transport::messages::result_message>> execute_with_conditions(
future<shared_ptr<cql_transport::messages::result_message>> execute_with_conditions(
distributed<service::storage_proxy>& storage,
const query_options& options,
service::query_state& state);
public:
virtual future<shared_ptr<transport::messages::result_message>> execute_internal(
virtual future<shared_ptr<cql_transport::messages::result_message>> execute_internal(
distributed<service::storage_proxy>& proxy,
service::query_state& query_state, const query_options& options) override;

View File

@@ -202,7 +202,7 @@ void create_index_statement::validate_targets_for_multi_column_index(std::vector
}
}
future<::shared_ptr<transport::event::schema_change>>
future<::shared_ptr<cql_transport::event::schema_change>>
create_index_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) {
if (!service::get_local_storage_service().cluster_supports_indexes()) {
throw exceptions::invalid_request_exception("Index support is not enabled");
@@ -233,7 +233,7 @@ create_index_statement::announce_migration(distributed<service::storage_proxy>&
auto existing_index = schema->find_index_noname(index);
if (existing_index) {
if (_if_not_exists) {
return make_ready_future<::shared_ptr<transport::event::schema_change>>(nullptr);
return make_ready_future<::shared_ptr<cql_transport::event::schema_change>>(nullptr);
} else {
throw exceptions::invalid_request_exception(
sprint("Index %s is a duplicate of existing index %s", index.name(), existing_index.value().name()));
@@ -243,7 +243,7 @@ create_index_statement::announce_migration(distributed<service::storage_proxy>&
builder.with_index(index);
return service::get_local_migration_manager().announce_column_family_update(
builder.build(), false, {}, is_local_only).then([this]() {
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,
event::schema_change::target_type::TABLE,

View File

@@ -79,7 +79,7 @@ public:
future<> check_access(const service::client_state& state) override;
void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
future<::shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>&, bool is_local_only) override;
future<::shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>&, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
private:

View File

@@ -101,20 +101,20 @@ void create_keyspace_statement::validate(distributed<service::storage_proxy>&, c
#endif
}
future<shared_ptr<transport::event::schema_change>> create_keyspace_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
future<shared_ptr<cql_transport::event::schema_change>> create_keyspace_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
{
return make_ready_future<>().then([this, is_local_only] {
return service::get_local_migration_manager().announce_new_keyspace(_attrs->as_ks_metadata(_name), is_local_only);
}).then_wrapped([this] (auto&& f) {
try {
f.get();
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::CREATED,
this->keyspace());
} catch (const exceptions::already_exists_exception& e) {
if (_if_not_exists) {
return ::shared_ptr<transport::event::schema_change>();
return ::shared_ptr<cql_transport::event::schema_change>();
}
throw e;
}

View File

@@ -81,7 +81,7 @@ public:
*/
virtual void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
};

View File

@@ -89,13 +89,13 @@ std::vector<column_definition> create_table_statement::get_columns()
return column_defs;
}
future<shared_ptr<transport::event::schema_change>> create_table_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) {
future<shared_ptr<cql_transport::event::schema_change>> create_table_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) {
return make_ready_future<>().then([this, is_local_only] {
return service::get_local_migration_manager().announce_new_column_family(get_cf_meta_data(), is_local_only);
}).then_wrapped([this] (auto&& f) {
try {
f.get();
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::CREATED,
event::schema_change::target_type::TABLE,
@@ -103,7 +103,7 @@ future<shared_ptr<transport::event::schema_change>> create_table_statement::anno
this->column_family());
} catch (const exceptions::already_exists_exception& e) {
if (_if_not_exists) {
return ::shared_ptr<transport::event::schema_change>();
return ::shared_ptr<cql_transport::event::schema_change>();
}
throw e;
}

View File

@@ -100,7 +100,7 @@ public:
virtual void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;

View File

@@ -129,7 +129,7 @@ inline user_type create_type_statement::create_type(database& db)
std::move(field_names), std::move(field_types));
}
future<shared_ptr<transport::event::schema_change>> create_type_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
future<shared_ptr<cql_transport::event::schema_change>> create_type_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
{
auto&& db = proxy.local().get_db().local();
@@ -138,13 +138,13 @@ future<shared_ptr<transport::event::schema_change>> create_type_statement::annou
// Can happen with if_not_exists
if (type_exists_in(ks)) {
return make_ready_future<::shared_ptr<transport::event::schema_change>>();
return make_ready_future<::shared_ptr<cql_transport::event::schema_change>>();
}
auto type = create_type(db);
check_for_duplicate_names(type);
return service::get_local_migration_manager().announce_new_type(type, is_local_only).then([this] {
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::CREATED,

View File

@@ -65,7 +65,7 @@ public:
virtual const sstring& keyspace() const override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;

View File

@@ -64,7 +64,7 @@ void cql3::statements::create_user_statement::validate(distributed<service::stor
// we need to query -> continuation, and this is not a continuation method
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::create_user_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
return state.get_client_state().user()->is_super().then([this](bool is_super) {
if (!is_super) {
@@ -75,11 +75,11 @@ cql3::statements::create_user_statement::execute(distributed<service::storage_pr
throw exceptions::invalid_request_exception(sprint("User %s already exists", _username));
}
if (exists && _if_not_exists) {
make_ready_future<::shared_ptr<transport::messages::result_message>>();
make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
}
return auth::authenticator::get().create(_username, _opts->options()).then([this] {
return auth::auth::insert_user(_username, _superuser).then([] {
return make_ready_future<::shared_ptr<transport::messages::result_message>>();
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
});
});
});

View File

@@ -60,7 +60,7 @@ public:
void validate(distributed<service::storage_proxy>&, const service::client_state&) override;
future<::shared_ptr<transport::messages::result_message>> execute(distributed<service::storage_proxy>&
future<::shared_ptr<cql_transport::messages::result_message>> execute(distributed<service::storage_proxy>&
, service::query_state&
, const query_options&) override;
};

View File

@@ -139,7 +139,7 @@ static bool validate_primary_key(
return false;
}
future<shared_ptr<transport::event::schema_change>> create_view_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) {
future<shared_ptr<cql_transport::event::schema_change>> create_view_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) {
// We need to make sure that:
// - primary key includes all columns in base table's primary key
// - make sure that the select statement does not have anything other than columns
@@ -328,7 +328,7 @@ future<shared_ptr<transport::event::schema_change>> create_view_statement::annou
}).then_wrapped([this] (auto&& f) {
try {
f.get();
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::CREATED,
event::schema_change::target_type::TABLE,
@@ -336,7 +336,7 @@ future<shared_ptr<transport::event::schema_change>> create_view_statement::annou
this->column_family());
} catch (const exceptions::already_exists_exception& e) {
if (_if_not_exists) {
return ::shared_ptr<transport::event::schema_change>();
return ::shared_ptr<cql_transport::event::schema_change>();
}
throw e;
}

View File

@@ -68,7 +68,7 @@ public:
// Functions we need to override to subclass schema_altering_statement
virtual future<> check_access(const service::client_state& state) override;
virtual void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
// FIXME: continue here. See create_table_statement.hh and CreateViewStatement.java

View File

@@ -77,14 +77,14 @@ void drop_index_statement::validate(distributed<service::storage_proxy>&, const
// validated in lookup_indexed_table()
}
future<shared_ptr<transport::event::schema_change>> drop_index_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
future<shared_ptr<cql_transport::event::schema_change>> drop_index_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
{
if (!service::get_local_storage_service().cluster_supports_indexes()) {
throw exceptions::invalid_request_exception("Index support is not enabled");
}
auto cfm = lookup_indexed_table();
if (!cfm) {
return make_ready_future<::shared_ptr<transport::event::schema_change>>(nullptr);
return make_ready_future<::shared_ptr<cql_transport::event::schema_change>>(nullptr);
}
auto builder = schema_builder(cfm);
builder.without_index(_index_name);
@@ -92,7 +92,7 @@ future<shared_ptr<transport::event::schema_change>> drop_index_statement::announ
// Dropping an index is akin to updating the CF
// Note that we shouldn't call columnFamily() at this point because the index has been dropped and the call to lookupIndexedTable()
// in that method would now throw.
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(event::schema_change::change_type::UPDATED,
event::schema_change::target_type::TABLE,
cfm->ks_name(),

View File

@@ -65,7 +65,7 @@ public:
virtual void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
private:

View File

@@ -73,20 +73,20 @@ const sstring& drop_keyspace_statement::keyspace() const
return _keyspace;
}
future<shared_ptr<transport::event::schema_change>> drop_keyspace_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
future<shared_ptr<cql_transport::event::schema_change>> drop_keyspace_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
{
return make_ready_future<>().then([this, is_local_only] {
return service::get_local_migration_manager().announce_keyspace_drop(_keyspace, is_local_only);
}).then_wrapped([this] (auto&& f) {
try {
f.get();
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::DROPPED,
this->keyspace());
} catch (const exceptions::configuration_exception& e) {
if (_if_exists) {
return ::shared_ptr<transport::event::schema_change>();
return ::shared_ptr<cql_transport::event::schema_change>();
}
throw e;
}

View File

@@ -59,7 +59,7 @@ public:
virtual const sstring& keyspace() const override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
};

View File

@@ -72,14 +72,14 @@ void drop_table_statement::validate(distributed<service::storage_proxy>&, const
// validated in announce_migration()
}
future<shared_ptr<transport::event::schema_change>> drop_table_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
future<shared_ptr<cql_transport::event::schema_change>> drop_table_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
{
return make_ready_future<>().then([this, is_local_only] {
return service::get_local_migration_manager().announce_column_family_drop(keyspace(), column_family(), is_local_only);
}).then_wrapped([this] (auto&& f) {
try {
f.get();
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::DROPPED,
event::schema_change::target_type::TABLE,
@@ -87,7 +87,7 @@ future<shared_ptr<transport::event::schema_change>> drop_table_statement::announ
this->column_family());
} catch (const exceptions::configuration_exception& e) {
if (_if_exists) {
return ::shared_ptr<transport::event::schema_change>();
return ::shared_ptr<cql_transport::event::schema_change>();
}
throw e;
}

View File

@@ -58,7 +58,7 @@ public:
virtual void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
};

View File

@@ -140,7 +140,7 @@ const sstring& drop_type_statement::keyspace() const
return _name.get_keyspace();
}
future<shared_ptr<transport::event::schema_change>> drop_type_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
future<shared_ptr<cql_transport::event::schema_change>> drop_type_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
{
auto&& db = proxy.local().get_db().local();
@@ -152,11 +152,11 @@ future<shared_ptr<transport::event::schema_change>> drop_type_statement::announc
// Can happen with if_exists
if (to_drop == all_types.end()) {
return make_ready_future<::shared_ptr<transport::event::schema_change>>();
return make_ready_future<::shared_ptr<cql_transport::event::schema_change>>();
}
return service::get_local_migration_manager().announce_type_drop(to_drop->second, is_local_only).then([this] {
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(
event::schema_change::change_type::DROPPED,

View File

@@ -61,7 +61,7 @@ public:
virtual const sstring& keyspace() const override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
};

View File

@@ -63,7 +63,7 @@ void cql3::statements::drop_user_statement::validate(distributed<service::storag
}
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::drop_user_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
return state.get_client_state().user()->is_super().then([this](bool is_super) {
if (!is_super) {
@@ -75,7 +75,7 @@ cql3::statements::drop_user_statement::execute(distributed<service::storage_prox
throw exceptions::invalid_request_exception(sprint("User %s doesn't exist", _username));
}
if (_if_exists && !exists) {
return make_ready_future<::shared_ptr<transport::messages::result_message>>();
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
}
// clean up permissions after the dropped user.
@@ -84,7 +84,7 @@ cql3::statements::drop_user_statement::execute(distributed<service::storage_prox
return auth::authenticator::get().drop(_username);
});
}).then([] {
return make_ready_future<::shared_ptr<transport::messages::result_message>>();
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
});
});
});

View File

@@ -56,7 +56,7 @@ public:
void validate(distributed<service::storage_proxy>&, const service::client_state&) override;
future<::shared_ptr<transport::messages::result_message>> execute(distributed<service::storage_proxy>&
future<::shared_ptr<cql_transport::messages::result_message>> execute(distributed<service::storage_proxy>&
, service::query_state&
, const query_options&) override;
};

View File

@@ -72,14 +72,14 @@ void drop_view_statement::validate(distributed<service::storage_proxy>&, const s
// validated in migration_manager::announce_view_drop()
}
future<shared_ptr<transport::event::schema_change>> drop_view_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
future<shared_ptr<cql_transport::event::schema_change>> drop_view_statement::announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only)
{
return make_ready_future<>().then([this, is_local_only] {
return service::get_local_migration_manager().announce_view_drop(keyspace(), column_family(), is_local_only);
}).then_wrapped([this] (auto&& f) {
try {
f.get();
using namespace transport;
using namespace cql_transport;
return make_shared<event::schema_change>(event::schema_change::change_type::DROPPED,
event::schema_change::target_type::TABLE,
@@ -87,7 +87,7 @@ future<shared_ptr<transport::event::schema_change>> drop_view_statement::announc
this->column_family());
} catch (const exceptions::configuration_exception& e) {
if (_if_exists) {
return ::shared_ptr<transport::event::schema_change>();
return ::shared_ptr<cql_transport::event::schema_change>();
}
throw e;
}

View File

@@ -63,7 +63,7 @@ public:
virtual void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
virtual future<shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual future<shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) override;
virtual std::unique_ptr<prepared> prepare(database& db, cql_stats& stats) override;
};

View File

@@ -42,9 +42,9 @@
#include "grant_statement.hh"
#include "auth/authorizer.hh"
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::grant_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
return auth::authorizer::get().grant(state.get_client_state().user(), _permissions, _resource, _username).then([] {
return make_ready_future<::shared_ptr<transport::messages::result_message>>();
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
});
}

View File

@@ -51,7 +51,7 @@ class grant_statement : public permission_altering_statement {
public:
using permission_altering_statement::permission_altering_statement;
future<::shared_ptr<transport::messages::result_message>> execute(distributed<service::storage_proxy>&
future<::shared_ptr<cql_transport::messages::result_message>> execute(distributed<service::storage_proxy>&
, service::query_state&
, const query_options&) override;
};

View File

@@ -81,7 +81,7 @@ future<> cql3::statements::list_permissions_statement::check_access(const servic
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::list_permissions_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
static auto make_column = [](sstring name) {
return ::make_shared<column_specification>(auth::auth::AUTH_KS, "permissions", ::make_shared<column_identifier>(std::move(name), true), utf8_type);
@@ -125,7 +125,7 @@ cql3::statements::list_permissions_statement::execute(distributed<service::stora
}
}
auto rows = ::make_shared<transport::messages::result_message::rows>(std::move(rs));
return ::shared_ptr<transport::messages::result_message>(std::move(rows));
auto rows = ::make_shared<cql_transport::messages::result_message::rows>(std::move(rs));
return ::shared_ptr<cql_transport::messages::result_message>(std::move(rows));
});
}

View File

@@ -63,7 +63,7 @@ public:
void validate(distributed<service::storage_proxy>&, const service::client_state&) override;
future<> check_access(const service::client_state&) override;
future<::shared_ptr<transport::messages::result_message>> execute(distributed<service::storage_proxy>&
future<::shared_ptr<cql_transport::messages::result_message>> execute(distributed<service::storage_proxy>&
, service::query_state&
, const query_options&) override;
};

View File

@@ -52,7 +52,7 @@ future<> cql3::statements::list_users_statement::check_access(const service::cli
return make_ready_future();
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::list_users_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
auto is = std::make_unique<service::query_state>(service::client_state::for_internal_calls());
auto io = std::make_unique<query_options>(db::consistency_level::QUORUM, std::vector<cql3::raw_value>{});

View File

@@ -52,7 +52,7 @@ class list_users_statement : public authentication_statement {
public:
void validate(distributed<service::storage_proxy>&, const service::client_state&) override;
future<> check_access(const service::client_state&) override;
future<::shared_ptr<transport::messages::result_message>> execute(distributed<service::storage_proxy>&
future<::shared_ptr<cql_transport::messages::result_message>> execute(distributed<service::storage_proxy>&
, service::query_state&
, const query_options&) override;
};

View File

@@ -353,12 +353,12 @@ struct modification_statement_executor {
};
static thread_local auto modify_stage = seastar::make_execution_stage("cql3_modification", modification_statement_executor::get());
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
modification_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& qs, const query_options& options) {
return modify_stage(this, seastar::ref(proxy), seastar::ref(qs), seastar::cref(options));
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
modification_statement::do_execute(distributed<service::storage_proxy>& proxy, service::query_state& qs, const query_options& options) {
if (has_conditions() && options.get_protocol_version() == 1) {
throw exceptions::invalid_request_exception("Conditional updates are not supported by the protocol version in use. You need to upgrade to a driver using the native protocol v2.");
@@ -373,8 +373,8 @@ modification_statement::do_execute(distributed<service::storage_proxy>& proxy, s
inc_cql_stats();
return execute_without_condition(proxy, qs, options).then([] {
return make_ready_future<::shared_ptr<transport::messages::result_message>>(
::shared_ptr<transport::messages::result_message>{});
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(
::shared_ptr<cql_transport::messages::result_message>{});
});
}
@@ -396,7 +396,7 @@ modification_statement::execute_without_condition(distributed<service::storage_p
});
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
modification_statement::execute_with_condition(distributed<service::storage_proxy>& proxy, service::query_state& qs, const query_options& options) {
fail(unimplemented::cause::LWT);
#if 0
@@ -424,7 +424,7 @@ modification_statement::execute_with_condition(distributed<service::storage_prox
#endif
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
modification_statement::execute_internal(distributed<service::storage_proxy>& proxy, service::query_state& qs, const query_options& options) {
if (has_conditions()) {
throw exceptions::unsupported_operation_exception();
@@ -439,8 +439,8 @@ modification_statement::execute_internal(distributed<service::storage_proxy>& pr
return proxy.local().mutate_locally(std::move(mutations));
}).then(
[] {
return make_ready_future<::shared_ptr<transport::messages::result_message>>(
::shared_ptr<transport::messages::result_message> {});
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(
::shared_ptr<cql_transport::messages::result_message> {});
});
}

View File

@@ -205,23 +205,23 @@ protected:
db::consistency_level cl,
tracing::trace_state_ptr trace_state);
private:
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
do_execute(distributed<service::storage_proxy>& proxy, service::query_state& qs, const query_options& options);
friend class modification_statement_executor;
public:
bool has_conditions();
virtual future<::shared_ptr<transport::messages::result_message>>
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(distributed<service::storage_proxy>& proxy, service::query_state& qs, const query_options& options) override;
virtual future<::shared_ptr<transport::messages::result_message>>
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute_internal(distributed<service::storage_proxy>& proxy, service::query_state& qs, const query_options& options) override;
private:
future<>
execute_without_condition(distributed<service::storage_proxy>& proxy, service::query_state& qs, const query_options& options);
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
execute_with_condition(distributed<service::storage_proxy>& proxy, service::query_state& qs, const query_options& options);
#if 0

View File

@@ -42,9 +42,9 @@
#include "revoke_statement.hh"
#include "auth/authorizer.hh"
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::revoke_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
return auth::authorizer::get().revoke(state.get_client_state().user(), _permissions, _resource, _username).then([] {
return make_ready_future<::shared_ptr<transport::messages::result_message>>();
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>();
});
}

View File

@@ -51,7 +51,7 @@ class revoke_statement : public permission_altering_statement {
public:
using permission_altering_statement::permission_altering_statement;
future<::shared_ptr<transport::messages::result_message>> execute(distributed<service::storage_proxy>&
future<::shared_ptr<cql_transport::messages::result_message>> execute(distributed<service::storage_proxy>&
, service::query_state&
, const query_options&) override;
};

View File

@@ -55,7 +55,7 @@ namespace cql3 {
namespace statements {
namespace messages = transport::messages;
namespace messages = cql_transport::messages;
/**
* Abstract class for statements that alter the schema.
@@ -81,7 +81,7 @@ protected:
virtual void prepare_keyspace(const service::client_state& state) override;
virtual future<::shared_ptr<transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) = 0;
virtual future<::shared_ptr<cql_transport::event::schema_change>> announce_migration(distributed<service::storage_proxy>& proxy, bool is_local_only) = 0;
virtual future<::shared_ptr<messages::result_message>>
execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) override;

View File

@@ -233,7 +233,7 @@ struct select_statement_executor {
};
static thread_local auto select_stage = seastar::make_execution_stage("cql3_select", select_statement_executor::get());
future<shared_ptr<transport::messages::result_message>>
future<shared_ptr<cql_transport::messages::result_message>>
select_statement::execute(distributed<service::storage_proxy>& proxy,
service::query_state& state,
const query_options& options)
@@ -241,7 +241,7 @@ select_statement::execute(distributed<service::storage_proxy>& proxy,
return select_stage(this, seastar::ref(proxy), seastar::ref(state), seastar::cref(options));
}
future<shared_ptr<transport::messages::result_message>>
future<shared_ptr<cql_transport::messages::result_message>>
select_statement::do_execute(distributed<service::storage_proxy>& proxy,
service::query_state& state,
const query_options& options)
@@ -293,8 +293,8 @@ select_statement::do_execute(distributed<service::storage_proxy>& proxy,
}
).then([&builder] {
auto rs = builder.build();
auto msg = ::make_shared<transport::messages::result_message::rows>(std::move(rs));
return make_ready_future<shared_ptr<transport::messages::result_message>>(std::move(msg));
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(std::move(rs));
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(msg));
});
});
}
@@ -312,12 +312,12 @@ select_statement::do_execute(distributed<service::storage_proxy>& proxy,
rs->get_metadata().set_has_more_pages(p->state());
}
auto msg = ::make_shared<transport::messages::result_message::rows>(std::move(rs));
return make_ready_future<shared_ptr<transport::messages::result_message>>(std::move(msg));
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(std::move(rs));
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(msg));
});
}
future<shared_ptr<transport::messages::result_message>>
future<shared_ptr<cql_transport::messages::result_message>>
select_statement::execute(distributed<service::storage_proxy>& proxy,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
@@ -349,7 +349,7 @@ select_statement::execute(distributed<service::storage_proxy>& proxy,
}
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
select_statement::execute_internal(distributed<service::storage_proxy>& proxy,
service::query_state& state,
const query_options& options)
@@ -383,7 +383,7 @@ select_statement::execute_internal(distributed<service::storage_proxy>& proxy,
}
}
shared_ptr<transport::messages::result_message>
shared_ptr<cql_transport::messages::result_message>
select_statement::process_results(foreign_ptr<lw_shared_ptr<query::result>> results,
lw_shared_ptr<query::read_command> cmd,
const query_options& options,
@@ -403,7 +403,7 @@ select_statement::process_results(foreign_ptr<lw_shared_ptr<query::result>> resu
}
rs->trim(cmd->row_limit);
}
return ::make_shared<transport::messages::result_message::rows>(std::move(rs));
return ::make_shared<cql_transport::messages::result_message::rows>(std::move(rs));
}
::shared_ptr<restrictions::statement_restrictions> select_statement::get_restrictions() const {

View File

@@ -91,7 +91,7 @@ private:
query::partition_slice::option_set _opts;
cql_stats& _stats;
private:
future<::shared_ptr<transport::messages::result_message>> do_execute(distributed<service::storage_proxy>& proxy,
future<::shared_ptr<cql_transport::messages::result_message>> do_execute(distributed<service::storage_proxy>& proxy,
service::query_state& state, const query_options& options);
friend class select_statement_executor;
public:
@@ -120,17 +120,17 @@ public:
virtual bool depends_on_keyspace(const sstring& ks_name) const;
virtual bool depends_on_column_family(const sstring& cf_name) const;
virtual future<::shared_ptr<transport::messages::result_message>> execute(distributed<service::storage_proxy>& proxy,
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute(distributed<service::storage_proxy>& proxy,
service::query_state& state, const query_options& options) override;
virtual future<::shared_ptr<transport::messages::result_message>> execute_internal(distributed<service::storage_proxy>& proxy,
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_internal(distributed<service::storage_proxy>& proxy,
service::query_state& state, const query_options& options) override;
future<::shared_ptr<transport::messages::result_message>> execute(distributed<service::storage_proxy>& proxy,
future<::shared_ptr<cql_transport::messages::result_message>> execute(distributed<service::storage_proxy>& proxy,
lw_shared_ptr<query::read_command> cmd, dht::partition_range_vector&& partition_ranges, service::query_state& state,
const query_options& options, gc_clock::time_point now);
shared_ptr<transport::messages::result_message> process_results(foreign_ptr<lw_shared_ptr<query::result>> results,
shared_ptr<cql_transport::messages::result_message> process_results(foreign_ptr<lw_shared_ptr<query::result>> results,
lw_shared_ptr<query::read_command> cmd, const query_options& options, gc_clock::time_point now);
#if 0
private ResultMessage.Rows pageAggregateQuery(QueryPager pager, QueryOptions options, int pageSize, long now)

View File

@@ -92,17 +92,17 @@ void truncate_statement::validate(distributed<service::storage_proxy>&, const se
#endif
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
truncate_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options)
{
return service::get_local_storage_proxy().truncate_blocking(keyspace(), column_family()).handle_exception([](auto ep) {
throw exceptions::truncate_exception(ep);
}).then([] {
return ::shared_ptr<transport::messages::result_message>{};
return ::shared_ptr<cql_transport::messages::result_message>{};
});
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
truncate_statement::execute_internal(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options)
{
throw std::runtime_error("unsupported operation");

View File

@@ -68,10 +68,10 @@ public:
virtual void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
virtual future<::shared_ptr<transport::messages::result_message>>
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) override;
virtual future<::shared_ptr<transport::messages::result_message>>
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute_internal(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) override;
};

View File

@@ -97,14 +97,14 @@ void use_statement::validate(distributed<service::storage_proxy>&, const service
{
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
use_statement::execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
state.get_client_state().set_keyspace(proxy.local().get_db(), _keyspace);
auto result =::make_shared<transport::messages::result_message::set_keyspace>(_keyspace);
return make_ready_future<::shared_ptr<transport::messages::result_message>>(result);
auto result =::make_shared<cql_transport::messages::result_message::set_keyspace>(_keyspace);
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(result);
}
future<::shared_ptr<transport::messages::result_message>>
future<::shared_ptr<cql_transport::messages::result_message>>
use_statement::execute_internal(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) {
// Internal queries are exclusively on the system keyspace and 'use' is thus useless
throw std::runtime_error("unsupported operation");

View File

@@ -69,10 +69,10 @@ public:
virtual void validate(distributed<service::storage_proxy>&, const service::client_state& state) override;
virtual future<::shared_ptr<transport::messages::result_message>>
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) override;
virtual future<::shared_ptr<transport::messages::result_message>>
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute_internal(distributed<service::storage_proxy>& proxy, service::query_state& state, const query_options& options) override;
};

View File

@@ -67,7 +67,7 @@ bool cql3::untyped_result_set::row::has(const sstring& name) const {
return i != _data.end() && i->second;
}
using transport::messages::result_message;
using cql_transport::messages::result_message;
cql3::untyped_result_set::untyped_result_set(::shared_ptr<result_message> msg)
: _rows([msg]{

View File

@@ -142,7 +142,7 @@ public:
typedef std::vector<row> rows_type;
using const_iterator = rows_type::const_iterator;
untyped_result_set(::shared_ptr<transport::messages::result_message>);
untyped_result_set(::shared_ptr<cql_transport::messages::result_message>);
untyped_result_set(untyped_result_set&&) = default;
const_iterator begin() const {

View File

@@ -69,7 +69,7 @@
#include "idl/frozen_schema.dist.impl.hh"
#include "message/messaging_service.hh"
static logging::logger logger("batchlog_manager");
static logging::logger blogger("batchlog_manager");
const uint32_t db::batchlog_manager::replay_interval;
const uint32_t db::batchlog_manager::page_size;
@@ -94,11 +94,11 @@ future<> db::batchlog_manager::do_batch_log_replay() {
return bm._cpu++ % smp::count;
});
}).then([] (auto dest) {
logger.debug("Batchlog replay on shard {}: starts", dest);
blogger.debug("Batchlog replay on shard {}: starts", dest);
return get_batchlog_manager().invoke_on(dest, [] (auto& bm) {
return bm.replay_all_failed_batches();
}).then([dest] {
logger.debug("Batchlog replay on shard {}: done", dest);
blogger.debug("Batchlog replay on shard {}: done", dest);
});
}).finally([] {
return get_batchlog_manager().invoke_on(0, [] (auto& bm) {
@@ -117,7 +117,7 @@ future<> db::batchlog_manager::start() {
if (engine().cpu_id() == 0) {
_timer.set_callback([this] {
return do_batch_log_replay().handle_exception([] (auto ep) {
logger.error("Exception in batch replay: {}", ep);
blogger.error("Exception in batch replay: {}", ep);
}).finally([this] {
_timer.arm(lowres_clock::now() + std::chrono::milliseconds(replay_interval));
});
@@ -188,25 +188,25 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
auto timeout = get_batch_log_timeout();
if (db_clock::now() < written_at + timeout) {
logger.debug("Skipping replay of {}, too fresh", id);
blogger.debug("Skipping replay of {}, too fresh", id);
return make_ready_future<>();
}
// check version of serialization format
if (!row.has("version")) {
logger.warn("Skipping logged batch because of unknown version");
blogger.warn("Skipping logged batch because of unknown version");
return make_ready_future<>();
}
auto version = row.get_as<int32_t>("version");
if (version != net::messaging_service::current_version) {
logger.warn("Skipping logged batch because of incorrect version");
if (version != netw::messaging_service::current_version) {
blogger.warn("Skipping logged batch because of incorrect version");
return make_ready_future<>();
}
auto data = row.get_blob("data");
logger.debug("Replaying batch {}", id);
blogger.debug("Replaying batch {}", id);
auto fms = make_lw_shared<std::deque<canonical_mutation>>();
auto in = ser::as_input_stream(data);
@@ -292,7 +292,7 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
};
return seastar::with_gate(_gate, [this, batch = std::move(batch)] {
logger.debug("Started replayAllFailedBatches (cpu {})", engine().cpu_id());
blogger.debug("Started replayAllFailedBatches (cpu {})", engine().cpu_id());
typedef ::shared_ptr<cql3::untyped_result_set> page_ptr;
sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size);
@@ -332,7 +332,7 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
#endif
}).then([this] {
logger.debug("Finished replayAllFailedBatches");
blogger.debug("Finished replayAllFailedBatches");
});
});
}

View File

@@ -62,6 +62,8 @@
#include <seastar/core/sleep.hh>
#include <net/byteorder.hh>
#include "seastarx.hh"
#include "commitlog.hh"
#include "db/config.hh"
#include "utils/data_input.hh"
@@ -78,7 +80,7 @@
#include "checked-file-impl.hh"
#include "disk-error-handler.hh"
static logging::logger logger("commitlog");
static logging::logger clogger("commitlog");
using namespace std::chrono_literals;
@@ -226,7 +228,7 @@ public:
++totals.pending_flushes;
if (totals.pending_flushes >= cfg.max_active_flushes) {
++totals.flush_limit_exceeded;
logger.trace("Flush ops overflow: {}. Will block.", totals.pending_flushes);
clogger.trace("Flush ops overflow: {}. Will block.", totals.pending_flushes);
}
return _flush_semaphore.wait();
}
@@ -236,7 +238,7 @@ public:
}
segment_manager(config c);
~segment_manager() {
logger.trace("Commitlog {} disposed", cfg.commit_log_location);
clogger.trace("Commitlog {} disposed", cfg.commit_log_location);
}
uint64_t next_id() {
@@ -427,11 +429,11 @@ public:
clock_type::now()), _pending_ops(true) // want exception propagation
{
++_segment_manager->totals.segments_created;
logger.debug("Created new {} segment {}", active ? "active" : "reserve", *this);
clogger.debug("Created new {} segment {}", active ? "active" : "reserve", *this);
}
~segment() {
if (is_clean()) {
logger.debug("Segment {} is no longer active and will be deleted now", *this);
clogger.debug("Segment {} is no longer active and will be deleted now", *this);
++_segment_manager->totals.segments_destroyed;
_segment_manager->totals.total_size_on_disk -= size_on_disk();
_segment_manager->totals.total_size -= (size_on_disk() + _buffer.size());
@@ -439,10 +441,10 @@ public:
commit_io_check([] (const char* fname) { ::unlink(fname); },
_file_name.c_str());
} catch (...) {
logger.error("Could not delete segment {}: {}", *this, std::current_exception());
clogger.error("Could not delete segment {}: {}", *this, std::current_exception());
}
} else {
logger.warn("Segment {} is dirty and is left on disk.", *this);
clogger.warn("Segment {} is dirty and is left on disk.", *this);
}
}
@@ -464,7 +466,7 @@ public:
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - _sync_time).count();
if ((_segment_manager->cfg.commitlog_sync_period_in_ms * 2) < uint64_t(ms)) {
logger.debug("{} needs sync. {} ms elapsed", *this, ms);
clogger.debug("{} needs sync. {} ms elapsed", *this, ms);
return true;
}
return false;
@@ -514,7 +516,7 @@ public:
pos = _file_pos;
}
logger.trace("Syncing {} {} -> {}", *this, _flush_pos, pos);
clogger.trace("Syncing {} {} -> {}", *this, _flush_pos, pos);
// Only run the flush when all write ops at lower rp:s
// have completed.
@@ -531,7 +533,7 @@ public:
auto me = shared_from_this();
return begin_flush().then([this, pos]() {
if (pos <= _flush_pos) {
logger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos);
clogger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos);
return make_ready_future<>();
}
return _file.flush().then_wrapped([this, pos](future<> f) {
@@ -541,9 +543,9 @@ public:
// we fast-fail the whole commit.
_flush_pos = std::max(pos, _flush_pos);
++_segment_manager->totals.flush_count;
logger.trace("{} synced to {}", *this, _flush_pos);
clogger.trace("{} synced to {}", *this, _flush_pos);
} catch (...) {
logger.error("Failed to flush commits to disk: {}", std::current_exception());
clogger.error("Failed to flush commits to disk: {}", std::current_exception());
throw;
}
});
@@ -573,10 +575,10 @@ public:
_buffer = _segment_manager->acquire_buffer(k);
break;
} catch (std::bad_alloc&) {
logger.warn("Could not allocate {} k bytes output buffer ({} k required)", k / 1024, a / 1024);
clogger.warn("Could not allocate {} k bytes output buffer ({} k required)", k / 1024, a / 1024);
if (k > a) {
k = std::max(a, k / 2);
logger.debug("Trying reduced size: {} k", k / 1024);
clogger.debug("Trying reduced size: {} k", k / 1024);
continue;
}
throw;
@@ -647,7 +649,7 @@ public:
replay_position rp(_desc.id, position_type(off));
logger.trace("Writing {} entries, {} k in {} -> {}", num, size, off, off + size);
clogger.trace("Writing {} entries, {} k in {} -> {}", num, size, off, off + size);
// The write will be allowed to start now, but flush (below) must wait for not only this,
// but all previous write/flush pairs.
@@ -668,13 +670,13 @@ public:
}
// gah, partial write. should always get here with dma chunk sized
// "bytes", but lets make sure...
logger.debug("Partial write {}: {}/{} bytes", *this, *written, size);
clogger.debug("Partial write {}: {}/{} bytes", *this, *written, size);
*written = align_down(*written, alignment);
return make_ready_future<stop_iteration>(stop_iteration::no);
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
// we fast-fail the whole commit.
} catch (...) {
logger.error("Failed to persist commits to disk for {}: {}", *this, std::current_exception());
clogger.error("Failed to persist commits to disk for {}: {}", *this, std::current_exception());
throw;
}
});
@@ -758,7 +760,7 @@ public:
});
} else {
cycle().discard_result().handle_exception([] (auto ex) {
logger.error("Failed to flush commits to disk: {}", ex);
clogger.error("Failed to flush commits to disk: {}", ex);
});
}
}
@@ -811,7 +813,7 @@ public:
// have to do it ourselves.
if ((_buf_pos >= (db::commitlog::segment::default_size))) {
cycle().discard_result().handle_exception([] (auto ex) {
logger.error("Failed to flush commits to disk: {}", ex);
clogger.error("Failed to flush commits to disk: {}", ex);
});
}
return make_ready_future<replay_position>(rp);
@@ -934,7 +936,7 @@ db::commitlog::segment_manager::segment_manager(config c)
{
assert(max_size > 0);
logger.trace("Commitlog {} maximum disk size: {} MB / cpu ({} cpus)",
clogger.trace("Commitlog {} maximum disk size: {} MB / cpu ({} cpus)",
cfg.commit_log_location, max_disk_size / (1024 * 1024),
smp::count);
@@ -955,12 +957,12 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
return this->allocate_segment(false).then([this](sseg_ptr s) {
auto ret = _reserve_segments.push(std::move(s));
if (!ret) {
logger.error("Segment reserve is full! Ignoring and trying to continue, but shouldn't happen");
clogger.error("Segment reserve is full! Ignoring and trying to continue, but shouldn't happen");
}
return make_ready_future<>();
});
}).handle_exception([](std::exception_ptr ep) {
logger.warn("Exception in segment reservation: {}", ep);
clogger.warn("Exception in segment reservation: {}", ep);
return sleep(100ms);
});
});
@@ -995,7 +997,7 @@ db::commitlog::segment_manager::list_descriptors(sstring dirname) {
try {
_result.emplace_back(de.name);
} catch (std::domain_error& e) {
logger.warn(e.what());
clogger.warn(e.what());
}
}
return make_ready_future<>();
@@ -1038,7 +1040,7 @@ future<> db::commitlog::segment_manager::init() {
// always run the timer now, since we need to handle segment pre-alloc etc as well.
_timer.set_callback(std::bind(&segment_manager::on_timer, this));
auto delay = engine().cpu_id() * std::ceil(double(cfg.commitlog_sync_period_in_ms) / smp::count);
logger.trace("Delaying timer loop {} ms", delay);
clogger.trace("Delaying timer loop {} ms", delay);
// We need to wait until we have scanned all other segments to actually start serving new
// segments. We are ready now
this->_reserve_replenisher = replenish_reserve();
@@ -1129,7 +1131,7 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
}
});
logger.debug("Flushing ({}) to {}", force, high);
clogger.debug("Flushing ({}) to {}", force, high);
// For each CF id: for each callback c: call c(id, high)
for (auto& f : callbacks) {
@@ -1137,7 +1139,7 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
try {
f(id, high);
} catch (...) {
logger.error("Exception during flush request {}/{}: {}", id, high, std::current_exception());
clogger.error("Exception during flush request {}/{}: {}", id, high, std::current_exception());
}
}
}
@@ -1165,7 +1167,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
if (_reserve_segments.empty() && (_reserve_segments.max_size() < cfg.max_reserve_segments)) {
_reserve_segments.set_max_size(_reserve_segments.max_size() + 1);
logger.debug("Increased segment reserve count to {}", _reserve_segments.max_size());
clogger.debug("Increased segment reserve count to {}", _reserve_segments.max_size());
}
return _reserve_segments.pop_eventually().then([this] (auto s) {
_segments.push_back(std::move(s));
@@ -1207,7 +1209,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
*/
void db::commitlog::segment_manager::discard_completed_segments(
const cf_id_type& id, const replay_position& pos) {
logger.debug("Discard completed segments for {}, table {}", pos, id);
clogger.debug("Discard completed segments for {}, table {}", pos, id);
for (auto&s : _segments) {
s->mark_clean(id, pos);
}
@@ -1231,19 +1233,19 @@ std::ostream& operator<<(std::ostream& out, const db::replay_position& p) {
}
void db::commitlog::segment_manager::discard_unused_segments() {
logger.trace("Checking for unused segments ({} active)", _segments.size());
clogger.trace("Checking for unused segments ({} active)", _segments.size());
auto i = std::remove_if(_segments.begin(), _segments.end(), [=](sseg_ptr s) {
if (s->can_delete()) {
logger.debug("Segment {} is unused", *s);
clogger.debug("Segment {} is unused", *s);
return true;
}
if (s->is_still_allocating()) {
logger.debug("Not safe to delete segment {}; still allocating.", s);
clogger.debug("Not safe to delete segment {}; still allocating.", s);
} else if (!s->is_clean()) {
logger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s});
clogger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s});
} else {
logger.debug("Not safe to delete segment {}; disk ops pending", s);
clogger.debug("Not safe to delete segment {}; disk ops pending", s);
}
return false;
});
@@ -1262,10 +1264,10 @@ future<> db::commitlog::segment_manager::clear_reserve_segments() {
}
future<> db::commitlog::segment_manager::sync_all_segments(bool shutdown) {
logger.debug("Issuing sync for all segments ({})", shutdown ? "shutdown" : "active");
clogger.debug("Issuing sync for all segments ({})", shutdown ? "shutdown" : "active");
return parallel_for_each(_segments, [this, shutdown](sseg_ptr s) {
return s->sync(shutdown).then([](sseg_ptr s) {
logger.debug("Synced segment {}", *s);
clogger.debug("Synced segment {}", *s);
});
});
}
@@ -1313,9 +1315,9 @@ future<> db::commitlog::segment_manager::orphan_all() {
* Only use from tests.
*/
future<> db::commitlog::segment_manager::clear() {
logger.debug("Clearing commitlog");
clogger.debug("Clearing commitlog");
return shutdown().then([this] {
logger.debug("Clearing all segments");
clogger.debug("Clearing all segments");
for (auto& s : _segments) {
s->mark_clean();
}
@@ -1346,7 +1348,7 @@ void db::commitlog::segment_manager::on_timer() {
auto cur = totals.total_size_on_disk;
if (max != 0 && cur >= max) {
_new_counter = 0;
logger.debug("Size on disk {} MB exceeds local maximum {} MB", cur / (1024 * 1024), max / (1024 * 1024));
clogger.debug("Size on disk {} MB exceeds local maximum {} MB", cur / (1024 * 1024), max / (1024 * 1024));
flush_segments();
}
}
@@ -1396,7 +1398,7 @@ db::commitlog::segment_manager::buffer_type db::commitlog::segment_manager::acqu
if (a == nullptr) {
throw std::bad_alloc();
}
logger.trace("Allocated {} k buffer", s / 1024);
clogger.trace("Allocated {} k buffer", s / 1024);
return buffer_type(reinterpret_cast<char *>(a), s, make_free_deleter(a));
}
@@ -1409,7 +1411,7 @@ void db::commitlog::segment_manager::release_buffer(buffer_type&& b) {
constexpr const size_t max_temp_buffers = 4;
if (_temp_buffers.size() > max_temp_buffers) {
logger.trace("Deleting {} buffers", _temp_buffers.size() - max_temp_buffers);
clogger.trace("Deleting {} buffers", _temp_buffers.size() - max_temp_buffers);
_temp_buffers.erase(_temp_buffers.begin() + max_temp_buffers, _temp_buffers.end());
}
totals.buffer_list_bytes = boost::accumulate(
@@ -1696,7 +1698,7 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
if (cs != checksum) {
// if a chunk header checksum is broken, we shall just assume that all
// remaining is as well. We cannot trust the "next" pointer, so...
logger.debug("Checksum error in segment chunk at {}.", pos);
clogger.debug("Checksum error in segment chunk at {}.", pos);
corrupt_size += (file_size - pos);
return stop();
}
@@ -1741,7 +1743,7 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
if (size < 3 * sizeof(uint32_t) || checksum != crc.checksum()) {
auto slack = next - pos;
if (size != 0) {
logger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack);
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack);
corrupt_size += slack;
}
// size == 0 -> special scylla case: zero padding due to dma blocks
@@ -1763,7 +1765,7 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
// If we're getting a checksum error here, most likely the rest of
// the file will be corrupt as well. But it does not hurt to retry.
// Just go to the next entry (since "size" in header seemed ok).
logger.debug("Segment entry at {} checksum error. Skipping {} bytes", rp, size);
clogger.debug("Segment entry at {} checksum error. Skipping {} bytes", rp, size);
corrupt_size += size;
return make_ready_future<>();
}

View File

@@ -50,7 +50,9 @@
#include "replay_position.hh"
#include "commitlog_entry.hh"
class file;
namespace seastar { class file; }
#include "seastarx.hh"
namespace db {

View File

@@ -26,8 +26,7 @@
#include "frozen_mutation.hh"
#include "schema.hh"
#include "utils/data_output.hh"
namespace stdx = std::experimental;
#include "stdx.hh"
class commitlog_entry {
stdx::optional<column_mapping> _mapping;

View File

@@ -59,7 +59,7 @@
#include "schema_registry.hh"
#include "commitlog_entry.hh"
static logging::logger logger("commitlog_replayer");
static logging::logger rlogger("commitlog_replayer");
class db::commitlog_replayer::impl {
struct column_mappings {
@@ -161,11 +161,11 @@ future<> db::commitlog_replayer::impl::init() {
for (auto& sst : *cfp.second->get_sstables()) {
try {
auto p = sst->get_stats_metadata().position;
logger.trace("sstable {} -> rp {}", sst->get_filename(), p);
rlogger.trace("sstable {} -> rp {}", sst->get_filename(), p);
auto& pp = map[p.shard_id()][uuid];
pp = std::max(pp, p);
} catch (...) {
logger.warn("Could not read sstable metadata {}", std::current_exception());
rlogger.warn("Could not read sstable metadata {}", std::current_exception());
}
}
// We do this on each cpu, for each CF, which technically is a little wasteful, but the values are
@@ -175,7 +175,7 @@ future<> db::commitlog_replayer::impl::init() {
// mark the CF as "needed".
return db::system_keyspace::get_truncated_position(uuid).then([&map, &uuid](std::vector<db::replay_position> tpps) {
for (auto& p : tpps) {
logger.trace("CF {} truncated at {}", uuid, p);
rlogger.trace("CF {} truncated at {}", uuid, p);
auto& pp = map[p.shard_id()][uuid];
pp = std::max(pp, p);
}
@@ -204,11 +204,11 @@ future<> db::commitlog_replayer::impl::init() {
}
}
for (auto&p : _min_pos) {
logger.debug("minimum position for shard {}: {}", p.first, p.second);
rlogger.debug("minimum position for shard {}: {}", p.first, p.second);
}
for (auto&p1 : _rpm) {
for (auto& p2 : p1.second) {
logger.debug("replay position for shard/uuid {}/{}: {}", p1.first, p2.first, p2.second);
rlogger.debug("replay position for shard/uuid {}/{}: {}", p1.first, p2.first, p2.second);
}
}
});
@@ -222,7 +222,7 @@ db::commitlog_replayer::impl::recover(sstring file) const {
auto gp = min_pos(rp.shard_id());
if (rp.id < gp.id) {
logger.debug("skipping replay of fully-flushed {}", file);
rlogger.debug("skipping replay of fully-flushed {}", file);
return make_ready_future<stats>();
}
position_type p = 0;
@@ -261,14 +261,14 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
if (!cer.get_column_mapping()) {
throw std::runtime_error(sprint("unknown schema version {}", fm.schema_version()));
}
logger.debug("new schema version {} in entry {}", fm.schema_version(), rp);
rlogger.debug("new schema version {} in entry {}", fm.schema_version(), rp);
cm_it = local_cm.emplace(fm.schema_version(), *cer.get_column_mapping()).first;
}
const column_mapping& src_cm = cm_it->second;
auto shard_id = rp.shard_id();
if (rp < min_pos(shard_id)) {
logger.trace("entry {} is less than global min position. skipping", rp);
rlogger.trace("entry {} is less than global min position. skipping", rp);
s->skipped_mutations++;
return make_ready_future<>();
}
@@ -276,7 +276,7 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
auto uuid = fm.column_family_id();
auto cf_rp = cf_min_pos(uuid, shard_id);
if (rp <= cf_rp) {
logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, cf_rp);
rlogger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, cf_rp);
s->skipped_mutations++;
return make_ready_future<>();
}
@@ -289,8 +289,8 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
// will not do this.
auto& cf = db.find_column_family(fm.column_family_id());
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("replaying at {} v={} {}:{} at {}", fm.column_family_id(), fm.schema_version(),
if (rlogger.is_enabled(logging::log_level::debug)) {
rlogger.debug("replaying at {} v={} {}:{} at {}", fm.column_family_id(), fm.schema_version(),
cf.schema()->ks_name(), cf.schema()->cf_name(), rp);
}
// Removed forwarding "new" RP. Instead give none/empty.
@@ -317,14 +317,14 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
}).handle_exception([s](auto ep) {
s->invalid_mutations++;
// TODO: write mutation to file like origin.
logger.warn("error replaying: {}", ep);
rlogger.warn("error replaying: {}", ep);
});
} catch (no_such_column_family&) {
// No such CF now? Origin just ignores this.
} catch (...) {
s->invalid_mutations++;
// TODO: write mutation to file like origin.
logger.warn("error replaying: {}", std::current_exception());
rlogger.warn("error replaying: {}", std::current_exception());
}
return make_ready_future<>();
@@ -353,7 +353,7 @@ future<db::commitlog_replayer> db::commitlog_replayer::create_replayer(seastar::
future<> db::commitlog_replayer::recover(std::vector<sstring> files) {
typedef std::unordered_multimap<unsigned, sstring> shard_file_map;
logger.info("Replaying {}", join(", ", files));
rlogger.info("Replaying {}", join(", ", files));
// pre-compute work per shard already.
auto map = ::make_lw_shared<shard_file_map>();
@@ -373,12 +373,12 @@ future<> db::commitlog_replayer::recover(std::vector<sstring> files) {
auto range = map->equal_range(id);
return do_for_each(range.first, range.second, [this, total](const std::pair<unsigned, sstring>& p) {
auto&f = p.second;
logger.debug("Replaying {}", f);
rlogger.debug("Replaying {}", f);
return _impl->recover(f).then([f, total](impl::stats stats) {
if (stats.corrupt_bytes != 0) {
logger.warn("Corrupted file: {}. {} bytes skipped.", f, stats.corrupt_bytes);
rlogger.warn("Corrupted file: {}. {} bytes skipped.", f, stats.corrupt_bytes);
}
logger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)"
rlogger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)"
, f
, stats.applied_mutations
, stats.invalid_mutations
@@ -391,7 +391,7 @@ future<> db::commitlog_replayer::recover(std::vector<sstring> files) {
});
});
}, impl::stats(), std::plus<impl::stats>()).then([](impl::stats totals) {
logger.info("Log replay complete, {} replayed mutations ({} invalid, {} skipped)"
rlogger.info("Log replay complete, {} replayed mutations ({} invalid, {} skipped)"
, totals.applied_mutations
, totals.invalid_mutations
, totals.skipped_mutations

View File

@@ -34,7 +34,7 @@
#include "log.hh"
#include <boost/any.hpp>
static logging::logger logger("config");
static logging::logger clogger("config");
db::config::config()
:
@@ -364,32 +364,32 @@ void db::config::read_from_yaml(const char* yaml) {
auto label = node.first.as<sstring>();
auto i = values.find(label);
if (i == values.end()) {
logger.warn("Unknown option {} ignored.", label);
clogger.warn("Unknown option {} ignored.", label);
continue;
}
if (i->second->source() > config_source::SettingsFile) {
logger.debug("Option {} already set by commandline. ignored.", label);
clogger.debug("Option {} already set by commandline. ignored.", label);
continue;
}
switch (i->second->status()) {
case value_status::Invalid:
logger.warn("Option {} is not applicable. Ignoring.", label);
clogger.warn("Option {} is not applicable. Ignoring.", label);
continue;
case value_status::Unused:
logger.warn("Option {} is not (yet) used.", label);
clogger.warn("Option {} is not (yet) used.", label);
break;
default:
break;
}
if (node.second.IsNull()) {
logger.debug("Option {}, empty value. Skipping.", label);
clogger.debug("Option {}, empty value. Skipping.", label);
continue;
}
// Still, a syntax error is an error warning, not a fail
try {
(*i->second)(node.second);
} catch (...) {
logger.error("Option {}, exception while converting value.", label);
clogger.error("Option {}, exception while converting value.", label);
}
}
@@ -400,7 +400,7 @@ void db::config::read_from_yaml(const char* yaml) {
if (p.second->source() > config_source::None) {
continue;
}
logger.debug("Option {} not set", p.first);
clogger.debug("Option {} not set", p.first);
}
}

View File

@@ -27,8 +27,9 @@
#include <unordered_map>
#include "core/sstring.hh"
#include "core/future.hh"
#include "seastarx.hh"
class file;
namespace seastar { class file; }
namespace db {

View File

@@ -42,6 +42,7 @@
#pragma once
#include "core/sstring.hh"
#include "seastarx.hh"
namespace db {
namespace index {

View File

@@ -54,7 +54,7 @@
#include "cql3/query_processor.hh"
#include "utils/joinpoint.hh"
static seastar::logger logger("legacy_schema_migrator");
static seastar::logger mlogger("legacy_schema_migrator");
namespace db {
namespace legacy_schema_migrator {
@@ -377,7 +377,7 @@ public:
builder.set_compaction_strategy(sstables::compaction_strategy::type(strategy));
} catch (const exceptions::configuration_exception& e) {
// If compaction strategy class isn't supported, fallback to size tiered.
logger.warn("Falling back to size-tiered compaction strategy after the problem: {}", e.what());
mlogger.warn("Falling back to size-tiered compaction strategy after the problem: {}", e.what());
builder.set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
}
}
@@ -526,7 +526,7 @@ public:
}
future<> truncate_legacy_tables() {
logger.info("Truncating legacy schema tables");
mlogger.info("Truncating legacy schema tables");
return do_with(utils::make_joinpoint([] { return db_clock::now();}),[this](auto& tsf) {
return _qp.db().invoke_on_all([&tsf](database& db) {
return parallel_for_each(legacy_schema_tables, [&db, &tsf](const sstring& cfname) {
@@ -537,7 +537,7 @@ public:
}
future<> store_keyspaces_in_new_schema_tables() {
logger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})",
mlogger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})",
_keyspaces.size(), db::schema_tables::v3::NAME);
std::vector<mutation> mutations;
@@ -597,7 +597,7 @@ public:
.then(std::bind(&migrator::flush_schemas, this))
.then(std::bind(&migrator::truncate_legacy_tables, this))
.then(std::bind(&migrator::unload_legacy_tables, this))
.then([] { logger.info("Completed migration of legacy schema tables"); });
.then([] { mlogger.info("Completed migration of legacy schema tables"); });
});
}

View File

@@ -80,7 +80,7 @@ using namespace std::chrono_literals;
namespace db {
namespace schema_tables {
logging::logger logger("schema_tables");
logging::logger slogger("schema_tables");
struct push_back_and_return {
std::vector<mutation> muts;
@@ -808,16 +808,16 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
auto diff = difference(before, after, indirect_equal_to<lw_shared_ptr<query::result_set>>());
for (auto&& key : diff.entries_only_on_left) {
logger.info("Dropping keyspace {}", key);
slogger.info("Dropping keyspace {}", key);
dropped.emplace(key);
}
for (auto&& key : diff.entries_only_on_right) {
auto&& value = after[key];
logger.info("Creating keyspace {}", key);
slogger.info("Creating keyspace {}", key);
created.emplace_back(schema_result_value_type{key, std::move(value)});
}
for (auto&& key : diff.entries_differing) {
logger.info("Altering keyspace {}", key);
slogger.info("Altering keyspace {}", key);
altered.emplace_back(key);
}
return do_with(std::move(created), [&proxy, altered = std::move(altered)] (auto& created) mutable {
@@ -867,17 +867,17 @@ static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy
auto diff = difference(before, after);
for (auto&& key : diff.entries_only_on_left) {
auto&& s = proxy.local().get_db().local().find_schema(key.keyspace_name, key.table_name);
logger.info("Dropping {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
slogger.info("Dropping {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
d.dropped.emplace_back(schema_diff::dropped_schema{s});
}
for (auto&& key : diff.entries_only_on_right) {
auto s = create_schema(std::move(after.at(key)));
logger.info("Creating {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
slogger.info("Creating {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
d.created.emplace_back(s);
}
for (auto&& key : diff.entries_differing) {
auto s = create_schema(std::move(after.at(key)));
logger.info("Altering {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
slogger.info("Altering {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
d.altered.emplace_back(s);
}
return d;
@@ -1760,7 +1760,7 @@ static void prepare_builder_from_table_row(schema_builder& builder, const query:
map.erase(i);
} catch (const exceptions::configuration_exception& e) {
// If compaction strategy class isn't supported, fallback to size tiered.
logger.warn("Falling back to size-tiered compaction strategy after the problem: {}", e.what());
slogger.warn("Falling back to size-tiered compaction strategy after the problem: {}", e.what());
builder.set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
}
}
@@ -2343,7 +2343,7 @@ future<std::vector<mutation>> make_drop_view_mutations(lw_shared_ptr<keyspace_me
}
catch (InvalidRequestException e)
{
logger.error(String.format("Cannot load function '%s' from schema: this function won't be available (on this node)", name), e);
slogger.error(String.format("Cannot load function '%s' from schema: this function won't be available (on this node)", name), e);
return UDFunction.createBrokenFunction(name, argNames, argTypes, returnType, language, body, e);
}
}

View File

@@ -80,7 +80,7 @@ std::unique_ptr<query_context> qctx = {};
namespace system_keyspace {
static logging::logger logger("system_keyspace");
static logging::logger slogger("system_keyspace");
static const api::timestamp_type creation_timestamp = api::new_timestamp();
api::timestamp_type schema_creation_timestamp() {
@@ -947,7 +947,7 @@ static future<> setup_version() {
sstring(dht::global_partitioner().name()),
a.addr(),
utils::fb_utilities::get_broadcast_address().addr(),
net::get_local_messaging_service().listen_address().addr(),
netw::get_local_messaging_service().listen_address().addr(),
service::storage_service::get_config_supported_features()
).discard_result();
});
@@ -1045,7 +1045,7 @@ future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp
}).then([] {
return db::schema_tables::save_system_keyspace_schema();
}).then([] {
return net::get_messaging_service().invoke_on_all([] (auto& ms){
return netw::get_messaging_service().invoke_on_all([] (auto& ms){
return ms.init_local_preferred_ip_cache();
});
});
@@ -1129,7 +1129,7 @@ static future<truncation_record> get_truncation_record(utils::UUID cf_id) {
if (buf.size() & 1) {
// new record.
if (buf[0] != current_version) {
logger.warn("Found truncation record of unknown version {}. Ignoring.", int(buf[0]));
slogger.warn("Found truncation record of unknown version {}. Ignoring.", int(buf[0]));
continue;
}
e = ser::deserialize_from_buffer(buf, boost::type<truncation_record>(), 1);
@@ -1143,7 +1143,7 @@ static future<truncation_record> get_truncation_record(utils::UUID cf_id) {
// struct (and official serial size) is 64+32.
data_input in(buf);
logger.debug("Reading old type record");
slogger.debug("Reading old type record");
while (in.avail() > sizeof(db_clock::rep)) {
auto id = in.read<uint64_t>();
auto pos = in.read<uint64_t>();
@@ -1161,7 +1161,7 @@ static future<truncation_record> get_truncation_record(utils::UUID cf_id) {
// This is useless to us, because the only usage for this
// data is commit log and batch replay, and we cannot replay
// either from origin anyway.
logger.warn("Error reading truncation record for {}. "
slogger.warn("Error reading truncation record for {}. "
"Most likely this is data from a cassandra instance."
"Make sure you have cleared commit and batch logs before upgrading.",
uuid
@@ -1427,7 +1427,7 @@ future<> force_blocking_flush(sstring cfname) {
* 3. files are present but you can't read them: bad
*/
future<> check_health() {
using namespace transport::messages;
using namespace cql_transport::messages;
sstring req = sprint("SELECT cluster_name FROM system.%s WHERE key=?", LOCAL);
return execute_cql(req, sstring(LOCAL)).then([] (::shared_ptr<cql3::untyped_result_set> msg) {
if (msg->empty() || !msg->one().has("cluster_name")) {
@@ -1589,7 +1589,7 @@ void make(database& db, bool durable, bool volatile_testing_only) {
}
future<utils::UUID> get_local_host_id() {
using namespace transport::messages;
using namespace cql_transport::messages;
sstring req = sprint("SELECT host_id FROM system.%s WHERE key=?", LOCAL);
return execute_cql(req, sstring(LOCAL)).then([] (::shared_ptr<cql3::untyped_result_set> msg) {
auto new_id = [] {
@@ -1738,7 +1738,7 @@ future<int> increment_and_get_generation() {
int stored_generation = rs->one().template get_as<int>("gossip_generation") + 1;
int now = service::get_generation_number();
if (stored_generation >= now) {
logger.warn("Using stored Gossip Generation {} as it is greater than current system time {}."
slogger.warn("Using stored Gossip Generation {} as it is greater than current system time {}."
"See CASSANDRA-3654 if you experience problems", stored_generation, now);
generation = stored_generation;
} else {

View File

@@ -55,7 +55,7 @@
#include "service/storage_service.hh"
#include "view_info.hh"
static logging::logger logger("view");
static logging::logger vlogger("view");
view_info::view_info(const schema& schema, const raw_view_info& raw_view_info)
: _schema(schema)
@@ -831,7 +831,7 @@ void mutate_MV(const dht::token& base_token,
// Note also that mutate_locally(mut) copies mut (in
// frozen from) so don't need to increase its lifetime.
service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) {
logger.error("Error applying local view update: {}", ep);
vlogger.error("Error applying local view update: {}", ep);
});
} else {
#if 0
@@ -849,7 +849,7 @@ void mutate_MV(const dht::token& base_token,
// Note we don't wait for the asynchronous operation to complete
// FIXME: need to extend mut's lifetime???
service::get_local_storage_proxy().send_to_endpoint(mut, *paired_endpoint, db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) {
logger.error("Error applying view update to {}: {}", *paired_endpoint, ep);
vlogger.error("Error applying view update to {}: {}", *paired_endpoint, ep);
});;
}
} else {
@@ -857,7 +857,7 @@ void mutate_MV(const dht::token& base_token,
//if there are no paired endpoints there are probably range movements going on,
//so we write to the local batchlog to replay later
if (pendingEndpoints.isEmpty())
logger.warn("Received base materialized view mutation for key {} that does not belong " +
vlogger.warn("Received base materialized view mutation for key {} that does not belong " +
"to this node. There is probably a range movement happening (move or decommission)," +
"but this node hasn't updated its ring metadata yet. Adding mutation to " +
"local batchlog to be replayed later.",

View File

@@ -42,12 +42,12 @@
#include "gms/failure_detector.hh"
#include "log.hh"
static logging::logger logger("boot_strapper");
static logging::logger blogger("boot_strapper");
namespace dht {
future<> boot_strapper::bootstrap() {
logger.debug("Beginning bootstrap process: sorted_tokens={}", _token_metadata.sorted_tokens());
blogger.debug("Beginning bootstrap process: sorted_tokens={}", _token_metadata.sorted_tokens());
auto streamer = make_lw_shared<range_streamer>(_db, _token_metadata, _tokens, _address, "Bootstrap");
streamer->add_source_filter(std::make_unique<range_streamer::failure_detector_source_filter>(gms::get_local_failure_detector()));
@@ -55,7 +55,7 @@ future<> boot_strapper::bootstrap() {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strategy = ks.get_replication_strategy();
dht::token_range_vector ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address);
logger.debug("Will stream keyspace={}, ranges={}", keyspace_name, ranges);
blogger.debug("Will stream keyspace={}, ranges={}", keyspace_name, ranges);
streamer->add_ranges(keyspace_name, ranges);
}
@@ -74,7 +74,7 @@ std::unordered_set<token> boot_strapper::get_bootstrap_tokens(token_metadata met
auto initial_tokens = db.get_initial_tokens();
// if user specified tokens, use those
if (initial_tokens.size() > 0) {
logger.debug("tokens manually specified as {}", initial_tokens);
blogger.debug("tokens manually specified as {}", initial_tokens);
std::unordered_set<token> tokens;
for (auto& token_string : initial_tokens) {
auto token = dht::global_partitioner().from_sstring(token_string);
@@ -83,7 +83,7 @@ std::unordered_set<token> boot_strapper::get_bootstrap_tokens(token_metadata met
}
tokens.insert(token);
}
logger.debug("Get manually specified bootstrap_tokens={}", tokens);
blogger.debug("Get manually specified bootstrap_tokens={}", tokens);
return tokens;
}
@@ -93,11 +93,11 @@ std::unordered_set<token> boot_strapper::get_bootstrap_tokens(token_metadata met
}
if (num_tokens == 1) {
logger.warn("Picking random token for a single vnode. You should probably add more vnodes; failing that, you should probably specify the token manually");
blogger.warn("Picking random token for a single vnode. You should probably add more vnodes; failing that, you should probably specify the token manually");
}
auto tokens = get_random_tokens(metadata, num_tokens);
logger.debug("Get random bootstrap_tokens={}", tokens);
blogger.debug("Get random bootstrap_tokens={}", tokens);
return tokens;
}

View File

@@ -27,6 +27,8 @@
#include "utils/exceptions.hh"
#include <core/future.hh>
#include "seastarx.hh"
namespace bs2 = boost::signals2;
using disk_error_signal_type = bs2::signal_type<void (), bs2::keywords::mutex_type<bs2::dummy_mutex>>::type;

View File

@@ -42,6 +42,7 @@
#include <seastar/core/sstring.hh>
#include <ostream>
#include <map>
#include "seastarx.hh"
namespace gms {

View File

@@ -72,7 +72,7 @@ constexpr int64_t gossiper::MAX_GENERATION_DIFFERENCE;
distributed<gossiper> _the_gossiper;
net::msg_addr gossiper::get_msg_addr(inet_address to) {
netw::msg_addr gossiper::get_msg_addr(inet_address to) {
return msg_addr{to, _default_cpuid};
}
@@ -293,7 +293,7 @@ void gossiper::init_messaging_service_handler() {
}
_ms_registered = true;
ms().register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
auto from = net::messaging_service::get_source(cinfo);
auto from = netw::messaging_service::get_source(cinfo);
smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
return gossiper.handle_syn_msg(from, std::move(syn_msg));
@@ -303,7 +303,7 @@ void gossiper::init_messaging_service_handler() {
return messaging_service::no_wait();
});
ms().register_gossip_digest_ack([] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
auto from = net::messaging_service::get_source(cinfo);
auto from = netw::messaging_service::get_source(cinfo);
smp::submit_to(0, [from, msg = std::move(msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
return gossiper.handle_ack_msg(from, std::move(msg));
@@ -339,7 +339,7 @@ void gossiper::init_messaging_service_handler() {
}
void gossiper::uninit_messaging_service_handler() {
auto& ms = net::get_local_messaging_service();
auto& ms = netw::get_local_messaging_service();
ms.unregister_gossip_echo();
ms.unregister_gossip_shutdown();
ms.unregister_gossip_digest_syn();
@@ -1057,7 +1057,7 @@ std::unordered_map<inet_address, endpoint_state>& gms::gossiper::get_endpoint_st
}
bool gossiper::uses_host_id(inet_address endpoint) {
if (net::get_local_messaging_service().knows_version(endpoint)) {
if (netw::get_local_messaging_service().knows_version(endpoint)) {
return true;
} else if (get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::NET_VERSION)) {
return true;

View File

@@ -84,11 +84,11 @@ class gossiper : public i_failure_detection_event_listener, public seastar::asyn
public:
using clk = std::chrono::system_clock;
private:
using messaging_verb = net::messaging_verb;
using messaging_service = net::messaging_service;
using msg_addr = net::msg_addr;
net::messaging_service& ms() {
return net::get_local_messaging_service();
using messaging_verb = netw::messaging_verb;
using messaging_service = netw::messaging_service;
using msg_addr = netw::msg_addr;
netw::messaging_service& ms() {
return netw::get_local_messaging_service();
}
void init_messaging_service_handler();
void uninit_messaging_service_handler();

View File

@@ -54,7 +54,7 @@ constexpr const char* versioned_value::SHUTDOWN;
constexpr const char* versioned_value::REMOVAL_COORDINATOR;
versioned_value versioned_value::factory::network_version() {
return versioned_value(sprint("%s",net::messaging_service::current_version));
return versioned_value(sprint("%s",netw::messaging_service::current_version));
}
}

View File

@@ -26,6 +26,7 @@
#include <experimental/optional>
#include <seastar/core/byteorder.hh>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
//
// This hashing differs from std::hash<> in that it decouples knowledge about

View File

@@ -57,8 +57,8 @@ void init_ms_fd_gossiper(sstring listen_address_in
{
const auto listen = gms::inet_address::lookup(listen_address_in).get0();
using encrypt_what = net::messaging_service::encrypt_what;
using compress_what = net::messaging_service::compress_what;
using encrypt_what = netw::messaging_service::encrypt_what;
using compress_what = netw::messaging_service::compress_what;
using namespace seastar::tls;
encrypt_what ew = encrypt_what::none;
@@ -102,10 +102,10 @@ void init_ms_fd_gossiper(sstring listen_address_in
// Init messaging_service
// Delay listening messaging_service until gossip message handlers are registered
bool listen_now = false;
net::get_messaging_service().start(listen, storage_port, ew, cw, ssl_storage_port, creds, sltba, listen_now).get();
netw::get_messaging_service().start(listen, storage_port, ew, cw, ssl_storage_port, creds, sltba, listen_now).get();
// #293 - do not stop anything
//engine().at_exit([] { return net::get_messaging_service().stop(); });
//engine().at_exit([] { return netw::get_messaging_service().stop(); });
// Init failure_detector
gms::get_failure_detector().start(std::move(phi)).get();
// #293 - do not stop anything

View File

@@ -25,6 +25,7 @@
#include <json/json.h>
namespace seastar { // FIXME: not ours
namespace json {
template<typename Map>
@@ -60,3 +61,5 @@ inline std::map<sstring, sstring> to_map(const sstring& raw) {
}
}
}

View File

@@ -4,7 +4,7 @@
#include "disk-error-handler.hh"
#include "checked-file-impl.hh"
static seastar::logger logger("lister");
static seastar::logger llogger("lister");
lister::lister(file f, dir_entry_types type, walker_type walker, filter_type filter, lister::path dir, lister::show_hidden do_show_hidden)
: _f(std::move(f))

View File

@@ -27,6 +27,8 @@
#include <seastar/core/enum.hh>
#include <boost/filesystem.hpp>
#include "seastarx.hh"
class lister final {
public:
using path = boost::filesystem::path;

Some files were not shown because too many files have changed in this diff Show More