Compare commits

..

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
9e806cb3f7 Fix critical bugs and issues found in alternator code review
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-01-29 22:54:57 +00:00
copilot-swe-agent[bot]
f267af38bd Initial plan 2026-01-29 22:49:31 +00:00
3893 changed files with 12843 additions and 17805 deletions

View File

@@ -1,53 +0,0 @@
name: Backport with Jira Integration
on:
push:
branches:
- master
- next-*.*
- branch-*.*
pull_request_target:
types: [labeled, closed]
branches:
- master
- next
- next-*.*
- branch-*.*
jobs:
backport-on-push:
if: github.event_name == 'push'
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
with:
event_type: 'push'
base_branch: ${{ github.ref }}
commits: ${{ github.event.before }}..${{ github.sha }}
secrets:
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
backport-on-label:
if: github.event_name == 'pull_request_target' && github.event.action == 'labeled'
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
with:
event_type: 'labeled'
base_branch: refs/heads/${{ github.event.pull_request.base.ref }}
pull_request_number: ${{ github.event.pull_request.number }}
head_commit: ${{ github.event.pull_request.base.sha }}
label_name: ${{ github.event.label.name }}
pr_state: ${{ github.event.pull_request.state }}
secrets:
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
backport-chain:
if: github.event_name == 'pull_request_target' && github.event.action == 'closed' && github.event.pull_request.merged == true
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
with:
event_type: 'chain'
base_branch: refs/heads/${{ github.event.pull_request.base.ref }}
pull_request_number: ${{ github.event.pull_request.number }}
pr_body: ${{ github.event.pull_request.body }}
secrets:
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,22 +0,0 @@
name: Sync Jira Based on PR Milestone Events
on:
pull_request_target:
types: [milestoned, demilestoned]
permissions:
contents: read
pull-requests: read
jobs:
jira-sync-milestone-set:
if: github.event.action == 'milestoned'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_milestone_set.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-milestone-removed:
if: github.event.action == 'demilestoned'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_milestone_removed.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,4 +1,4 @@
name: Call Jira release creation for new milestone
name: Call Jira release creation for new milestone
on:
milestone:
@@ -9,6 +9,6 @@ jobs:
uses: scylladb/github-automation/.github/workflows/main_sync_milestone_to_jira_release.yml@main
with:
# Comma-separated list of Jira project keys
jira_project_keys: "SCYLLADB,CUSTOMER,SMI,RELENG"
jira_project_keys: "SCYLLADB,CUSTOMER"
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,62 +0,0 @@
name: Close issues created by Scylla associates
on:
issues:
types: [opened, reopened]
permissions:
issues: write
jobs:
comment-and-close:
runs-on: ubuntu-latest
steps:
- name: Comment and close if author email is scylladb.com
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
const issue = context.payload.issue;
const actor = context.actor;
// Get user data (only public email is available)
const { data: user } = await github.rest.users.getByUsername({
username: actor,
});
const email = user.email || "";
console.log(`Actor: ${actor}, public email: ${email || "<none>"}`);
// Only continue if email exists and ends with @scylladb.com
if (!email || !email.toLowerCase().endsWith("@scylladb.com")) {
console.log("User is not a scylladb.com email (or email not public); skipping.");
return;
}
const owner = context.repo.owner;
const repo = context.repo.repo;
const issue_number = issue.number;
const body = "Issues in this repository are closed automatically. Scylla associates should use Jira to manage issues.\nPlease move this issue to Jira https://scylladb.atlassian.net/jira/software/c/projects/SCYLLADB/list";
// Add the comment
await github.rest.issues.createComment({
owner,
repo,
issue_number,
body,
});
console.log(`Comment added to #${issue_number}`);
// Close the issue
await github.rest.issues.update({
owner,
repo,
issue_number,
state: "closed",
state_reason: "not_planned"
});
console.log(`Issue #${issue_number} closed.`);

View File

@@ -9,56 +9,16 @@ on:
jobs:
trigger-jenkins:
if: (github.event_name == 'issue_comment' && github.event.comment.user.login != 'scylladbbot') || github.event.label.name == 'conflicts'
if: (github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')) || github.event.label.name == 'conflicts'
runs-on: ubuntu-latest
steps:
- name: Verify Org Membership
id: verify_author
env:
EVENT_NAME: ${{ github.event_name }}
PR_AUTHOR: ${{ github.event.pull_request.user.login }}
PR_ASSOCIATION: ${{ github.event.pull_request.author_association }}
COMMENT_AUTHOR: ${{ github.event.comment.user.login }}
COMMENT_ASSOCIATION: ${{ github.event.comment.author_association }}
shell: bash
run: |
if [[ "$EVENT_NAME" == "pull_request_target" ]]; then
AUTHOR="$PR_AUTHOR"
ASSOCIATION="$PR_ASSOCIATION"
else
AUTHOR="$COMMENT_AUTHOR"
ASSOCIATION="$COMMENT_ASSOCIATION"
fi
if [[ "$ASSOCIATION" == "MEMBER" || "$ASSOCIATION" == "OWNER" ]]; then
echo "member=true" >> $GITHUB_OUTPUT
else
echo "::warning::${AUTHOR} is not a member of scylladb (association: ${ASSOCIATION}); skipping CI trigger."
echo "member=false" >> $GITHUB_OUTPUT
fi
- name: Validate Comment Trigger
if: github.event_name == 'issue_comment'
id: verify_comment
env:
COMMENT_BODY: ${{ github.event.comment.body }}
shell: bash
run: |
CLEAN_BODY=$(echo "$COMMENT_BODY" | grep -v '^[[:space:]]*>')
if echo "$CLEAN_BODY" | grep -qi '@scylladbbot' && echo "$CLEAN_BODY" | grep -qi 'trigger-ci'; then
echo "trigger=true" >> $GITHUB_OUTPUT
else
echo "trigger=false" >> $GITHUB_OUTPUT
fi
- name: Trigger Scylla-CI-Route Jenkins Job
if: steps.verify_author.outputs.member == 'true' && (github.event_name == 'pull_request_target' || steps.verify_comment.outputs.trigger == 'true')
env:
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
JENKINS_URL: "https://jenkins.scylladb.com"
PR_NUMBER: "${{ github.event.issue.number || github.event.pull_request.number }}"
PR_REPO_NAME: "${{ github.event.repository.full_name }}"
run: |
PR_NUMBER=${{ github.event.issue.number }}
PR_REPO_NAME=${{ github.event.repository.full_name }}
curl -X POST "$JENKINS_URL/job/releng/job/Scylla-CI-Route/buildWithParameters?PR_NUMBER=$PR_NUMBER&PR_REPO_NAME=$PR_REPO_NAME" \
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail -i -v

View File

@@ -300,6 +300,7 @@ add_subdirectory(locator)
add_subdirectory(message)
add_subdirectory(mutation)
add_subdirectory(mutation_writer)
add_subdirectory(node_ops)
add_subdirectory(readers)
add_subdirectory(replica)
add_subdirectory(raft)

View File

@@ -43,7 +43,7 @@ For further information, please see:
[developer documentation]: HACKING.md
[build documentation]: docs/dev/building.md
[docker image build documentation]: dist/docker/redhat/README.md
[docker image build documentation]: dist/docker/debian/README.md
## Running Scylla

View File

@@ -244,7 +244,10 @@ static bool is_set_of(const rjson::value& type1, const rjson::value& type2) {
// Check if two JSON-encoded values match with the CONTAINS relation
bool check_CONTAINS(const rjson::value* v1, const rjson::value& v2, bool v1_from_query, bool v2_from_query) {
if (!v1) {
if (!v1 || !v1->IsObject() || v1->MemberCount() == 0) {
return false;
}
if (!v2.IsObject() || v2.MemberCount() == 0) {
return false;
}
const auto& kv1 = *v1->MemberBegin();
@@ -618,7 +621,7 @@ conditional_operator_type get_conditional_operator(const rjson::value& req) {
// Check if the existing values of the item (previous_item) match the
// conditions given by the Expected and ConditionalOperator parameters
// (if they exist) in the request (an UpdateItem, PutItem or DeleteItem).
// This function can throw a ValidationException API error if there
// This function can throw an ValidationException API error if there
// are errors in the format of the condition itself.
bool verify_expected(const rjson::value& req, const rjson::value* previous_item) {
const rjson::value* expected = rjson::find(req, "Expected");

View File

@@ -53,7 +53,9 @@ void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjso
}
static uint64_t calculate_half_units(uint64_t unit_block_size, uint64_t total_bytes, bool is_quorum) {
uint64_t half_units = (total_bytes + unit_block_size -1) / unit_block_size; //divide by unit_block_size and round up
// Avoid potential integer overflow when total_bytes is close to UINT64_MAX
// by using division with modulo instead of addition before division
uint64_t half_units = total_bytes / unit_block_size + (total_bytes % unit_block_size != 0 ? 1 : 0);
if (is_quorum) {
half_units *= 2;

View File

@@ -63,7 +63,6 @@
#include "types/types.hh"
#include "db/system_keyspace.hh"
#include "cql3/statements/ks_prop_defs.hh"
#include "alternator/ttl_tag.hh"
using namespace std::chrono_literals;
@@ -165,7 +164,7 @@ static map_type attrs_type() {
static const column_definition& attrs_column(const schema& schema) {
const column_definition* cdef = schema.get_column_definition(bytes(executor::ATTRS_COLUMN_NAME));
throwing_assert(cdef);
SCYLLA_ASSERT(cdef);
return *cdef;
}
@@ -238,7 +237,7 @@ static void validate_is_object(const rjson::value& value, const char* caller) {
}
// This function assumes the given value is an object and returns requested member value.
// If it is not possible, an api_error::validation is thrown.
// If it is not possible an api_error::validation is thrown.
static const rjson::value& get_member(const rjson::value& obj, const char* member_name, const char* caller) {
validate_is_object(obj, caller);
const rjson::value* ret = rjson::find(obj, member_name);
@@ -250,7 +249,7 @@ static const rjson::value& get_member(const rjson::value& obj, const char* membe
// This function assumes the given value is an object with a single member, and returns this member.
// In case the requirements are not met, an api_error::validation is thrown.
// In case the requirements are not met an api_error::validation is thrown.
static const rjson::value::Member& get_single_member(const rjson::value& v, const char* caller) {
if (!v.IsObject() || v.MemberCount() != 1) {
throw api_error::validation(format("{}: expected an object with a single member.", caller));
@@ -683,7 +682,7 @@ static std::optional<int> get_int_attribute(const rjson::value& value, std::stri
}
// Sets a KeySchema object inside the given JSON parent describing the key
// attributes of the given schema as being either HASH or RANGE keys.
// attributes of the the given schema as being either HASH or RANGE keys.
// Additionally, adds to a given map mappings between the key attribute
// names and their type (as a DynamoDB type string).
void executor::describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>* attribute_types, const std::map<sstring, sstring> *tags) {
@@ -835,11 +834,13 @@ future<> executor::fill_table_size(rjson::value &table_description, schema_ptr s
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
// Note: we don't care when the notification of other shards will finish, as long as it will be done
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
// which is also fine, as the specification doesn't give precision guarantees of any kind.
// A race condition is possible: if a DescribeTable request arrives on a different shard before
// that shard receives the cached size, it will recalculate independently. This is acceptable because:
// 1. Both calculations will cache their results with an expiry time
// 2. Expiry times are unlikely to be identical, so eventually all shards converge to the most recent value
// 3. Even if expiry times match, different shards may briefly return different table sizes
// 4. This temporary inconsistency is acceptable per DynamoDB specification, which doesn't guarantee
// exact precision for DescribeTable size information
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
}
}
@@ -917,7 +918,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
sstring index_name = cf_name.substr(delim_it + 1);
rjson::add(view_entry, "IndexName", rjson::from_string(index_name));
rjson::add(view_entry, "IndexArn", generate_arn_for_index(*schema, index_name));
// Add index's KeySchema and collect types for AttributeDefinitions:
// Add indexes's KeySchema and collect types for AttributeDefinitions:
executor::describe_key_schema(view_entry, *vptr, key_attribute_types, db::get_tags_of_table(vptr));
// Add projection type
rjson::value projection = rjson::empty_object();
@@ -1650,7 +1651,7 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
}
future<executor::request_return_type> executor::create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode) {
throwing_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
// We begin by parsing and validating the content of the CreateTable
// command. We can't inspect the current database schema at this point
@@ -2436,7 +2437,7 @@ std::unordered_map<bytes, std::string> si_key_attributes(data_dictionary::table
// case, this function simply won't be called for this attribute.)
//
// This function checks if the given attribute update is an update to some
// GSI's key, and if the value is unsuitable, an api_error::validation is
// GSI's key, and if the value is unsuitable, a api_error::validation is
// thrown. The checking here is similar to the checking done in
// get_key_from_typed_value() for the base table's key columns.
//
@@ -2838,12 +2839,14 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
}
} else if (_write_isolation != write_isolation::LWT_ALWAYS) {
std::optional<mutation> m = apply(nullptr, api::new_timestamp(), cdc_opts);
throwing_assert(m); // !needs_read_before_write, so apply() did not check a condition
SCYLLA_ASSERT(m); // !needs_read_before_write, so apply() did not check a condition
return proxy.mutate(utils::chunked_vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit), db::allow_per_partition_rate_limit::yes, false, std::move(cdc_opts)).then([this, &wcu_total] () mutable {
return rmw_operation_return(std::move(_return_attributes), _consumed_capacity, wcu_total);
});
}
throwing_assert(cas_shard);
if (!cas_shard) {
on_internal_error(elogger, "cas_shard is not set");
}
// If we're still here, we need to do this write using LWT:
global_stats.write_using_lwt++;
per_table_stats.write_using_lwt++;
@@ -3547,7 +3550,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
return true;
}
// Add a path to an attribute_path_map. Throws a validation error if the path
// Add a path to a attribute_path_map. Throws a validation error if the path
// "overlaps" with one already in the filter (one is a sub-path of the other)
// or "conflicts" with it (both a member and index is requested).
template<typename T>
@@ -5412,7 +5415,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
}
static dht::token token_for_segment(int segment, int total_segments) {
throwing_assert(total_segments > 1 && segment >= 0 && segment < total_segments);
SCYLLA_ASSERT(total_segments > 1 && segment >= 0 && segment < total_segments);
uint64_t delta = std::numeric_limits<uint64_t>::max() / total_segments;
return dht::token::from_int64(std::numeric_limits<int64_t>::min() + delta * segment);
}

View File

@@ -50,7 +50,7 @@ public:
_operators.emplace_back(i);
check_depth_limit();
}
void add_dot(std::string name) {
void add_dot(std::string(name)) {
_operators.emplace_back(std::move(name));
check_depth_limit();
}
@@ -85,7 +85,7 @@ struct constant {
}
};
// "value" is a value used in the right hand side of an assignment
// "value" is is a value used in the right hand side of an assignment
// expression, "SET a = ...". It can be a constant (a reference to a value
// included in the request, e.g., ":val"), a path to an attribute from the
// existing item (e.g., "a.b[3].c"), or a function of other such values.
@@ -205,7 +205,7 @@ public:
// The supported primitive conditions are:
// 1. Binary operators - v1 OP v2, where OP is =, <>, <, <=, >, or >= and
// v1 and v2 are values - from the item (an attribute path), the query
// (a ":val" reference), or a function of the above (only the size()
// (a ":val" reference), or a function of the the above (only the size()
// function is supported).
// 2. Ternary operator - v1 BETWEEN v2 and v3 (means v1 >= v2 AND v1 <= v3).
// 3. N-ary operator - v1 IN ( v2, v3, ... )

View File

@@ -55,7 +55,7 @@ partition_key pk_from_json(const rjson::value& item, schema_ptr schema);
clustering_key ck_from_json(const rjson::value& item, schema_ptr schema);
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema);
// If v encodes a number (i.e., it is a {"N": [...]}), returns an object representing it. Otherwise,
// If v encodes a number (i.e., it is a {"N": [...]}, returns an object representing it. Otherwise,
// raises ValidationException with diagnostic.
big_decimal unwrap_number(const rjson::value& v, std::string_view diagnostic);

View File

@@ -710,7 +710,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
++_executor._stats.requests_blocked_memory;
}
auto units = co_await std::move(units_fut);
throwing_assert(req->content_stream);
SCYLLA_ASSERT(req->content_stream);
chunked_content content = co_await read_entire_stream(*req->content_stream, request_content_length_limit);
// If the request had no Content-Length, we reserved too many units
// so need to return some

View File

@@ -46,7 +46,6 @@
#include "alternator/executor.hh"
#include "alternator/controller.hh"
#include "alternator/serialization.hh"
#include "alternator/ttl_tag.hh"
#include "dht/sharder.hh"
#include "db/config.hh"
#include "db/tags/utils.hh"
@@ -58,10 +57,19 @@ static logging::logger tlogger("alternator_ttl");
namespace alternator {
// We write the expiration-time attribute enabled on a table in a
// tag TTL_TAG_KEY.
// Currently, the *value* of this tag is simply the name of the attribute,
// and the expiration scanner interprets it as an Alternator attribute name -
// It can refer to a real column or if that doesn't exist, to a member of
// the ":attrs" map column. Although this is designed for Alternator, it may
// be good enough for CQL as well (there, the ":attrs" column won't exist).
extern const sstring TTL_TAG_KEY;
future<executor::request_return_type> executor::update_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
_stats.api_operations.update_time_to_live++;
if (!_proxy.features().alternator_ttl) {
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Upgrade all nodes to a version that supports it.");
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Experimental support is available if the 'alternator-ttl' experimental feature is enabled on all nodes.");
}
schema_ptr schema = get_table(_proxy, request);
@@ -133,7 +141,7 @@ future<executor::request_return_type> executor::describe_time_to_live(client_sta
// expiration_service is a sharded service responsible for cleaning up expired
// items in all tables with per-item expiration enabled. Currently, this means
// Alternator tables with TTL configured via an UpdateTimeToLive request.
// Alternator tables with TTL configured via a UpdateTimeToLive request.
//
// Here is a brief overview of how the expiration service works:
//
@@ -316,7 +324,9 @@ static future<std::vector<std::pair<dht::token_range, locator::host_id>>> get_se
const auto& tm = *erm->get_token_metadata_ptr();
const auto& sorted_tokens = tm.sorted_tokens();
std::vector<std::pair<dht::token_range, locator::host_id>> ret;
throwing_assert(!sorted_tokens.empty());
if (sorted_tokens.empty()) {
on_internal_error(tlogger, "Token metadata is empty");
}
auto prev_tok = sorted_tokens.back();
for (const auto& tok : sorted_tokens) {
co_await coroutine::maybe_yield();
@@ -553,7 +563,7 @@ static future<> scan_table_ranges(
expiration_service::stats& expiration_stats)
{
const schema_ptr& s = scan_ctx.s;
throwing_assert(partition_ranges.size() == 1); // otherwise issue #9167 will cause incorrect results.
SCYLLA_ASSERT (partition_ranges.size() == 1); // otherwise issue #9167 will cause incorrect results.
auto p = service::pager::query_pagers::pager(proxy, s, scan_ctx.selection, *scan_ctx.query_state_ptr,
*scan_ctx.query_options, scan_ctx.command, std::move(partition_ranges), nullptr);
while (!p->is_exhausted()) {
@@ -583,7 +593,7 @@ static future<> scan_table_ranges(
if (retries >= 10) {
// Don't get stuck forever asking the same page, maybe there's
// a bug or a real problem in several replicas. Give up on
// this scan and retry the scan from a random position later,
// this scan an retry the scan from a random position later,
// in the next scan period.
throw runtime_exception("scanner thread failed after too many timeouts for the same page");
}
@@ -630,38 +640,13 @@ static future<> scan_table_ranges(
}
} else {
// For a real column to contain an expiration time, it
// must be a numeric type. We currently support decimal
// (used by Alternator TTL) as well as bigint, int and
// timestamp (used by CQL per-row TTL).
switch (meta[*expiration_column]->type->get_kind()) {
case abstract_type::kind::decimal:
// Used by Alternator TTL for key columns not stored
// in the map. The value is in seconds, fractional
// part is ignored.
expired = is_expired(value_cast<big_decimal>(v), now);
break;
case abstract_type::kind::long_kind:
// Used by CQL per-row TTL. The value is in seconds.
expired = is_expired(gc_clock::time_point(std::chrono::seconds(value_cast<int64_t>(v))), now);
break;
case abstract_type::kind::int32:
// Used by CQL per-row TTL. The value is in seconds.
// Using int type is not recommended because it will
// overflow in 2038, but we support it to allow users
// to use existing int columns for expiration.
expired = is_expired(gc_clock::time_point(std::chrono::seconds(value_cast<int32_t>(v))), now);
break;
case abstract_type::kind::timestamp:
// Used by CQL per-row TTL. The value is in milliseconds
// but we truncate it to gc_clock's precision (whole seconds).
expired = is_expired(gc_clock::time_point(std::chrono::duration_cast<gc_clock::duration>(value_cast<db_clock::time_point>(v).time_since_epoch())), now);
break;
default:
// Should never happen - we verified the column's type
// before starting the scan.
[[unlikely]]
on_internal_error(tlogger, format("expiration scanner value of unsupported type {} in column {}", meta[*expiration_column]->type->cql3_type_name(), scan_ctx.column_name) );
}
// must be a numeric type.
// FIXME: Currently we only support decimal_type (which is
// what Alternator uses), but other numeric types can be
// supported as well to make this feature more useful in CQL.
// Note that kind::decimal is also checked above.
big_decimal n = value_cast<big_decimal>(v);
expired = is_expired(n, now);
}
if (expired) {
expiration_stats.items_deleted++;
@@ -723,12 +708,16 @@ static future<bool> scan_table(
co_return false;
}
// attribute_name may be one of the schema's columns (in Alternator, this
// means a key column, in CQL it's a regular column), or an element in
// Alternator's attrs map encoded in Alternator's JSON encoding (which we
// decode). If attribute_name is a real column, in Alternator it will have
// the type decimal, counting seconds since the UNIX epoch, while in CQL
// it will one of the types bigint or int (counting seconds) or timestamp
// (counting milliseconds).
// means it's a key column), or an element in Alternator's attrs map
// encoded in Alternator's JSON encoding.
// FIXME: To make this less Alternators-specific, we should encode in the
// single key's value three things:
// 1. The name of a column
// 2. Optionally if column is a map, a member in the map
// 3. The deserializer for the value: CQL or Alternator (JSON).
// The deserializer can be guessed: If the given column or map item is
// numeric, it can be used directly. If it is a "bytes" type, it needs to
// be deserialized using Alternator's deserializer.
bytes column_name = to_bytes(*attribute_name);
const column_definition *cd = s->get_column_definition(column_name);
std::optional<std::string> member;
@@ -747,14 +736,11 @@ static future<bool> scan_table(
data_type column_type = cd->type;
// Verify that the column has the right type: If "member" exists
// the column must be a map, and if it doesn't, the column must
// be decimal_type (Alternator), bigint, int or timestamp (CQL).
// If the column has the wrong type nothing can get expired in
// this table, and it's pointless to scan it.
// (currently) be a decimal_type. If the column has the wrong type
// nothing can get expired in this table, and it's pointless to
// scan it.
if ((member && column_type->get_kind() != abstract_type::kind::map) ||
(!member && column_type->get_kind() != abstract_type::kind::decimal &&
column_type->get_kind() != abstract_type::kind::long_kind &&
column_type->get_kind() != abstract_type::kind::int32 &&
column_type->get_kind() != abstract_type::kind::timestamp)) {
(!member && column_type->get_kind() != abstract_type::kind::decimal)) {
tlogger.info("table {} TTL column has unsupported type, not scanning", s->cf_name());
co_return false;
}
@@ -781,7 +767,7 @@ static future<bool> scan_table(
// by tasking another node to take over scanning of the dead node's primary
// ranges. What we do here is that this node will also check expiration
// on its *secondary* ranges - but only those whose primary owner is down.
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet, erm->get_topology()); // throws if no secondary replica
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet); // throws if no secondary replica
if (tablet_secondary_replica.host == my_host_id && tablet_secondary_replica.shard == this_shard_id()) {
if (!gossiper.is_alive(tablet_primary_replica.host)) {
co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map);
@@ -892,10 +878,12 @@ future<> expiration_service::run() {
future<> expiration_service::start() {
// Called by main() on each shard to start the expiration-service
// thread. Just runs run() in the background and allows stop().
if (!shutting_down()) {
_end = run().handle_exception([] (std::exception_ptr ep) {
tlogger.error("expiration_service failed: {}", ep);
});
if (_db.features().alternator_ttl) {
if (!shutting_down()) {
_end = run().handle_exception([] (std::exception_ptr ep) {
tlogger.error("expiration_service failed: {}", ep);
});
}
}
return make_ready_future<>();
}

View File

@@ -30,7 +30,7 @@ namespace alternator {
// expiration_service is a sharded service responsible for cleaning up expired
// items in all tables with per-item expiration enabled. Currently, this means
// Alternator tables with TTL configured via an UpdateTimeToLive request.
// Alternator tables with TTL configured via a UpdateTimeToLeave request.
class expiration_service final : public seastar::peering_sharded_service<expiration_service> {
public:
// Object holding per-shard statistics related to the expiration service.
@@ -52,7 +52,7 @@ private:
data_dictionary::database _db;
service::storage_proxy& _proxy;
gms::gossiper& _gossiper;
// _end is set by start(), and resolves when the background service
// _end is set by start(), and resolves when the the background service
// started by it ends. To ask the background service to end, _abort_source
// should be triggered. stop() below uses both _abort_source and _end.
std::optional<future<>> _end;

View File

@@ -1,26 +0,0 @@
/*
* Copyright 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "seastarx.hh"
#include <seastar/core/sstring.hh>
namespace alternator {
// We use the table tag TTL_TAG_KEY ("system:ttl_attribute") to remember
// which attribute was chosen as the expiration-time attribute for
// Alternator's TTL and CQL's per-row TTL features.
// Currently, the *value* of this tag is simply the name of the attribute:
// It can refer to a real column or if that doesn't exist, to a member of
// the ":attrs" map column (which Alternator uses).
extern const sstring TTL_TAG_KEY;
} // namespace alternator
// let users use TTL_TAG_KEY without the "alternator::" prefix,
// to make it easier to move it to a different namespace later.
using alternator::TTL_TAG_KEY;

View File

@@ -12,7 +12,7 @@
"operations":[
{
"method":"POST",
"summary":"Resets authorized prepared statements cache",
"summary":"Reset cache",
"type":"void",
"nickname":"authorization_cache_reset",
"produces":[

View File

@@ -3085,48 +3085,6 @@
}
]
},
{
"path":"/storage_service/tablets/snapshots",
"operations":[
{
"method":"POST",
"summary":"Takes the snapshot for the given keyspaces/tables. A snapshot name must be specified.",
"type":"void",
"nickname":"take_cluster_snapshot",
"produces":[
"application/json"
],
"parameters":[
{
"name":"tag",
"description":"the tag given to the snapshot",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"keyspace",
"description":"Keyspace(s) to snapshot. Multiple keyspaces can be provided using a comma-separated list. If omitted, snapshot all keyspaces.",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"table",
"description":"Table(s) to snapshot. Multiple tables (in a single keyspace) can be provided using a comma-separated list. If omitted, snapshot all tables in the given keyspace(s).",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/quiesce_topology",
"operations":[

View File

@@ -23,6 +23,31 @@
namespace api {
template<class T>
std::vector<T> map_to_key_value(const std::map<sstring, sstring>& map) {
std::vector<T> res;
res.reserve(map.size());
for (const auto& [key, value] : map) {
res.push_back(T());
res.back().key = key;
res.back().value = value;
}
return res;
}
template<class T, class MAP>
std::vector<T>& map_to_key_value(const MAP& map, std::vector<T>& res) {
res.reserve(res.size() + std::size(map));
for (const auto& [key, value] : map) {
T val;
val.key = fmt::to_string(key);
val.value = fmt::to_string(value);
res.push_back(val);
}
return res;
}
template <typename T, typename S = T>
T map_sum(T&& dest, const S& src) {
for (const auto& i : src) {

View File

@@ -515,15 +515,6 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>&
auto sstables = parsed.GetArray() |
std::views::transform([] (const auto& s) { return sstring(rjson::to_string_view(s)); }) |
std::ranges::to<std::vector>();
apilog.info("Restore invoked with following parameters: keyspace={}, table={}, endpoint={}, bucket={}, prefix={}, sstables_count={}, scope={}, primary_replica_only={}",
keyspace,
table,
endpoint,
bucket,
prefix,
sstables.size(),
scope,
primary_replica_only);
auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, prefix, std::move(sstables), endpoint, bucket, scope, primary_replica_only);
co_return json::json_return_type(fmt::to_string(task_id));
});
@@ -536,15 +527,13 @@ void unset_sstables_loader(http_context& ctx, routes& r) {
}
void set_view_builder(http_context& ctx, routes& r, sharded<db::view::view_builder>& vb, sharded<gms::gossiper>& g) {
ss::view_build_statuses.set(r, [&ctx, &vb, &g] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
ss::view_build_statuses.set(r, [&ctx, &vb, &g] (std::unique_ptr<http::request> req) {
auto keyspace = validate_keyspace(ctx, req);
auto view = req->get_path_param("view");
co_return json::json_return_type(stream_range_as_array(co_await vb.local().view_build_statuses(std::move(keyspace), std::move(view), g.local()), [] (const auto& i) {
storage_service_json::mapper res;
res.key = i.first;
res.value = i.second;
return res;
}));
return vb.local().view_build_statuses(std::move(keyspace), std::move(view), g.local()).then([] (std::unordered_map<sstring, sstring> status) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(std::move(status), res));
});
});
cf::get_built_indexes.set(r, [&vb](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
@@ -582,16 +571,6 @@ static future<json::json_return_type> describe_ring_as_json_for_table(const shar
co_return json::json_return_type(stream_range_as_array(co_await ss.local().describe_ring_for_table(keyspace, table), token_range_endpoints_to_json));
}
namespace {
template <typename Key, typename Value>
storage_service_json::mapper map_to_json(const std::pair<Key, Value>& i) {
storage_service_json::mapper val;
val.key = fmt::to_string(i.first);
val.value = fmt::to_string(i.second);
return val;
}
}
static
future<json::json_return_type>
rest_get_token_endpoint(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
@@ -609,7 +588,12 @@ rest_get_token_endpoint(http_context& ctx, sharded<service::storage_service>& ss
throw bad_param_exception("Either provide both keyspace and table (for tablet table) or neither (for vnodes)");
}
co_return json::json_return_type(stream_range_as_array(token_endpoints, &map_to_json<dht::token, gms::inet_address>));
co_return json::json_return_type(stream_range_as_array(token_endpoints, [](const auto& i) {
storage_service_json::mapper val;
val.key = fmt::to_string(i.first);
val.value = fmt::to_string(i.second);
return val;
}));
}
static
@@ -693,6 +677,7 @@ rest_get_range_to_endpoint_map(http_context& ctx, sharded<service::storage_servi
table_id = validate_table(ctx.db.local(), keyspace, table);
}
std::vector<ss::maplist_mapper> res;
co_return stream_range_as_array(co_await ss.local().get_range_to_address_map(keyspace, table_id),
[](const std::pair<dht::token_range, inet_address_vector_replica_set>& entry){
ss::maplist_mapper m;
@@ -783,13 +768,17 @@ rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::
apilog.info("cleanup_all global={}", global);
if (global) {
co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<> {
co_return co_await ss.do_clusterwide_vnodes_cleanup();
});
auto done = !global ? false : co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<bool> {
if (!ss.is_topology_coordinator_enabled()) {
co_return false;
}
co_await ss.do_clusterwide_vnodes_cleanup();
co_return true;
});
if (done) {
co_return json::json_return_type(0);
}
// fall back to the local cleanup if local cleanup is requested
// fall back to the local cleanup if topology coordinator is not enabled or local cleanup is requested
auto& db = ctx.db;
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::global_cleanup_compaction_task_impl>({}, db);
@@ -797,7 +786,9 @@ rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::
// Mark this node as clean
co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<> {
co_await ss.reset_cleanup_needed();
if (ss.is_topology_coordinator_enabled()) {
co_await ss.reset_cleanup_needed();
}
});
co_return json::json_return_type(0);
@@ -808,6 +799,9 @@ future<json::json_return_type>
rest_reset_cleanup_needed(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("reset_cleanup_needed");
co_await ss.invoke_on(0, [] (service::storage_service& ss) {
if (!ss.is_topology_coordinator_enabled()) {
throw std::runtime_error("mark_node_as_clean is only supported when topology over raft is enabled");
}
return ss.reset_cleanup_needed();
});
co_return json_void();
@@ -1314,7 +1308,10 @@ rest_get_ownership(http_context& ctx, sharded<service::storage_service>& ss, std
throw httpd::bad_param_exception("storage_service/ownership cannot be used when a keyspace uses tablets");
}
co_return json::json_return_type(stream_range_as_array(co_await ss.local().get_ownership(), &map_to_json<gms::inet_address, float>));
return ss.local().get_ownership().then([] (auto&& ownership) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
});
}
static
@@ -1331,7 +1328,10 @@ rest_get_effective_ownership(http_context& ctx, sharded<service::storage_service
}
}
co_return json::json_return_type(stream_range_as_array(co_await ss.local().effective_ownership(keyspace_name, table_name), &map_to_json<gms::inet_address, float>));
return ss.local().effective_ownership(keyspace_name, table_name).then([] (auto&& ownership) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
});
}
static
@@ -1341,7 +1341,7 @@ rest_estimate_compression_ratios(http_context& ctx, sharded<service::storage_ser
apilog.warn("estimate_compression_ratios: called before the cluster feature was enabled");
throw std::runtime_error("estimate_compression_ratios requires all nodes to support the SSTABLE_COMPRESSION_DICTS cluster feature");
}
auto ticket = co_await get_units(ss.local().get_do_sample_sstables_concurrency_limiter(), 1);
auto ticket = get_units(ss.local().get_do_sample_sstables_concurrency_limiter(), 1);
auto ks = api::req_param<sstring>(*req, "keyspace", {}).value;
auto cf = api::req_param<sstring>(*req, "cf", {}).value;
apilog.debug("estimate_compression_ratios: called with ks={} cf={}", ks, cf);
@@ -1407,7 +1407,7 @@ rest_retrain_dict(http_context& ctx, sharded<service::storage_service>& ss, serv
apilog.warn("retrain_dict: called before the cluster feature was enabled");
throw std::runtime_error("retrain_dict requires all nodes to support the SSTABLE_COMPRESSION_DICTS cluster feature");
}
auto ticket = co_await get_units(ss.local().get_do_sample_sstables_concurrency_limiter(), 1);
auto ticket = get_units(ss.local().get_do_sample_sstables_concurrency_limiter(), 1);
auto ks = api::req_param<sstring>(*req, "keyspace", {}).value;
auto cf = api::req_param<sstring>(*req, "cf", {}).value;
apilog.debug("retrain_dict: called with ks={} cf={}", ks, cf);
@@ -1565,7 +1565,16 @@ rest_reload_raft_topology_state(sharded<service::storage_service>& ss, service::
static
future<json::json_return_type>
rest_upgrade_to_raft_topology(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("Requested to schedule upgrade to raft topology, but this version does not need it since it uses raft topology by default.");
apilog.info("Requested to schedule upgrade to raft topology");
try {
co_await ss.invoke_on(0, [] (auto& ss) {
return ss.start_upgrade_to_raft_topology();
});
} catch (...) {
auto ex = std::current_exception();
apilog.error("Failed to schedule upgrade to raft topology: {}", ex);
std::rethrow_exception(std::move(ex));
}
co_return json_void();
}
@@ -2007,8 +2016,6 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("cf"), ",");
auto sfopt = req->get_query_param("sf");
auto tcopt = req->get_query_param("tc");
db::snapshot_options opts = {
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
};
@@ -2033,27 +2040,6 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
}
});
ss::take_cluster_snapshot.set(r, [&snap_ctl](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
apilog.info("take_cluster_snapshot: {}", req->get_query_params());
auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("table"), ",");
// Note: not published/active. Retain as internal option, but...
auto sfopt = req->get_query_param("skip_flush");
db::snapshot_options opts = {
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
};
std::vector<sstring> keynames = split(req->get_query_param("keyspace"), ",");
try {
co_await snap_ctl.local().take_cluster_column_family_snapshot(keynames, column_families, tag, opts);
co_return json_void();
} catch (...) {
apilog.error("take_cluster_snapshot failed: {}", std::current_exception());
throw;
}
});
ss::del_snapshot.set(r, [&snap_ctl](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
apilog.info("del_snapshot: {}", req->get_query_params());
auto tag = req->get_query_param("tag");

View File

@@ -17,6 +17,7 @@ target_sources(scylla_auth
password_authenticator.cc
passwords.cc
permission.cc
permissions_cache.cc
resource.cc
role_or_anonymous.cc
roles-metadata.cc
@@ -25,7 +26,6 @@ target_sources(scylla_auth
service.cc
standard_role_manager.cc
transitional.cc
maintenance_socket_authenticator.cc
maintenance_socket_role_manager.cc)
target_include_directories(scylla_auth
PUBLIC

View File

@@ -9,9 +9,19 @@
#include "auth/allow_all_authenticator.hh"
#include "service/migration_manager.hh"
#include "utils/class_registrator.hh"
namespace auth {
constexpr std::string_view allow_all_authenticator_name("org.apache.cassandra.auth.AllowAllAuthenticator");
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authenticator,
allow_all_authenticator,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}

View File

@@ -9,9 +9,18 @@
#include "auth/allow_all_authorizer.hh"
#include "auth/common.hh"
#include "utils/class_registrator.hh"
namespace auth {
constexpr std::string_view allow_all_authorizer_name("org.apache.cassandra.auth.AllowAllAuthorizer");
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authorizer,
allow_all_authorizer,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&> registration("org.apache.cassandra.auth.AllowAllAuthorizer");
}

View File

@@ -8,7 +8,6 @@
#include "auth/cache.hh"
#include "auth/common.hh"
#include "auth/role_or_anonymous.hh"
#include "auth/roles-metadata.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
@@ -19,8 +18,6 @@
#include <seastar/core/abort_source.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/format.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/do_with.hh>
namespace auth {
@@ -30,21 +27,7 @@ cache::cache(cql3::query_processor& qp, abort_source& as) noexcept
: _current_version(0)
, _qp(qp)
, _loading_sem(1)
, _as(as)
, _permission_loader(nullptr)
, _permission_loader_sem(8) {
namespace sm = seastar::metrics;
_metrics.add_group("auth_cache", {
sm::make_gauge("roles", [this] { return _roles.size(); },
sm::description("Number of roles currently cached")),
sm::make_gauge("permissions", [this] {
return _cached_permissions_count;
}, sm::description("Total number of permission sets currently cached across all roles"))
});
}
void cache::set_permission_loader(permission_loader_func loader) {
_permission_loader = std::move(loader);
, _as(as) {
}
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
@@ -55,83 +38,6 @@ lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) cons
return it->second;
}
future<permission_set> cache::get_permissions(const role_or_anonymous& role, const resource& r) {
std::unordered_map<resource, permission_set>* perms_cache;
lw_shared_ptr<role_record> role_ptr;
if (is_anonymous(role)) {
perms_cache = &_anonymous_permissions;
} else {
const auto& role_name = *role.name;
auto role_it = _roles.find(role_name);
if (role_it == _roles.end()) {
// Role might have been deleted but there are some connections
// left which reference it. They should no longer have access to anything.
return make_ready_future<permission_set>(permissions::NONE);
}
role_ptr = role_it->second;
perms_cache = &role_ptr->cached_permissions;
}
if (auto it = perms_cache->find(r); it != perms_cache->end()) {
return make_ready_future<permission_set>(it->second);
}
// keep alive role_ptr as it holds perms_cache (except anonymous)
return do_with(std::move(role_ptr), [this, &role, &r, perms_cache] (auto& role_ptr) {
return load_permissions(role, r, perms_cache);
});
}
future<permission_set> cache::load_permissions(const role_or_anonymous& role, const resource& r, std::unordered_map<resource, permission_set>* perms_cache) {
SCYLLA_ASSERT(_permission_loader);
auto units = co_await get_units(_permission_loader_sem, 1, _as);
// Check again, perhaps we were blocked and other call loaded
// the permissions already. This is a protection against misses storm.
if (auto it = perms_cache->find(r); it != perms_cache->end()) {
co_return it->second;
}
auto perms = co_await _permission_loader(role, r);
add_permissions(*perms_cache, r, perms);
co_return perms;
}
future<> cache::prune(const resource& r) {
auto units = co_await get_units(_loading_sem, 1, _as);
_anonymous_permissions.erase(r);
for (auto& it : _roles) {
// Prunning can run concurrently with other functions but it
// can only cause cached_permissions extra reload via get_permissions.
remove_permissions(it.second->cached_permissions, r);
co_await coroutine::maybe_yield();
}
}
future<> cache::reload_all_permissions() noexcept {
SCYLLA_ASSERT(_permission_loader);
auto units = co_await get_units(_loading_sem, 1, _as);
auto copy_keys = [] (const std::unordered_map<resource, permission_set>& m) {
std::vector<resource> keys;
keys.reserve(m.size());
for (const auto& [res, _] : m) {
keys.push_back(res);
}
return keys;
};
const role_or_anonymous anon;
for (const auto& res : copy_keys(_anonymous_permissions)) {
_anonymous_permissions[res] = co_await _permission_loader(anon, res);
}
for (auto& [role, entry] : _roles) {
auto& perms_cache = entry->cached_permissions;
auto r = role_or_anonymous(role);
for (const auto& res : copy_keys(perms_cache)) {
perms_cache[res] = co_await _permission_loader(r, res);
}
}
logger.debug("Reloaded auth cache with {} entries", _roles.size());
}
future<lw_shared_ptr<cache::role_record>> cache::fetch_role(const role_name_t& role) const {
auto rec = make_lw_shared<role_record>();
rec->version = _current_version;
@@ -199,7 +105,7 @@ future<lw_shared_ptr<cache::role_record>> cache::fetch_role(const role_name_t& r
future<> cache::prune_all() noexcept {
for (auto it = _roles.begin(); it != _roles.end(); ) {
if (it->second->version != _current_version) {
remove_role(it++);
_roles.erase(it++);
co_await coroutine::maybe_yield();
} else {
++it;
@@ -223,7 +129,7 @@ future<> cache::load_all() {
const auto name = r.get_as<sstring>("role");
auto role = co_await fetch_role(name);
if (role) {
add_role(name, role);
_roles[name] = role;
}
co_return stop_iteration::no;
};
@@ -236,32 +142,11 @@ future<> cache::load_all() {
co_await distribute_role(name, role);
}
co_await container().invoke_on_others([this](cache& c) -> future<> {
auto units = co_await get_units(c._loading_sem, 1, c._as);
c._current_version = _current_version;
co_await c.prune_all();
});
}
future<> cache::gather_inheriting_roles(std::unordered_set<role_name_t>& roles, lw_shared_ptr<cache::role_record> role, const role_name_t& name) {
if (!role) {
// Role might have been removed or not yet added, either way
// their members will be handled by another top call to this function.
co_return;
}
for (const auto& member_name : role->members) {
bool is_new = roles.insert(member_name).second;
if (!is_new) {
continue;
}
lw_shared_ptr<cache::role_record> member_role;
auto r = _roles.find(member_name);
if (r != _roles.end()) {
member_role = r->second;
}
co_await gather_inheriting_roles(roles, member_role, member_name);
}
}
future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
if (legacy_mode(_qp)) {
co_return;
@@ -269,41 +154,27 @@ future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
SCYLLA_ASSERT(this_shard_id() == 0);
auto units = co_await get_units(_loading_sem, 1, _as);
std::unordered_set<role_name_t> roles_to_clear_perms;
for (const auto& name : roles) {
logger.info("Loading role {}", name);
auto role = co_await fetch_role(name);
if (role) {
add_role(name, role);
co_await gather_inheriting_roles(roles_to_clear_perms, role, name);
_roles[name] = role;
} else {
if (auto it = _roles.find(name); it != _roles.end()) {
auto old_role = it->second;
remove_role(it);
co_await gather_inheriting_roles(roles_to_clear_perms, old_role, name);
}
_roles.erase(name);
}
co_await distribute_role(name, role);
}
co_await container().invoke_on_all([&roles_to_clear_perms] (cache& c) -> future<> {
for (const auto& name : roles_to_clear_perms) {
c.clear_role_permissions(name);
co_await coroutine::maybe_yield();
}
});
}
future<> cache::distribute_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
auto role_ptr = role.get();
co_await container().invoke_on_others([&name, role_ptr](cache& c) -> future<> {
auto units = co_await get_units(c._loading_sem, 1, c._as);
co_await container().invoke_on_others([&name, role_ptr](cache& c) {
if (!role_ptr) {
c.remove_role(name);
co_return;
c._roles.erase(name);
return;
}
auto role_copy = make_lw_shared<role_record>(*role_ptr);
c.add_role(name, std::move(role_copy));
c._roles[name] = std::move(role_copy);
});
}
@@ -314,40 +185,4 @@ bool cache::includes_table(const table_id& id) noexcept {
|| id == db::system_keyspace::role_permissions()->id();
}
void cache::add_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
if (auto it = _roles.find(name); it != _roles.end()) {
_cached_permissions_count -= it->second->cached_permissions.size();
}
_cached_permissions_count += role->cached_permissions.size();
_roles[name] = std::move(role);
}
void cache::remove_role(const role_name_t& name) {
if (auto it = _roles.find(name); it != _roles.end()) {
remove_role(it);
}
}
void cache::remove_role(roles_map::iterator it) {
_cached_permissions_count -= it->second->cached_permissions.size();
_roles.erase(it);
}
void cache::clear_role_permissions(const role_name_t& name) {
if (auto it = _roles.find(name); it != _roles.end()) {
_cached_permissions_count -= it->second->cached_permissions.size();
it->second->cached_permissions.clear();
}
}
void cache::add_permissions(std::unordered_map<resource, permission_set>& cache, const resource& r, permission_set perms) {
if (cache.emplace(r, perms).second) {
++_cached_permissions_count;
}
}
void cache::remove_permissions(std::unordered_map<resource, permission_set>& cache, const resource& r) {
_cached_permissions_count -= cache.erase(r);
}
} // namespace auth

View File

@@ -17,14 +17,11 @@
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/metrics_registration.hh>
#include <absl/container/flat_hash_map.h>
#include "auth/permission.hh"
#include "auth/common.hh"
#include "auth/resource.hh"
#include "auth/role_or_anonymous.hh"
namespace cql3 { class query_processor; }
@@ -34,7 +31,6 @@ class cache : public peering_sharded_service<cache> {
public:
using role_name_t = sstring;
using version_tag_t = char;
using permission_loader_func = std::function<future<permission_set>(const role_or_anonymous&, const resource&)>;
struct role_record {
bool can_login = false;
@@ -44,19 +40,11 @@ public:
sstring salted_hash;
std::unordered_map<sstring, sstring> attributes;
std::unordered_map<sstring, permission_set> permissions;
private:
friend cache;
// cached permissions include effects of role's inheritance
std::unordered_map<resource, permission_set> cached_permissions;
version_tag_t version; // used for seamless cache reloads
};
explicit cache(cql3::query_processor& qp, abort_source& as) noexcept;
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
void set_permission_loader(permission_loader_func loader);
future<permission_set> get_permissions(const role_or_anonymous& role, const resource& r);
future<> prune(const resource& r);
future<> reload_all_permissions() noexcept;
future<> load_all();
future<> load_roles(std::unordered_set<role_name_t> roles);
static bool includes_table(const table_id&) noexcept;
@@ -64,31 +52,14 @@ public:
private:
using roles_map = absl::flat_hash_map<role_name_t, lw_shared_ptr<role_record>>;
roles_map _roles;
// anonymous permissions map exists mainly due to compatibility with
// higher layers which use role_or_anonymous to get permissions.
std::unordered_map<resource, permission_set> _anonymous_permissions;
version_tag_t _current_version;
cql3::query_processor& _qp;
semaphore _loading_sem; // protects iteration of _roles map
semaphore _loading_sem;
abort_source& _as;
permission_loader_func _permission_loader;
semaphore _permission_loader_sem; // protects against reload storms on a single role change
metrics::metric_groups _metrics;
size_t _cached_permissions_count = 0;
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
future<> prune_all() noexcept;
future<> distribute_role(const role_name_t& name, const lw_shared_ptr<role_record> role);
future<> gather_inheriting_roles(std::unordered_set<role_name_t>& roles, lw_shared_ptr<cache::role_record> role, const role_name_t& name);
void add_role(const role_name_t& name, lw_shared_ptr<role_record> role);
void remove_role(const role_name_t& name);
void remove_role(roles_map::iterator it);
void clear_role_permissions(const role_name_t& name);
void add_permissions(std::unordered_map<resource, permission_set>& cache, const resource& r, permission_set perms);
void remove_permissions(std::unordered_map<resource, permission_set>& cache, const resource& r);
future<permission_set> load_permissions(const role_or_anonymous& role, const resource& r, std::unordered_map<resource, permission_set>* perms_cache);
};
} // namespace auth

View File

@@ -13,11 +13,14 @@
#include <boost/regex.hpp>
#include <fmt/ranges.h>
#include "utils/class_registrator.hh"
#include "utils/to_string.hh"
#include "data_dictionary/data_dictionary.hh"
#include "cql3/query_processor.hh"
#include "db/config.hh"
static const auto CERT_AUTH_NAME = "com.scylladb.auth.CertificateAuthenticator";
const std::string_view auth::certificate_authenticator_name(CERT_AUTH_NAME);
static logging::logger clogger("certificate_authenticator");
@@ -27,6 +30,13 @@ static const std::string cfg_query_attr = "query";
static const std::string cfg_source_subject = "SUBJECT";
static const std::string cfg_source_altname = "ALTNAME";
static const class_registrator<auth::authenticator
, auth::certificate_authenticator
, cql3::query_processor&
, ::service::raft_group0_client&
, ::service::migration_manager&
, auth::cache&> cert_auth_reg(CERT_AUTH_NAME);
enum class auth::certificate_authenticator::query_source {
subject, altname
};
@@ -89,7 +99,7 @@ future<> auth::certificate_authenticator::stop() {
}
std::string_view auth::certificate_authenticator::qualified_java_name() const {
return "com.scylladb.auth.CertificateAuthenticator";
return certificate_authenticator_name;
}
bool auth::certificate_authenticator::require_authentication() const {

View File

@@ -27,6 +27,8 @@ namespace auth {
class cache;
extern const std::string_view certificate_authenticator_name;
class certificate_authenticator : public authenticator {
enum class query_source;
std::vector<std::pair<query_source, boost::regex>> _queries;

View File

@@ -26,6 +26,7 @@ extern "C" {
#include "cql3/untyped_result_set.hh"
#include "exceptions/exceptions.hh"
#include "utils/log.hh"
#include "utils/class_registrator.hh"
namespace auth {
@@ -39,6 +40,14 @@ static constexpr std::string_view PERMISSIONS_NAME = "permissions";
static logging::logger alogger("default_authorizer");
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authorizer,
default_authorizer,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&> password_auth_reg("org.apache.cassandra.auth.CassandraAuthorizer");
default_authorizer::default_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: _qp(qp)
, _migration_manager(mm) {

View File

@@ -24,6 +24,7 @@
#include "exceptions/exceptions.hh"
#include "seastarx.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/class_registrator.hh"
#include "db/config.hh"
#include "utils/exponential_backoff_retry.hh"
@@ -71,22 +72,26 @@ std::vector<sstring> get_attr_values(LDAP* ld, LDAPMessage* res, const char* att
return values;
}
const char* ldap_role_manager_full_name = "com.scylladb.auth.LDAPRoleManager";
} // anonymous namespace
namespace auth {
static const class_registrator<
role_manager,
ldap_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration(ldap_role_manager_full_name);
ldap_role_manager::ldap_role_manager(
std::string_view query_template, std::string_view target_attr, std::string_view bind_name, std::string_view bind_password,
uint32_t permissions_update_interval_in_ms,
utils::observer<uint32_t> permissions_update_interval_in_ms_observer,
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
: _std_mgr(qp, rg0c, mm, cache), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
, _bind_password(bind_password)
, _permissions_update_interval_in_ms(permissions_update_interval_in_ms)
, _permissions_update_interval_in_ms_observer(std::move(permissions_update_interval_in_ms_observer))
, _connection_factory(bind(std::mem_fn(&ldap_role_manager::reconnect), std::ref(*this)))
, _cache(cache)
, _cache_pruner(make_ready_future<>()) {
, _connection_factory(bind(std::mem_fn(&ldap_role_manager::reconnect), std::ref(*this))) {
}
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
@@ -95,8 +100,6 @@ ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_
qp.db().get_config().ldap_attr_role(),
qp.db().get_config().ldap_bind_dn(),
qp.db().get_config().ldap_bind_passwd(),
qp.db().get_config().permissions_update_interval_in_ms(),
qp.db().get_config().permissions_update_interval_in_ms.observe([this] (const uint32_t& v) { _permissions_update_interval_in_ms = v; }),
qp,
rg0c,
mm,
@@ -104,7 +107,7 @@ ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_
}
std::string_view ldap_role_manager::qualified_java_name() const noexcept {
return "com.scylladb.auth.LDAPRoleManager";
return ldap_role_manager_full_name;
}
const resource_set& ldap_role_manager::protected_resources() const {
@@ -116,22 +119,6 @@ future<> ldap_role_manager::start() {
return make_exception_future(
std::runtime_error(fmt::format("error getting LDAP server address from template {}", _query_template)));
}
_cache_pruner = futurize_invoke([this] () -> future<> {
while (true) {
try {
co_await seastar::sleep_abortable(std::chrono::milliseconds(_permissions_update_interval_in_ms), _as);
} catch (const seastar::sleep_aborted&) {
co_return; // ignore
}
co_await _cache.container().invoke_on_all([] (cache& c) -> future<> {
try {
co_await c.reload_all_permissions();
} catch (...) {
mylog.warn("Cache reload all permissions failed: {}", std::current_exception());
}
});
}
});
return _std_mgr.start();
}
@@ -188,11 +175,7 @@ future<conn_ptr> ldap_role_manager::reconnect() {
future<> ldap_role_manager::stop() {
_as.request_abort();
return std::move(_cache_pruner).then([this] {
return _std_mgr.stop();
}).then([this] {
return _connection_factory.stop();
});
return _std_mgr.stop().then([this] { return _connection_factory.stop(); });
}
future<> ldap_role_manager::create(std::string_view name, const role_config& config, ::service::group0_batch& mc) {

View File

@@ -10,7 +10,6 @@
#pragma once
#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
#include <stdexcept>
#include "ent/ldap/ldap_connection.hh"
@@ -35,29 +34,22 @@ class ldap_role_manager : public role_manager {
seastar::sstring _target_attr; ///< LDAP entry attribute containing the Scylla role name.
seastar::sstring _bind_name; ///< Username for LDAP simple bind.
seastar::sstring _bind_password; ///< Password for LDAP simple bind.
uint32_t _permissions_update_interval_in_ms;
utils::observer<uint32_t> _permissions_update_interval_in_ms_observer;
mutable ldap_reuser _connection_factory; // Potentially modified by query_granted().
seastar::abort_source _as;
cache& _cache;
seastar::future<> _cache_pruner;
public:
ldap_role_manager(
std::string_view query_template, ///< LDAP query template as described in Scylla documentation.
std::string_view target_attr, ///< LDAP entry attribute containing the Scylla role name.
std::string_view bind_name, ///< LDAP bind credentials.
std::string_view bind_password, ///< LDAP bind credentials.
uint32_t permissions_update_interval_in_ms,
utils::observer<uint32_t> permissions_update_interval_in_ms_observer,
cql3::query_processor& qp, ///< Passed to standard_role_manager.
::service::raft_group0_client& rg0c, ///< Passed to standard_role_manager.
::service::migration_manager& mm, ///< Passed to standard_role_manager.
cache& cache ///< Passed to standard_role_manager.
);
/// Retrieves LDAP configuration entries from qp and invokes the other constructor.
/// Retrieves LDAP configuration entries from qp and invokes the other constructor. Required by
/// class_registrator<role_manager>.
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache);
/// Thrown when query-template parsing fails.

View File

@@ -1,31 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "auth/maintenance_socket_authenticator.hh"
namespace auth {
maintenance_socket_authenticator::~maintenance_socket_authenticator() {
}
future<> maintenance_socket_authenticator::start() {
return make_ready_future<>();
}
future<> maintenance_socket_authenticator::ensure_superuser_is_created() const {
return make_ready_future<>();
}
bool maintenance_socket_authenticator::require_authentication() const {
return false;
}
} // namespace auth

View File

@@ -1,36 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include <seastar/core/shared_future.hh>
#include "password_authenticator.hh"
namespace auth {
// maintenance_socket_authenticator is used for clients connecting to the
// maintenance socket. It does not require authentication,
// while still allowing the managing of roles and their credentials.
class maintenance_socket_authenticator : public password_authenticator {
public:
using password_authenticator::password_authenticator;
virtual ~maintenance_socket_authenticator();
virtual future<> start() override;
virtual future<> ensure_superuser_is_created() const override;
bool require_authentication() const override;
};
} // namespace auth

View File

@@ -13,48 +13,23 @@
#include <string_view>
#include "auth/cache.hh"
#include "cql3/description.hh"
#include "utils/log.hh"
#include "utils/on_internal_error.hh"
#include "utils/class_registrator.hh"
namespace auth {
static logging::logger log("maintenance_socket_role_manager");
constexpr std::string_view maintenance_socket_role_manager_name = "com.scylladb.auth.MaintenanceSocketRoleManager";
future<> maintenance_socket_role_manager::ensure_role_operations_are_enabled() {
if (_is_maintenance_mode) {
on_internal_error(log, "enabling role operations not allowed in maintenance mode");
}
static const class_registrator<
role_manager,
maintenance_socket_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration(sstring{maintenance_socket_role_manager_name});
if (_std_mgr.has_value()) {
on_internal_error(log, "role operations are already enabled");
}
_std_mgr.emplace(_qp, _group0_client, _migration_manager, _cache);
return _std_mgr->start();
}
void maintenance_socket_role_manager::set_maintenance_mode() {
if (_std_mgr.has_value()) {
on_internal_error(log, "cannot enter maintenance mode after role operations have been enabled");
}
_is_maintenance_mode = true;
}
maintenance_socket_role_manager::maintenance_socket_role_manager(
cql3::query_processor& qp,
::service::raft_group0_client& rg0c,
::service::migration_manager& mm,
cache& c)
: _qp(qp)
, _group0_client(rg0c)
, _migration_manager(mm)
, _cache(c)
, _std_mgr(std::nullopt)
, _is_maintenance_mode(false) {
}
std::string_view maintenance_socket_role_manager::qualified_java_name() const noexcept {
return "com.scylladb.auth.MaintenanceSocketRoleManager";
return maintenance_socket_role_manager_name;
}
const resource_set& maintenance_socket_role_manager::protected_resources() const {
@@ -68,161 +43,81 @@ future<> maintenance_socket_role_manager::start() {
}
future<> maintenance_socket_role_manager::stop() {
return _std_mgr ? _std_mgr->stop() : make_ready_future<>();
}
future<> maintenance_socket_role_manager::ensure_superuser_is_created() {
return _std_mgr ? _std_mgr->ensure_superuser_is_created() : make_ready_future<>();
}
template<typename T = void>
future<T> operation_not_available_in_maintenance_mode_exception(std::string_view operation) {
return make_exception_future<T>(
std::runtime_error(fmt::format("role manager: {} operation not available through maintenance socket in maintenance mode", operation)));
}
template<typename T = void>
future<T> manager_not_ready_exception(std::string_view operation) {
return make_exception_future<T>(
std::runtime_error(fmt::format("role manager: {} operation not available because manager not ready yet (role operations not enabled)", operation)));
}
future<> maintenance_socket_role_manager::validate_operation(std::string_view name) const {
if (_is_maintenance_mode) {
return operation_not_available_in_maintenance_mode_exception(name);
}
if (!_std_mgr) {
return manager_not_ready_exception(name);
}
return make_ready_future<>();
}
future<> maintenance_socket_role_manager::create(std::string_view role_name, const role_config& c, ::service::group0_batch& mc) {
auto f = validate_operation("CREATE");
if (f.failed()) {
return f;
}
return _std_mgr->create(role_name, c, mc);
future<> maintenance_socket_role_manager::ensure_superuser_is_created() {
return make_ready_future<>();
}
template<typename T = void>
future<T> operation_not_supported_exception(std::string_view operation) {
return make_exception_future<T>(
std::runtime_error(fmt::format("role manager: {} operation not supported through maintenance socket", operation)));
}
future<> maintenance_socket_role_manager::create(std::string_view role_name, const role_config&, ::service::group0_batch&) {
return operation_not_supported_exception("CREATE");
}
future<> maintenance_socket_role_manager::drop(std::string_view role_name, ::service::group0_batch& mc) {
auto f = validate_operation("DROP");
if (f.failed()) {
return f;
}
return _std_mgr->drop(role_name, mc);
return operation_not_supported_exception("DROP");
}
future<> maintenance_socket_role_manager::alter(std::string_view role_name, const role_config_update& u, ::service::group0_batch& mc) {
auto f = validate_operation("ALTER");
if (f.failed()) {
return f;
}
return _std_mgr->alter(role_name, u, mc);
future<> maintenance_socket_role_manager::alter(std::string_view role_name, const role_config_update&, ::service::group0_batch&) {
return operation_not_supported_exception("ALTER");
}
future<> maintenance_socket_role_manager::grant(std::string_view grantee_name, std::string_view role_name, ::service::group0_batch& mc) {
auto f = validate_operation("GRANT");
if (f.failed()) {
return f;
}
return _std_mgr->grant(grantee_name, role_name, mc);
return operation_not_supported_exception("GRANT");
}
future<> maintenance_socket_role_manager::revoke(std::string_view revokee_name, std::string_view role_name, ::service::group0_batch& mc) {
auto f = validate_operation("REVOKE");
if (f.failed()) {
return f;
}
return _std_mgr->revoke(revokee_name, role_name, mc);
return operation_not_supported_exception("REVOKE");
}
future<role_set> maintenance_socket_role_manager::query_granted(std::string_view grantee_name, recursive_role_query m) {
auto f = validate_operation("QUERY GRANTED");
if (f.failed()) {
return make_exception_future<role_set>(f.get_exception());
}
return _std_mgr->query_granted(grantee_name, m);
future<role_set> maintenance_socket_role_manager::query_granted(std::string_view grantee_name, recursive_role_query) {
return operation_not_supported_exception<role_set>("QUERY GRANTED");
}
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted(::service::query_state& qs) {
auto f = validate_operation("QUERY ALL DIRECTLY GRANTED");
if (f.failed()) {
return make_exception_future<role_to_directly_granted_map>(f.get_exception());
}
return _std_mgr->query_all_directly_granted(qs);
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted(::service::query_state&) {
return operation_not_supported_exception<role_to_directly_granted_map>("QUERY ALL DIRECTLY GRANTED");
}
future<role_set> maintenance_socket_role_manager::query_all(::service::query_state& qs) {
auto f = validate_operation("QUERY ALL");
if (f.failed()) {
return make_exception_future<role_set>(f.get_exception());
}
return _std_mgr->query_all(qs);
future<role_set> maintenance_socket_role_manager::query_all(::service::query_state&) {
return operation_not_supported_exception<role_set>("QUERY ALL");
}
future<bool> maintenance_socket_role_manager::exists(std::string_view role_name) {
auto f = validate_operation("EXISTS");
if (f.failed()) {
return make_exception_future<bool>(f.get_exception());
}
return _std_mgr->exists(role_name);
return operation_not_supported_exception<bool>("EXISTS");
}
future<bool> maintenance_socket_role_manager::is_superuser(std::string_view role_name) {
auto f = validate_operation("IS SUPERUSER");
if (f.failed()) {
return make_exception_future<bool>(f.get_exception());
}
return _std_mgr->is_superuser(role_name);
return make_ready_future<bool>(true);
}
future<bool> maintenance_socket_role_manager::can_login(std::string_view role_name) {
auto f = validate_operation("CAN LOGIN");
if (f.failed()) {
return make_exception_future<bool>(f.get_exception());
}
return _std_mgr->can_login(role_name);
return make_ready_future<bool>(true);
}
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
auto f = validate_operation("GET ATTRIBUTE");
if (f.failed()) {
return make_exception_future<std::optional<sstring>>(f.get_exception());
}
return _std_mgr->get_attribute(role_name, attribute_name, qs);
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) {
return operation_not_supported_exception<std::optional<sstring>>("GET ATTRIBUTE");
}
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state& qs) {
auto f = validate_operation("QUERY ATTRIBUTE FOR ALL");
if (f.failed()) {
return make_exception_future<role_manager::attribute_vals>(f.get_exception());
}
return _std_mgr->query_attribute_for_all(attribute_name, qs);
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) {
return operation_not_supported_exception<role_manager::attribute_vals>("QUERY ATTRIBUTE");
}
future<> maintenance_socket_role_manager::set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) {
auto f = validate_operation("SET ATTRIBUTE");
if (f.failed()) {
return f;
}
return _std_mgr->set_attribute(role_name, attribute_name, attribute_value, mc);
return operation_not_supported_exception("SET ATTRIBUTE");
}
future<> maintenance_socket_role_manager::remove_attribute(std::string_view role_name, std::string_view attribute_name, ::service::group0_batch& mc) {
auto f = validate_operation("REMOVE ATTRIBUTE");
if (f.failed()) {
return f;
}
return _std_mgr->remove_attribute(role_name, attribute_name, mc);
return operation_not_supported_exception("REMOVE ATTRIBUTE");
}
future<std::vector<cql3::description>> maintenance_socket_role_manager::describe_role_grants() {
auto f = validate_operation("DESCRIBE ROLE GRANTS");
if (f.failed()) {
return make_exception_future<std::vector<cql3::description>>(f.get_exception());
}
return _std_mgr->describe_role_grants();
return operation_not_supported_exception<std::vector<cql3::description>>("DESCRIBE SCHEMA WITH INTERNALS");
}
} // namespace auth

View File

@@ -11,7 +11,6 @@
#include "auth/cache.hh"
#include "auth/resource.hh"
#include "auth/role_manager.hh"
#include "auth/standard_role_manager.hh"
#include <seastar/core/future.hh>
namespace cql3 {
@@ -25,26 +24,13 @@ class raft_group0_client;
namespace auth {
// This role manager is used by the maintenance socket. It has disabled all role management operations
// in maintenance mode. In normal mode it delegates all operations to a standard_role_manager,
// which is created on demand when the node joins the cluster.
extern const std::string_view maintenance_socket_role_manager_name;
// This role manager is used by the maintenance socket. It has disabled all role management operations to not depend on
// system_auth keyspace, which may be not yet created when the maintenance socket starts listening.
class maintenance_socket_role_manager final : public role_manager {
cql3::query_processor& _qp;
::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
cache& _cache;
std::optional<standard_role_manager> _std_mgr;
bool _is_maintenance_mode;
public:
void set_maintenance_mode() override;
// Ensures role management operations are enabled.
// It must be called once the node has joined the cluster.
// In the meantime all role management operations will fail.
future<> ensure_role_operations_are_enabled() override;
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {}
virtual std::string_view qualified_java_name() const noexcept override;
@@ -56,21 +42,21 @@ public:
virtual future<> ensure_superuser_is_created() override;
virtual future<> create(std::string_view role_name, const role_config& c, ::service::group0_batch& mc) override;
virtual future<> create(std::string_view role_name, const role_config&, ::service::group0_batch&) override;
virtual future<> drop(std::string_view role_name, ::service::group0_batch& mc) override;
virtual future<> alter(std::string_view role_name, const role_config_update& u, ::service::group0_batch& mc) override;
virtual future<> alter(std::string_view role_name, const role_config_update&, ::service::group0_batch&) override;
virtual future<> grant(std::string_view grantee_name, std::string_view role_name, ::service::group0_batch& mc) override;
virtual future<> revoke(std::string_view revokee_name, std::string_view role_name, ::service::group0_batch& mc) override;
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query m) override;
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query) override;
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state& qs) override;
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
virtual future<role_set> query_all(::service::query_state& qs) override;
virtual future<role_set> query_all(::service::query_state&) override;
virtual future<bool> exists(std::string_view role_name) override;
@@ -78,19 +64,15 @@ public:
virtual future<bool> can_login(std::string_view role_name) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state& qs) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) override;
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) override;
virtual future<> remove_attribute(std::string_view role_name, std::string_view attribute_name, ::service::group0_batch& mc) override;
virtual future<std::vector<cql3::description>> describe_role_grants() override;
private:
future<> validate_operation(std::string_view name) const;
};
}

View File

@@ -26,6 +26,7 @@
#include "cql3/untyped_result_set.hh"
#include "utils/log.hh"
#include "service/migration_manager.hh"
#include "utils/class_registrator.hh"
#include "replica/database.hh"
#include "cql3/query_processor.hh"
#include "db/config.hh"
@@ -36,18 +37,27 @@ constexpr std::string_view password_authenticator_name("org.apache.cassandra.aut
// name of the hash column.
static constexpr std::string_view SALTED_HASH = "salted_hash";
static constexpr std::string_view DEFAULT_USER_NAME = meta::DEFAULT_SUPERUSER_NAME;
static const sstring DEFAULT_USER_PASSWORD = sstring(meta::DEFAULT_SUPERUSER_NAME);
static logging::logger plogger("password_authenticator");
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authenticator,
password_authenticator,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
std::string password_authenticator::default_superuser(cql3::query_processor& qp) {
if (legacy_mode(qp)) {
return std::string(meta::DEFAULT_SUPERUSER_NAME);
}
return qp.db().get_config().auth_superuser_name();
static std::string_view get_config_value(std::string_view value, std::string_view def) {
return value.empty() ? def : value;
}
std::string password_authenticator::default_superuser(const db::config& cfg) {
return std::string(get_config_value(cfg.auth_superuser_name(), DEFAULT_USER_NAME));
}
password_authenticator::~password_authenticator() {
@@ -59,6 +69,7 @@ password_authenticator::password_authenticator(cql3::query_processor& qp, ::serv
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(default_superuser(qp.db().get_config()))
{}
static bool has_salted_hash(const cql3::untyped_result_set_row& row) {
@@ -112,14 +123,11 @@ future<> password_authenticator::migrate_legacy_metadata() const {
}
future<> password_authenticator::legacy_create_default_if_missing() {
if (_superuser.empty()) {
on_internal_error(plogger, "Legacy auth default superuser name is empty");
}
const auto exists = co_await legacy::default_role_row_satisfies(_qp, &has_salted_hash, _superuser);
if (exists) {
co_return;
}
std::string salted_pwd(_qp.db().get_config().auth_superuser_salted_password());
std::string salted_pwd(get_config_value(_qp.db().get_config().auth_superuser_salted_password(), ""));
if (salted_pwd.empty()) {
salted_pwd = passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt, _scheme);
}
@@ -139,9 +147,6 @@ future<> password_authenticator::legacy_create_default_if_missing() {
future<> password_authenticator::maybe_create_default_password() {
auto needs_password = [this] () -> future<bool> {
if (_superuser.empty()) {
co_return false;
}
const sstring query = seastar::format("SELECT * FROM {}.{} WHERE is_superuser = true ALLOW FILTERING", get_auth_ks_name(_qp), meta::roles_table::name);
auto results = co_await _qp.execute_internal(query,
db::consistency_level::LOCAL_ONE,
@@ -173,9 +178,9 @@ future<> password_authenticator::maybe_create_default_password() {
co_return;
}
// Set default superuser's password.
std::string salted_pwd(_qp.db().get_config().auth_superuser_salted_password());
std::string salted_pwd(get_config_value(_qp.db().get_config().auth_superuser_salted_password(), ""));
if (salted_pwd.empty()) {
co_return;
salted_pwd = passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt, _scheme);
}
const auto update_query = update_row_query();
co_await collect_mutations(_qp, batch, update_query, {salted_pwd, _superuser});
@@ -205,8 +210,6 @@ future<> password_authenticator::maybe_create_default_password_with_retries() {
future<> password_authenticator::start() {
return once_among_shards([this] {
_superuser = default_superuser(_qp);
// Verify that at least one hashing scheme is supported.
passwords::detail::verify_scheme(_scheme);
plogger.info("Using password hashing scheme: {}", passwords::detail::prefix_for_scheme(_scheme));
@@ -214,9 +217,6 @@ future<> password_authenticator::start() {
_stopped = do_after_system_ready(_as, [this] {
return async([this] {
if (legacy_mode(_qp)) {
if (_superuser.empty()) {
on_internal_error(plogger, "Legacy auth default superuser name is empty");
}
if (!_superuser_created_promise.available()) {
// Counterintuitively, we mark promise as ready before any startup work
// because wait_for_schema_agreement() below will block indefinitely
@@ -251,9 +251,6 @@ future<> password_authenticator::start() {
});
if (legacy_mode(_qp)) {
if (_superuser.empty()) {
on_internal_error(plogger, "Legacy auth default superuser name is empty");
}
static const sstring create_roles_query = fmt::format(
"CREATE TABLE {}.{} ("
" {} text PRIMARY KEY,"
@@ -283,7 +280,7 @@ future<> password_authenticator::stop() {
db::consistency_level password_authenticator::consistency_for_user(std::string_view role_name) {
// TODO: this is plain dung. Why treat hardcoded default special, but for example a user-created
// super user uses plain LOCAL_ONE?
if (role_name == meta::DEFAULT_SUPERUSER_NAME) {
if (role_name == DEFAULT_USER_NAME) {
return db::consistency_level::QUORUM;
}
return db::consistency_level::LOCAL_ONE;

View File

@@ -51,7 +51,7 @@ class password_authenticator : public authenticator {
public:
static db::consistency_level consistency_for_user(std::string_view role_name);
static std::string default_superuser(cql3::query_processor& qp);
static std::string default_superuser(const db::config&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);

38
auth/permissions_cache.cc Normal file
View File

@@ -0,0 +1,38 @@
/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "auth/permissions_cache.hh"
#include <fmt/ranges.h>
#include "auth/authorizer.hh"
#include "auth/service.hh"
namespace auth {
permissions_cache::permissions_cache(const utils::loading_cache_config& c, service& ser, logging::logger& log)
: _cache(c, log, [&ser, &log](const key_type& k) {
log.debug("Refreshing permissions for {}", k.first);
return ser.get_uncached_permissions(k.first, k.second);
}) {
}
bool permissions_cache::update_config(utils::loading_cache_config c) {
return _cache.update_config(std::move(c));
}
void permissions_cache::reset() {
_cache.reset();
}
future<permission_set> permissions_cache::get(const role_or_anonymous& maybe_role, const resource& r) {
return do_with(key_type(maybe_role, r), [this](const auto& k) {
return _cache.get(k);
});
}
}

66
auth/permissions_cache.hh Normal file
View File

@@ -0,0 +1,66 @@
/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <iostream>
#include <utility>
#include <fmt/core.h>
#include <seastar/core/future.hh>
#include "auth/permission.hh"
#include "auth/resource.hh"
#include "auth/role_or_anonymous.hh"
#include "utils/log.hh"
#include "utils/hash.hh"
#include "utils/loading_cache.hh"
namespace std {
inline std::ostream& operator<<(std::ostream& os, const pair<auth::role_or_anonymous, auth::resource>& p) {
fmt::print(os, "{{role: {}, resource: {}}}", p.first, p.second);
return os;
}
}
namespace db {
class config;
}
namespace auth {
class service;
class permissions_cache final {
using cache_type = utils::loading_cache<
std::pair<role_or_anonymous, resource>,
permission_set,
1,
utils::loading_cache_reload_enabled::yes,
utils::simple_entry_size<permission_set>,
utils::tuple_hash>;
using key_type = typename cache_type::key_type;
cache_type _cache;
public:
explicit permissions_cache(const utils::loading_cache_config&, service&, logging::logger&);
future <> stop() {
return _cache.stop();
}
bool update_config(utils::loading_cache_config);
void reset();
future<permission_set> get(const role_or_anonymous&, const resource&);
};
}

View File

@@ -112,11 +112,6 @@ public:
virtual future<> stop() = 0;
///
/// Notify that the maintenance mode is starting.
///
virtual void set_maintenance_mode() {}
///
/// Ensure that superuser role exists.
///
@@ -124,11 +119,6 @@ public:
///
virtual future<> ensure_superuser_is_created() = 0;
///
/// Ensure role management operations are enabled. Some role managers may defer initialization.
///
virtual future<> ensure_role_operations_are_enabled() { return make_ready_future<>(); }
///
/// \returns an exceptional future with \ref role_already_exists for a role that has previously been created.
///

View File

@@ -22,11 +22,21 @@
#include "db/config.hh"
#include "utils/log.hh"
#include "seastarx.hh"
#include "utils/class_registrator.hh"
namespace auth {
static logging::logger mylog("saslauthd_authenticator");
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authenticator,
saslauthd_authenticator,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&)
: _socket_path(qp.db().get_config().saslauthd_socket_path())
{}

View File

@@ -16,8 +16,6 @@
#include <algorithm>
#include <chrono>
#include <boost/algorithm/string.hpp>
#include <seastar/core/future-util.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>
@@ -25,17 +23,8 @@
#include "auth/allow_all_authenticator.hh"
#include "auth/allow_all_authorizer.hh"
#include "auth/certificate_authenticator.hh"
#include "auth/common.hh"
#include "auth/default_authorizer.hh"
#include "auth/ldap_role_manager.hh"
#include "auth/maintenance_socket_authenticator.hh"
#include "auth/maintenance_socket_role_manager.hh"
#include "auth/password_authenticator.hh"
#include "auth/role_or_anonymous.hh"
#include "auth/saslauthd_authenticator.hh"
#include "auth/standard_role_manager.hh"
#include "auth/transitional.hh"
#include "cql3/functions/functions.hh"
#include "cql3/query_processor.hh"
#include "cql3/description.hh"
@@ -54,6 +43,7 @@
#include "service/raft/raft_group0_client.hh"
#include "mutation/timestamp.hh"
#include "utils/assert.hh"
#include "utils/class_registrator.hh"
#include "locator/abstract_replication_strategy.hh"
#include "data_dictionary/keyspace_metadata.hh"
#include "service/storage_service.hh"
@@ -74,11 +64,11 @@ static const sstring superuser_col_name("super");
static logging::logger log("auth_service");
class auth_migration_listener final : public ::service::migration_listener {
service& _service;
authorizer& _authorizer;
cql3::query_processor& _qp;
public:
explicit auth_migration_listener(service& s, cql3::query_processor& qp) : _service(s), _qp(qp) {
explicit auth_migration_listener(authorizer& a, cql3::query_processor& qp) : _authorizer(a), _qp(qp) {
}
private:
@@ -102,14 +92,14 @@ private:
return;
}
// Do it in the background.
(void)do_with(auth::make_data_resource(ks_name), ::service::group0_batch::unused(), [this] (auto& r, auto& mc) mutable {
return _service.revoke_all(r, mc);
(void)do_with(::service::group0_batch::unused(), [this, &ks_name] (auto& mc) mutable {
return _authorizer.revoke_all(auth::make_data_resource(ks_name), mc);
}).handle_exception([] (std::exception_ptr e) {
log.error("Unexpected exception while revoking all permissions on dropped keyspace: {}", e);
});
(void)do_with(auth::make_functions_resource(ks_name), ::service::group0_batch::unused(), [this] (auto& r, auto& mc) mutable {
return _service.revoke_all(r, mc);
(void)do_with(::service::group0_batch::unused(), [this, &ks_name] (auto& mc) mutable {
return _authorizer.revoke_all(auth::make_functions_resource(ks_name), mc);
}).handle_exception([] (std::exception_ptr e) {
log.error("Unexpected exception while revoking all permissions on functions in dropped keyspace: {}", e);
});
@@ -121,8 +111,9 @@ private:
return;
}
// Do it in the background.
(void)do_with(auth::make_data_resource(ks_name, cf_name), ::service::group0_batch::unused(), [this] (auto& r, auto& mc) mutable {
return _service.revoke_all(r, mc);
(void)do_with(::service::group0_batch::unused(), [this, &ks_name, &cf_name] (auto& mc) mutable {
return _authorizer.revoke_all(
auth::make_data_resource(ks_name, cf_name), mc);
}).handle_exception([] (std::exception_ptr e) {
log.error("Unexpected exception while revoking all permissions on dropped table: {}", e);
});
@@ -135,8 +126,9 @@ private:
return;
}
// Do it in the background.
(void)do_with(auth::make_functions_resource(ks_name, function_name), ::service::group0_batch::unused(), [this] (auto& r, auto& mc) mutable {
return _service.revoke_all(r, mc);
(void)do_with(::service::group0_batch::unused(), [this, &ks_name, &function_name] (auto& mc) mutable {
return _authorizer.revoke_all(
auth::make_functions_resource(ks_name, function_name), mc);
}).handle_exception([] (std::exception_ptr e) {
log.error("Unexpected exception while revoking all permissions on dropped function: {}", e);
});
@@ -146,8 +138,9 @@ private:
// in non legacy path revoke is part of schema change statement execution
return;
}
(void)do_with(auth::make_functions_resource(ks_name, aggregate_name), ::service::group0_batch::unused(), [this] (auto& r, auto& mc) mutable {
return _service.revoke_all(r, mc);
(void)do_with(::service::group0_batch::unused(), [this, &ks_name, &aggregate_name] (auto& mc) mutable {
return _authorizer.revoke_all(
auth::make_functions_resource(ks_name, aggregate_name), mc);
}).handle_exception([] (std::exception_ptr e) {
log.error("Unexpected exception while revoking all permissions on dropped aggregate: {}", e);
});
@@ -164,6 +157,7 @@ static future<> validate_role_exists(const service& ser, std::string_view role_n
}
service::service(
utils::loading_cache_config c,
cache& cache,
cql3::query_processor& qp,
::service::raft_group0_client& g0,
@@ -172,33 +166,41 @@ service::service(
std::unique_ptr<authenticator> a,
std::unique_ptr<role_manager> r,
maintenance_socket_enabled used_by_maintenance_socket)
: _cache(cache)
: _loading_cache_config(std::move(c))
, _permissions_cache(nullptr)
, _cache(cache)
, _qp(qp)
, _group0_client(g0)
, _mnotifier(mn)
, _authorizer(std::move(z))
, _authenticator(std::move(a))
, _role_manager(std::move(r))
, _migration_listener(std::make_unique<auth_migration_listener>(*this, qp))
, _migration_listener(std::make_unique<auth_migration_listener>(*_authorizer, qp))
, _permissions_cache_cfg_cb([this] (uint32_t) { (void) _permissions_cache_config_action.trigger_later(); })
, _permissions_cache_config_action([this] { update_cache_config(); return make_ready_future<>(); })
, _permissions_cache_max_entries_observer(_qp.db().get_config().permissions_cache_max_entries.observe(_permissions_cache_cfg_cb))
, _permissions_cache_update_interval_in_ms_observer(_qp.db().get_config().permissions_update_interval_in_ms.observe(_permissions_cache_cfg_cb))
, _permissions_cache_validity_in_ms_observer(_qp.db().get_config().permissions_validity_in_ms.observe(_permissions_cache_cfg_cb))
, _used_by_maintenance_socket(used_by_maintenance_socket) {}
service::service(
utils::loading_cache_config c,
cql3::query_processor& qp,
::service::raft_group0_client& g0,
::service::migration_notifier& mn,
authorizer_factory authorizer_factory,
authenticator_factory authenticator_factory,
role_manager_factory role_manager_factory,
::service::migration_manager& mm,
const service_config& sc,
maintenance_socket_enabled used_by_maintenance_socket,
cache& cache)
: service(
std::move(c),
cache,
qp,
g0,
mn,
authorizer_factory(),
authenticator_factory(),
role_manager_factory(),
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
used_by_maintenance_socket) {
}
@@ -255,14 +257,7 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
co_await _role_manager->ensure_superuser_is_created();
}
co_await when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
if (!_used_by_maintenance_socket) {
// Maintenance socket mode can't cache permissions because it has
// different authorizer. We can't mix cached permissions, they could be
// different in normal mode.
_cache.set_permission_loader(std::bind(
&service::get_uncached_permissions,
this, std::placeholders::_1, std::placeholders::_2));
}
_permissions_cache = std::make_unique<permissions_cache>(_loading_cache_config, *this, log);
co_await once_among_shards([this] {
_mnotifier.register_listener(_migration_listener.get());
return make_ready_future<>();
@@ -274,7 +269,9 @@ future<> service::stop() {
// Only one of the shards has the listener registered, but let's try to
// unregister on each one just to make sure.
return _mnotifier.unregister_listener(_migration_listener.get()).then([this] {
_cache.set_permission_loader(nullptr);
if (_permissions_cache) {
return _permissions_cache->stop();
}
return make_ready_future<>();
}).then([this] {
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
@@ -286,8 +283,21 @@ future<> service::ensure_superuser_is_created() {
co_await _authenticator->ensure_superuser_is_created();
}
void service::update_cache_config() {
auto db = _qp.db();
utils::loading_cache_config perm_cache_config;
perm_cache_config.max_size = db.get_config().permissions_cache_max_entries();
perm_cache_config.expiry = std::chrono::milliseconds(db.get_config().permissions_validity_in_ms());
perm_cache_config.refresh = std::chrono::milliseconds(db.get_config().permissions_update_interval_in_ms());
if (!_permissions_cache->update_config(std::move(perm_cache_config))) {
log.error("Failed to apply permissions cache changes. Please read the documentation of these parameters");
}
}
void service::reset_authorization_cache() {
_permissions_cache->reset();
_qp.reset_cache();
}
@@ -312,14 +322,7 @@ service::get_uncached_permissions(const role_or_anonymous& maybe_role, const res
}
future<permission_set> service::get_permissions(const role_or_anonymous& maybe_role, const resource& r) const {
if (legacy_mode(_qp) || _used_by_maintenance_socket) {
return get_uncached_permissions(maybe_role, r);
}
return _cache.get_permissions(maybe_role, r);
}
void service::set_maintenance_mode() {
_role_manager->set_maintenance_mode();
return _permissions_cache->get(maybe_role, r);
}
future<bool> service::has_superuser(std::string_view role_name, const role_set& roles) const {
@@ -357,10 +360,6 @@ static void validate_authentication_options_are_supported(
}
}
future<> service::ensure_role_operations_are_enabled() {
return _role_manager->ensure_role_operations_are_enabled();
}
future<> service::create_role(std::string_view name,
const role_config& config,
const authentication_options& options,
@@ -448,11 +447,6 @@ future<bool> service::exists(const resource& r) const {
return make_ready_future<bool>(false);
}
future<> service::revoke_all(const resource& r, ::service::group0_batch& mc) const {
co_await _authorizer->revoke_all(r, mc);
co_await _cache.prune(r);
}
future<std::vector<cql3::description>> service::describe_roles(bool with_hashed_passwords) {
std::vector<cql3::description> result{};
@@ -678,10 +672,6 @@ future<std::vector<cql3::description>> service::describe_auth(bool with_hashed_p
// Free functions.
//
void set_maintenance_mode(service& ser) {
ser.set_maintenance_mode();
}
future<bool> has_superuser(const service& ser, const authenticated_user& u) {
if (is_anonymous(u)) {
return make_ready_future<bool>(false);
@@ -690,10 +680,6 @@ future<bool> has_superuser(const service& ser, const authenticated_user& u) {
return ser.has_superuser(*u.name);
}
future<> ensure_role_operations_are_enabled(service& ser) {
return ser.underlying_role_manager().ensure_role_operations_are_enabled();
}
future<role_set> get_roles(const service& ser, const authenticated_user& u) {
if (is_anonymous(u)) {
return make_ready_future<role_set>();
@@ -815,7 +801,7 @@ future<> revoke_permissions(
}
future<> revoke_all(const service& ser, const resource& r, ::service::group0_batch& mc) {
return ser.revoke_all(r, mc);
return ser.underlying_authorizer().revoke_all(r, mc);
}
future<std::vector<permission_details>> list_filtered_permissions(
@@ -955,111 +941,4 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
std::nullopt);
}
namespace {
std::string_view get_short_name(std::string_view name) {
auto pos = name.find_last_of('.');
if (pos == std::string_view::npos) {
return name;
}
return name.substr(pos + 1);
}
} // anonymous namespace
authorizer_factory make_authorizer_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm) {
std::string_view short_name = get_short_name(name);
if (boost::iequals(short_name, "AllowAllAuthorizer")) {
return [&qp, &g0, &mm] {
return std::make_unique<allow_all_authorizer>(qp.local(), g0, mm.local());
};
} else if (boost::iequals(short_name, "CassandraAuthorizer")) {
return [&qp, &g0, &mm] {
return std::make_unique<default_authorizer>(qp.local(), g0, mm.local());
};
} else if (boost::iequals(short_name, "TransitionalAuthorizer")) {
return [&qp, &g0, &mm] {
return std::make_unique<transitional_authorizer>(qp.local(), g0, mm.local());
};
}
throw std::invalid_argument(fmt::format("Unknown authorizer: {}", name));
}
authenticator_factory make_authenticator_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& auth_cache) {
std::string_view short_name = get_short_name(name);
if (boost::iequals(short_name, "AllowAllAuthenticator")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<allow_all_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
} else if (boost::iequals(short_name, "PasswordAuthenticator")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<password_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
} else if (boost::iequals(short_name, "CertificateAuthenticator")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<certificate_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
} else if (boost::iequals(short_name, "SaslauthdAuthenticator")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<saslauthd_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
} else if (boost::iequals(short_name, "TransitionalAuthenticator")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<transitional_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
}
throw std::invalid_argument(fmt::format("Unknown authenticator: {}", name));
}
role_manager_factory make_role_manager_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& auth_cache) {
std::string_view short_name = get_short_name(name);
if (boost::iequals(short_name, "CassandraRoleManager")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<standard_role_manager>(qp.local(), g0, mm.local(), auth_cache.local());
};
} else if (boost::iequals(short_name, "LDAPRoleManager")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<ldap_role_manager>(qp.local(), g0, mm.local(), auth_cache.local());
};
}
throw std::invalid_argument(fmt::format("Unknown role manager: {}", name));
}
authenticator_factory make_maintenance_socket_authenticator_factory(
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& auth_cache) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<maintenance_socket_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
}
role_manager_factory make_maintenance_socket_role_manager_factory(
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& auth_cache) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<maintenance_socket_role_manager>(qp.local(), g0, mm.local(), auth_cache.local());
};
}
}

View File

@@ -20,6 +20,7 @@
#include "auth/authenticator.hh"
#include "auth/authorizer.hh"
#include "auth/permission.hh"
#include "auth/permissions_cache.hh"
#include "auth/cache.hh"
#include "auth/role_manager.hh"
#include "auth/common.hh"
@@ -44,10 +45,11 @@ namespace auth {
class role_or_anonymous;
/// Factory function types for creating auth module instances on each shard.
using authorizer_factory = std::function<std::unique_ptr<authorizer>()>;
using authenticator_factory = std::function<std::unique_ptr<authenticator>()>;
using role_manager_factory = std::function<std::unique_ptr<role_manager>()>;
struct service_config final {
sstring authorizer_java_name;
sstring authenticator_java_name;
sstring role_manager_java_name;
};
///
/// Due to poor (in this author's opinion) decisions of Apache Cassandra, certain choices of one role-manager,
@@ -73,6 +75,8 @@ public:
/// peering_sharded_service inheritance is needed to be able to access shard local authentication service
/// given an object from another shard. Used for bouncing lwt requests to correct shard.
class service final : public seastar::peering_sharded_service<service> {
utils::loading_cache_config _loading_cache_config;
std::unique_ptr<permissions_cache> _permissions_cache;
cache& _cache;
cql3::query_processor& _qp;
@@ -90,12 +94,20 @@ class service final : public seastar::peering_sharded_service<service> {
// Only one of these should be registered, so we end up with some unused instances. Not the end of the world.
std::unique_ptr<::service::migration_listener> _migration_listener;
std::function<void(uint32_t)> _permissions_cache_cfg_cb;
serialized_action _permissions_cache_config_action;
utils::observer<uint32_t> _permissions_cache_max_entries_observer;
utils::observer<uint32_t> _permissions_cache_update_interval_in_ms_observer;
utils::observer<uint32_t> _permissions_cache_validity_in_ms_observer;
maintenance_socket_enabled _used_by_maintenance_socket;
abort_source _as;
public:
service(
utils::loading_cache_config,
cache& cache,
cql3::query_processor&,
::service::raft_group0_client&,
@@ -107,16 +119,16 @@ public:
///
/// This constructor is intended to be used when the class is sharded via \ref seastar::sharded. In that case, the
/// arguments must be copyable, which is why we delay construction with instance-construction factories instead
/// arguments must be copyable, which is why we delay construction with instance-construction instructions instead
/// of the instances themselves.
///
service(
utils::loading_cache_config,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_notifier&,
authorizer_factory,
authenticator_factory,
role_manager_factory,
::service::migration_manager&,
const service_config&,
maintenance_socket_enabled,
cache&);
@@ -126,6 +138,8 @@ public:
future<> ensure_superuser_is_created();
void update_cache_config();
void reset_authorization_cache();
///
@@ -138,11 +152,6 @@ public:
///
future<permission_set> get_uncached_permissions(const role_or_anonymous&, const resource&) const;
///
/// Notify the service that the node is entering maintenance mode.
///
void set_maintenance_mode();
///
/// Query whether the named role has been granted a role that is a superuser.
///
@@ -152,11 +161,6 @@ public:
///
future<bool> has_superuser(std::string_view role_name) const;
///
/// Ensure that the role operations are enabled. Some role managers defer initialization.
///
future<> ensure_role_operations_are_enabled();
///
/// Create a role with optional authentication information.
///
@@ -177,13 +181,6 @@ public:
future<bool> exists(const resource&) const;
///
/// Revoke all permissions granted to any role for a particular resource.
///
/// \throws \ref unsupported_authorization_operation if revoking permissions is not supported.
///
future<> revoke_all(const resource&, ::service::group0_batch&) const;
///
/// Produces descriptions that can be used to restore the state of auth. That encompasses
/// roles, role grants, and permission grants.
@@ -218,12 +215,8 @@ private:
future<std::vector<cql3::description>> describe_permissions() const;
};
void set_maintenance_mode(service&);
future<bool> has_superuser(const service&, const authenticated_user&);
future<> ensure_role_operations_are_enabled(service&);
future<role_set> get_roles(const service&, const authenticated_user&);
future<permission_set> get_permissions(const service&, const authenticated_user&, const resource&);
@@ -410,52 +403,4 @@ future<> commit_mutations(service& ser, ::service::group0_batch&& mc);
// Migrates data from old keyspace to new one which supports linearizable writes via raft.
future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_client& g0, start_operation_func_t start_operation_func, abort_source& as);
///
/// Factory helper functions for creating auth module instances.
/// These are intended for use with sharded<service>::start() where copyable arguments are required.
/// The returned factories capture the sharded references and call .local() when invoked on each shard.
///
/// Creates an authorizer factory for config-selectable authorizer types.
/// @param name The authorizer class name (e.g., "CassandraAuthorizer", "AllowAllAuthorizer")
authorizer_factory make_authorizer_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm);
/// Creates an authenticator factory for config-selectable authenticator types.
/// @param name The authenticator class name (e.g., "PasswordAuthenticator", "AllowAllAuthenticator")
authenticator_factory make_authenticator_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& cache);
/// Creates a role_manager factory for config-selectable role manager types.
/// @param name The role manager class name (e.g., "CassandraRoleManager")
role_manager_factory make_role_manager_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& cache);
/// Creates a factory for the maintenance socket authenticator.
/// This authenticator is not config-selectable and is only used for the maintenance socket.
authenticator_factory make_maintenance_socket_authenticator_factory(
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& cache);
/// Creates a factory for the maintenance socket role manager.
/// This role manager is not config-selectable and is only used for the maintenance socket.
role_manager_factory make_maintenance_socket_role_manager_factory(
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& cache);
}

View File

@@ -34,6 +34,7 @@
#include <seastar/core/loop.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include "service/raft/raft_group0_client.hh"
#include "utils/class_registrator.hh"
#include "service/migration_manager.hh"
#include "password_authenticator.hh"
#include "utils/managed_string.hh"
@@ -43,6 +44,14 @@ namespace auth {
static logging::logger log("standard_role_manager");
static const class_registrator<
role_manager,
standard_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
static db::consistency_level consistency_for_role(std::string_view role_name) noexcept {
if (role_name == meta::DEFAULT_SUPERUSER_NAME) {
return db::consistency_level::QUORUM;
@@ -114,6 +123,7 @@ standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::servic
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(password_authenticator::default_superuser(qp.db().get_config()))
{}
std::string_view standard_role_manager::qualified_java_name() const noexcept {
@@ -176,9 +186,6 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
}
future<> standard_role_manager::legacy_create_default_role_if_missing() {
if (_superuser.empty()) {
on_internal_error(log, "Legacy auth default superuser name is empty");
}
try {
const auto exists = co_await legacy::default_role_row_satisfies(_qp, &has_can_login, _superuser);
if (exists) {
@@ -202,9 +209,6 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
}
future<> standard_role_manager::maybe_create_default_role() {
if (_superuser.empty()) {
co_return;
}
auto has_superuser = [this] () -> future<bool> {
const sstring query = seastar::format("SELECT * FROM {}.{} WHERE is_superuser = true ALLOW FILTERING", get_auth_ks_name(_qp), meta::roles_table::name);
auto results = co_await _qp.execute_internal(query, db::consistency_level::LOCAL_ONE,
@@ -296,8 +300,6 @@ future<> standard_role_manager::migrate_legacy_metadata() {
future<> standard_role_manager::start() {
return once_among_shards([this] () -> future<> {
_superuser = password_authenticator::default_superuser(_qp);
if (legacy_mode(_qp)) {
co_await create_legacy_metadata_tables_if_missing();
}

View File

@@ -8,200 +8,244 @@
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "auth/transitional.hh"
#include "auth/authenticated_user.hh"
#include "auth/authenticator.hh"
#include "auth/authorizer.hh"
#include "auth/default_authorizer.hh"
#include "auth/password_authenticator.hh"
#include "auth/cache.hh"
#include "auth/permission.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/class_registrator.hh"
namespace auth {
transitional_authenticator::transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
static const sstring PACKAGE_NAME("com.scylladb.auth.");
static const sstring& transitional_authenticator_name() {
static const sstring name = PACKAGE_NAME + "TransitionalAuthenticator";
return name;
}
transitional_authenticator::transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
static const sstring& transitional_authorizer_name() {
static const sstring name = PACKAGE_NAME + "TransitionalAuthorizer";
return name;
}
future<> transitional_authenticator::start() {
return _authenticator->start();
}
class transitional_authenticator : public authenticator {
std::unique_ptr<authenticator> _authenticator;
future<> transitional_authenticator::stop() {
return _authenticator->stop();
}
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
std::string_view transitional_authenticator::qualified_java_name() const {
return "com.scylladb.auth.TransitionalAuthenticator";
}
bool transitional_authenticator::require_authentication() const {
return true;
}
authentication_option_set transitional_authenticator::supported_options() const {
return _authenticator->supported_options();
}
authentication_option_set transitional_authenticator::alterable_options() const {
return _authenticator->alterable_options();
}
future<authenticated_user> transitional_authenticator::authenticate(const credentials_map& credentials) const {
auto i = credentials.find(authenticator::USERNAME_KEY);
if ((i == credentials.end() || i->second.empty())
&& (!credentials.contains(PASSWORD_KEY) || credentials.at(PASSWORD_KEY).empty())) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
}
return make_ready_future().then([this, &credentials] {
return _authenticator->authenticate(credentials);
}).handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
}
virtual future<> start() override {
return _authenticator->start();
}
virtual future<> stop() override {
return _authenticator->stop();
}
virtual std::string_view qualified_java_name() const override {
return transitional_authenticator_name();
}
virtual bool require_authentication() const override {
return true;
}
virtual authentication_option_set supported_options() const override {
return _authenticator->supported_options();
}
virtual authentication_option_set alterable_options() const override {
return _authenticator->alterable_options();
}
virtual future<authenticated_user> authenticate(const credentials_map& credentials) const override {
auto i = credentials.find(authenticator::USERNAME_KEY);
if ((i == credentials.end() || i->second.empty())
&& (!credentials.contains(PASSWORD_KEY) || credentials.at(PASSWORD_KEY).empty())) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
});
}
future<> transitional_authenticator::create(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) {
return _authenticator->create(role_name, options, mc);
}
future<> transitional_authenticator::alter(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) {
return _authenticator->alter(role_name, options, mc);
}
future<> transitional_authenticator::drop(std::string_view role_name, ::service::group0_batch& mc) {
return _authenticator->drop(role_name, mc);
}
future<custom_options> transitional_authenticator::query_custom_options(std::string_view role_name) const {
return _authenticator->query_custom_options(role_name);
}
bool transitional_authenticator::uses_password_hashes() const {
return _authenticator->uses_password_hashes();
}
future<std::optional<sstring>> transitional_authenticator::get_password_hash(std::string_view role_name) const {
return _authenticator->get_password_hash(role_name);
}
const resource_set& transitional_authenticator::protected_resources() const {
return _authenticator->protected_resources();
}
::shared_ptr<sasl_challenge> transitional_authenticator::new_sasl_challenge() const {
class sasl_wrapper : public sasl_challenge {
public:
sasl_wrapper(::shared_ptr<sasl_challenge> sasl)
: _sasl(std::move(sasl)) {
}
virtual bytes evaluate_response(bytes_view client_response) override {
return make_ready_future().then([this, &credentials] {
return _authenticator->authenticate(credentials);
}).handle_exception([](auto ep) {
try {
return _sasl->evaluate_response(client_response);
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
_complete = true;
return {};
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
}
});
}
virtual bool is_complete() const override {
return _complete || _sasl->is_complete();
}
virtual future<> create(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override {
return _authenticator->create(role_name, options, mc);
}
virtual future<authenticated_user> get_authenticated_user() const override {
return futurize_invoke([this] {
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
virtual future<> alter(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override {
return _authenticator->alter(role_name, options, mc);
}
virtual future<> drop(std::string_view role_name, ::service::group0_batch& mc) override {
return _authenticator->drop(role_name, mc);
}
virtual future<custom_options> query_custom_options(std::string_view role_name) const override {
return _authenticator->query_custom_options(role_name);
}
virtual bool uses_password_hashes() const override {
return _authenticator->uses_password_hashes();
}
virtual future<std::optional<sstring>> get_password_hash(std::string_view role_name) const override {
return _authenticator->get_password_hash(role_name);
}
virtual const resource_set& protected_resources() const override {
return _authenticator->protected_resources();
}
virtual ::shared_ptr<sasl_challenge> new_sasl_challenge() const override {
class sasl_wrapper : public sasl_challenge {
public:
sasl_wrapper(::shared_ptr<sasl_challenge> sasl)
: _sasl(std::move(sasl)) {
}
virtual bytes evaluate_response(bytes_view client_response) override {
try {
return _sasl->evaluate_response(client_response);
} catch (const exceptions::authentication_exception&) {
_complete = true;
return {};
}
}
virtual bool is_complete() const override {
return _complete || _sasl->is_complete();
}
virtual future<authenticated_user> get_authenticated_user() const override {
return futurize_invoke([this] {
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
});
});
});
}
}
const sstring& get_username() const override {
return _sasl->get_username();
}
const sstring& get_username() const override {
return _sasl->get_username();
}
private:
::shared_ptr<sasl_challenge> _sasl;
private:
::shared_ptr<sasl_challenge> _sasl;
bool _complete = false;
};
return ::make_shared<sasl_wrapper>(_authenticator->new_sasl_challenge());
}
bool _complete = false;
};
return ::make_shared<sasl_wrapper>(_authenticator->new_sasl_challenge());
}
future<> transitional_authenticator::ensure_superuser_is_created() const {
return _authenticator->ensure_superuser_is_created();
}
virtual future<> ensure_superuser_is_created() const override {
return _authenticator->ensure_superuser_is_created();
}
};
transitional_authorizer::transitional_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: transitional_authorizer(std::make_unique<default_authorizer>(qp, g0, mm)) {
}
class transitional_authorizer : public authorizer {
std::unique_ptr<authorizer> _authorizer;
transitional_authorizer::transitional_authorizer(std::unique_ptr<authorizer> a)
: _authorizer(std::move(a)) {
}
public:
transitional_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: transitional_authorizer(std::make_unique<default_authorizer>(qp, g0, mm)) {
}
transitional_authorizer(std::unique_ptr<authorizer> a)
: _authorizer(std::move(a)) {
}
transitional_authorizer::~transitional_authorizer() {
}
~transitional_authorizer() {
}
future<> transitional_authorizer::start() {
return _authorizer->start();
}
virtual future<> start() override {
return _authorizer->start();
}
future<> transitional_authorizer::stop() {
return _authorizer->stop();
}
virtual future<> stop() override {
return _authorizer->stop();
}
std::string_view transitional_authorizer::qualified_java_name() const {
return "com.scylladb.auth.TransitionalAuthorizer";
}
virtual std::string_view qualified_java_name() const override {
return transitional_authorizer_name();
}
future<permission_set> transitional_authorizer::authorize(const role_or_anonymous&, const resource&) const {
static const permission_set transitional_permissions =
permission_set::of<
permission::CREATE,
permission::ALTER,
permission::DROP,
permission::SELECT,
permission::MODIFY>();
virtual future<permission_set> authorize(const role_or_anonymous&, const resource&) const override {
static const permission_set transitional_permissions =
permission_set::of<
permission::CREATE,
permission::ALTER,
permission::DROP,
permission::SELECT,
permission::MODIFY>();
return make_ready_future<permission_set>(transitional_permissions);
}
return make_ready_future<permission_set>(transitional_permissions);
}
future<> transitional_authorizer::grant(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) {
return _authorizer->grant(s, std::move(ps), r, mc);
}
virtual future<> grant(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override {
return _authorizer->grant(s, std::move(ps), r, mc);
}
future<> transitional_authorizer::revoke(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) {
return _authorizer->revoke(s, std::move(ps), r, mc);
}
virtual future<> revoke(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override {
return _authorizer->revoke(s, std::move(ps), r, mc);
}
future<std::vector<permission_details>> transitional_authorizer::list_all() const {
return _authorizer->list_all();
}
virtual future<std::vector<permission_details>> list_all() const override {
return _authorizer->list_all();
}
future<> transitional_authorizer::revoke_all(std::string_view s, ::service::group0_batch& mc) {
return _authorizer->revoke_all(s, mc);
}
virtual future<> revoke_all(std::string_view s, ::service::group0_batch& mc) override {
return _authorizer->revoke_all(s, mc);
}
future<> transitional_authorizer::revoke_all(const resource& r, ::service::group0_batch& mc) {
return _authorizer->revoke_all(r, mc);
}
virtual future<> revoke_all(const resource& r, ::service::group0_batch& mc) override {
return _authorizer->revoke_all(r, mc);
}
const resource_set& transitional_authorizer::protected_resources() const {
return _authorizer->protected_resources();
}
virtual const resource_set& protected_resources() const override {
return _authorizer->protected_resources();
}
};
}
//
// To ensure correct initialization order, we unfortunately need to use string literals.
//
static const class_registrator<
auth::authenticator,
auth::transitional_authenticator,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
auth::cache&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
static const class_registrator<
auth::authorizer,
auth::transitional_authorizer,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&> transitional_authorizer_reg(auth::PACKAGE_NAME + "TransitionalAuthorizer");

View File

@@ -1,81 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "auth/authenticator.hh"
#include "auth/authorizer.hh"
#include "auth/cache.hh"
namespace cql3 {
class query_processor;
}
namespace service {
class raft_group0_client;
class migration_manager;
}
namespace auth {
///
/// Transitional authenticator that allows anonymous access when credentials are not provided
/// or authentication fails. Used for migration scenarios.
///
class transitional_authenticator : public authenticator {
std::unique_ptr<authenticator> _authenticator;
public:
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache);
transitional_authenticator(std::unique_ptr<authenticator> a);
virtual future<> start() override;
virtual future<> stop() override;
virtual std::string_view qualified_java_name() const override;
virtual bool require_authentication() const override;
virtual authentication_option_set supported_options() const override;
virtual authentication_option_set alterable_options() const override;
virtual future<authenticated_user> authenticate(const credentials_map& credentials) const override;
virtual future<> create(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override;
virtual future<> alter(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override;
virtual future<> drop(std::string_view role_name, ::service::group0_batch& mc) override;
virtual future<custom_options> query_custom_options(std::string_view role_name) const override;
virtual bool uses_password_hashes() const override;
virtual future<std::optional<sstring>> get_password_hash(std::string_view role_name) const override;
virtual const resource_set& protected_resources() const override;
virtual ::shared_ptr<sasl_challenge> new_sasl_challenge() const override;
virtual future<> ensure_superuser_is_created() const override;
};
///
/// Transitional authorizer that grants a fixed set of permissions to all users.
/// Used for migration scenarios.
///
class transitional_authorizer : public authorizer {
std::unique_ptr<authorizer> _authorizer;
public:
transitional_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm);
transitional_authorizer(std::unique_ptr<authorizer> a);
~transitional_authorizer();
virtual future<> start() override;
virtual future<> stop() override;
virtual std::string_view qualified_java_name() const override;
virtual future<permission_set> authorize(const role_or_anonymous&, const resource&) const override;
virtual future<> grant(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override;
virtual future<> revoke(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override;
virtual future<std::vector<permission_details>> list_all() const override;
virtual future<> revoke_all(std::string_view s, ::service::group0_batch& mc) override;
virtual future<> revoke_all(const resource& r, ::service::group0_batch& mc) override;
virtual const resource_set& protected_resources() const override;
};
} // namespace auth

View File

@@ -814,7 +814,8 @@ generation_service::generation_service(
config cfg, gms::gossiper& g, sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::system_keyspace>& sys_ks,
abort_source& abort_src, const locator::shared_token_metadata& stm, gms::feature_service& f,
replica::database& db)
replica::database& db,
std::function<bool()> raft_topology_change_enabled)
: _cfg(std::move(cfg))
, _gossiper(g)
, _sys_dist_ks(sys_dist_ks)
@@ -823,6 +824,7 @@ generation_service::generation_service(
, _token_metadata(stm)
, _feature_service(f)
, _db(db)
, _raft_topology_change_enabled(std::move(raft_topology_change_enabled))
{
}
@@ -876,7 +878,16 @@ future<> generation_service::on_join(gms::inet_address ep, locator::host_id id,
future<> generation_service::on_change(gms::inet_address ep, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) {
assert_shard_zero(__PRETTY_FUNCTION__);
return make_ready_future<>();
if (_raft_topology_change_enabled()) {
return make_ready_future<>();
}
return on_application_state_change(ep, id, states, gms::application_state::CDC_GENERATION_ID, pid, [this] (gms::inet_address ep, locator::host_id id, const gms::versioned_value& v, gms::permit_id) {
auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value());
cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id);
return legacy_handle_cdc_generation(gen_id);
});
}
future<> generation_service::check_and_repair_cdc_streams() {

View File

@@ -79,12 +79,17 @@ private:
std::optional<cdc::generation_id> _gen_id;
future<> _cdc_streams_rewrite_complete = make_ready_future<>();
/* Returns true if raft topology changes are enabled.
* Can only be called from shard 0.
*/
std::function<bool()> _raft_topology_change_enabled;
public:
generation_service(config cfg, gms::gossiper&,
sharded<db::system_distributed_keyspace>&,
sharded<db::system_keyspace>& sys_ks,
abort_source&, const locator::shared_token_metadata&,
gms::feature_service&, replica::database& db);
gms::feature_service&, replica::database& db,
std::function<bool()> raft_topology_change_enabled);
future<> stop();
~generation_service();

View File

@@ -618,7 +618,7 @@ static void set_default_properties_log_table(schema_builder& b, const schema& s,
b.set_caching_options(caching_options::get_disabled_caching_options());
auto rs = generate_replication_strategy(ksm, db.get_token_metadata().get_topology());
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, false));
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, db.get_token_metadata(), false));
b.add_extension(tombstone_gc_extension::NAME, std::move(tombstone_gc_ext));
}

View File

@@ -598,7 +598,8 @@ protected:
// Garbage collected sstables that were added to SSTable set and should be eventually removed from it.
std::vector<sstables::shared_sstable> _used_garbage_collected_sstables;
utils::observable<> _stop_request_observable;
tombstone_gc_state _tombstone_gc_state;
// optional tombstone_gc_state that is used when gc has to check only the compacting sstables to collect tombstones.
std::optional<tombstone_gc_state> _tombstone_gc_state_with_commitlog_check_disabled;
int64_t _output_repaired_at = 0;
private:
// Keeps track of monitors for input sstable.
@@ -648,12 +649,9 @@ protected:
, _owned_ranges(std::move(descriptor.owned_ranges))
, _sharder(descriptor.sharder)
, _owned_ranges_checker(_owned_ranges ? std::optional<dht::incremental_owned_ranges_checker>(*_owned_ranges) : std::nullopt)
, _tombstone_gc_state(_table_s.get_tombstone_gc_state())
, _tombstone_gc_state_with_commitlog_check_disabled(descriptor.gc_check_only_compacting_sstables ? std::make_optional(_table_s.get_tombstone_gc_state().with_commitlog_check_disabled()) : std::nullopt)
, _progress_monitor(progress_monitor)
{
if (descriptor.gc_check_only_compacting_sstables) {
_tombstone_gc_state = _tombstone_gc_state.with_commitlog_check_disabled();
}
std::unordered_set<sstables::run_id> ssts_run_ids;
_contains_multi_fragment_runs = std::any_of(_sstables.begin(), _sstables.end(), [&ssts_run_ids] (sstables::shared_sstable& sst) {
return !ssts_run_ids.insert(sst->run_identifier()).second;
@@ -851,8 +849,8 @@ private:
return _table_s.get_compaction_strategy().make_sstable_set(_table_s);
}
tombstone_gc_state get_tombstone_gc_state() const {
return _tombstone_gc_state;
const tombstone_gc_state& get_tombstone_gc_state() const {
return _tombstone_gc_state_with_commitlog_check_disabled ? _tombstone_gc_state_with_commitlog_check_disabled.value() : _table_s.get_tombstone_gc_state();
}
future<> setup() {
@@ -1052,7 +1050,7 @@ private:
return can_never_purge;
}
return [this] (const dht::decorated_key& dk, is_shadowable is_shadowable) {
return get_max_purgeable_timestamp(_table_s, *_selector, _compacting_for_max_purgeable_func, dk, _bloom_filter_checks, _compacting_max_timestamp, !_tombstone_gc_state.is_commitlog_check_enabled(), is_shadowable);
return get_max_purgeable_timestamp(_table_s, *_selector, _compacting_for_max_purgeable_func, dk, _bloom_filter_checks, _compacting_max_timestamp, _tombstone_gc_state_with_commitlog_check_disabled.has_value(), is_shadowable);
};
}

View File

@@ -54,7 +54,7 @@ public:
virtual future<> on_compaction_completion(compaction_completion_desc desc, sstables::offstrategy offstrategy) = 0;
virtual bool is_auto_compaction_disabled_by_user() const noexcept = 0;
virtual bool tombstone_gc_enabled() const noexcept = 0;
virtual tombstone_gc_state get_tombstone_gc_state() const noexcept = 0;
virtual const tombstone_gc_state& get_tombstone_gc_state() const noexcept = 0;
virtual compaction_backlog_tracker& get_backlog_tracker() = 0;
virtual const std::string get_group_id() const noexcept = 0;
virtual seastar::condition_variable& get_staging_done_condition() noexcept = 0;

View File

@@ -778,7 +778,6 @@ compaction_manager::get_incremental_repair_read_lock(compaction::compaction_grou
cmlog.debug("Get get_incremental_repair_read_lock for {} started", reason);
}
compaction::compaction_state& cs = get_compaction_state(&t);
auto gh = cs.gate.hold();
auto ret = co_await cs.incremental_repair_lock.hold_read_lock();
if (!reason.empty()) {
cmlog.debug("Get get_incremental_repair_read_lock for {} done", reason);
@@ -792,7 +791,6 @@ compaction_manager::get_incremental_repair_write_lock(compaction::compaction_gro
cmlog.debug("Get get_incremental_repair_write_lock for {} started", reason);
}
compaction::compaction_state& cs = get_compaction_state(&t);
auto gh = cs.gate.hold();
auto ret = co_await cs.incremental_repair_lock.hold_write_lock();
if (!reason.empty()) {
cmlog.debug("Get get_incremental_repair_write_lock for {} done", reason);
@@ -1042,7 +1040,7 @@ compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task
_compaction_controller.set_max_shares(max_shares);
}))
, _strategy_control(std::make_unique<strategy_control>(*this))
{
, _tombstone_gc_state(_shared_tombstone_gc_state) {
tm.register_module(_task_manager_module->get_name(), _task_manager_module);
register_metrics();
// Bandwidth throttling is node-wide, updater is needed on single shard
@@ -1066,7 +1064,7 @@ compaction_manager::compaction_manager(tasks::task_manager& tm)
, _compaction_static_shares_observer(_cfg.static_shares.observe(_update_compaction_static_shares_action.make_observer()))
, _compaction_max_shares_observer(_cfg.max_shares.observe([] (const float& max_shares) {}))
, _strategy_control(std::make_unique<strategy_control>(*this))
{
, _tombstone_gc_state(_shared_tombstone_gc_state) {
tm.register_module(_task_manager_module->get_name(), _task_manager_module);
// No metric registration because this constructor is supposed to be used only by the testing
// infrastructure.
@@ -1521,9 +1519,7 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
| std::views::transform(std::mem_fn(&sstables::sstable::run_identifier))
| std::ranges::to<std::unordered_set>());
};
const auto injected_threshold = utils::get_local_injector().inject_parameter<size_t>("set_sstable_count_reduction_threshold");
const auto threshold = injected_threshold.value_or(size_t(std::max(schema->max_compaction_threshold(), 32)));
const auto threshold = size_t(std::max(schema->max_compaction_threshold(), 32));
auto count = co_await num_runs_for_compaction();
if (count <= threshold) {
cmlog.trace("No need to wait for sstable count reduction in {}: {} <= {}",
@@ -1538,7 +1534,9 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
auto& cstate = get_compaction_state(&t);
try {
while (can_perform_regular_compaction(t) && co_await num_runs_for_compaction() > threshold) {
co_await cstate.compaction_done.when();
co_await cstate.compaction_done.wait([this, &t] {
return !can_perform_regular_compaction(t);
});
}
} catch (const broken_condition_variable&) {
co_return;
@@ -2389,8 +2387,6 @@ future<> compaction_manager::remove(compaction_group_view& t, sstring reason) no
if (!c_state.gate.is_closed()) {
auto close_gate = c_state.gate.close();
co_await stop_ongoing_compactions(reason, &t);
// Wait for users of incremental repair lock (can be either repair itself or maintenance compactions).
co_await c_state.incremental_repair_lock.write_lock();
co_await std::move(close_gate);
}

View File

@@ -167,6 +167,10 @@ private:
std::unique_ptr<strategy_control> _strategy_control;
shared_tombstone_gc_state _shared_tombstone_gc_state;
// TODO: tombstone_gc_state should now have value semantics, but the code
// still uses it with reference semantics (inconsistently though).
// Drop this member, once the code is converted into using value semantics.
tombstone_gc_state _tombstone_gc_state;
utils::disk_space_monitor::subscription _out_of_space_subscription;
private:
@@ -452,6 +456,10 @@ public:
compaction::strategy_control& get_strategy_control() const noexcept;
const tombstone_gc_state& get_tombstone_gc_state() const noexcept {
return _tombstone_gc_state;
};
shared_tombstone_gc_state& get_shared_tombstone_gc_state() noexcept {
return _shared_tombstone_gc_state;
};

View File

@@ -299,11 +299,13 @@ batch_size_fail_threshold_in_kb: 1024
# max_hint_window_in_ms: 10800000 # 3 hours
# Validity period for authorized statements cache. Defaults to 10000, set to 0 to disable.
# Validity period for permissions cache (fetching permissions can be an
# expensive operation depending on the authorizer, CassandraAuthorizer is
# one example). Defaults to 10000, set to 0 to disable.
# Will be disabled automatically for AllowAllAuthorizer.
# permissions_validity_in_ms: 10000
# Refresh interval for authorized statements cache.
# Refresh interval for permissions cache (if enabled).
# After this interval, cache entries become eligible for refresh. Upon next
# access, an async reload is scheduled and the old value returned until it
# completes. If permissions_validity_in_ms is non-zero, then this also must have
@@ -564,16 +566,15 @@ commitlog_total_space_in_mb: -1
# prometheus_address: 1.2.3.4
# audit settings
# Table audit is enabled by default.
# By default, Scylla does not audit anything.
# 'audit' config option controls if and where to output audited events:
# - "none": auditing is disabled
# - "table": save audited events in audit.audit_log column family (default)
# - "none": auditing is disabled (default)
# - "table": save audited events in audit.audit_log column family
# - "syslog": send audited events via syslog (depends on OS, but usually to /dev/log)
audit: "table"
#
# List of statement categories that should be audited.
# Possible categories are: QUERY, DML, DCL, DDL, AUTH, ADMIN
audit_categories: "DCL,AUTH,ADMIN"
audit_categories: "DCL,DDL,AUTH,ADMIN"
#
# List of tables that should be audited.
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"
@@ -639,7 +640,7 @@ strict_is_not_null_in_views: true
# * workdir: the node will open the maintenance socket on the path <scylla's workdir>/cql.m,
# where <scylla's workdir> is a path defined by the workdir configuration option,
# * <socket path>: the node will open the maintenance socket on the path <socket path>.
maintenance_socket: workdir
maintenance_socket: ignore
# If set to true, configuration parameters defined with LiveUpdate option can be updated in runtime with CQL
# by updating system.config virtual table. If we don't want any configuration parameter to be changed in runtime
@@ -648,9 +649,10 @@ maintenance_socket: workdir
# e.g. for cloud users, for whom scylla's configuration should be changed only by support engineers.
# live_updatable_config_params_changeable_via_cql: true
#
# Guardrails options
#
# ****************
# * GUARDRAILS *
# ****************
# Guardrails to warn or fail when Replication Factor is smaller/greater than the threshold.
# Please note that the value of 0 is always allowed,
# which means that having no replication at all, i.e. RF = 0, is always valid.
@@ -660,27 +662,6 @@ maintenance_socket: workdir
# minimum_replication_factor_warn_threshold: 3
# maximum_replication_factor_warn_threshold: -1
# maximum_replication_factor_fail_threshold: -1
#
# Guardrails to warn about or disallow creating a keyspace with specific replication strategy.
# Each of these 2 settings is a list storing replication strategies considered harmful.
# The replication strategies to choose from are:
# 1) SimpleStrategy,
# 2) NetworkTopologyStrategy,
# 3) LocalStrategy,
# 4) EverywhereStrategy
#
# replication_strategy_warn_list:
# - SimpleStrategy
# replication_strategy_fail_list:
#
# Guardrail to enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE.
# enable_create_table_with_compact_storage: false
#
# Guardrails to limit usage of selected consistency levels for writes.
# Adding a warning to a CQL query response can significantly increase network
# traffic and decrease overall throughput.
# write_consistency_levels_warned: []
# write_consistency_levels_disallowed: []
#
# System information encryption settings
@@ -858,6 +839,21 @@ maintenance_socket: workdir
# key_namespace: <kmip key namespace> (optional)
#
# Guardrails to warn about or disallow creating a keyspace with specific replication strategy.
# Each of these 2 settings is a list storing replication strategies considered harmful.
# The replication strategies to choose from are:
# 1) SimpleStrategy,
# 2) NetworkTopologyStrategy,
# 3) LocalStrategy,
# 4) EverywhereStrategy
#
# replication_strategy_warn_list:
# - SimpleStrategy
# replication_strategy_fail_list:
# Guardrail to enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE.
# enable_create_table_with_compact_storage: false
# Control tablets for new keyspaces.
# Can be set to: disabled|enabled|enforced
#
@@ -879,16 +875,7 @@ maintenance_socket: workdir
# The `tablets` option cannot be changed using `ALTER KEYSPACE`.
tablets_mode_for_new_keyspaces: enabled
# Require every tablet-enabled keyspace to be RF-rack-valid.
#
# A tablet-enabled keyspace is RF-rack-valid when, for each data center,
# its replication factor (RF) is 0, 1, or exactly equal to the number of
# racks in that data center. Setting the RF to the number of racks ensures
# that a single rack failure never results in data unavailability.
#
# When set to true, CREATE KEYSPACE and ALTER KEYSPACE statements that
# would produce an RF-rack-invalid keyspace are rejected.
# When set to false, such statements are allowed but emit a warning.
# Enforce RF-rack-valid keyspaces.
rf_rack_valid_keyspaces: false
#

View File

@@ -730,6 +730,28 @@ vector_search_tests = set([
'test/vector_search/rescoring_test'
])
vector_search_validator_bin = 'vector-search-validator/bin/vector-search-validator'
vector_search_validator_deps = set([
'test/vector_search_validator/build-validator',
'test/vector_search_validator/Cargo.toml',
'test/vector_search_validator/crates/validator/Cargo.toml',
'test/vector_search_validator/crates/validator/src/main.rs',
'test/vector_search_validator/crates/validator-scylla/Cargo.toml',
'test/vector_search_validator/crates/validator-scylla/src/lib.rs',
'test/vector_search_validator/crates/validator-scylla/src/cql.rs',
])
vector_store_bin = 'vector-search-validator/bin/vector-store'
vector_store_deps = set([
'test/vector_search_validator/build-env',
'test/vector_search_validator/build-vector-store',
])
vector_search_validator_bins = set([
vector_search_validator_bin,
vector_store_bin,
])
wasms = set([
'wasm/return_input.wat',
'wasm/test_complex_null_values.wat',
@@ -763,7 +785,7 @@ other = set([
'iotune',
])
all_artifacts = apps | cpp_apps | tests | other | wasms
all_artifacts = apps | cpp_apps | tests | other | wasms | vector_search_validator_bins
arg_parser = argparse.ArgumentParser('Configure scylla', add_help=False, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('--out', dest='buildfile', action='store', default='build.ninja',
@@ -1174,7 +1196,6 @@ scylla_core = (['message/messaging_service.cc',
'utils/gz/crc_combine.cc',
'utils/gz/crc_combine_table.cc',
'utils/http.cc',
'utils/http_client_error_processing.cc',
'utils/rest/client.cc',
'utils/s3/aws_error.cc',
'utils/s3/client.cc',
@@ -1192,7 +1213,6 @@ scylla_core = (['message/messaging_service.cc',
'utils/azure/identity/default_credentials.cc',
'utils/gcp/gcp_credentials.cc',
'utils/gcp/object_storage.cc',
'utils/gcp/object_storage_retry_strategy.cc',
'gms/version_generator.cc',
'gms/versioned_value.cc',
'gms/gossiper.cc',
@@ -1204,7 +1224,6 @@ scylla_core = (['message/messaging_service.cc',
'gms/application_state.cc',
'gms/inet_address.cc',
'dht/i_partitioner.cc',
'dht/fixed_shard.cc',
'dht/token.cc',
'dht/murmur3_partitioner.cc',
'dht/boot_strapper.cc',
@@ -1276,9 +1295,9 @@ scylla_core = (['message/messaging_service.cc',
'auth/resource.cc',
'auth/roles-metadata.cc',
'auth/passwords.cc',
'auth/maintenance_socket_authenticator.cc',
'auth/password_authenticator.cc',
'auth/permission.cc',
'auth/permissions_cache.cc',
'auth/service.cc',
'auth/standard_role_manager.cc',
'auth/ldap_role_manager.cc',
@@ -1342,7 +1361,6 @@ scylla_core = (['message/messaging_service.cc',
'service/strong_consistency/groups_manager.cc',
'service/strong_consistency/coordinator.cc',
'service/strong_consistency/state_machine.cc',
'service/strong_consistency/raft_groups_storage.cc',
'service/raft/group0_state_id_handler.cc',
'service/raft/group0_state_machine.cc',
'service/raft/group0_state_machine_merger.cc',
@@ -1364,6 +1382,7 @@ scylla_core = (['message/messaging_service.cc',
'service/topology_state_machine.cc',
'service/topology_mutation.cc',
'service/topology_coordinator.cc',
'node_ops/node_ops_ctl.cc',
'node_ops/task_manager_module.cc',
'reader_concurrency_semaphore_group.cc',
'utils/disk_space_monitor.cc',
@@ -1648,7 +1667,6 @@ for t in sorted(perf_tests):
deps['test/boost/combined_tests'] += [
'test/boost/aggregate_fcts_test.cc',
'test/boost/auth_cache_test.cc',
'test/boost/auth_test.cc',
'test/boost/batchlog_manager_test.cc',
'test/boost/cache_algorithm_test.cc',
@@ -2567,10 +2585,11 @@ def write_build_file(f,
description = RUST_LIB $out
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, rustc_wrapper=rustc_wrapper, **modeval))
f.write(
'build {mode}-build: phony {artifacts} {wasms}\n'.format(
'build {mode}-build: phony {artifacts} {wasms} {vector_search_validator_bins}\n'.format(
mode=mode,
artifacts=str.join(' ', ['$builddir/' + mode + '/' + x for x in sorted(build_artifacts - wasms)]),
artifacts=str.join(' ', ['$builddir/' + mode + '/' + x for x in sorted(build_artifacts - wasms - vector_search_validator_bins)]),
wasms = str.join(' ', ['$builddir/' + x for x in sorted(build_artifacts & wasms)]),
vector_search_validator_bins=str.join(' ', ['$builddir/' + x for x in sorted(build_artifacts & vector_search_validator_bins)]),
)
)
if profile_recipe := modes[mode].get('profile_recipe'):
@@ -2600,7 +2619,7 @@ def write_build_file(f,
continue
profile_dep = modes[mode].get('profile_target', "")
if binary in other or binary in wasms:
if binary in other or binary in wasms or binary in vector_search_validator_bins:
continue
srcs = deps[binary]
# 'scylla'
@@ -2711,10 +2730,11 @@ def write_build_file(f,
)
f.write(
'build {mode}-test: test.{mode} {test_executables} $builddir/{mode}/scylla {wasms}\n'.format(
'build {mode}-test: test.{mode} {test_executables} $builddir/{mode}/scylla {wasms} {vector_search_validator_bins} \n'.format(
mode=mode,
test_executables=' '.join(['$builddir/{}/{}'.format(mode, binary) for binary in sorted(tests)]),
wasms=' '.join([f'$builddir/{binary}' for binary in sorted(wasms)]),
vector_search_validator_bins=' '.join([f'$builddir/{binary}' for binary in sorted(vector_search_validator_bins)]),
)
)
f.write(
@@ -2882,6 +2902,19 @@ def write_build_file(f,
'build compiler-training: phony {}\n'.format(' '.join(['{mode}-compiler-training'.format(mode=mode) for mode in default_modes]))
)
f.write(textwrap.dedent(f'''\
rule build-vector-search-validator
command = test/vector_search_validator/build-validator $builddir
rule build-vector-store
command = test/vector_search_validator/build-vector-store $builddir
'''))
f.write(
'build $builddir/{vector_search_validator_bin}: build-vector-search-validator {}\n'.format(' '.join([dep for dep in sorted(vector_search_validator_deps)]), vector_search_validator_bin=vector_search_validator_bin)
)
f.write(
'build $builddir/{vector_store_bin}: build-vector-store {}\n'.format(' '.join([dep for dep in sorted(vector_store_deps)]), vector_store_bin=vector_store_bin)
)
f.write(textwrap.dedent(f'''\
build dist-unified-tar: phony {' '.join([f'$builddir/{mode}/dist/tar/{scylla_product}-unified-{scylla_version}-{scylla_release}.{arch}.tar.gz' for mode in default_modes])}
build dist-unified: phony dist-unified-tar

View File

@@ -389,10 +389,8 @@ selectStatement returns [std::unique_ptr<raw::select_statement> expr]
bool is_ann_ordering = false;
}
: K_SELECT (
( (K_JSON K_DISTINCT)=> K_JSON { statement_subtype = raw::select_statement::parameters::statement_subtype::JSON; }
| (K_JSON selectClause K_FROM)=> K_JSON { statement_subtype = raw::select_statement::parameters::statement_subtype::JSON; }
)?
( (K_DISTINCT selectClause K_FROM)=> K_DISTINCT { is_distinct = true; } )?
( K_JSON { statement_subtype = raw::select_statement::parameters::statement_subtype::JSON; } )?
( K_DISTINCT { is_distinct = true; } )?
sclause=selectClause
)
K_FROM (
@@ -427,13 +425,13 @@ selector returns [shared_ptr<raw_selector> s]
unaliasedSelector returns [uexpression tmp]
: ( c=cident { tmp = unresolved_identifier{std::move(c)}; }
| v=value { tmp = std::move(v); }
| K_COUNT '(' countArgument ')' { tmp = make_count_rows_function_expression(); }
| K_WRITETIME '(' c=cident ')' { tmp = column_mutation_attribute{column_mutation_attribute::attribute_kind::writetime,
unresolved_identifier{std::move(c)}}; }
| K_TTL '(' c=cident ')' { tmp = column_mutation_attribute{column_mutation_attribute::attribute_kind::ttl,
unresolved_identifier{std::move(c)}}; }
| f=functionName args=selectionFunctionArgs { tmp = function_call{std::move(f), std::move(args)}; }
| f=similarityFunctionName args=vectorSimilarityArgs { tmp = function_call{std::move(f), std::move(args)}; }
| K_CAST '(' arg=unaliasedSelector K_AS t=native_type ')' { tmp = cast{.style = cast::cast_style::sql, .arg = std::move(arg), .type = std::move(t)}; }
)
( '.' fi=cident { tmp = field_selection{std::move(tmp), std::move(fi)}; }
@@ -448,9 +446,23 @@ selectionFunctionArgs returns [std::vector<expression> a]
')'
;
vectorSimilarityArgs returns [std::vector<expression> a]
: '(' ')'
| '(' v1=vectorSimilarityArg { a.push_back(std::move(v1)); }
( ',' vn=vectorSimilarityArg { a.push_back(std::move(vn)); } )*
')'
;
vectorSimilarityArg returns [uexpression a]
: s=unaliasedSelector { a = std::move(s); }
| v=value { a = std::move(v); }
;
countArgument
: '*'
/* COUNT(1) is also allowed, it is recognized via the general function(args) path */
| i=INTEGER { if (i->getText() != "1") {
add_recognition_error("Only COUNT(1) is supported, got COUNT(" + i->getText() + ")");
} }
;
whereClause returns [uexpression clause]
@@ -874,8 +886,8 @@ cfamDefinition[cql3::statements::create_table_statement::raw_statement& expr]
;
cfamColumns[cql3::statements::create_table_statement::raw_statement& expr]
@init { bool is_static=false, is_ttl=false; }
: k=ident v=comparatorType (K_TTL {is_ttl = true;})? (K_STATIC {is_static = true;})? { $expr.add_definition(k, v, is_static, is_ttl); }
@init { bool is_static=false; }
: k=ident v=comparatorType (K_STATIC {is_static = true;})? { $expr.add_definition(k, v, is_static); }
(K_PRIMARY K_KEY { $expr.add_key_aliases(std::vector<shared_ptr<cql3::column_identifier>>{k}); })?
| K_PRIMARY K_KEY '(' pkDef[expr] (',' c=ident { $expr.add_column_alias(c); } )* ')'
;
@@ -1042,7 +1054,6 @@ alterTableStatement returns [std::unique_ptr<alter_table_statement::raw_statemen
std::vector<alter_table_statement::column_change> column_changes;
std::vector<std::pair<shared_ptr<cql3::column_identifier::raw>, shared_ptr<cql3::column_identifier::raw>>> renames;
auto attrs = std::make_unique<cql3::attributes::raw>();
shared_ptr<cql3::column_identifier::raw> ttl_change;
}
: K_ALTER K_COLUMNFAMILY cf=columnFamilyName
( K_ALTER id=cident K_TYPE v=comparatorType { type = alter_table_statement::type::alter; column_changes.emplace_back(alter_table_statement::column_change{id, v}); }
@@ -1061,11 +1072,9 @@ alterTableStatement returns [std::unique_ptr<alter_table_statement::raw_statemen
| K_RENAME { type = alter_table_statement::type::rename; }
id1=cident K_TO toId1=cident { renames.emplace_back(id1, toId1); }
( K_AND idn=cident K_TO toIdn=cident { renames.emplace_back(idn, toIdn); } )*
| K_TTL { type = alter_table_statement::type::ttl; }
( id=cident { ttl_change = id; } | K_NULL )
)
{
$expr = std::make_unique<alter_table_statement::raw_statement>(std::move(cf), type, std::move(column_changes), std::move(props), std::move(renames), std::move(attrs), std::move(ttl_change));
$expr = std::make_unique<alter_table_statement::raw_statement>(std::move(cf), type, std::move(column_changes), std::move(props), std::move(renames), std::move(attrs));
}
;
@@ -1697,6 +1706,10 @@ functionName returns [cql3::functions::function_name s]
: (ks=keyspaceName '.')? f=allowedFunctionName { $s.keyspace = std::move(ks); $s.name = std::move(f); }
;
similarityFunctionName returns [cql3::functions::function_name s]
: f=allowedSimilarityFunctionName { $s = cql3::functions::function_name::native_function(std::move(f)); }
;
allowedFunctionName returns [sstring s]
: f=IDENT { $s = $f.text; std::transform(s.begin(), s.end(), s.begin(), ::tolower); }
| f=QUOTED_NAME { $s = $f.text; }
@@ -1705,6 +1718,11 @@ allowedFunctionName returns [sstring s]
| K_COUNT { $s = "count"; }
;
allowedSimilarityFunctionName returns [sstring s]
: f=(K_SIMILARITY_COSINE | K_SIMILARITY_EUCLIDEAN | K_SIMILARITY_DOT_PRODUCT)
{ $s = $f.text; std::transform(s.begin(), s.end(), s.begin(), ::tolower); }
;
functionArgs returns [std::vector<expression> a]
: '(' ')'
| '(' t1=term { a.push_back(std::move(t1)); }
@@ -2074,21 +2092,7 @@ vector_type returns [shared_ptr<cql3::cql3_type::raw> pt]
{
if ($d.text[0] == '-')
throw exceptions::invalid_request_exception("Vectors must have a dimension greater than 0");
unsigned long parsed_dimension;
try {
parsed_dimension = std::stoul($d.text);
} catch (const std::exception& e) {
throw exceptions::invalid_request_exception(format("Invalid vector dimension: {}", $d.text));
}
static_assert(sizeof(unsigned long) >= sizeof(vector_dimension_t));
if (parsed_dimension == 0) {
throw exceptions::invalid_request_exception("Vectors must have a dimension greater than 0");
}
if (parsed_dimension > cql3::cql3_type::MAX_VECTOR_DIMENSION) {
throw exceptions::invalid_request_exception(
format("Vectors must have a dimension less than or equal to {}", cql3::cql3_type::MAX_VECTOR_DIMENSION));
}
$pt = cql3::cql3_type::raw::vector(t, static_cast<vector_dimension_t>(parsed_dimension));
$pt = cql3::cql3_type::raw::vector(t, std::stoul($d.text));
}
;
@@ -2415,6 +2419,10 @@ K_MUTATION_FRAGMENTS: M U T A T I O N '_' F R A G M E N T S;
K_VECTOR_SEARCH_INDEXING: V E C T O R '_' S E A R C H '_' I N D E X I N G;
K_SIMILARITY_EUCLIDEAN: S I M I L A R I T Y '_' E U C L I D E A N;
K_SIMILARITY_COSINE: S I M I L A R I T Y '_' C O S I N E;
K_SIMILARITY_DOT_PRODUCT: S I M I L A R I T Y '_' D O T '_' P R O D U C T;
// Case-insensitive alpha characters
fragment A: ('a'|'A');
fragment B: ('b'|'B');

View File

@@ -27,7 +27,7 @@ public:
struct vector_test_result {
test_result result;
std::optional<vector_dimension_t> dimension_opt;
std::optional<size_t> dimension_opt;
};
static bool is_assignable(test_result tr) {

View File

@@ -307,14 +307,17 @@ public:
class cql3_type::raw_vector : public raw {
shared_ptr<raw> _type;
vector_dimension_t _dimension;
size_t _dimension;
// This limitation is acquired from the maximum number of dimensions in OpenSearch.
static constexpr size_t MAX_VECTOR_DIMENSION = 16000;
virtual sstring to_string() const override {
return seastar::format("vector<{}, {}>", _type, _dimension);
}
public:
raw_vector(shared_ptr<raw> type, vector_dimension_t dimension)
raw_vector(shared_ptr<raw> type, size_t dimension)
: _type(std::move(type)), _dimension(dimension) {
}
@@ -414,7 +417,7 @@ cql3_type::raw::tuple(std::vector<shared_ptr<raw>> ts) {
}
shared_ptr<cql3_type::raw>
cql3_type::raw::vector(shared_ptr<raw> t, vector_dimension_t dimension) {
cql3_type::raw::vector(shared_ptr<raw> t, size_t dimension) {
return ::make_shared<raw_vector>(std::move(t), dimension);
}

View File

@@ -39,9 +39,6 @@ public:
data_type get_type() const { return _type; }
const sstring& to_string() const { return _type->cql3_type_name(); }
// This limitation is acquired from the maximum number of dimensions in OpenSearch.
static constexpr vector_dimension_t MAX_VECTOR_DIMENSION = 16000;
// For UserTypes, we need to know the current keyspace to resolve the
// actual type used, so Raw is a "not yet prepared" CQL3Type.
class raw {
@@ -67,7 +64,7 @@ public:
static shared_ptr<raw> list(shared_ptr<raw> t);
static shared_ptr<raw> set(shared_ptr<raw> t);
static shared_ptr<raw> tuple(std::vector<shared_ptr<raw>> ts);
static shared_ptr<raw> vector(shared_ptr<raw> t, vector_dimension_t dimension);
static shared_ptr<raw> vector(shared_ptr<raw> t, size_t dimension);
static shared_ptr<raw> frozen(shared_ptr<raw> t);
friend sstring format_as(const raw& r) {
return r.to_string();

View File

@@ -10,7 +10,6 @@
#include "expr-utils.hh"
#include "evaluate.hh"
#include "cql3/functions/functions.hh"
#include "cql3/functions/aggregate_fcts.hh"
#include "cql3/functions/castas_fcts.hh"
#include "cql3/functions/scalar_function.hh"
#include "cql3/column_identifier.hh"
@@ -502,8 +501,8 @@ vector_validate_assignable_to(const collection_constructor& c, data_dictionary::
throw exceptions::invalid_request_exception(format("Invalid vector type literal for {} of type {}", *receiver.name, receiver.type->as_cql3_type()));
}
vector_dimension_t expected_size = vt->get_dimension();
if (expected_size == 0) {
size_t expected_size = vt->get_dimension();
if (!expected_size) {
throw exceptions::invalid_request_exception(format("Invalid vector type literal for {}: type {} expects at least one element",
*receiver.name, receiver.type->as_cql3_type()));
}
@@ -1048,47 +1047,8 @@ prepare_function_args_for_type_inference(std::span<const expression> args, data_
return partially_prepared_args;
}
// Special case for count(1) - recognize it as the countRows() function. Note it is quite
// artificial and we might relax it to the more general count(expression) later.
static
std::optional<expression>
try_prepare_count_rows(const expr::function_call& fc, data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, lw_shared_ptr<column_specification> receiver) {
return std::visit(overloaded_functor{
[&] (const functions::function_name& name) -> std::optional<expression> {
auto native_name = name;
if (!native_name.has_keyspace()) {
native_name = name.as_native_function();
}
// Collapse count(1) into countRows()
if (native_name == functions::function_name::native_function("count")) {
if (fc.args.size() == 1) {
if (auto uc_arg = expr::as_if<expr::untyped_constant>(&fc.args[0])) {
if (uc_arg->partial_type == expr::untyped_constant::type_class::integer
&& uc_arg->raw_text == "1") {
return expr::function_call{
.func = functions::aggregate_fcts::make_count_rows_function(),
.args = {},
};
} else {
throw exceptions::invalid_request_exception(format("count() expects a column or the literal 1 as an argument", fc.args[0]));
}
}
}
}
return std::nullopt;
},
[] (const shared_ptr<functions::function>&) -> std::optional<expression> {
// Already prepared, nothing to do
return std::nullopt;
},
}, fc.func);
}
std::optional<expression>
prepare_function_call(const expr::function_call& fc, data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, lw_shared_ptr<column_specification> receiver) {
if (auto prepared = try_prepare_count_rows(fc, db, keyspace, schema_opt, receiver)) {
return prepared;
}
// Try to extract a column family name from the available information.
// Most functions can be prepared without information about the column family, usually just the keyspace is enough.
// One exception is the token() function - in order to prepare system.token() we have to know the partition key of the table,

View File

@@ -10,38 +10,9 @@
#include "types/types.hh"
#include "types/vector.hh"
#include "exceptions/exceptions.hh"
#include <bit>
#include <span>
#include <seastar/core/byteorder.hh>
namespace cql3 {
namespace functions {
namespace detail {
std::vector<float> extract_float_vector(const bytes_opt& param, vector_dimension_t dimension) {
if (!param) {
throw exceptions::invalid_request_exception("Cannot extract float vector from null parameter");
}
const size_t expected_size = dimension * sizeof(float);
if (param->size() != expected_size) {
throw exceptions::invalid_request_exception(
fmt::format("Invalid vector size: expected {} bytes for {} floats, got {} bytes",
expected_size, dimension, param->size()));
}
std::vector<float> result(dimension);
const char* p = reinterpret_cast<const char*>(param->data());
for (size_t i = 0; i < dimension; ++i) {
result[i] = std::bit_cast<float>(consume_be<uint32_t>(p));
}
return result;
}
} // namespace detail
namespace {
// The computations of similarity scores match the exact formulas of Cassandra's (jVector's) implementation to ensure compatibility.
@@ -51,15 +22,14 @@ namespace {
// You should only use this function if you need to preserve the original vectors and cannot normalize
// them in advance.
float compute_cosine_similarity(std::span<const float> v1, std::span<const float> v2) {
#pragma clang fp contract(fast) reassociate(on) // Allow the compiler to optimize the loop.
float dot_product = 0.0;
float squared_norm_a = 0.0;
float squared_norm_b = 0.0;
float compute_cosine_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
double dot_product = 0.0;
double squared_norm_a = 0.0;
double squared_norm_b = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
float a = v1[i];
float b = v2[i];
double a = value_cast<float>(v1[i]);
double b = value_cast<float>(v2[i]);
dot_product += a * b;
squared_norm_a += a * a;
@@ -67,7 +37,7 @@ float compute_cosine_similarity(std::span<const float> v1, std::span<const float
}
if (squared_norm_a == 0 || squared_norm_b == 0) {
return std::numeric_limits<float>::quiet_NaN();
throw exceptions::invalid_request_exception("Function system.similarity_cosine doesn't support all-zero vectors");
}
// The cosine similarity is in the range [-1, 1].
@@ -76,15 +46,14 @@ float compute_cosine_similarity(std::span<const float> v1, std::span<const float
return (1 + (dot_product / (std::sqrt(squared_norm_a * squared_norm_b)))) / 2;
}
float compute_euclidean_similarity(std::span<const float> v1, std::span<const float> v2) {
#pragma clang fp contract(fast) reassociate(on) // Allow the compiler to optimize the loop.
float sum = 0.0;
float compute_euclidean_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
double sum = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
float a = v1[i];
float b = v2[i];
double a = value_cast<float>(v1[i]);
double b = value_cast<float>(v2[i]);
float diff = a - b;
double diff = a - b;
sum += diff * diff;
}
@@ -96,13 +65,12 @@ float compute_euclidean_similarity(std::span<const float> v1, std::span<const fl
// Assumes that both vectors are L2-normalized.
// This similarity is intended as an optimized way to perform cosine similarity calculation.
float compute_dot_product_similarity(std::span<const float> v1, std::span<const float> v2) {
#pragma clang fp contract(fast) reassociate(on) // Allow the compiler to optimize the loop.
float dot_product = 0.0;
float compute_dot_product_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
double dot_product = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
float a = v1[i];
float b = v2[i];
double a = value_cast<float>(v1[i]);
double b = value_cast<float>(v2[i]);
dot_product += a * b;
}
@@ -156,7 +124,7 @@ std::vector<data_type> retrieve_vector_arg_types(const function_name& name, cons
}
}
vector_dimension_t dimension = first_dim_opt ? *first_dim_opt : *second_dim_opt;
size_t dimension = first_dim_opt ? *first_dim_opt : *second_dim_opt;
auto type = vector_type_impl::get_instance(float_type, dimension);
return {type, type};
}
@@ -168,15 +136,13 @@ bytes_opt vector_similarity_fct::execute(std::span<const bytes_opt> parameters)
return std::nullopt;
}
// Extract dimension from the vector type
const auto& type = static_cast<const vector_type_impl&>(*arg_types()[0]);
vector_dimension_t dimension = type.get_dimension();
const auto& type = arg_types()[0];
data_value v1 = type->deserialize(*parameters[0]);
data_value v2 = type->deserialize(*parameters[1]);
const auto& v1_elements = value_cast<std::vector<data_value>>(v1);
const auto& v2_elements = value_cast<std::vector<data_value>>(v2);
// Optimized path: extract floats directly from bytes, bypassing data_value overhead
std::vector<float> v1 = detail::extract_float_vector(parameters[0], dimension);
std::vector<float> v2 = detail::extract_float_vector(parameters[1], dimension);
float result = SIMILARITY_FUNCTIONS.at(_name)(v1, v2);
float result = SIMILARITY_FUNCTIONS.at(_name)(v1_elements, v2_elements);
return float_type->decompose(result);
}

View File

@@ -11,7 +11,6 @@
#include "native_scalar_function.hh"
#include "cql3/assignment_testable.hh"
#include "cql3/functions/function_name.hh"
#include <span>
namespace cql3 {
namespace functions {
@@ -20,7 +19,7 @@ static const function_name SIMILARITY_COSINE_FUNCTION_NAME = function_name::nati
static const function_name SIMILARITY_EUCLIDEAN_FUNCTION_NAME = function_name::native_function("similarity_euclidean");
static const function_name SIMILARITY_DOT_PRODUCT_FUNCTION_NAME = function_name::native_function("similarity_dot_product");
using similarity_function_t = float (*)(std::span<const float>, std::span<const float>);
using similarity_function_t = float (*)(const std::vector<data_value>&, const std::vector<data_value>&);
extern thread_local const std::unordered_map<function_name, similarity_function_t> SIMILARITY_FUNCTIONS;
std::vector<data_type> retrieve_vector_arg_types(const function_name& name, const std::vector<shared_ptr<assignment_testable>>& provided_args);
@@ -34,14 +33,5 @@ public:
virtual bytes_opt execute(std::span<const bytes_opt> parameters) override;
};
namespace detail {
// Extract float vector directly from serialized bytes, bypassing data_value overhead.
// This is an internal API exposed for testing purposes.
// Vector<float, N> wire format: N floats as big-endian uint32_t values, 4 bytes each.
std::vector<float> extract_float_vector(const bytes_opt& param, vector_dimension_t dimension);
} // namespace detail
} // namespace functions
} // namespace cql3

View File

@@ -91,11 +91,7 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
, _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb))
, _authorized_prepared_cache_validity_in_ms_observer(_db.get_config().permissions_validity_in_ms.observe(_auth_prepared_cache_cfg_cb))
, _lang_manager(langm)
, _write_consistency_levels_warned_observer(_db.get_config().write_consistency_levels_warned.observe([this](const auto& v) { _write_consistency_levels_warned = to_consistency_level_set(v); }))
, _write_consistency_levels_disallowed_observer(_db.get_config().write_consistency_levels_disallowed.observe([this](const auto& v) { _write_consistency_levels_disallowed = to_consistency_level_set(v); }))
{
_write_consistency_levels_warned = to_consistency_level_set(_db.get_config().write_consistency_levels_warned());
_write_consistency_levels_disallowed = to_consistency_level_set(_db.get_config().write_consistency_levels_disallowed());
namespace sm = seastar::metrics;
namespace stm = statements;
using clevel = db::consistency_level;
@@ -512,32 +508,6 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
"i.e. attempts to set a forbidden replication strategy in a keyspace via CREATE/ALTER KEYSPACE.")).set_skip_when_empty(),
});
std::vector<sm::metric_definition> cql_cl_group;
for (auto cl = size_t(clevel::MIN_VALUE); cl <= size_t(clevel::MAX_VALUE); ++cl) {
cql_cl_group.push_back(
sm::make_counter(
"writes_per_consistency_level",
_cql_stats.writes_per_consistency_level[cl],
sm::description("Counts the number of writes for each consistency level."),
{cl_label(clevel(cl)), basic_level}).set_skip_when_empty());
}
_metrics.add_group("cql", cql_cl_group);
_metrics.add_group("cql", {
sm::make_counter(
"write_consistency_levels_disallowed_violations",
_cql_stats.write_consistency_levels_disallowed_violations,
sm::description("Counts the number of write_consistency_levels_disallowed guardrail violations, "
"i.e. attempts to write with a forbidden consistency level."),
{basic_level}),
sm::make_counter(
"write_consistency_levels_warned_violations",
_cql_stats.write_consistency_levels_warned_violations,
sm::description("Counts the number of write_consistency_levels_warned guardrail violations, "
"i.e. attempts to write with a discouraged consistency level."),
{basic_level}),
});
_mnotifier.register_listener(_migration_subscriber.get());
}
@@ -1263,14 +1233,6 @@ shared_ptr<cql_transport::messages::result_message> query_processor::bounce_to_s
return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(shard, std::move(cached_fn_calls));
}
query_processor::consistency_level_set query_processor::to_consistency_level_set(const query_processor::cl_option_list& levels) {
query_processor::consistency_level_set result;
for (const auto& opt : levels) {
result.set(static_cast<db::consistency_level>(opt));
}
return result;
}
void query_processor::update_authorized_prepared_cache_config() {
utils::loading_cache_config cfg;
cfg.max_size = _mcfg.authorized_prepared_cache_size;

View File

@@ -34,9 +34,6 @@
#include "service/raft/raft_group0_client.hh"
#include "types/types.hh"
#include "db/auth_version.hh"
#include "db/consistency_level_type.hh"
#include "db/config.hh"
#include "utils/enum_option.hh"
#include "service/storage_proxy_fwd.hh"
@@ -145,30 +142,6 @@ private:
std::unordered_map<sstring, std::unique_ptr<statements::prepared_statement>> _internal_statements;
lang::manager& _lang_manager;
using cl_option_list = std::vector<enum_option<db::consistency_level_restriction_t>>;
/// Efficient bitmask-based set of consistency levels.
using consistency_level_set = enum_set<super_enum<db::consistency_level,
db::consistency_level::ANY,
db::consistency_level::ONE,
db::consistency_level::TWO,
db::consistency_level::THREE,
db::consistency_level::QUORUM,
db::consistency_level::ALL,
db::consistency_level::LOCAL_QUORUM,
db::consistency_level::EACH_QUORUM,
db::consistency_level::SERIAL,
db::consistency_level::LOCAL_SERIAL,
db::consistency_level::LOCAL_ONE>>;
consistency_level_set _write_consistency_levels_warned;
consistency_level_set _write_consistency_levels_disallowed;
utils::observer<cl_option_list> _write_consistency_levels_warned_observer;
utils::observer<cl_option_list> _write_consistency_levels_disallowed_observer;
static consistency_level_set to_consistency_level_set(const cl_option_list& levels);
public:
static const sstring CQL_VERSION;
@@ -520,21 +493,6 @@ public:
int32_t page_size = -1,
service::node_local_only node_local_only = service::node_local_only::no) const;
enum class write_consistency_guardrail_state { NONE, WARN, FAIL };
inline write_consistency_guardrail_state check_write_consistency_levels_guardrail(db::consistency_level cl) {
_cql_stats.writes_per_consistency_level[size_t(cl)]++;
if (_write_consistency_levels_disallowed.contains(cl)) [[unlikely]] {
_cql_stats.write_consistency_levels_disallowed_violations++;
return write_consistency_guardrail_state::FAIL;
}
if (_write_consistency_levels_warned.contains(cl)) [[unlikely]] {
_cql_stats.write_consistency_levels_warned_violations++;
return write_consistency_guardrail_state::WARN;
}
return write_consistency_guardrail_state::NONE;
}
private:
// Keep the holder until you stop using the `remote` services.
std::pair<std::reference_wrapper<remote>, gate::holder> remote();

View File

@@ -1,20 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <ostream>
namespace cql3 {
class result;
void print_query_results_text(std::ostream& os, const result& result);
void print_query_results_json(std::ostream& os, const result& result);
} // namespace cql3

View File

@@ -9,10 +9,8 @@
*/
#include <cstdint>
#include "types/json_utils.hh"
#include "utils/assert.hh"
#include "utils/hashers.hh"
#include "utils/rjson.hh"
#include "cql3/result_set.hh"
namespace cql3 {
@@ -197,85 +195,4 @@ make_empty_metadata() {
return empty_metadata_cache;
}
void print_query_results_text(std::ostream& os, const cql3::result& result) {
const auto& metadata = result.get_metadata();
const auto& column_metadata = metadata.get_names();
struct column_values {
size_t max_size{0};
sstring header_format;
sstring row_format;
std::vector<sstring> values;
void add(sstring value) {
max_size = std::max(max_size, value.size());
values.push_back(std::move(value));
}
};
std::vector<column_values> columns;
columns.resize(column_metadata.size());
for (size_t i = 0; i < column_metadata.size(); ++i) {
columns[i].add(column_metadata[i]->name->text());
}
for (const auto& row : result.result_set().rows()) {
for (size_t i = 0; i < row.size(); ++i) {
if (row[i]) {
columns[i].add(column_metadata[i]->type->to_string(linearized(managed_bytes_view(*row[i]))));
} else {
columns[i].add("");
}
}
}
std::vector<sstring> separators(columns.size(), sstring());
for (size_t i = 0; i < columns.size(); ++i) {
auto& col_values = columns[i];
col_values.header_format = seastar::format(" {{:<{}}} ", col_values.max_size);
col_values.row_format = seastar::format(" {{:>{}}} ", col_values.max_size);
for (size_t c = 0; c < col_values.max_size; ++c) {
separators[i] += "-";
}
}
for (size_t r = 0; r < result.result_set().rows().size() + 1; ++r) {
std::vector<sstring> row;
row.reserve(columns.size());
for (size_t i = 0; i < columns.size(); ++i) {
const auto& format = r == 0 ? columns[i].header_format : columns[i].row_format;
row.push_back(fmt::format(fmt::runtime(std::string_view(format)), columns[i].values[r]));
}
fmt::print(os, "{}\n", fmt::join(row, "|"));
if (!r) {
fmt::print(os, "-{}-\n", fmt::join(separators, "-+-"));
}
}
}
void print_query_results_json(std::ostream& os, const cql3::result& result) {
const auto& metadata = result.get_metadata();
const auto& column_metadata = metadata.get_names();
rjson::streaming_writer writer(os);
writer.StartArray();
for (const auto& row : result.result_set().rows()) {
writer.StartObject();
for (size_t i = 0; i < row.size(); ++i) {
writer.Key(column_metadata[i]->name->text());
if (!row[i] || row[i]->empty()) {
writer.Null();
continue;
}
const auto value = to_json_string(*column_metadata[i]->type, *row[i]);
const auto type = to_json_type(*column_metadata[i]->type, *row[i]);
writer.RawValue(value, type);
}
writer.EndObject();
}
writer.EndArray();
}
}

View File

@@ -212,20 +212,11 @@ public:
}
virtual uint32_t add_column_for_post_processing(const column_definition& c) override {
auto it = std::find_if(_selectors.begin(), _selectors.end(), [&c](const expr::expression& e) {
auto col = expr::as_if<expr::column_value>(&e);
return col && col->col == &c;
});
if (it != _selectors.end()) {
return std::distance(_selectors.begin(), it);
}
add_column(c);
get_result_metadata()->add_non_serialized_column(c.column_specification);
uint32_t index = selection::add_column_for_post_processing(c);
_selectors.push_back(expr::column_value(&c));
if (_inner_loop.empty()) {
// Simple case: no aggregation
return _selectors.size() - 1;
return index;
} else {
// Complex case: aggregation, must pass through temporary
auto first_func = cql3::functions::aggregate_fcts::make_first_function(c.type);
@@ -479,21 +470,10 @@ std::vector<const column_definition*> selection::wildcard_columns(schema_ptr sch
return simple_selection::make(schema, std::move(columns), false);
}
selection::add_column_result selection::add_column(const column_definition& c) {
auto index = index_of(c);
if (index != -1) {
return {index, false};
}
_columns.push_back(&c);
return {_columns.size() - 1, true};
}
uint32_t selection::add_column_for_post_processing(const column_definition& c) {
auto col = add_column(c);
if (col.added) {
_metadata->add_non_serialized_column(c.column_specification);
}
return col.index;
_columns.push_back(&c);
_metadata->add_non_serialized_column(c.column_specification);
return _columns.size() - 1;
}
::shared_ptr<selection> selection::from_selectors(data_dictionary::database db, schema_ptr schema, const sstring& ks, const std::vector<prepared_selector>& prepared_selectors) {

View File

@@ -130,14 +130,6 @@ public:
virtual std::vector<shared_ptr<functions::function>> used_functions() const { return {}; }
query::partition_slice::option_set get_query_options();
protected:
// Result of add_column: index in _columns and whether it was added now (or existed already).
struct add_column_result {
uint32_t index;
bool added;
};
// Adds a column to the _columns if not already present, returns add_column_result.
add_column_result add_column(const column_definition& c);
private:
static bool processes_selection(const std::vector<prepared_selector>& prepared_selectors);

View File

@@ -10,7 +10,6 @@
#include "cdc/log.hh"
#include "index/vector_index.hh"
#include "types/types.hh"
#include "utils/assert.hh"
#include <seastar/core/coroutine.hh>
#include "cql3/query_options.hh"
@@ -31,9 +30,6 @@
#include "cql3/query_processor.hh"
#include "cdc/cdc_extension.hh"
#include "cdc/cdc_partitioner.hh"
#include "db/tags/extension.hh"
#include "db/tags/utils.hh"
#include "alternator/ttl_tag.hh"
namespace cql3 {
@@ -47,8 +43,7 @@ alter_table_statement::alter_table_statement(uint32_t bound_terms,
std::vector<column_change> column_changes,
std::optional<cf_prop_defs> properties,
renames_type renames,
std::unique_ptr<attributes> attrs,
shared_ptr<column_identifier::raw> ttl_change)
std::unique_ptr<attributes> attrs)
: schema_altering_statement(std::move(name))
, _bound_terms(bound_terms)
, _type(t)
@@ -56,7 +51,6 @@ alter_table_statement::alter_table_statement(uint32_t bound_terms,
, _properties(std::move(properties))
, _renames(std::move(renames))
, _attrs(std::move(attrs))
, _ttl_change(std::move(ttl_change))
{
}
@@ -386,21 +380,6 @@ std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_sche
throw exceptions::invalid_request_exception("Cannot drop columns from a non-CQL3 table");
}
invoke_column_change_fn(std::mem_fn(&alter_table_statement::drop_column));
// If we dropped the column used for per-row TTL, we need to remove the tag.
if (std::optional<std::string> ttl_column = db::find_tag(*s, TTL_TAG_KEY)) {
for (auto& [raw_name, raw_validator, is_static] : _column_changes) {
if (*ttl_column == raw_name->text()) {
const std::map<sstring, sstring>* tags_ptr = db::get_tags_of_table(s);
if (tags_ptr) {
std::map<sstring, sstring> tags_map = *tags_ptr;
tags_map.erase(TTL_TAG_KEY);
cfm.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
}
break;
}
}
}
break;
case alter_table_statement::type::opts:
@@ -455,7 +434,6 @@ std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_sche
break;
case alter_table_statement::type::rename:
{
for (auto&& entry : _renames) {
auto from = entry.first->prepare_column_identifier(*s);
auto to = entry.second->prepare_column_identifier(*s);
@@ -492,53 +470,6 @@ std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_sche
}
return make_pair(std::move(new_base_schema), std::move(view_updates));
}
case alter_table_statement::type::ttl:
if (!db.features().cql_row_ttl) {
throw exceptions::invalid_request_exception("The CQL per-row TTL feature is not yet supported by this cluster. Upgrade all nodes to use it.");
}
if (_ttl_change) {
// Enable per-row TTL with chosen column for expiration time
const column_definition *cdef =
s->get_column_definition(to_bytes(_ttl_change->text()));
if (!cdef) {
throw exceptions::invalid_request_exception(fmt::format("Column '{}' does not exist in table {}.{}", _ttl_change->text(), keyspace(), column_family()));
}
if (cdef->type != timestamp_type && cdef->type != long_type && cdef->type != int32_type) {
throw exceptions::invalid_request_exception(fmt::format("TTL column {} must be of type timestamp, bigint or int, can't be {}", _ttl_change->text(), cdef->type->as_cql3_type().to_string()));
}
if (cdef->is_primary_key()) {
throw exceptions::invalid_request_exception(fmt::format("Cannot use a primary key column {} as a TTL column", _ttl_change->text()));
}
if (cdef->is_static()) {
throw exceptions::invalid_request_exception(fmt::format("Cannot use a static column {} as a TTL column", _ttl_change->text()));
}
std::optional<std::string> old_ttl_column = db::find_tag(*s, TTL_TAG_KEY);
if (old_ttl_column) {
throw exceptions::invalid_request_exception(fmt::format("Cannot set TTL column, table {}.{} already has a TTL column defined: {}", keyspace(), column_family(), *old_ttl_column));
}
const std::map<sstring, sstring>* old_tags_ptr = db::get_tags_of_table(s);
std::map<sstring, sstring> tags_map;
if (old_tags_ptr) {
// tags_ptr is a constant pointer to schema data. To modify
// it, we must make a copy.
tags_map = *old_tags_ptr;
}
tags_map[TTL_TAG_KEY] = _ttl_change->text();
cfm.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
} else {
// Disable per-row TTL
const std::map<sstring, sstring>* tags_ptr = db::get_tags_of_table(s);
if (!tags_ptr || tags_ptr->find(TTL_TAG_KEY) == tags_ptr->end()) {
throw exceptions::invalid_request_exception(fmt::format("Cannot unset TTL column, table {}.{} does not have a TTL column set", keyspace(), column_family()));
}
// tags_ptr is a constant pointer to schema data. To modify it, we
// must make a copy.
std::map<sstring, sstring> tags_map = *tags_ptr;
tags_map.erase(TTL_TAG_KEY);
cfm.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
}
break;
}
return make_pair(cfm.build(), std::move(view_updates));
}
@@ -577,15 +508,13 @@ alter_table_statement::raw_statement::raw_statement(cf_name name,
std::vector<column_change> column_changes,
std::optional<cf_prop_defs> properties,
renames_type renames,
std::unique_ptr<attributes::raw> attrs,
shared_ptr<column_identifier::raw> ttl_change)
std::unique_ptr<attributes::raw> attrs)
: cf_statement(std::move(name))
, _type(t)
, _column_changes(std::move(column_changes))
, _properties(std::move(properties))
, _renames(std::move(renames))
, _attrs(std::move(attrs))
, _ttl_change(std::move(ttl_change))
{}
std::unique_ptr<cql3::statements::prepared_statement>
@@ -610,8 +539,7 @@ alter_table_statement::raw_statement::prepare(data_dictionary::database db, cql_
_column_changes,
_properties,
_renames,
std::move(prepared_attrs),
_ttl_change
std::move(prepared_attrs)
),
ctx,
// since alter table is `cql_statement_no_metadata` (it doesn't return any metadata when preparing)

View File

@@ -32,7 +32,6 @@ public:
drop,
opts,
rename,
ttl,
};
using renames_type = std::vector<std::pair<shared_ptr<column_identifier::raw>,
shared_ptr<column_identifier::raw>>>;
@@ -51,7 +50,6 @@ private:
const std::optional<cf_prop_defs> _properties;
const renames_type _renames;
const std::unique_ptr<attributes> _attrs;
shared_ptr<column_identifier::raw> _ttl_change;
public:
alter_table_statement(uint32_t bound_terms,
cf_name name,
@@ -59,8 +57,7 @@ public:
std::vector<column_change> column_changes,
std::optional<cf_prop_defs> properties,
renames_type renames,
std::unique_ptr<attributes> attrs,
shared_ptr<column_identifier::raw> ttl_change);
std::unique_ptr<attributes> attrs);
virtual uint32_t get_bound_terms() const override;
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
@@ -81,7 +78,6 @@ class alter_table_statement::raw_statement : public raw::cf_statement {
const std::optional<cf_prop_defs> _properties;
const alter_table_statement::renames_type _renames;
const std::unique_ptr<attributes::raw> _attrs;
shared_ptr<column_identifier::raw> _ttl_change;
public:
raw_statement(cf_name name,
@@ -89,8 +85,7 @@ public:
std::vector<column_change> column_changes,
std::optional<cf_prop_defs> properties,
renames_type renames,
std::unique_ptr<attributes::raw> attrs,
shared_ptr<column_identifier::raw> ttl_change);
std::unique_ptr<attributes::raw> attrs);
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;

View File

@@ -259,15 +259,6 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
if (options.getSerialConsistency() == null)
throw new InvalidRequestException("Invalid empty serial consistency level");
#endif
const auto cl = options.get_consistency();
const query_processor::write_consistency_guardrail_state guardrail_state = qp.check_write_consistency_levels_guardrail(cl);
if (guardrail_state == query_processor::write_consistency_guardrail_state::FAIL) {
return make_exception_future<shared_ptr<cql_transport::messages::result_message>>(
exceptions::invalid_request_exception(
format("Consistency level {} is not allowed for write operations", cl)));
}
for (size_t i = 0; i < _statements.size(); ++i) {
_statements[i].statement->restrictions().validate_primary_key(options.for_statement(i));
}
@@ -275,31 +266,23 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
if (_has_conditions) {
++_stats.cas_batches;
_stats.statements_in_cas_batches += _statements.size();
return execute_with_conditions(qp, options, query_state).then([guardrail_state, cl] (auto result) {
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
}
return result;
});
return execute_with_conditions(qp, options, query_state);
}
++_stats.batches;
_stats.statements_in_batches += _statements.size();
auto timeout = db::timeout_clock::now() + get_timeout(query_state.get_client_state(), options);
return get_mutations(qp, options, timeout, local, now, query_state).then([this, &qp, cl, timeout, tr_state = query_state.get_trace_state(),
return get_mutations(qp, options, timeout, local, now, query_state).then([this, &qp, &options, timeout, tr_state = query_state.get_trace_state(),
permit = query_state.get_permit()] (utils::chunked_vector<mutation> ms) mutable {
return execute_without_conditions(qp, std::move(ms), cl, timeout, std::move(tr_state), std::move(permit));
}).then([guardrail_state, cl] (coordinator_result<> res) {
return execute_without_conditions(qp, std::move(ms), options.get_consistency(), timeout, std::move(tr_state), std::move(permit));
}).then([] (coordinator_result<> res) {
if (!res) {
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
seastar::make_shared<cql_transport::messages::result_message::exception>(std::move(res).assume_error()));
}
auto result = make_shared<cql_transport::messages::result_message::void_message>();
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
}
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(result));
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
make_shared<cql_transport::messages::result_message::void_message>());
});
}

View File

@@ -30,9 +30,6 @@
#include "service/storage_proxy.hh"
#include "db/config.hh"
#include "compaction/time_window_compaction_strategy.hh"
#include "db/tags/extension.hh"
#include "db/tags/utils.hh"
#include "alternator/ttl_tag.hh"
namespace cql3 {
@@ -44,12 +41,10 @@ create_table_statement::create_table_statement(cf_name name,
::shared_ptr<cf_prop_defs> properties,
bool if_not_exists,
column_set_type static_columns,
::shared_ptr<column_identifier> ttl_column,
const std::optional<table_id>& id)
: schema_altering_statement{name}
, _use_compact_storage(false)
, _static_columns{static_columns}
, _ttl_column{ttl_column}
, _properties{properties}
, _if_not_exists{if_not_exists}
, _id(id)
@@ -128,13 +123,6 @@ void create_table_statement::apply_properties_to(schema_builder& builder, const
#endif
_properties->apply_to_builder(builder, _properties->make_schema_extensions(db.extensions()), db, keyspace(), true);
// Remembering which column was designated as the TTL column for row-based
// TTL column is done using a "tag" extension. If there is no TTL column,
// we don't need this extension at all.
if (_ttl_column) {
std::map<sstring, sstring> tags_map = {{TTL_TAG_KEY, _ttl_column->text()}};
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(std::move(tags_map)));
}
}
void create_table_statement::add_column_metadata_from_aliases(schema_builder& builder, std::vector<bytes> aliases, const std::vector<data_type>& types, column_kind kind) const
@@ -210,7 +198,7 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
}
const bool has_default_ttl = _properties.properties()->get_default_time_to_live() > 0;
auto stmt = ::make_shared<create_table_statement>(*_cf_name, _properties.properties(), _if_not_exists, _static_columns, _ttl_column, _properties.properties()->get_id());
auto stmt = ::make_shared<create_table_statement>(*_cf_name, _properties.properties(), _if_not_exists, _static_columns, _properties.properties()->get_id());
bool ks_uses_tablets;
try {
@@ -415,27 +403,6 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
}
}
// If a TTL column is defined, it must be a regular column - not a static
// column or part of the primary key.
if (_ttl_column) {
if (!db.features().cql_row_ttl) {
throw exceptions::invalid_request_exception("The CQL per-row TTL feature is not yet supported by this cluster. Upgrade all nodes to use it.");
}
for (const auto& alias : key_aliases) {
if (alias->text() == _ttl_column->text()) {
throw exceptions::invalid_request_exception(format("TTL column {} cannot be part of the PRIMARY KEY", alias->text()));
}
}
for (const auto& alias : _column_aliases) {
if (alias->text() == _ttl_column->text()) {
throw exceptions::invalid_request_exception(format("TTL column {} cannot be part of the PRIMARY KEY", alias->text()));
}
}
if (_static_columns.contains(_ttl_column)) {
throw exceptions::invalid_request_exception(format("TTL column {} cannot be a static column", _ttl_column->text()));
}
}
return std::make_unique<prepared_statement>(audit_info(), stmt, std::move(stmt_warnings));
}
@@ -458,23 +425,12 @@ data_type create_table_statement::raw_statement::get_type_and_remove(column_map_
return _properties.get_reversable_type(*t, type);
}
void create_table_statement::raw_statement::add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static, bool is_ttl) {
void create_table_statement::raw_statement::add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static) {
_defined_names.emplace(def);
_definitions.emplace(def, type);
if (is_static) {
_static_columns.emplace(def);
}
if (is_ttl) {
if (_ttl_column) {
throw exceptions::invalid_request_exception(fmt::format("Cannot have more than one TTL column in a table. Saw {} and {}", _ttl_column->text(), def->text()));
}
// FIXME: find a way to check cql3_type::raw without fmt::format
auto type_name = fmt::format("{}", type);
if (type_name != "timestamp" && type_name != "bigint" && type_name != "int") {
throw exceptions::invalid_request_exception(fmt::format("TTL column '{}' must be of type timestamp, bigint or int, can't be {}", def->text(), type_name));
}
_ttl_column = def;
}
}
void create_table_statement::raw_statement::add_key_aliases(const std::vector<::shared_ptr<column_identifier>> aliases) {

View File

@@ -57,7 +57,6 @@ class create_table_statement : public schema_altering_statement {
shared_ptr_equal_by_value<column_identifier>>;
column_map_type _columns;
column_set_type _static_columns;
::shared_ptr<column_identifier> _ttl_column; // for row-based TTL
const ::shared_ptr<cf_prop_defs> _properties;
const bool _if_not_exists;
std::optional<table_id> _id;
@@ -66,7 +65,6 @@ public:
::shared_ptr<cf_prop_defs> properties,
bool if_not_exists,
column_set_type static_columns,
::shared_ptr<column_identifier> ttl_column,
const std::optional<table_id>& id);
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
@@ -102,7 +100,6 @@ private:
std::vector<std::vector<::shared_ptr<column_identifier>>> _key_aliases;
std::vector<::shared_ptr<column_identifier>> _column_aliases;
create_table_statement::column_set_type _static_columns;
::shared_ptr<column_identifier> _ttl_column; // for row-based TTL
std::multiset<::shared_ptr<column_identifier>,
indirect_less<::shared_ptr<column_identifier>, column_identifier::text_comparator>> _defined_names;
@@ -119,7 +116,7 @@ public:
data_type get_type_and_remove(column_map_type& columns, ::shared_ptr<column_identifier> t);
void add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static, bool is_ttl);
void add_definition(::shared_ptr<column_identifier> def, ::shared_ptr<cql3_type::raw> type, bool is_static);
void add_key_aliases(const std::vector<::shared_ptr<column_identifier>> aliases);

View File

@@ -23,7 +23,6 @@
#include "index/vector_index.hh"
#include "schema/schema.hh"
#include "service/client_state.hh"
#include "service/paxos/paxos_state.hh"
#include "types/types.hh"
#include "cql3/query_processor.hh"
#include "cql3/cql_statement.hh"
@@ -330,19 +329,6 @@ future<std::vector<description>> table(const data_dictionary::database& db, cons
"*/",
*table_desc.create_statement);
table_desc.create_statement = std::move(os).to_managed_string();
} else if (service::paxos::paxos_store::try_get_base_table(name)) {
// Paxos state table is internally managed by Scylla and it shouldn't be exposed to the user.
// The table is allowed to be described as a comment to ease administrative work but it's hidden from all listings.
fragmented_ostringstream os{};
fmt::format_to(os.to_iter(),
"/* Do NOT execute this statement! It's only for informational purposes.\n"
" A paxos state table is created automatically when enabling LWT on a base table.\n"
"\n{}\n"
"*/",
*table_desc.create_statement);
table_desc.create_statement = std::move(os).to_managed_string();
}
result.push_back(std::move(table_desc));
@@ -378,7 +364,7 @@ future<std::vector<description>> table(const data_dictionary::database& db, cons
future<std::vector<description>> tables(const data_dictionary::database& db, const lw_shared_ptr<keyspace_metadata>& ks, std::optional<bool> with_internals = std::nullopt) {
auto& replica_db = db.real_database();
auto tables = ks->tables() | std::views::filter([&replica_db] (const schema_ptr& s) {
return !cdc::is_log_for_some_table(replica_db, s->ks_name(), s->cf_name()) && !service::paxos::paxos_store::try_get_base_table(s->cf_name());
return !cdc::is_log_for_some_table(replica_db, s->ks_name(), s->cf_name());
}) | std::ranges::to<std::vector<schema_ptr>>();
std::ranges::sort(tables, std::ranges::less(), std::mem_fn(&schema::cf_name));
@@ -659,7 +645,8 @@ future<std::vector<std::vector<managed_bytes_opt>>> schema_describe_statement::d
auto& auth_service = *client_state.get_auth_service();
if (config.with_hashed_passwords) {
if (!co_await client_state.has_superuser()) {
const auto maybe_user = client_state.user();
if (!maybe_user || !co_await auth::has_superuser(auth_service, *maybe_user)) {
co_await coroutine::return_exception(exceptions::unauthorized_exception(
"DESCRIBE SCHEMA WITH INTERNALS AND PASSWORDS can only be issued by a superuser"));
}

View File

@@ -49,7 +49,7 @@ future<> cql3::statements::list_permissions_statement::check_access(query_proces
const auto& as = *state.get_auth_service();
const auto user = state.user();
return state.has_superuser().then([this, &as, user](bool has_super) {
return auth::has_superuser(as, *user).then([this, &as, user](bool has_super) {
if (has_super) {
return make_ready_future<>();
}

View File

@@ -74,7 +74,7 @@ cql3::statements::list_users_statement::execute(query_processor& qp, service::qu
const auto& cs = state.get_client_state();
const auto& as = *cs.get_auth_service();
return cs.has_superuser().then([&cs, &as, make_results = std::move(make_results)](bool has_superuser) mutable {
return auth::has_superuser(as, *cs.user()).then([&cs, &as, make_results = std::move(make_results)](bool has_superuser) mutable {
if (has_superuser) {
return as.underlying_role_manager().query_all().then([&as, make_results = std::move(make_results)](std::unordered_set<sstring> roles) mutable {
return make_results(as, std::move(roles));

View File

@@ -268,22 +268,10 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
inc_cql_stats(qs.get_client_state().is_internal());
const auto cl = options.get_consistency();
const query_processor::write_consistency_guardrail_state guardrail_state = qp.check_write_consistency_levels_guardrail(cl);
if (guardrail_state == query_processor::write_consistency_guardrail_state::FAIL) {
co_return coroutine::exception(
std::make_exception_ptr(exceptions::invalid_request_exception(
format("Consistency level {} is not allowed for write operations", cl))));
}
_restrictions->validate_primary_key(options);
if (has_conditions()) {
auto result = co_await execute_with_condition(qp, qs, options);
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
}
co_return result;
co_return co_await execute_with_condition(qp, qs, options);
}
json_cache_opt json_cache = maybe_prepare_json_cache(options);
@@ -302,9 +290,6 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
}
auto result = seastar::make_shared<cql_transport::messages::result_message::void_message>();
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
}
if (keys_size_one) {
auto&& table = s->table();
if (_may_use_token_aware_routing && table.uses_tablets() && qs.get_client_state().is_protocol_extension_set(cql_transport::cql_protocol_extension::TABLETS_ROUTING_V1)) {

View File

@@ -94,7 +94,7 @@ future<> create_role_statement::check_access(query_processor& qp, const service:
return;
}
const bool has_superuser = state.has_superuser().get();
const bool has_superuser = auth::has_superuser(*state.get_auth_service(), *state.user()).get();
if (_options.hashed_password && !has_superuser) {
throw exceptions::unauthorized_exception("Only superusers can create a role with a hashed password.");
@@ -213,7 +213,7 @@ future<> alter_role_statement::check_access(query_processor& qp, const service::
auto& as = *state.get_auth_service();
const auto& user = *state.user();
const bool user_is_superuser = state.has_superuser().get();
const bool user_is_superuser = auth::has_superuser(as, user).get();
if (_options.is_superuser) {
if (!user_is_superuser) {
@@ -306,7 +306,7 @@ future<> drop_role_statement::check_access(query_processor& qp, const service::c
auto& as = *state.get_auth_service();
const bool user_is_superuser = state.has_superuser().get();
const bool user_is_superuser = auth::has_superuser(as, *state.user()).get();
const bool role_has_superuser = [this, &as] {
try {
@@ -442,7 +442,7 @@ list_roles_statement::execute(query_processor& qp, service::query_state& state,
const auto& cs = state.get_client_state();
const auto& as = *cs.get_auth_service();
return cs.has_superuser().then([this, &cs, &as, make_results = std::move(make_results)](bool super) mutable {
return auth::has_superuser(as, *cs.user()).then([this, &cs, &as, make_results = std::move(make_results)](bool super) mutable {
auto& rm = as.underlying_role_manager();
const auto& a = as.underlying_authenticator();
const auto query_mode = _recursive ? auth::recursive_role_query::yes : auth::recursive_role_query::no;

View File

@@ -259,9 +259,11 @@ uint32_t select_statement::get_bound_terms() const {
future<> select_statement::check_access(query_processor& qp, const service::client_state& state) const {
try {
auto cdc = qp.db().get_cdc_base_table(*_schema);
auto& cf_name = _schema->is_view()
? _schema->view_info()->base_name()
const data_dictionary::database db = qp.db();
auto&& s = db.find_schema(keyspace(), column_family());
auto cdc = db.get_cdc_base_table(*s);
auto& cf_name = s->is_view()
? s->view_info()->base_name()
: (cdc ? cdc->cf_name() : column_family());
const schema_ptr& base_schema = cdc ? cdc : _schema;
bool is_vector_indexed = secondary_index::vector_index::has_vector_index(*base_schema);
@@ -2004,7 +2006,9 @@ static std::optional<ann_ordering_info> get_ann_ordering_info(
auto indexes = sim.list_indexes();
auto it = std::find_if(indexes.begin(), indexes.end(), [&prepared_ann_ordering](const auto& ind) {
return secondary_index::vector_index::is_vector_index_on_column(ind.metadata(), prepared_ann_ordering.first->name_as_text());
return (ind.metadata().options().contains(db::index::secondary_index::custom_class_option_name) &&
ind.metadata().options().at(db::index::secondary_index::custom_class_option_name) == ANN_CUSTOM_INDEX_OPTION) &&
(ind.target_column() == prepared_ann_ordering.first->name_as_text());
});
if (it == indexes.end()) {
@@ -2757,7 +2761,11 @@ select_statement::ordering_comparator_type select_statement::get_ordering_compar
// even if we don't
// ultimately ship them to the client (CASSANDRA-4911).
for (auto&& [column_def, is_descending] : orderings) {
auto index = selection.add_column_for_post_processing(*column_def);
auto index = selection.index_of(*column_def);
if (index < 0) {
index = selection.add_column_for_post_processing(*column_def);
}
sorters.emplace_back(index, column_def->type);
}
@@ -2860,7 +2868,9 @@ void select_statement::ensure_filtering_columns_retrieval(data_dictionary::datab
selection::selection& selection,
const restrictions::statement_restrictions& restrictions) {
for (auto&& cdef : restrictions.get_column_defs_for_filtering(db)) {
selection.add_column_for_post_processing(*cdef);
if (!selection.has_column(*cdef)) {
selection.add_column_for_post_processing(*cdef);
}
}
}

View File

@@ -11,7 +11,6 @@
#pragma once
#include "cql3/statements/statement_type.hh"
#include "db/consistency_level_type.hh"
#include <cstdint>
@@ -88,9 +87,6 @@ struct cql_stats {
uint64_t replication_strategy_warn_list_violations = 0;
uint64_t replication_strategy_fail_list_violations = 0;
uint64_t writes_per_consistency_level[size_t(db::consistency_level::MAX_VALUE) + 1] = {};
uint64_t write_consistency_levels_disallowed_violations = 0;
uint64_t write_consistency_levels_warned_violations = 0;
private:
uint64_t _unpaged_select_queries[(size_t)ks_selector::SIZE] = {0ul};

View File

@@ -55,21 +55,8 @@ int32_t batchlog_shard_of(db_clock::time_point written_at) {
return hash & ((1ULL << batchlog_shard_bits) - 1);
}
bool is_batchlog_v1(const schema& schema) {
return schema.cf_name() == system_keyspace::BATCHLOG;
}
std::pair<partition_key, clustering_key>
get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, int32_t batchlog_shard, db_clock::time_point written_at, std::optional<utils::UUID> id) {
if (is_batchlog_v1(schema)) {
if (!id) {
on_internal_error(blogger, "get_batchlog_key(): key for batchlog v1 requires batchlog id");
}
auto pkey = partition_key::from_single_value(schema, {serialized(*id)});
auto ckey = clustering_key::make_empty();
return std::pair(std::move(pkey), std::move(ckey));
}
auto pkey = partition_key::from_exploded(schema, {serialized(version), serialized(int8_t(stage)), serialized(batchlog_shard)});
std::vector<bytes> ckey_components;
@@ -98,14 +85,6 @@ mutation get_batchlog_mutation_for(schema_ptr schema, managed_bytes data, int32_
auto cdef_data = schema->get_column_definition(to_bytes("data"));
m.set_cell(ckey, *cdef_data, atomic_cell::make_live(*cdef_data->type, timestamp, std::move(data)));
if (is_batchlog_v1(*schema)) {
auto cdef_version = schema->get_column_definition(to_bytes("version"));
m.set_cell(ckey, *cdef_version, atomic_cell::make_live(*cdef_version->type, timestamp, serialized(version)));
auto cdef_written_at = schema->get_column_definition(to_bytes("written_at"));
m.set_cell(ckey, *cdef_written_at, atomic_cell::make_live(*cdef_written_at->type, timestamp, serialized(now)));
}
return m;
}
@@ -143,10 +122,9 @@ mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db_clo
const std::chrono::seconds db::batchlog_manager::replay_interval;
const uint32_t db::batchlog_manager::page_size;
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, gms::feature_service& fs, batchlog_manager_config config)
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, batchlog_manager_config config)
: _qp(qp)
, _sys_ks(sys_ks)
, _fs(fs)
, _replay_timeout(config.replay_timeout)
, _replay_rate(config.replay_rate)
, _delay(config.delay)
@@ -322,206 +300,149 @@ future<> db::batchlog_manager::maybe_migrate_v1_to_v2() {
});
}
namespace {
using clock_type = db_clock::rep;
struct replay_stats {
std::optional<db_clock::time_point> min_too_fresh;
bool need_cleanup = false;
};
} // anonymous namespace
static future<db::all_batches_replayed> process_batch(
cql3::query_processor& qp,
db::batchlog_manager::stats& stats,
db::batchlog_manager::post_replay_cleanup cleanup,
utils::rate_limiter& limiter,
schema_ptr schema,
std::unordered_map<int32_t, replay_stats>& replay_stats_per_shard,
const db_clock::time_point now,
db_clock::duration replay_timeout,
std::chrono::seconds write_timeout,
const cql3::untyped_result_set::row& row) {
const bool is_v1 = db::is_batchlog_v1(*schema);
const auto stage = is_v1 ? db::batchlog_stage::initial : static_cast<db::batchlog_stage>(row.get_as<int8_t>("stage"));
const auto batch_shard = is_v1 ? 0 : row.get_as<int32_t>("shard");
auto written_at = row.get_as<db_clock::time_point>("written_at");
auto id = row.get_as<utils::UUID>("id");
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
auto timeout = replay_timeout;
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
co_return db::all_batches_replayed::no;
}
auto data = row.get_blob_unfragmented("data");
blogger.debug("Replaying batch {} from stage {} and batch shard {}", id, int32_t(stage), batch_shard);
utils::chunked_vector<mutation> mutations;
bool send_failed = false;
auto& shard_written_at = replay_stats_per_shard.try_emplace(batch_shard, replay_stats{}).first->second;
try {
utils::chunked_vector<std::pair<canonical_mutation, schema_ptr>> fms;
auto in = ser::as_input_stream(data);
while (in.size()) {
auto fm = ser::deserialize(in, std::type_identity<canonical_mutation>());
const auto tbl = qp.db().try_find_table(fm.column_family_id());
if (!tbl) {
continue;
}
if (written_at <= tbl->get_truncation_time()) {
continue;
}
schema_ptr s = tbl->schema();
if (s->tombstone_gc_options().mode() == tombstone_gc_mode::repair) {
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
}
fms.emplace_back(std::move(fm), std::move(s));
}
if (now < written_at + timeout) {
blogger.debug("Skipping replay of {}, too fresh", id);
shard_written_at.min_too_fresh = std::min(shard_written_at.min_too_fresh.value_or(written_at), written_at);
co_return db::all_batches_replayed::no;
}
auto size = data.size();
for (const auto& [fm, s] : fms) {
mutations.emplace_back(fm.to_mutation(s));
co_await coroutine::maybe_yield();
}
if (!mutations.empty()) {
const auto ttl = [written_at]() -> clock_type {
/*
* Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
* This ensures that deletes aren't "undone" by an old batch replay.
*/
auto unadjusted_ttl = std::numeric_limits<gc_clock::rep>::max();
warn(unimplemented::cause::HINT);
#if 0
for (auto& m : *mutations) {
unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
}
#endif
return unadjusted_ttl - std::chrono::duration_cast<gc_clock::duration>(db_clock::now() - written_at).count();
}();
if (ttl > 0) {
// Origin does the send manually, however I can't see a super great reason to do so.
// Our normal write path does not add much redundancy to the dispatch, and rate is handled after send
// in both cases.
// FIXME: verify that the above is reasonably true.
co_await limiter.reserve(size);
stats.write_attempts += mutations.size();
auto timeout = db::timeout_clock::now() + write_timeout;
if (cleanup) {
co_await qp.proxy().send_batchlog_replay_to_all_replicas(mutations, timeout);
} else {
co_await qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
}
}
}
} catch (data_dictionary::no_such_keyspace& ex) {
// should probably ignore and drop the batch
} catch (const data_dictionary::no_such_column_family&) {
// As above -- we should drop the batch if the table doesn't exist anymore.
} catch (...) {
blogger.warn("Replay failed (will retry): {}", std::current_exception());
// timeout, overload etc.
// Do _not_ remove the batch, assuning we got a node write error.
// Since we don't have hints (which origin is satisfied with),
// we have to resort to keeping this batch to next lap.
if (is_v1 || !cleanup || stage == db::batchlog_stage::failed_replay) {
co_return db::all_batches_replayed::no;
}
send_failed = true;
}
auto& sp = qp.proxy();
if (send_failed) {
blogger.debug("Moving batch {} to stage failed_replay", id);
auto m = get_batchlog_mutation_for(schema, mutations, netw::messaging_service::current_version, db::batchlog_stage::failed_replay, written_at, id);
co_await sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
}
// delete batch
auto m = get_batchlog_delete_mutation(schema, netw::messaging_service::current_version, stage, written_at, id);
co_await qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
shard_written_at.need_cleanup = true;
co_return db::all_batches_replayed(!send_failed);
}
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches_v1(post_replay_cleanup) {
db::all_batches_replayed all_replayed = all_batches_replayed::yes;
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
utils::rate_limiter limiter(throttle);
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
// Use a stable `now` across all batches, so skip/replay decisions are the
// same across a while prefix of written_at (across all ids).
const auto now = db_clock::now();
auto batch = [this, &limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
all_replayed = all_replayed && co_await process_batch(_qp, _stats, post_replay_cleanup::no, limiter, schema, replay_stats_per_shard, now, _replay_timeout, write_timeout, row);
co_return stop_iteration::no;
};
co_await with_gate(_gate, [this, &all_replayed, batch = std::move(batch)] () mutable -> future<> {
blogger.debug("Started replayAllFailedBatches");
co_await utils::get_local_injector().inject("add_delay_to_batch_replay", std::chrono::milliseconds(1000));
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
co_await _qp.query_internal(
format("SELECT * FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG),
db::consistency_level::ONE,
{},
page_size,
batch);
blogger.debug("Finished replayAllFailedBatches with all_replayed: {}", all_replayed);
});
co_return all_replayed;
}
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches_v2(post_replay_cleanup cleanup) {
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cleanup) {
co_await maybe_migrate_v1_to_v2();
typedef db_clock::rep clock_type;
db::all_batches_replayed all_replayed = all_batches_replayed::yes;
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
utils::rate_limiter limiter(throttle);
auto limiter = make_lw_shared<utils::rate_limiter>(throttle);
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
struct replay_stats {
std::optional<db_clock::time_point> min_too_fresh;
bool need_cleanup = false;
};
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
// Use a stable `now` across all batches, so skip/replay decisions are the
// same across a while prefix of written_at (across all ids).
const auto now = db_clock::now();
auto batch = [this, cleanup, &limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
all_replayed = all_replayed && co_await process_batch(_qp, _stats, cleanup, limiter, schema, replay_stats_per_shard, now, _replay_timeout, write_timeout, row);
auto batch = [this, cleanup, limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
const auto stage = static_cast<batchlog_stage>(row.get_as<int8_t>("stage"));
const auto batch_shard = row.get_as<int32_t>("shard");
auto written_at = row.get_as<db_clock::time_point>("written_at");
auto id = row.get_as<utils::UUID>("id");
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
auto timeout = _replay_timeout;
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
all_replayed = all_batches_replayed::no;
co_return stop_iteration::no;
}
auto data = row.get_blob_unfragmented("data");
blogger.debug("Replaying batch {} from stage {} and batch shard {}", id, int32_t(stage), batch_shard);
utils::chunked_vector<mutation> mutations;
bool send_failed = false;
auto& shard_written_at = replay_stats_per_shard.try_emplace(batch_shard, replay_stats{}).first->second;
try {
utils::chunked_vector<std::pair<canonical_mutation, schema_ptr>> fms;
auto in = ser::as_input_stream(data);
while (in.size()) {
auto fm = ser::deserialize(in, std::type_identity<canonical_mutation>());
const auto tbl = _qp.db().try_find_table(fm.column_family_id());
if (!tbl) {
continue;
}
if (written_at <= tbl->get_truncation_time()) {
continue;
}
schema_ptr s = tbl->schema();
if (s->tombstone_gc_options().mode() == tombstone_gc_mode::repair) {
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
}
fms.emplace_back(std::move(fm), std::move(s));
}
if (now < written_at + timeout) {
blogger.debug("Skipping replay of {}, too fresh", id);
shard_written_at.min_too_fresh = std::min(shard_written_at.min_too_fresh.value_or(written_at), written_at);
co_return stop_iteration::no;
}
auto size = data.size();
for (const auto& [fm, s] : fms) {
mutations.emplace_back(fm.to_mutation(s));
co_await coroutine::maybe_yield();
}
if (!mutations.empty()) {
const auto ttl = [written_at]() -> clock_type {
/*
* Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
* This ensures that deletes aren't "undone" by an old batch replay.
*/
auto unadjusted_ttl = std::numeric_limits<gc_clock::rep>::max();
warn(unimplemented::cause::HINT);
#if 0
for (auto& m : *mutations) {
unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
}
#endif
return unadjusted_ttl - std::chrono::duration_cast<gc_clock::duration>(db_clock::now() - written_at).count();
}();
if (ttl > 0) {
// Origin does the send manually, however I can't see a super great reason to do so.
// Our normal write path does not add much redundancy to the dispatch, and rate is handled after send
// in both cases.
// FIXME: verify that the above is reasonably true.
co_await limiter->reserve(size);
_stats.write_attempts += mutations.size();
auto timeout = db::timeout_clock::now() + write_timeout;
if (cleanup) {
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(mutations, timeout);
} else {
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
}
}
}
} catch (data_dictionary::no_such_keyspace& ex) {
// should probably ignore and drop the batch
} catch (const data_dictionary::no_such_column_family&) {
// As above -- we should drop the batch if the table doesn't exist anymore.
} catch (...) {
blogger.warn("Replay failed (will retry): {}", std::current_exception());
all_replayed = all_batches_replayed::no;
// timeout, overload etc.
// Do _not_ remove the batch, assuning we got a node write error.
// Since we don't have hints (which origin is satisfied with),
// we have to resort to keeping this batch to next lap.
if (!cleanup || stage == batchlog_stage::failed_replay) {
co_return stop_iteration::no;
}
send_failed = true;
}
auto& sp = _qp.proxy();
if (send_failed) {
blogger.debug("Moving batch {} to stage failed_replay", id);
auto m = get_batchlog_mutation_for(schema, mutations, netw::messaging_service::current_version, batchlog_stage::failed_replay, written_at, id);
co_await sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
}
// delete batch
auto m = get_batchlog_delete_mutation(schema, netw::messaging_service::current_version, stage, written_at, id);
co_await _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
shard_written_at.need_cleanup = true;
co_return stop_iteration::no;
};
@@ -580,10 +501,3 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
co_return all_replayed;
}
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cleanup) {
if (_fs.batchlog_v2) {
return replay_all_failed_batches_v2(cleanup);
}
return replay_all_failed_batches_v1(cleanup);
}

View File

@@ -27,12 +27,6 @@ class query_processor;
} // namespace cql3
namespace gms {
class feature_service;
} // namespace gms
namespace db {
class system_keyspace;
@@ -55,11 +49,6 @@ class batchlog_manager : public peering_sharded_service<batchlog_manager> {
public:
using post_replay_cleanup = bool_class<class post_replay_cleanup_tag>;
struct stats {
uint64_t write_attempts = 0;
};
private:
static constexpr std::chrono::seconds replay_interval = std::chrono::seconds(60);
static constexpr uint32_t page_size = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
@@ -67,13 +56,14 @@ private:
using clock_type = lowres_clock;
stats _stats;
struct stats {
uint64_t write_attempts = 0;
} _stats;
seastar::metrics::metric_groups _metrics;
cql3::query_processor& _qp;
db::system_keyspace& _sys_ks;
gms::feature_service& _fs;
db_clock::duration _replay_timeout;
uint64_t _replay_rate;
std::chrono::milliseconds _delay;
@@ -94,14 +84,12 @@ private:
future<> maybe_migrate_v1_to_v2();
future<all_batches_replayed> replay_all_failed_batches_v1(post_replay_cleanup cleanup);
future<all_batches_replayed> replay_all_failed_batches_v2(post_replay_cleanup cleanup);
future<all_batches_replayed> replay_all_failed_batches(post_replay_cleanup cleanup);
public:
// Takes a QP, not a distributes. Because this object is supposed
// to be per shard and does no dispatching beyond delegating the the
// shard qp (which is what you feed here).
batchlog_manager(cql3::query_processor&, db::system_keyspace& sys_ks, gms::feature_service& fs, batchlog_manager_config config);
batchlog_manager(cql3::query_processor&, db::system_keyspace& sys_ks, batchlog_manager_config config);
// abort the replay loop and return its future.
future<> drain();
@@ -114,7 +102,7 @@ public:
return _last_replay;
}
const stats& get_stats() const {
const stats& stats() const {
return _stats;
}
private:

View File

@@ -199,9 +199,18 @@ class cache_mutation_reader final : public mutation_reader::impl {
return *_snp->schema();
}
gc_clock::time_point get_read_time() {
return _read_context.tombstone_gc_state() ? gc_clock::now() : gc_clock::time_point::min();
}
gc_clock::time_point get_gc_before() {
if (!_gc_before.has_value()) {
_gc_before = _read_context.tombstone_gc_state().with_commitlog_check_disabled().get_gc_before_for_key(_schema, _dk, _read_time);
auto gc_state = _read_context.tombstone_gc_state();
if (gc_state) {
_gc_before = gc_state->with_commitlog_check_disabled().get_gc_before_for_key(_schema, _dk, _read_time);
} else {
_gc_before = gc_clock::time_point::min();
}
}
return *_gc_before;
}
@@ -233,7 +242,7 @@ public:
, _read_context_holder()
, _read_context(ctx) // ctx is owned by the caller, who's responsible for closing it.
, _next_row(*_schema, *_snp, false, _read_context.is_reversed())
, _read_time(gc_clock::now())
, _read_time(get_read_time())
{
clogger.trace("csm {}: table={}.{}, dk={}, reversed={}, snap={}",
fmt::ptr(this),
@@ -792,7 +801,7 @@ void cache_mutation_reader::copy_from_cache_to_buffer() {
if (_next_row_in_range) {
bool remove_row = false;
if (_read_context.tombstone_gc_state().is_gc_enabled() // do not compact rows when set to no_gc() (used in some unit tests)
if (_read_context.tombstone_gc_state() // do not compact rows when tombstone_gc_state is not set (used in some unit tests)
&& !_next_row.dummy()
&& _snp->at_latest_version()
&& _snp->at_oldest_version()) {

View File

@@ -1986,13 +1986,13 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
}
continue;
} catch (shutdown_marker&) {
_reserve_segments.abort(std::current_exception());
break;
} catch (...) {
clogger.warn("Exception in segment reservation: {}", std::current_exception());
}
co_await sleep(100ms);
}
_reserve_segments.abort(std::make_exception_ptr(shutdown_marker()));
}
future<std::vector<db::commitlog::descriptor>>

View File

@@ -266,13 +266,6 @@ const config_type& config_type_for<std::vector<enum_option<db::replication_strat
return ct;
}
template <>
const config_type& config_type_for<std::vector<enum_option<db::consistency_level_restriction_t>>>() {
static config_type ct(
"consistency level list", printable_vector_to_json<enum_option<db::consistency_level_restriction_t>>);
return ct;
}
template <>
const config_type& config_type_for<enum_option<db::tri_mode_restriction_t>>() {
static config_type ct(
@@ -422,23 +415,6 @@ public:
}
};
template <>
class convert<enum_option<db::consistency_level_restriction_t>> {
public:
static bool decode(const Node& node, enum_option<db::consistency_level_restriction_t>& rhs) {
std::string name;
if (!convert<std::string>::decode(node, name)) {
return false;
}
try {
std::istringstream(name) >> rhs;
} catch (boost::program_options::invalid_option_value&) {
return false;
}
return true;
}
};
template <>
class convert<enum_option<db::tri_mode_restriction_t>> {
public:
@@ -1090,7 +1066,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Enable or disable the native transport server. Uses the same address as the rpc_address, but the port is different from the rpc_port. See native_transport_port.")
, native_transport_port(this, "native_transport_port", "cql_port", value_status::Used, 9042,
"Port on which the CQL native transport listens for clients.")
, maintenance_socket(this, "maintenance_socket", value_status::Used, "workdir",
, maintenance_socket(this, "maintenance_socket", value_status::Used, "ignore",
"The Unix Domain Socket the node uses for maintenance socket.\n"
"The possible options are:\n"
"\tignore the node will not open the maintenance socket.\n"
@@ -1225,13 +1201,13 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"* org.apache.cassandra.auth.CassandraRoleManager: Stores role data in the system_auth keyspace;\n"
"* com.scylladb.auth.LDAPRoleManager: Fetches role data from an LDAP server.")
, permissions_validity_in_ms(this, "permissions_validity_in_ms", liveness::LiveUpdate, value_status::Used, 10000,
"How long authorized statements cache entries remain valid. The cached value is considered valid as long as both its value is not older than the permissions_validity_in_ms "
"How long permissions in cache remain valid. Depending on the authorizer, such as CassandraAuthorizer, fetching permissions can be resource intensive. Permissions caching is disabled when this property is set to 0 or when AllowAllAuthorizer is used. The cached value is considered valid as long as both its value is not older than the permissions_validity_in_ms "
"and the cached value has been read at least once during the permissions_validity_in_ms time frame. If any of these two conditions doesn't hold the cached value is going to be evicted from the cache.\n"
"\n"
"Related information: Object permissions")
, permissions_update_interval_in_ms(this, "permissions_update_interval_in_ms", liveness::LiveUpdate, value_status::Used, 2000,
"Refresh interval for authorized statements cache. After this interval, cache entries become eligible for refresh. An async reload is scheduled every permissions_update_interval_in_ms time period and the old value is returned until it completes. If permissions_validity_in_ms has a non-zero value, then this property must also have a non-zero value. It's recommended to set this value to be at least 3 times smaller than the permissions_validity_in_ms. This option additionally controls the permissions refresh interval for LDAP.")
, permissions_cache_max_entries(this, "permissions_cache_max_entries", liveness::LiveUpdate, value_status::Unused, 1000,
"Refresh interval for permissions cache (if enabled). After this interval, cache entries become eligible for refresh. An async reload is scheduled every permissions_update_interval_in_ms time period and the old value is returned until it completes. If permissions_validity_in_ms has a non-zero value, then this property must also have a non-zero value. It's recommended to set this value to be at least 3 times smaller than the permissions_validity_in_ms.")
, permissions_cache_max_entries(this, "permissions_cache_max_entries", liveness::LiveUpdate, value_status::Used, 1000,
"Maximum cached permission entries. Must have a non-zero value if permissions caching is enabled (see a permissions_validity_in_ms description).")
, server_encryption_options(this, "server_encryption_options", value_status::Used, {/*none*/},
"Enable or disable inter-node encryption. You must also generate keys and provide the appropriate key and trust store locations and passwords. The available options are:\n"
@@ -1296,7 +1272,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, ignore_dead_nodes_for_replace(this, "ignore_dead_nodes_for_replace", value_status::Used, "", "List dead nodes to ignore for replace operation using a comma-separated list of host IDs. E.g., scylla --ignore-dead-nodes-for-replace 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e")
, override_decommission(this, "override_decommission", value_status::Deprecated, false, "Set true to force a decommissioned node to join the cluster (cannot be set if consistent-cluster-management is enabled).")
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based.")
, allowed_repair_based_node_ops(this, "allowed_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, "replace,removenode,rebuild", "A comma separated list of node operations which are allowed to enable repair based node operations. The operations can be bootstrap, replace, removenode, decommission and rebuild.")
, allowed_repair_based_node_ops(this, "allowed_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, "replace,removenode,rebuild,bootstrap,decommission", "A comma separated list of node operations which are allowed to enable repair based node operations. The operations can be bootstrap, replace, removenode, decommission and rebuild.")
, enable_compacting_data_for_streaming_and_repair(this, "enable_compacting_data_for_streaming_and_repair", liveness::LiveUpdate, value_status::Used, true, "Enable the compacting reader, which compacts the data for streaming and repair (load'n'stream included) before sending it to, or synchronizing it with peers. Can reduce the amount of data to be processed by removing dead data, but adds CPU overhead.")
, enable_tombstone_gc_for_streaming_and_repair(this, "enable_tombstone_gc_for_streaming_and_repair", liveness::LiveUpdate, value_status::Used, false,
"If the compacting reader is enabled for streaming and repair (see enable_compacting_data_for_streaming_and_repair), allow it to garbage-collect tombstones."
@@ -1316,7 +1292,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, fd_initial_value_ms(this, "fd_initial_value_ms", value_status::Used, 2 * 1000, "The initial failure_detector interval time in milliseconds.")
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
, developer_mode(this, "developer_mode", value_status::Used, DEVELOPER_MODE_DEFAULT, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Deprecated, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user.")
, experimental_features(this, "experimental_features", value_status::Used, {}, experimental_features_help_string())
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step.")
@@ -1522,7 +1498,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, index_cache_fraction(this, "index_cache_fraction", liveness::LiveUpdate, value_status::Used, 0.2,
"The maximum fraction of cache memory permitted for use by index cache. Clamped to the [0.0; 1.0] range. Must be small enough to not deprive the row cache of memory, but should be big enough to fit a large fraction of the index. The default value 0.2 means that at least 80\% of cache memory is reserved for the row cache, while at most 20\% is usable by the index cache.")
, consistent_cluster_management(this, "consistent_cluster_management", value_status::Deprecated, true, "Use RAFT for cluster management and DDL.")
, force_gossip_topology_changes(this, "force_gossip_topology_changes", value_status::Deprecated, false, "Force gossip-based topology operations in a fresh cluster. Only the first node in the cluster must use it. The rest will fall back to gossip-based operations anyway. This option should be used only for testing. Note: gossip topology changes are incompatible with tablets.")
, force_gossip_topology_changes(this, "force_gossip_topology_changes", value_status::Used, false, "Force gossip-based topology operations in a fresh cluster. Only the first node in the cluster must use it. The rest will fall back to gossip-based operations anyway. This option should be used only for testing. Note: gossip topology changes are incompatible with tablets.")
, recovery_leader(this, "recovery_leader", liveness::LiveUpdate, value_status::Used, utils::null_uuid(), "Host ID of the node restarted first while performing the Manual Raft-based Recovery Procedure. Warning: this option disables some guardrails for the needs of the Manual Raft-based Recovery Procedure. Make sure you unset it at the end of the procedure.")
, wasm_cache_memory_fraction(this, "wasm_cache_memory_fraction", value_status::Used, 0.01, "Maximum total size of all WASM instances stored in the cache as fraction of total shard memory.")
, wasm_cache_timeout_in_ms(this, "wasm_cache_timeout_in_ms", value_status::Used, 5000, "Time after which an instance is evicted from the cache.")
@@ -1539,15 +1515,10 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Ignored if authentication tables already contain a super user password.")
, auth_certificate_role_queries(this, "auth_certificate_role_queries", value_status::Used, { { { "source", "SUBJECT" }, {"query", "CN=([^,]+)" } } },
"Regular expression used by CertificateAuthenticator to extract role name from an accepted transport authentication certificate subject info.")
, enable_create_table_with_compact_storage(this, "enable_create_table_with_compact_storage", liveness::LiveUpdate, value_status::Used, false, "Enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE. This feature will eventually be removed in a future version.")
, minimum_replication_factor_fail_threshold(this, "minimum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
, minimum_replication_factor_warn_threshold(this, "minimum_replication_factor_warn_threshold", liveness::LiveUpdate, value_status::Used, 3, "")
, maximum_replication_factor_fail_threshold(this, "maximum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
, maximum_replication_factor_warn_threshold(this, "maximum_replication_factor_warn_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
, replication_strategy_fail_list(this, "replication_strategy_fail_list", liveness::LiveUpdate, value_status::Used, {}, "Controls which replication strategies are disallowed to be used when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
, replication_strategy_warn_list(this, "replication_strategy_warn_list", liveness::LiveUpdate, value_status::Used, {locator::replication_strategy_type::simple}, "Controls which replication strategies to warn about when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
, write_consistency_levels_disallowed(this, "write_consistency_levels_disallowed", liveness::LiveUpdate, value_status::Used, {}, "A list of consistency levels that are not allowed for write operations. Requests using these levels will fail.")
, write_consistency_levels_warned(this, "write_consistency_levels_warned", liveness::LiveUpdate, value_status::Used, {}, "A list of consistency levels that will trigger a warning when used in write operations. Requests using these levels will contain a warning in the query response.")
, maximum_replication_factor_fail_threshold(this, "maximum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
, tablets_initial_scale_factor(this, "tablets_initial_scale_factor", liveness::LiveUpdate, value_status::Used, 10,
"Minimum average number of tablet replicas per shard per table. Suppressed by tablet options in table's schema: min_per_shard_tablet_count and min_tablet_count")
, tablets_per_shard_goal(this, "tablets_per_shard_goal", liveness::LiveUpdate, value_status::Used, 100,
@@ -1556,19 +1527,17 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Allows target tablet size to be configured. Defaults to 5G (in bytes). Maintaining tablets at reasonable sizes is important to be able to " \
"redistribute load. A higher value means tablet migration throughput can be reduced. A lower value may cause number of tablets to increase significantly, " \
"potentially resulting in performance drawbacks.")
, tablet_streaming_read_concurrency_per_shard(this, "tablet_streaming_read_concurrency_per_shard", liveness::LiveUpdate, value_status::Used, 2,
"Maximum number of tablets which may be leaving a shard at the same time. Effecting only on topology coordinator. Set to the same value on all nodes.")
, tablet_streaming_write_concurrency_per_shard(this, "tablet_streaming_write_concurrency_per_shard", liveness::LiveUpdate, value_status::Used, 2,
"Maximum number of tablets which may be pending on a shard at the same time. Effecting only on topology coordinator. Set to the same value on all nodes.")
, replication_strategy_warn_list(this, "replication_strategy_warn_list", liveness::LiveUpdate, value_status::Used, {locator::replication_strategy_type::simple}, "Controls which replication strategies to warn about when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
, replication_strategy_fail_list(this, "replication_strategy_fail_list", liveness::LiveUpdate, value_status::Used, {}, "Controls which replication strategies are disallowed to be used when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
, service_levels_interval(this, "service_levels_interval_ms", liveness::LiveUpdate, value_status::Used, 10000, "Controls how often service levels module polls configuration table")
, audit(this, "audit", value_status::Used, "table",
, audit(this, "audit", value_status::Used, "none",
"Controls the audit feature:\n"
"\n"
"\tnone : No auditing enabled.\n"
"\tsyslog : Audit messages sent to Syslog.\n"
"\ttable : Audit messages written to column family named audit.audit_log.\n")
, audit_categories(this, "audit_categories", liveness::LiveUpdate, value_status::Used, "DCL,AUTH,ADMIN", "Comma separated list of operation categories that should be audited.")
, audit_categories(this, "audit_categories", liveness::LiveUpdate, value_status::Used, "DCL,DDL,AUTH", "Comma separated list of operation categories that should be audited.")
, audit_tables(this, "audit_tables", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of table names (<keyspace>.<table>) that will be audited.")
, audit_keyspaces(this, "audit_keyspaces", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of keyspaces that will be audited. All tables in those keyspaces will be audited")
, audit_unix_socket_path(this, "audit_unix_socket_path", value_status::Used, "/dev/log", "The path to the unix socket used for writing to syslog. Only applicable when audit is set to syslog.")
@@ -1597,6 +1566,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, disk_space_monitor_high_polling_interval_in_seconds(this, "disk_space_monitor_high_polling_interval_in_seconds", value_status::Used, 1, "Disk-space polling interval at or above polling threshold")
, disk_space_monitor_polling_interval_threshold(this, "disk_space_monitor_polling_interval_threshold", value_status::Used, 0.9, "Disk-space polling threshold. Polling interval is increased when disk utilization is greater than or equal to this threshold")
, critical_disk_utilization_level(this, "critical_disk_utilization_level", liveness::LiveUpdate, value_status::Used, 0.98, "Disk utilization level above which mechanisms preventing a node getting out of space are activated")
, enable_create_table_with_compact_storage(this, "enable_create_table_with_compact_storage", liveness::LiveUpdate, value_status::Used, false, "Enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE. This feature will eventually be removed in a future version.")
, rf_rack_valid_keyspaces(this, "rf_rack_valid_keyspaces", liveness::MustRestart, value_status::Used, false,
"Enforce RF-rack-valid keyspaces. Additionally, if there are existing RF-rack-invalid "
"keyspaces, attempting to start a node with this option ON will fail. "
@@ -1869,30 +1839,6 @@ std::unordered_map<sstring, locator::replication_strategy_type> db::replication_
{"EverywhereStrategy", locator::replication_strategy_type::everywhere_topology}};
}
std::unordered_map<sstring, db::consistency_level> db::consistency_level_restriction_t::map() {
using cl = db::consistency_level;
std::unordered_map<sstring, cl> result = {
{"ANY", cl::ANY},
{"ONE", cl::ONE},
{"TWO", cl::TWO},
{"THREE", cl::THREE},
{"QUORUM", cl::QUORUM},
{"ALL", cl::ALL},
{"LOCAL_QUORUM", cl::LOCAL_QUORUM},
{"EACH_QUORUM", cl::EACH_QUORUM},
{"SERIAL", cl::SERIAL},
{"LOCAL_SERIAL", cl::LOCAL_SERIAL},
{"LOCAL_ONE", cl::LOCAL_ONE},
};
constexpr auto expected_size = static_cast<size_t>(cl::MAX_VALUE) - static_cast<size_t>(cl::MIN_VALUE) + 1;
if (result.size() != expected_size) {
on_internal_error_noexcept(dblog, format("consistency_level_option::map() has {} entries but expected {}", result.size(), expected_size));
}
return result;
}
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
std::vector<enum_option<db::experimental_features_t>> ret;
for (const auto& f : db::experimental_features_t::map()) {

View File

@@ -24,7 +24,6 @@
#include "utils/error_injection.hh"
#include "message/dict_trainer.hh"
#include "message/advanced_rpc_compressor.hh"
#include "db/consistency_level_type.hh"
#include "db/tri_mode_restriction.hh"
#include "sstables/compressor.hh"
@@ -127,10 +126,6 @@ struct replication_strategy_restriction_t {
static std::unordered_map<sstring, locator::replication_strategy_type> map(); // for enum_option<>
};
struct consistency_level_restriction_t {
static std::unordered_map<sstring, db::consistency_level> map(); // for enum_option<>
};
constexpr unsigned default_murmur3_partitioner_ignore_msb_bits = 12;
struct tablets_mode_t {
@@ -539,22 +534,17 @@ public:
named_value<std::vector<std::unordered_map<sstring, sstring>>> auth_certificate_role_queries;
// guardrails options
named_value<bool> enable_create_table_with_compact_storage;
named_value<int> minimum_replication_factor_fail_threshold;
named_value<int> minimum_replication_factor_warn_threshold;
named_value<int> maximum_replication_factor_fail_threshold;
named_value<int> maximum_replication_factor_warn_threshold;
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_fail_list;
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_warn_list;
named_value<std::vector<enum_option<consistency_level_restriction_t>>> write_consistency_levels_disallowed;
named_value<std::vector<enum_option<consistency_level_restriction_t>>> write_consistency_levels_warned;
named_value<int> maximum_replication_factor_fail_threshold;
named_value<double> tablets_initial_scale_factor;
named_value<unsigned> tablets_per_shard_goal;
named_value<uint64_t> target_tablet_size_in_bytes;
named_value<unsigned> tablet_streaming_read_concurrency_per_shard;
named_value<unsigned> tablet_streaming_write_concurrency_per_shard;
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_warn_list;
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_fail_list;
named_value<uint32_t> service_levels_interval;
@@ -606,6 +596,8 @@ public:
named_value<float> disk_space_monitor_polling_interval_threshold;
named_value<float> critical_disk_utilization_level;
named_value<bool> enable_create_table_with_compact_storage;
named_value<bool> rf_rack_valid_keyspaces;
named_value<bool> enforce_rack_list;

View File

@@ -154,10 +154,7 @@ hint_sender::~hint_sender() {
future<> hint_sender::stop(drain should_drain) noexcept {
seastar::thread_attributes attr;
attr.sched_group = _hints_cpu_sched_group;
return seastar::async(std::move(attr), [this, should_drain] {
return seastar::async([this, should_drain] {
set_stopping();
_stop_as.request_abort();
_stopped.get();

View File

@@ -16,7 +16,6 @@
#include <string>
#include <tuple>
#include "cql3/cql3_type.hh"
#include "types/user.hh"
#include "types/map.hh"
#include "types/list.hh"
@@ -114,7 +113,7 @@ std::vector<data_type> type_parser::get_type_parameters(bool multicell)
throw parse_exception(_str, _idx, "unexpected end of string");
}
std::tuple<data_type, vector_dimension_t> type_parser::get_vector_parameters()
std::tuple<data_type, size_t> type_parser::get_vector_parameters()
{
if (is_eos() || _str[_idx] != '(') {
throw std::logic_error("internal error");
@@ -129,7 +128,7 @@ std::tuple<data_type, vector_dimension_t> type_parser::get_vector_parameters()
}
data_type type = do_parse(true);
vector_dimension_t size = 0;
size_t size = 0;
if (_str[_idx] == ',') {
++_idx;
skip_blank();
@@ -143,20 +142,7 @@ std::tuple<data_type, vector_dimension_t> type_parser::get_vector_parameters()
throw parse_exception(_str, _idx, "expected digit or ')'");
}
unsigned long parsed_size;
try {
parsed_size = std::stoul(_str.substr(i, _idx - i));
} catch (const std::exception& e) {
throw parse_exception(_str, i, format("Invalid vector dimension: {}", e.what()));
}
static_assert(sizeof(unsigned long) >= sizeof(vector_dimension_t));
if (parsed_size == 0) {
throw parse_exception(_str, _idx, "Vectors must have a dimension greater than 0");
}
if (parsed_size > cql3::cql3_type::MAX_VECTOR_DIMENSION) {
throw parse_exception(_str, _idx, format("Vectors must have a dimension less than or equal to {}", cql3::cql3_type::MAX_VECTOR_DIMENSION));
}
size = static_cast<vector_dimension_t>(parsed_size);
size = std::stoul(_str.substr(i, _idx - i));
++_idx; // skipping ')'
return std::make_tuple(type, size);

View File

@@ -97,7 +97,7 @@ public:
}
#endif
std::vector<data_type> get_type_parameters(bool multicell=true);
std::tuple<data_type, vector_dimension_t> get_vector_parameters();
std::tuple<data_type, size_t> get_vector_parameters();
std::tuple<sstring, bytes, std::vector<bytes>, std::vector<data_type>> get_user_type_parameters();
data_type do_parse(bool multicell = true);

View File

@@ -125,7 +125,7 @@ class read_context final : public enable_lw_shared_from_this<read_context> {
tracing::trace_state_ptr _trace_state;
mutation_reader::forwarding _fwd_mr;
bool _range_query;
tombstone_gc_state _tombstone_gc_state;
const tombstone_gc_state* _tombstone_gc_state;
max_purgeable_fn _get_max_purgeable;
// When reader enters a partition, it must be set up for reading that
// partition from the underlying mutation source (_underlying) in one of two ways:
@@ -149,7 +149,7 @@ public:
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tombstone_gc_state gc_state,
const tombstone_gc_state* gc_state,
max_purgeable_fn get_max_purgeable,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
@@ -161,7 +161,7 @@ public:
, _trace_state(std::move(trace_state))
, _fwd_mr(fwd_mr)
, _range_query(!query::is_single_partition(range))
, _tombstone_gc_state(std::move(gc_state))
, _tombstone_gc_state(gc_state)
, _get_max_purgeable(std::move(get_max_purgeable))
, _underlying(_cache, *this)
{
@@ -197,7 +197,7 @@ public:
bool partition_exists() const { return _partition_exists; }
void on_underlying_created() { ++_underlying_created; }
bool digest_requested() const { return _slice.options.contains<query::partition_slice::option::with_digest>(); }
const tombstone_gc_state& tombstone_gc_state() const { return _tombstone_gc_state; }
const tombstone_gc_state* tombstone_gc_state() const { return _tombstone_gc_state; }
max_purgeable get_max_purgeable(const dht::decorated_key& dk, is_shadowable is) const { return _get_max_purgeable(dk, is); }
public:
future<> ensure_underlying() {

View File

@@ -775,7 +775,7 @@ row_cache::make_reader_opt(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tombstone_gc_state gc_state,
const tombstone_gc_state* gc_state,
max_purgeable_fn get_max_purgeable,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,

View File

@@ -373,7 +373,7 @@ public:
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no,
tombstone_gc_state gc_state = tombstone_gc_state::no_gc(),
const tombstone_gc_state* gc_state = nullptr,
max_purgeable_fn get_max_purgeable = can_never_purge) {
if (auto reader_opt = make_reader_opt(s, permit, range, slice, gc_state, std::move(get_max_purgeable), std::move(trace_state), fwd, fwd_mr)) {
return std::move(*reader_opt);
@@ -386,7 +386,7 @@ public:
reader_permit permit,
const dht::partition_range&,
const query::partition_slice&,
tombstone_gc_state,
const tombstone_gc_state*,
max_purgeable_fn get_max_purgeable,
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
@@ -395,7 +395,7 @@ public:
mutation_reader make_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range = query::full_partition_range,
tombstone_gc_state gc_state = tombstone_gc_state::no_gc(),
const tombstone_gc_state* gc_state = nullptr,
max_purgeable_fn get_max_purgeable = can_never_purge) {
auto& full_slice = s->full_slice();
return make_reader(std::move(s), std::move(permit), range, full_slice, nullptr,

View File

@@ -1139,17 +1139,14 @@ future<> schema_applier::finalize_tables_and_views() {
// was already dropped (see https://github.com/scylladb/scylla/issues/5614)
for (auto& dropped_view : diff.tables_and_views.local().views.dropped) {
auto s = dropped_view.get();
co_await _ss.local().on_cleanup_for_drop_table(s->id());
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
}
for (auto& dropped_table : diff.tables_and_views.local().tables.dropped) {
auto s = dropped_table.get();
co_await _ss.local().on_cleanup_for_drop_table(s->id());
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
}
for (auto& dropped_cdc : diff.tables_and_views.local().cdc.dropped) {
auto s = dropped_cdc.get();
co_await _ss.local().on_cleanup_for_drop_table(s->id());
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
}

View File

@@ -105,7 +105,7 @@ namespace {
schema_builder::register_schema_initializer([](schema_builder& builder) {
if (builder.ks_name() == schema_tables::NAME) {
// all schema tables are group0 tables
builder.set_is_group0_table();
builder.set_is_group0_table(true);
}
});
}
@@ -2840,15 +2840,20 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks.query_processor().execute_internal(
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) {
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
{table_id.uuid(), version.uuid()},
cql3::query_processor::cache_internal::no
);
if (results->empty()) {
co_return std::nullopt;
// If we don't have a stored column_mapping for an obsolete schema version
// then it means it's way too old and been cleaned up already.
// Fail the whole learn stage in this case.
co_await coroutine::return_exception(std::runtime_error(
format("Failed to look up column mapping for schema version {}",
version)));
}
std::vector<column_definition> static_columns, regular_columns;
for (const auto& row : *results) {
@@ -2876,18 +2881,6 @@ future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_ke
co_return std::move(cm);
}
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) {
auto cm_opt = co_await schema_tables::get_column_mapping_if_exists(sys_ks, table_id, version);
if (!cm_opt) {
// If we don't have a stored column_mapping for an obsolete schema version
// then it means it's way too old and been cleaned up already.
co_await coroutine::return_exception(std::runtime_error(
format("Failed to look up column mapping for schema version {}",
version)));
}
co_return std::move(*cm_opt);
}
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
GET_COLUMN_MAPPING_QUERY,

View File

@@ -320,8 +320,6 @@ std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const ss
future<> store_column_mapping(sharded<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl);
/// Query column mapping for a given version of the table locally.
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
/// Returns the same result as `get_column_mapping()` wrapped in optional and returns nullopt if the mapping doesn't exist.
future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
/// Check that column mapping exists for a given version of the table
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
/// Delete matching column mapping entries from the `system.scylla_table_schema_history` table

View File

@@ -21,16 +21,14 @@
#include "replica/database.hh"
#include "replica/global_table_ptr.hh"
#include "sstables/sstables_manager.hh"
#include "service/storage_proxy.hh"
logging::logger snap_log("snapshots");
namespace db {
snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, sharded<service::storage_proxy>& sp, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg)
snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg)
: _config(std::move(cfg))
, _db(db)
, _sp(sp)
, _ops("snapshot_ctl")
, _task_manager_module(make_shared<snapshot::task_manager_module>(tm))
, _storage_manager(sstm)
@@ -106,45 +104,6 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
});
}
future<> snapshot_ctl::take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
if (tag.empty()) {
throw std::invalid_argument("You must supply a snapshot name.");
}
if (ks_names.size() != 1 && !tables.empty()) {
throw std::invalid_argument("Cannot name tables when doing multiple keyspaces snapshot");
}
if (ks_names.empty()) {
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(ks_names));
}
return run_snapshot_modify_operation([this, ks_names = std::move(ks_names), tables = std::move(tables), tag = std::move(tag), opts] () mutable {
return do_take_cluster_column_family_snapshot(std::move(ks_names), std::move(tables), std::move(tag), opts);
});
}
future<> snapshot_ctl::do_take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
if (tables.empty()) {
co_await coroutine::parallel_for_each(ks_names, [tag, this] (const auto& ks_name) {
return check_snapshot_not_exist(ks_name, tag);
});
co_await _sp.local().snapshot_keyspace(
ks_names | std::views::transform([&](auto& ks) { return std::make_pair(ks, sstring{}); })
| std::ranges::to<std::unordered_multimap>(),
tag, opts
);
co_return;
};
auto ks = ks_names[0];
co_await check_snapshot_not_exist(ks, tag, tables);
co_await _sp.local().snapshot_keyspace(
tables | std::views::transform([&](auto& cf) { return std::make_pair(ks, cf); })
| std::ranges::to<std::unordered_multimap>(),
tag, opts
);
}
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
co_await check_snapshot_not_exist(ks_name, tag, tables);
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
@@ -226,4 +185,4 @@ future<int64_t> snapshot_ctl::true_snapshots_size(sstring ks, sstring cf) {
}));
}
}
}

View File

@@ -24,7 +24,6 @@
using namespace seastar;
namespace sstables { class storage_manager; }
namespace service { class storage_proxy; }
namespace db {
@@ -64,7 +63,7 @@ public:
using db_snapshot_details = std::vector<table_snapshot_details_ext>;
snapshot_ctl(sharded<replica::database>& db, sharded<service::storage_proxy>&, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg);
snapshot_ctl(sharded<replica::database>& db, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg);
future<> stop();
@@ -96,17 +95,6 @@ public:
*/
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
/**
* Takes the snapshot of multiple tables or a whole keyspace, or all keyspaces,
* using global, clusterwide topology coordinated op.
* A snapshot name must be specified.
*
* @param ks_names the keyspaces to snapshot
* @param tables optional - a vector of tables names to snapshot
* @param tag the tag given to the snapshot; may not be null or empty
*/
future<> take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
/**
* Remove the snapshot with the given name from the given keyspaces.
* If no tag is specified we will remove all snapshots.
@@ -123,7 +111,6 @@ public:
private:
config _config;
sharded<replica::database>& _db;
sharded<service::storage_proxy>& _sp;
seastar::rwlock _lock;
seastar::named_gate _ops;
shared_ptr<snapshot::task_manager_module> _task_manager_module;
@@ -146,7 +133,6 @@ private:
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
future<> do_take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
};
}
}

View File

@@ -770,6 +770,13 @@ system_distributed_keyspace::get_cdc_desc_v1_timestamps(context ctx) {
co_return res;
}
bool system_distributed_keyspace::workload_prioritization_tables_exists() {
auto wp_table = get_updated_service_levels(_qp.db(), true);
auto table = _qp.db().try_find_table(NAME, wp_table->cf_name());
return table && table->schema()->equal_columns(*wp_table);
}
future<qos::service_levels_info> system_distributed_keyspace::get_service_levels(qos::query_context ctx) const {
return qos::get_service_levels(_qp, NAME, SERVICE_LEVELS, db::consistency_level::ONE, ctx);
}

View File

@@ -117,6 +117,7 @@ public:
future<qos::service_levels_info> get_service_level(sstring service_level_name) const;
future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const;
future<> drop_service_level(sstring service_level_name) const;
bool workload_prioritization_tables_exists();
private:
future<> create_tables(std::vector<schema_ptr> tables);

View File

@@ -87,15 +87,31 @@ namespace {
static const std::unordered_set<sstring> tables = {
schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY,
system_keyspace::BROADCAST_KV_STORE,
system_keyspace::CDC_GENERATIONS_V3,
system_keyspace::RAFT,
system_keyspace::RAFT_SNAPSHOTS,
system_keyspace::RAFT_SNAPSHOT_CONFIG,
system_keyspace::GROUP0_HISTORY,
system_keyspace::DISCOVERY,
system_keyspace::TABLETS,
system_keyspace::TOPOLOGY,
system_keyspace::TOPOLOGY_REQUESTS,
system_keyspace::LOCAL,
system_keyspace::PEERS,
system_keyspace::SCYLLA_LOCAL,
system_keyspace::COMMITLOG_CLEANUPS,
system_keyspace::SERVICE_LEVELS_V2,
system_keyspace::VIEW_BUILD_STATUS_V2,
system_keyspace::CDC_STREAMS_STATE,
system_keyspace::CDC_STREAMS_HISTORY,
system_keyspace::ROLES,
system_keyspace::ROLE_MEMBERS,
system_keyspace::ROLE_ATTRIBUTES,
system_keyspace::ROLE_PERMISSIONS,
system_keyspace::CDC_LOCAL,
system_keyspace::DICTS,
system_keyspace::VIEW_BUILDING_TASKS,
system_keyspace::CLIENT_ROUTES,
};
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
builder.enable_schema_commitlog();
@@ -127,7 +143,7 @@ namespace {
system_keyspace::REPAIR_TASKS,
};
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
builder.set_is_group0_table();
builder.set_is_group0_table(true);
}
});
}
@@ -319,10 +335,6 @@ schema_ptr system_keyspace::topology_requests() {
.with_column("truncate_table_id", uuid_type)
.with_column("new_keyspace_rf_change_ks_name", utf8_type)
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false))
.with_column("snapshot_table_ids", set_type_impl::get_instance(uuid_type, false))
.with_column("snapshot_tag", utf8_type)
.with_column("snapshot_expiry", timestamp_type)
.with_column("snapshot_skip_flush", boolean_type)
.set_comment("Topology request tracking")
.with_hash_version()
.build();
@@ -400,7 +412,26 @@ schema_ptr system_keyspace::cdc_streams_history() {
}
schema_ptr system_keyspace::raft() {
static thread_local auto schema = replica::make_raft_schema(db::system_keyspace::RAFT, true);
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, RAFT);
return schema_builder(NAME, RAFT, std::optional(id))
.with_column("group_id", timeuuid_type, column_kind::partition_key)
// raft log part
.with_column("index", long_type, column_kind::clustering_key)
.with_column("term", long_type)
.with_column("data", bytes_type) // decltype(raft::log_entry::data) - serialized variant
// persisted term and vote
.with_column("vote_term", long_type, column_kind::static_column)
.with_column("vote", uuid_type, column_kind::static_column)
// id of the most recent persisted snapshot
.with_column("snapshot_id", uuid_type, column_kind::static_column)
.with_column("commit_idx", long_type, column_kind::static_column)
.set_comment("Persisted RAFT log, votes and snapshot info")
.with_hash_version()
.set_caching_options(caching_options::get_disabled_caching_options())
.build();
}();
return schema;
}
@@ -408,32 +439,35 @@ schema_ptr system_keyspace::raft() {
// on user-provided state machine and could be stored anywhere else in any other form.
// This should be seen as a snapshot descriptor, instead.
schema_ptr system_keyspace::raft_snapshots() {
static thread_local auto schema = replica::make_raft_snapshots_schema(db::system_keyspace::RAFT_SNAPSHOTS, true);
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, RAFT_SNAPSHOTS);
return schema_builder(NAME, RAFT_SNAPSHOTS, std::optional(id))
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("snapshot_id", uuid_type)
// Index and term of last entry in the snapshot
.with_column("idx", long_type)
.with_column("term", long_type)
.set_comment("Persisted RAFT snapshot descriptors info")
.with_hash_version()
.build();
}();
return schema;
}
schema_ptr system_keyspace::raft_snapshot_config() {
static thread_local auto schema = replica::make_raft_snapshot_config_schema(db::system_keyspace::RAFT_SNAPSHOT_CONFIG, true);
return schema;
}
static thread_local auto schema = [] {
auto id = generate_legacy_id(system_keyspace::NAME, RAFT_SNAPSHOT_CONFIG);
return schema_builder(system_keyspace::NAME, RAFT_SNAPSHOT_CONFIG, std::optional(id))
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("disposition", ascii_type, column_kind::clustering_key) // can be 'CURRENT` or `PREVIOUS'
.with_column("server_id", uuid_type, column_kind::clustering_key)
.with_column("can_vote", boolean_type)
// Raft tables for strongly consistent tablets.
// These tables have partition keys of the form (shard, group_id), allowing the data
// to be co-located with the tablet replica that owns the raft group.
// The raft_groups_partitioner creates tokens that map to the specified shard.
schema_ptr system_keyspace::raft_groups() {
static thread_local auto schema = replica::make_raft_schema(db::system_keyspace::RAFT_GROUPS, false);
return schema;
}
schema_ptr system_keyspace::raft_groups_snapshots() {
static thread_local auto schema = replica::make_raft_snapshots_schema(db::system_keyspace::RAFT_GROUPS_SNAPSHOTS, false);
return schema;
}
schema_ptr system_keyspace::raft_groups_snapshot_config() {
static thread_local auto schema = replica::make_raft_snapshot_config_schema(db::system_keyspace::RAFT_GROUPS_SNAPSHOT_CONFIG, false);
.set_comment("RAFT configuration for the latest snapshot descriptor")
.with_hash_version()
.build();
}();
return schema;
}
@@ -1680,9 +1714,7 @@ std::unordered_set<dht::token> decode_tokens(const set_type_impl::native_type& t
std::unordered_set<dht::token> tset;
for (auto& t: tokens) {
auto str = value_cast<sstring>(t);
if (str != dht::token::from_sstring(str).to_sstring()) {
on_internal_error(slogger, format("decode_tokens: invalid token string '{}'", str));
}
SCYLLA_ASSERT(str == dht::token::from_sstring(str).to_sstring());
tset.insert(dht::token::from_sstring(str));
}
return tset;
@@ -2278,29 +2310,21 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
r.insert(r.end(), {sstables_registry()});
}
if (cfg.check_experimental(db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES)) {
r.insert(r.end(), {raft_groups(), raft_groups_snapshots(), raft_groups_snapshot_config()});
}
return r;
}
static bool maybe_write_in_user_memory(schema_ptr s, replica::database& db) {
bool strongly_consistent = db.get_config().check_experimental(db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES);
static bool maybe_write_in_user_memory(schema_ptr s) {
return (s.get() == system_keyspace::batchlog().get())
|| (s.get() == system_keyspace::batchlog_v2().get())
|| (s.get() == system_keyspace::paxos().get())
|| s == system_keyspace::scylla_views_builds_in_progress()
|| (strongly_consistent && s == system_keyspace::raft_groups())
|| (strongly_consistent && s == system_keyspace::raft_groups_snapshots())
|| (strongly_consistent && s == system_keyspace::raft_groups_snapshot_config());
|| s == system_keyspace::scylla_views_builds_in_progress();
}
future<> system_keyspace::make(
locator::effective_replication_map_factory& erm_factory,
replica::database& db) {
for (auto&& table : system_keyspace::all_tables(db.get_config())) {
co_await db.create_local_system_table(table, maybe_write_in_user_memory(table, db), erm_factory);
co_await db.create_local_system_table(table, maybe_write_in_user_memory(table), erm_factory);
co_await db.find_column_family(table).init_storage();
}
@@ -3167,7 +3191,7 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
};
}
} else if (must_have_tokens(nstate)) {
on_internal_error(slogger, format(
on_fatal_internal_error(slogger, format(
"load_topology_state: node {} in {} state but missing ring slice", host_id, nstate));
}
}
@@ -3249,7 +3273,7 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
// Currently, at most one node at a time can be in transitioning state.
if (!map->empty()) {
const auto& [other_id, other_rs] = *map->begin();
on_internal_error(slogger, format(
on_fatal_internal_error(slogger, format(
"load_topology_state: found two nodes in transitioning state: {} in {} state and {} in {} state",
other_id, other_rs.state, host_id, nstate));
}
@@ -3307,7 +3331,8 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
format("SELECT count(range_end) as cnt FROM {}.{} WHERE key = '{}' AND id = ?",
NAME, CDC_GENERATIONS_V3, cdc::CDC_GENERATIONS_V3_KEY),
gen_id.id);
if (!gen_rows || gen_rows->empty()) {
SCYLLA_ASSERT(gen_rows);
if (gen_rows->empty()) {
on_internal_error(slogger, format(
"load_topology_state: last committed CDC generation time UUID ({}) present, but data missing", gen_id.id));
}
@@ -3555,18 +3580,6 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
entry.new_keyspace_rf_change_ks_name = row.get_as<sstring>("new_keyspace_rf_change_ks_name");
entry.new_keyspace_rf_change_data = row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
}
if (row.has("snapshot_table_ids")) {
entry.snapshot_tag = row.get_as<sstring>("snapshot_tag");
entry.snapshot_skip_flush = row.get_as<bool>("snapshot_skip_flush");
entry.snapshot_table_ids = row.get_set<utils::UUID>("snapshot_table_ids")
| std::views::transform([](auto& uuid) { return table_id(uuid); })
| std::ranges::to<std::unordered_set>()
;
;
if (row.has("snapshot_expiry")) {
entry.snapshot_expiry = row.get_as<db_clock::time_point>("snapshot_expiry");
}
}
return entry;
}

View File

@@ -191,9 +191,6 @@ public:
static constexpr auto RAFT = "raft";
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
static constexpr auto RAFT_SNAPSHOT_CONFIG = "raft_snapshot_config";
static constexpr auto RAFT_GROUPS = "raft_groups";
static constexpr auto RAFT_GROUPS_SNAPSHOTS = "raft_groups_snapshots";
static constexpr auto RAFT_GROUPS_SNAPSHOT_CONFIG = "raft_groups_snapshot_config";
static constexpr auto REPAIR_HISTORY = "repair_history";
static constexpr auto REPAIR_TASKS = "repair_tasks";
static constexpr auto GROUP0_HISTORY = "group0_history";
@@ -218,8 +215,6 @@ public:
static constexpr auto BUILT_VIEWS = "built_views";
static constexpr auto SCYLLA_VIEWS_BUILDS_IN_PROGRESS = "scylla_views_builds_in_progress";
static constexpr auto CDC_LOCAL = "cdc_local";
static constexpr auto CDC_TIMESTAMPS = "cdc_timestamps";
static constexpr auto CDC_STREAMS = "cdc_streams";
// auth
static constexpr auto ROLES = "roles";
@@ -247,9 +242,6 @@ public:
static schema_ptr scylla_local();
static schema_ptr raft();
static schema_ptr raft_snapshots();
static schema_ptr raft_groups();
static schema_ptr raft_groups_snapshots();
static schema_ptr raft_groups_snapshot_config();
static schema_ptr repair_history();
static schema_ptr repair_tasks();
static schema_ptr group0_history();
@@ -423,10 +415,6 @@ public:
std::optional<sstring> new_keyspace_rf_change_ks_name;
// The KS options to be used when executing the scheduled ALTER KS statement
std::optional<std::unordered_map<sstring, sstring>> new_keyspace_rf_change_data;
std::optional<std::unordered_set<table_id>> snapshot_table_ids;
std::optional<sstring> snapshot_tag;
std::optional<db_clock::time_point> snapshot_expiry;
bool snapshot_skip_flush;
};
using topology_requests_entries = std::unordered_map<utils::UUID, system_keyspace::topology_requests_entry>;

Some files were not shown because too many files have changed in this diff Show More