Merge 'Add per-table tablet options in schema' from Benny Halevy

This series extends the table schema with per-table tablet options.
The options are used as hints for initial tablet allocation on table creation and later for resize (split or merge) decisions,
when the table size changes.

* New feature, no backport required

Closes scylladb/scylladb#22090

* github.com:scylladb/scylladb:
  tablets: resize_decision: get rid of initial_decision
  tablet_allocator: consider tablet options for resize decision
  tablet_allocator: load_balancer: table_size_desc: keep target_tablet_size as member
  network_topology_strategy: allocate_tablets_for_new_table: consider tablet options
  network_topology_strategy: calculate_initial_tablets_from_topology: precalculate shards per dc using for_each_token_owner
  network_topology_strategy: calculate_initial_tablets_from_topology: set default rf to 0
  cql3: data_dictionary: format keyspace_metadata: print "enabled":true when initial_tablets=0
  cql3/create_keyspace_statement: add deprecation warning for initial tablets
  test: cqlpy: test_tablets: add tests for per-table tablet options
  schema: add per-table tablet options
  feature_service: add TABLET_OPTIONS cluster schema feature
This commit is contained in:
Avi Kivity
2025-02-08 20:32:19 +02:00
32 changed files with 610 additions and 109 deletions

View File

@@ -1012,6 +1012,7 @@ scylla_core = (['message/messaging_service.cc',
'db/view/view_update_generator.cc',
'db/virtual_table.cc',
'db/virtual_tables.cc',
'db/tablet_options.cc',
'index/secondary_index_manager.cc',
'index/secondary_index.cc',
'utils/UUID_gen.cc',

View File

@@ -9,6 +9,7 @@
*/
#include "cql3/statements/cf_prop_defs.hh"
#include "cql3/statements/request_validations.hh"
#include "data_dictionary/data_dictionary.hh"
#include "db/extensions.hh"
#include "db/tags/extension.hh"
@@ -20,6 +21,7 @@
#include "tombstone_gc.hh"
#include "db/per_partition_rate_limit_extension.hh"
#include "db/per_partition_rate_limit_options.hh"
#include "db/tablet_options.hh"
#include "utils/bloom_calculations.hh"
#include <boost/algorithm/string/predicate.hpp>
@@ -52,6 +54,8 @@ const sstring cf_prop_defs::COMPACTION_STRATEGY_CLASS_KEY = "class";
const sstring cf_prop_defs::COMPACTION_ENABLED_KEY = "enabled";
const sstring cf_prop_defs::KW_TABLETS = "tablets";
schema::extensions_map cf_prop_defs::make_schema_extensions(const db::extensions& exts) const {
schema::extensions_map er;
for (auto& p : exts.schema_extensions()) {
@@ -68,6 +72,14 @@ schema::extensions_map cf_prop_defs::make_schema_extensions(const db::extensions
return er;
}
data_dictionary::keyspace cf_prop_defs::find_keyspace(const data_dictionary::database db, std::string_view ks_name) {
try {
return db.find_keyspace(ks_name);
} catch (const data_dictionary::no_such_keyspace& e) {
throw request_validations::invalid_request("{}", e.what());
}
}
void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name, const schema::extensions_map& schema_extensions) const {
// Skip validation if the comapction strategy class is already set as it means we've already
// prepared (and redoing it would set strategyClass back to null, which we don't want)
@@ -75,13 +87,15 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
return;
}
const auto& ks = find_keyspace(db, ks_name);
static std::set<sstring> keywords({
KW_COMMENT,
KW_GCGRACESECONDS, KW_CACHING, KW_DEFAULT_TIME_TO_LIVE,
KW_MIN_INDEX_INTERVAL, KW_MAX_INDEX_INTERVAL, KW_SPECULATIVE_RETRY,
KW_BF_FP_CHANCE, KW_MEMTABLE_FLUSH_PERIOD, KW_COMPACTION,
KW_COMPRESSION, KW_CRC_CHECK_CHANCE, KW_ID, KW_PAXOSGRACESECONDS,
KW_SYNCHRONOUS_UPDATES
KW_SYNCHRONOUS_UPDATES, KW_TABLETS,
});
static std::set<sstring> obsolete_keywords({
sstring("index_interval"),
@@ -162,6 +176,16 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
}
speculative_retry::from_sstring(get_string(KW_SPECULATIVE_RETRY, speculative_retry(speculative_retry::type::NONE, 0).to_sstring()));
if (auto tablet_options_map = get_tablet_options()) {
if (!ks.uses_tablets()) {
throw exceptions::configuration_exception("tablet options cannot be used when tablets are disabled for the keyspace");
}
if (!db.features().tablet_options) {
throw exceptions::configuration_exception("tablet options cannot be used until all nodes in the cluster enable this feature");
}
db::tablet_options::validate(*tablet_options_map);
}
}
std::map<sstring, sstring> cf_prop_defs::get_compaction_type_options() const {
@@ -252,6 +276,13 @@ const db::per_partition_rate_limit_options* cf_prop_defs::get_per_partition_rate
return &ext->get_options();
}
std::optional<db::tablet_options::map_type> cf_prop_defs::get_tablet_options() const {
if (auto tablet_options = get_map(KW_TABLETS)) {
return tablet_options.value();
}
return std::nullopt;
}
void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions, const data_dictionary::database& db, sstring ks_name) const {
if (has_property(KW_COMMENT)) {
builder.set_comment(get_string(KW_COMMENT, ""));
@@ -351,6 +382,10 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
}
if (auto tablet_options_opt = get_map(KW_TABLETS)) {
builder.set_tablet_options(std::move(*tablet_options_opt));
}
}
void cf_prop_defs::validate_minimum_int(const sstring& field, int32_t minimum_value, int32_t default_value) const

View File

@@ -18,12 +18,14 @@
namespace data_dictionary {
class database;
class keyspace;
}
class tombstone_gc_options;
namespace db {
class extensions;
class tablet_options;
}
namespace cdc {
class options;
@@ -60,6 +62,8 @@ public:
static const sstring COMPACTION_STRATEGY_CLASS_KEY;
static const sstring COMPACTION_ENABLED_KEY;
static const sstring KW_TABLETS;
// FIXME: In origin the following consts are in CFMetaData.
static constexpr int32_t DEFAULT_DEFAULT_TIME_TO_LIVE = 0;
static constexpr int32_t DEFAULT_MIN_INDEX_INTERVAL = 128;
@@ -70,6 +74,7 @@ public:
private:
mutable std::optional<sstables::compaction_strategy_type> _compaction_strategy_class;
static data_dictionary::keyspace find_keyspace(const data_dictionary::database db, std::string_view ks_name);
public:
std::optional<sstables::compaction_strategy_type> get_compaction_strategy_class() const;
@@ -103,6 +108,7 @@ public:
int32_t get_paxos_grace_seconds() const;
std::optional<table_id> get_id() const;
bool get_synchronous_updates_flag() const;
std::optional<db::tablet_options::map_type> get_tablet_options() const;
void apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions, const data_dictionary::database& db, sstring ks_name) const;
void validate_minimum_int(const sstring& field, int32_t minimum_value, int32_t default_value) const;

View File

@@ -115,6 +115,9 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector
"To use CDC, LWT or counters, drop this keyspace and re-create it "
"without tablets by adding AND TABLETS = {'enabled': false} "
"to the CREATE KEYSPACE statement.");
if (ksm->initial_tablets().value()) {
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
}
}
} catch (const exceptions::already_exists_exception& e) {
if (!_if_not_exists) {

View File

@@ -55,6 +55,11 @@ keyspace::is_internal() const {
return _ops->is_internal(*this);
}
bool
keyspace::uses_tablets() const {
return metadata()->uses_tablets();
}
const locator::abstract_replication_strategy&
keyspace::get_replication_strategy() const {
return _ops->get_replication_strategy(*this);
@@ -457,7 +462,11 @@ auto fmt::formatter<data_dictionary::keyspace_metadata>::format(const data_dicti
fmt::format_to(ctx.out(), "KSMetaData{{name={}, strategyClass={}, strategyOptions={}, cfMetaData={}, durable_writes={}, tablets=",
m.name(), m.strategy_name(), m.strategy_options(), m.cf_meta_data(), m.durable_writes());
if (m.initial_tablets()) {
fmt::format_to(ctx.out(), "{{\"initial\":{}}}", m.initial_tablets().value());
if (auto initial_tablets = m.initial_tablets().value()) {
fmt::format_to(ctx.out(), "{{\"initial\":{}}}", initial_tablets);
} else {
fmt::format_to(ctx.out(), "{{\"enabled\":true}}");
}
} else {
fmt::format_to(ctx.out(), "{{\"enabled\":false}}");
}

View File

@@ -87,6 +87,7 @@ private:
keyspace(const impl* ops, const void* keyspace);
public:
bool is_internal() const;
bool uses_tablets() const;
lw_shared_ptr<keyspace_metadata> metadata() const;
const user_types_metadata& user_types() const;
const locator::abstract_replication_strategy& get_replication_strategy() const;

View File

@@ -65,6 +65,9 @@ public:
std::optional<unsigned> initial_tablets() const {
return _initial_tablets;
}
bool uses_tablets() const noexcept {
return _initial_tablets.has_value();
}
const std::unordered_map<sstring, schema_ptr>& cf_meta_data() const {
return _cf_meta_data;
}

View File

@@ -38,7 +38,8 @@ target_sources(db
snapshot/backup_task.cc
rate_limiter.cc
per_partition_rate_limit_options.cc
row_cache.cc)
row_cache.cc,
tablet_options.cc)
target_include_directories(db
PUBLIC
${CMAKE_SOURCE_DIR})

View File

@@ -30,6 +30,9 @@ enum class schema_feature {
// Unused. Defined for backward compatibility only
IN_MEMORY_TABLES,
// Per-table tablet options
TABLET_OPTIONS,
};
using schema_features = enum_set<super_enum<schema_feature,
@@ -39,7 +42,8 @@ using schema_features = enum_set<super_enum<schema_feature,
schema_feature::SCYLLA_AGGREGATES,
schema_feature::TABLE_DIGEST_INSENSITIVE_TO_EXPIRY,
schema_feature::GROUP0_SCHEMA_VERSIONING,
schema_feature::IN_MEMORY_TABLES
schema_feature::IN_MEMORY_TABLES,
schema_feature::TABLET_OPTIONS
>>;
}

View File

@@ -334,6 +334,11 @@ schema_ptr scylla_tables(schema_features features) {
// In this case, for non-system tables, `version` is null and `schema::version()` will be a hash.
sb.with_column("committed_by_group0", boolean_type);
}
// It is safe to add the `tablets` column unconditionally,
// since it is written to only after the cluster feature is enabled.
sb.with_column("tablets", map_type_impl::get_instance(utf8_type, utf8_type, false));
sb.with_hash_version();
s = sb.build();
}
@@ -1731,6 +1736,19 @@ mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type times
auto& cdef = *scylla_tables()->get_column_definition("partitioner");
m.set_clustered_cell(ckey, cdef, atomic_cell::make_dead(timestamp, gc_clock::now()));
}
// A table will have engaged tablet options
// only after they were set by CREATE TABLE or ALTER TABLE,
// Meaning the cluster feature is enabled, so it is safe to write
// to this columns.
if (table->has_tablet_options()) {
auto& map = table->raw_tablet_options();
auto& cdef = *scylla_tables()->get_column_definition("tablets");
if (map.empty()) {
m.set_clustered_cell(ckey, cdef, atomic_cell::make_dead(timestamp, gc_clock::now()));
} else {
m.set_clustered_cell(ckey, cdef, make_map_mutation(map, cdef, timestamp));
}
}
// In-memory tables are deprecated since scylla-2024.1.0
// FIXME: delete the column when there's no live version supporting it anymore.
// Writing it here breaks upgrade rollback to versions that do not support the in_memory schema_feature
@@ -2152,6 +2170,19 @@ static void prepare_builder_from_table_row(const schema_ctxt& ctxt, schema_build
}
}
static void prepare_builder_from_scylla_tables_row(const schema_ctxt& ctxt, schema_builder& builder, const query::result_set_row& table_row) {
auto in_mem = table_row.get<bool>("in_memory");
auto in_mem_enabled = in_mem.value_or(false);
if (in_mem_enabled) {
slogger.warn("Support for in_memory tables has been deprecated.");
}
builder.set_in_memory(in_mem_enabled);
if (auto opt_map = get_map<sstring, sstring>(table_row, "tablets")) {
auto tablet_options = db::tablet_options(*opt_map);
builder.set_tablet_options(tablet_options.to_map());
}
}
schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, std::optional<table_schema_version> version)
{
slogger.trace("create_table_from_mutations: version={}, {}", version, sm);
@@ -2206,13 +2237,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
if (sm.scylla_tables()) {
table_rs = query::result_set(*sm.scylla_tables());
if (!table_rs.empty()) {
query::result_set_row table_row = table_rs.row(0);
auto in_mem = table_row.get<bool>("in_memory");
auto in_mem_enabled = in_mem.value_or(false);
if (in_mem_enabled) {
slogger.warn("Support for in_memory tables has been deprecated.");
}
builder.set_in_memory(in_mem_enabled);
prepare_builder_from_scylla_tables_row(ctxt, builder, table_rs.row(0));
}
}
v3_columns columns(std::move(column_defs), is_dense, is_compound);
@@ -2443,6 +2468,13 @@ view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm
schema_builder builder{ks_name, cf_name, id};
prepare_builder_from_table_row(ctxt, builder, row);
if (sm.scylla_tables()) {
table_rs = query::result_set(*sm.scylla_tables());
if (!table_rs.empty()) {
prepare_builder_from_scylla_tables_row(ctxt, builder, table_rs.row(0));
}
}
auto computed_columns = get_computed_columns(sm);
auto column_defs = create_columns_from_column_rows(ctxt, query::result_set(sm.columns_mutation()), ks_name, cf_name, false, column_view_virtual::no, computed_columns);
for (auto&& cdef : column_defs) {

96
db/tablet_options.cc Normal file
View File

@@ -0,0 +1,96 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <cstdlib>
#include "exceptions/exceptions.hh"
#include "db/tablet_options.hh"
#include "utils/log.hh"
extern logging::logger dblog;
namespace db {
tablet_options::tablet_options(const map_type& map) {
for (auto& [key, value_str] : map) {
switch (tablet_options::from_string(key)) {
case tablet_option_type::min_tablet_count:
if (auto value = std::atol(value_str.c_str())) {
min_tablet_count.emplace(value);
}
break;
case tablet_option_type::min_per_shard_tablet_count:
if (auto value = std::atof(value_str.c_str())) {
min_per_shard_tablet_count.emplace(value);
}
break;
case tablet_option_type::expected_data_size_in_gb:
if (auto value = std::atol(value_str.c_str())) {
expected_data_size_in_gb.emplace(value);
}
break;
}
}
}
sstring tablet_options::to_string(tablet_option_type hint) {
switch (hint) {
case tablet_option_type::min_tablet_count: return "min_tablet_count";
case tablet_option_type::min_per_shard_tablet_count: return "min_per_shard_tablet_count";
case tablet_option_type::expected_data_size_in_gb: return "expected_data_size_in_gb";
}
}
tablet_option_type tablet_options::from_string(sstring hint_desc) {
if (hint_desc == "min_tablet_count") {
return tablet_option_type::min_tablet_count;
} else if (hint_desc == "min_per_shard_tablet_count") {
return tablet_option_type::min_per_shard_tablet_count;
} else if (hint_desc == "expected_data_size_in_gb") {
return tablet_option_type::expected_data_size_in_gb;
} else {
throw exceptions::syntax_exception(fmt::format("Unknown tablet hint '{}'", hint_desc));
}
}
std::map<sstring, sstring> tablet_options::to_map() const {
std::map<sstring, sstring> res;
if (min_tablet_count) {
res[to_string(tablet_option_type::min_tablet_count)] = fmt::to_string(*min_tablet_count);
}
if (min_per_shard_tablet_count) {
res[to_string(tablet_option_type::min_per_shard_tablet_count)] = fmt::to_string(*min_per_shard_tablet_count);
}
if (expected_data_size_in_gb) {
res[to_string(tablet_option_type::expected_data_size_in_gb)] = fmt::to_string(*expected_data_size_in_gb);
}
return res;
}
void tablet_options::validate(const map_type& map) {
for (auto& [key, value_str] : map) {
switch (tablet_options::from_string(key)) {
case tablet_option_type::min_tablet_count:
if (auto value = std::atol(value_str.c_str()); value < 0) {
throw exceptions::configuration_exception(format("Invalid value '{}' for min_tablet_count", value));
}
break;
case tablet_option_type::min_per_shard_tablet_count:
if (auto value = std::atof(value_str.c_str()); value < 0) {
throw exceptions::configuration_exception(format("Invalid value '{}' for min_per_shard_tablet_count", value));
}
break;
case tablet_option_type::expected_data_size_in_gb:
if (auto value = std::atol(value_str.c_str()); value < 0) {
throw exceptions::configuration_exception(format("Invalid value '{}' for expected_data_size_in_gb", value));
}
break;
}
}
}
} // namespace db

46
db/tablet_options.hh Normal file
View File

@@ -0,0 +1,46 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <map>
#include <seastar/core/sstring.hh>
using namespace seastar;
namespace db {
// Per-table tablet options
enum class tablet_option_type {
min_tablet_count,
min_per_shard_tablet_count,
expected_data_size_in_gb,
};
struct tablet_options {
using map_type = std::map<sstring, sstring>;
std::optional<ssize_t> min_tablet_count;
std::optional<double> min_per_shard_tablet_count;
std::optional<ssize_t> expected_data_size_in_gb;
tablet_options() = default;
explicit tablet_options(const map_type& map);
operator bool() const noexcept {
return min_tablet_count || min_per_shard_tablet_count || expected_data_size_in_gb;
}
map_type to_map() const;
static sstring to_string(tablet_option_type hint);
static tablet_option_type from_string(sstring hint_desc);
static void validate(const map_type& map);
};
} // namespace db

View File

@@ -225,7 +225,7 @@ Options:
sub-option type description
===================================== ====== =============================================
``'enabled'`` bool Whether or not to enable tablets for a keyspace
``'initial'`` int The number of tablets to start with
``'initial'`` int The number of tablets to start with (deprecated)
===================================== ====== =============================================
.. scylladb_include_flag:: tablets-default.rst
@@ -246,6 +246,8 @@ An example that creates a keyspace with 2048 tablets per table::
'initial': 2048
};
Note that the ``initial`` tablets option was deprecated.
Please use :ref:`Per-table tablet options <cql-per-table-tablet-options>` instead.
See :doc:`Data Distribution with Tablets </architecture/tablets>` for more information about tablets.
@@ -344,9 +346,17 @@ Creating a new table uses the ``CREATE TABLE`` statement:
: | CLUSTERING ORDER BY '(' `clustering_order` ')' [ AND `table_options` ]
: | scylla_encryption_options: '=' '{'[`cipher_algorithm` : <hash>]','[`secret_key_strength` : <len>]','[`key_provider`: <provider>]'}'
: | caching '=' ' {'caching_options'}'
: | tablets '=' '{' `tablet_options` '}'
: | `options`
clustering_order: `column_name` (ASC | DESC) ( ',' `column_name` (ASC | DESC) )*
tablet_options: `tablet_option` [',' `tablet_option`]
: |
tablet_option: 'expected_data_size_in_gb' ':' <int>
: | 'min_per_shard_tablet_count' ':' <float>
: | 'min_tablet_count' ':' <int>
For instance::
@@ -713,6 +723,10 @@ A table supports the following options:
- map
- see below
- :ref:`CDC Options <cdc-options>`
* - ``tablet_options``
- map
- see below
- :ref:`Per-table tablet options <cql-per-table-tablet-options>`
.. _speculative-retry-options:
@@ -898,6 +912,65 @@ The following modes are available:
* - ``immediate``
- Tombstone GC is immediately performed. There is no wait time or repair requirement. This mode is useful for a table that uses the TWCS compaction strategy with no user deletes. After data is expired after TTL, ScyllaDB can perform compaction to drop the expired data immediately.
.. _cql-per-table-tablet-options:
Per-table tablet options
########################
By default, ScyllaDB will allocate the initial number of tablets automatically.
Then on, tables may be automatically split if their average size is greater than twice
the ``target_tablet_size_in_bytes``, or merged if the average tablet size is less than
half the ``target_tablet_size_in_bytes``.
Other considerations, like the total number of tablet replicas per-shard, may also affect the tablet count.
Since each tablet replica has a constant memory overhead, ScyllaDB may limit the number of tablets to prevent
shards from running out-of-memory, in the presence of many tables.
The following per-table ``tablets`` options can be used to tune the tablet allocation logic for the table
if its data size, or performance requirements are known in advance.
=============================== =============== ===================================================================================
Option Default Description
=============================== =============== ===================================================================================
``expected_data_size_in_gb`` 0 This option provides a hint for the anticipated table size, before replication.
ScyllaDB will generate a tablets topology that matches that expectation (see details below).
It can be set when the table is created to allocate more tablets for it,
as if it already occupies that size. This will prevent unnecessary tablet splits
and tablet migrations during data ingestion.
It can also be changed later in the table life cycle to induce tablet splits or merges
to fit the new expected size.
The minimum tablet count is calculated by dividing the expected data
size by the ``target_tablet_size_in_bytes`` config option.
``min_per_shard_tablet_count`` 0 Used for ensuring that the table workload is well balanced in the whole cluster in a
topology-independent way. A higher number of tablet replicas per shard may help balance
the table workload more evenly across shards and across nodes in the cluster.
For example, setting this to 10 means that shard overcommit is limited to 10%, regardless
of cluster size.
Note that ``min_per_shard_tablet_count`` supports floating point values and can be set to
a value less than 1. This is useful for clusters with large number of shards where the
average number of tablet replicas owned by each shard is less than 1.
``min_tablet_count`` 0 Determines the minimum number of tablets to allocate for the table.
The hint is based on the deprecated keyspace ``initial`` tablets option.
Note that the actual number of tablet replicas that are owned by each shard is a
function of the tablet count, the replication factor in the datacenter, and the number
of nodes and shards in the datacenter. It is recommended to use higher-level options
such as ``expected_data_size_in_gb`` or ``min_per_shard_tablet_count`` instead.
=============================== =============== ===================================================================================
When allocating tablets for a new table, ScyllaDB uses the maximum of the ``initial`` tablets configured for the keyspace
and the minimum tablet count calculated from the table's ``tablets`` options, if any.
If multiple tablet options are provided, ScyllaDB uses the maximum tablet count derived by each option individually.
If the keyspace ``initial`` tablets is set to zero and no ``tablets`` options are provided,
ScyllaDB automatically calculates the number of tablets so that each shard would own at least one tablet replica,
scaled up by the ``tablets_initial_scale_factor`` configuration option.
Unlike the ``initial`` tablet count configured for the keyspace, ScyllaDB will not merge tablets when their
average size drops below half the ``target_tablet_size_in_bytes`` if that would cause the table's tablet count
to go below the minimum tablet count, or the per-shard tablet-count as per the above options.
This is useful for tables that go through rapid growth and shrink cycles.
If the table is shrunk for the long term and there are no special performance needs for the tablet, it is recommended
to drop the tablet options or to adjust them respectively, to fit the new requirements.
Other considerations:
#####################

View File

@@ -147,6 +147,7 @@ CREATE TABLE ks.t_scylla_cdc_log (
AND comment = 'CDC log for ks.t'
AND compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': '60', 'compaction_window_unit': 'MINUTES', 'expired_sstable_check_frequency_seconds': '1800'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND tablets = {'expected_data_size_in_gb': '250', 'min_per_shard_tablet_count': '0.8', 'min_tablet_count': '1'}
AND crc_check_chance = 1
AND default_time_to_live = 0
AND gc_grace_seconds = 0

View File

@@ -237,6 +237,7 @@ db::schema_features feature_service::cluster_schema_features() const {
f.set_if<db::schema_feature::TABLE_DIGEST_INSENSITIVE_TO_EXPIRY>(table_digest_insensitive_to_expiry);
f.set_if<db::schema_feature::GROUP0_SCHEMA_VERSIONING>(group0_schema_versioning);
f.set_if<db::schema_feature::IN_MEMORY_TABLES>(bool(in_memory_tables));
f.set_if<db::schema_feature::TABLET_OPTIONS>(bool(tablet_options));
return f;
}

View File

@@ -163,6 +163,7 @@ public:
gms::feature workload_prioritization { *this, "WORKLOAD_PRIORITIZATION"sv };
gms::feature file_stream { *this, "FILE_STREAM"sv };
gms::feature compression_dicts { *this, "COMPRESSION_DICTS"sv };
gms::feature tablet_options { *this, "TABLET_OPTIONS"sv };
public:
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;

View File

@@ -298,40 +298,55 @@ effective_replication_map_ptr network_topology_strategy::make_replication_map(ta
//
// Try to use as many tablets initially, so that all shards in the current topology
// are covered with at least one tablet. In other words, the value is
// are covered with at least `min_per_shard_tablet_count` tablets. In other words, the value is
//
// initial_tablets = max(nr_shards_in(dc) / RF_in(dc) for dc in datacenters)
//
static unsigned calculate_initial_tablets_from_topology(const schema& s, token_metadata_ptr tm, const std::unordered_map<sstring, size_t>& rf) {
static unsigned calculate_initial_tablets_from_topology(const schema& s, token_metadata_ptr tm, const std::unordered_map<sstring, size_t>& rf, double min_per_shard_tablet_count = 0) {
unsigned initial_tablets = std::numeric_limits<unsigned>::min();
for (const auto& dc : tm->get_datacenter_token_owners()) {
unsigned shards_in_dc = 0;
unsigned rf_in_dc = 1;
for (const auto& ep : dc.second) {
const auto* node = tm->get_topology().find_node(ep);
if (node != nullptr && node->is_normal()) {
shards_in_dc += node->get_shard_count();
}
std::unordered_map<sstring, unsigned> shards_per_dc_map;
tm->for_each_token_owner([&] (const node& node) {
if (node.is_normal()) {
shards_per_dc_map[node.dc_rack().dc] += node.get_shard_count();
}
if (auto it = rf.find(dc.first); it != rf.end()) {
rf_in_dc = it->second;
});
for (const auto& [dc, rf_in_dc] : rf) {
if (!rf_in_dc) {
continue;
}
unsigned shards_in_dc = shards_per_dc_map[dc];
unsigned tablets_in_dc = (shards_in_dc + rf_in_dc - 1) / rf_in_dc;
if (min_per_shard_tablet_count) {
auto min_tablets_in_dc = std::ceil((double)(min_per_shard_tablet_count * shards_in_dc) / rf_in_dc);
tablets_in_dc = std::max<unsigned>(tablets_in_dc, min_tablets_in_dc);
}
unsigned tablets_in_dc = rf_in_dc > 0 ? (shards_in_dc + rf_in_dc - 1) / rf_in_dc : 0;
initial_tablets = std::max(initial_tablets, tablets_in_dc);
}
rslogger.debug("Estimated {} initial tablets for table {}.{}", initial_tablets, s.ks_name(), s.cf_name());
return initial_tablets;
}
future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(schema_ptr s, token_metadata_ptr tm, unsigned initial_scale) const {
auto tablet_count = get_initial_tablets();
if (tablet_count == 0) {
tablet_count = calculate_initial_tablets_from_topology(*s, tm, _dc_rep_factor) * initial_scale;
size_t network_topology_strategy::calculate_min_tablet_count(schema_ptr s, token_metadata_ptr tm, uint64_t target_tablet_size, std::optional<unsigned> initial_scale) const {
size_t tablet_count = get_initial_tablets();
const auto& tablet_options = s->tablet_options();
if (tablet_options.min_tablet_count) {
tablet_count = std::max<size_t>(tablet_count, tablet_options.min_tablet_count.value());
}
if (tablet_options.expected_data_size_in_gb) {
tablet_count = std::max<size_t>(tablet_count, (tablet_options.expected_data_size_in_gb.value() << 30) / target_tablet_size);
}
if (tablet_options.min_per_shard_tablet_count) {
tablet_count = std::max<size_t>(tablet_count, calculate_initial_tablets_from_topology(*s, tm, _dc_rep_factor, tablet_options.min_per_shard_tablet_count.value()));
}
if (tablet_count == 0) {
tablet_count = calculate_initial_tablets_from_topology(*s, tm, _dc_rep_factor) * initial_scale.value_or(1);
}
return tablet_count;
}
future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(schema_ptr s, token_metadata_ptr tm, uint64_t target_tablet_size, std::optional<unsigned> initial_scale) const {
size_t tablet_count = calculate_min_tablet_count(s, tm, target_tablet_size, initial_scale);
auto aligned_tablet_count = 1ul << log2ceil(tablet_count);
if (tablet_count != aligned_tablet_count) {
rslogger.info("Rounding up tablet count from {} to {} for table {}.{}", tablet_count, aligned_tablet_count, s->ks_name(), s->cf_name());

View File

@@ -46,7 +46,8 @@ public:
public: // tablet_aware_replication_strategy
virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata_ptr) const override;
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr, unsigned initial_scale) const override;
virtual size_t calculate_min_tablet_count(schema_ptr s, token_metadata_ptr tm, uint64_t target_tablet_size, std::optional<unsigned> initial_scale) const override;
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr, uint64_t target_tablet_size, std::optional<unsigned> initial_scale = std::nullopt) const override;
virtual future<tablet_map> reallocate_tablets(schema_ptr, token_metadata_ptr, tablet_map cur_tablets) const override;
protected:
/**

View File

@@ -24,7 +24,8 @@ namespace locator {
/// system's tablet_metadata.
class tablet_aware_replication_strategy : public per_table_replication_strategy {
private:
size_t _initial_tablets = 1;
size_t _initial_tablets = 0;
db::tablet_options _tablet_options;
protected:
void validate_tablet_options(const abstract_replication_strategy&, const gms::feature_service&, const replication_strategy_config_options&) const;
void process_tablet_options(abstract_replication_strategy&, replication_strategy_config_options&, replication_strategy_params);
@@ -35,9 +36,13 @@ protected:
size_t replication_factor) const;
public:
/// Calculate the minimum tablet_count for a table, given the target_tablet_size, the per-table hints,
/// the network topology, and the configured replication factors.
virtual size_t calculate_min_tablet_count(schema_ptr s, token_metadata_ptr tm, uint64_t target_tablet_size, std::optional<unsigned> initial_scale) const = 0;
/// Generates tablet_map for a new table.
/// Runs under group0 guard.
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr, unsigned initial_scale) const = 0;
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr, uint64_t target_tablet_size, std::optional<unsigned> initial_scale = std::nullopt) const = 0;
/// Generates tablet_map for a new table or when increasing replication factor.
/// For a new table, cur_tablets is initialized with the tablet_count,

View File

@@ -650,10 +650,6 @@ resize_decision::seq_number_t resize_decision::next_sequence_number() const {
return (sequence_number == std::numeric_limits<seq_number_t>::max()) ? 0 : sequence_number + 1;
}
bool resize_decision::initial_decision() const {
return sequence_number == 0;
}
table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexcept {
size_in_bytes = size_in_bytes + s.size_in_bytes;
split_ready_seq_number = std::min(split_ready_seq_number, s.split_ready_seq_number);

View File

@@ -312,9 +312,15 @@ enum tablet_range_side {
// The decision of whether tablets of a given should be split, merged, or none, is made
// by the load balancer. This decision is recorded in the tablet_map and stored in group0.
struct resize_decision {
struct none {};
struct split {};
struct merge {};
struct none {
auto operator<=>(const none&) const = default;
};
struct split {
auto operator<=>(const split&) const = default;
};
struct merge {
auto operator<=>(const merge&) const = default;
};
using seq_number_t = int64_t;
@@ -327,14 +333,21 @@ struct resize_decision {
resize_decision() = default;
resize_decision(sstring decision, uint64_t seq_number);
bool is_none() const {
return std::holds_alternative<resize_decision::none>(way);
}
bool split_or_merge() const {
return !std::holds_alternative<resize_decision::none>(way);
return !is_none();
}
bool is_split() const {
return std::holds_alternative<resize_decision::split>(way);
}
bool is_merge() const {
return std::holds_alternative<resize_decision::merge>(way);
}
bool operator==(const resize_decision&) const;
sstring type_name() const;
seq_number_t next_sequence_number() const;
// Returns true if this is the initial decision, before split or merge was emitted.
bool initial_decision() const;
};
struct table_load_stats {

View File

@@ -9,6 +9,7 @@
#include <seastar/core/on_internal_error.hh>
#include <map>
#include "cql3/description.hh"
#include "db/tablet_options.hh"
#include "db/view/view.hh"
#include "timestamp.hh"
#include "utils/assert.hh"
@@ -588,6 +589,7 @@ bool operator==(const schema& x, const schema& y)
&& x._raw._indices_by_name == y._raw._indices_by_name
&& x._raw._is_counter == y._raw._is_counter
&& x._raw._in_memory == y._raw._in_memory
&& x._raw._tablet_options == y._raw._tablet_options
;
}
@@ -676,6 +678,8 @@ table_schema_version schema::calculate_digest(const schema::raw_schema& r) {
feed_hash(h, ext->options_to_string());
}
feed_hash(h, r._tablet_options);
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
}
@@ -844,6 +848,17 @@ auto fmt::formatter<schema>::format(const schema& s, fmt::format_context& ctx) c
out = fmt::format_to(out, ",minIndexInterval={}", s._raw._min_index_interval);
out = fmt::format_to(out, ",maxIndexInterval={}", s._raw._max_index_interval);
out = fmt::format_to(out, ",speculativeRetry={}", s._raw._speculative_retry.to_sstring());
out = fmt::format_to(out, ",tablets={{");
if (s._raw._tablet_options) {
n = 0;
for (auto& [k, v] : *s._raw._tablet_options) {
if (n++) {
out = fmt::format_to(out, ", ");
}
out = fmt::format_to(out, "{}={}", k, v);
}
}
out = fmt::format_to(out, "}}");
out = fmt::format_to(out, ",triggers=[]");
out = fmt::format_to(out, ",isDense={}", s._raw._is_dense);
out = fmt::format_to(out, ",in_memory={}", s._raw._in_memory);
@@ -1137,7 +1152,13 @@ std::ostream& schema::schema_properties(const schema_describe_helper& helper, st
os << "\n AND memtable_flush_period_in_ms = " << memtable_flush_period();
os << "\n AND min_index_interval = " << min_index_interval();
os << "\n AND speculative_retry = '" << speculative_retry().to_sstring() << "'";
if (has_tablet_options()) {
os << "\n AND tablets = {";
map_as_cql_param(os, tablet_options().to_map());
os << "}";
}
for (auto& [type, ext] : extensions()) {
os << "\n AND " << type << " = " << ext->options_to_string();
}
@@ -1622,6 +1643,22 @@ const ::tombstone_gc_options& schema::tombstone_gc_options() const {
return default_tombstone_gc_options;
}
bool schema::has_tablet_options() const noexcept {
return _raw._tablet_options.has_value();
}
db::tablet_options schema::tablet_options() const {
return db::tablet_options(raw_tablet_options());
}
const db::tablet_options::map_type& schema::raw_tablet_options() const noexcept {
if (!_raw._tablet_options) {
static db::tablet_options::map_type no_options;
return no_options;
}
return *_raw._tablet_options;
}
schema_builder& schema_builder::with_cdc_options(const cdc::options& opts) {
add_extension(cdc::cdc_extension::NAME, ::make_shared<cdc::cdc_extension>(opts));
return *this;
@@ -1642,6 +1679,11 @@ schema_builder& schema_builder::set_paxos_grace_seconds(int32_t seconds) {
return *this;
}
schema_builder& schema_builder::set_tablet_options(std::map<sstring, sstring>&& hints) {
_raw._tablet_options = std::move(hints);
return *this;
}
gc_clock::duration schema::paxos_grace_seconds() const {
return std::chrono::duration_cast<gc_clock::duration>(
std::chrono::seconds(

View File

@@ -30,6 +30,7 @@
#include "timestamp.hh"
#include "tombstone_gc_options.hh"
#include "db/per_partition_rate_limit_options.hh"
#include "db/tablet_options.hh"
#include "schema_fwd.hh"
namespace dht {
@@ -567,6 +568,7 @@ private:
// schema digest. It is also not set locally on a schema tables.
std::reference_wrapper<const dht::static_sharder> _sharder;
bool _in_memory = false;
std::optional<std::map<sstring, sstring>> _tablet_options;
std::optional<raw_view_info> _view_info;
};
raw_schema _raw;
@@ -742,6 +744,12 @@ public:
return _raw._caching_options;
}
// Returns true iff the _tablet_options are initialized.
// They may still be empty, e.g. after ALTER TABLE.
bool has_tablet_options() const noexcept;
db::tablet_options tablet_options() const;
const db::tablet_options::map_type& raw_tablet_options() const noexcept;
static void set_default_partitioner(const sstring& class_name, unsigned ignore_msb = 0);
const dht::i_partitioner& get_partitioner() const;

View File

@@ -227,6 +227,8 @@ public:
return *this;
}
schema_builder& set_tablet_options(std::map<sstring, sstring>&& hints);
class default_names {
public:
default_names(const schema_builder&);
@@ -283,7 +285,7 @@ public:
schema_builder& with_cdc_options(const cdc::options&);
schema_builder& with_tombstone_gc_options(const tombstone_gc_options& opts);
schema_builder& with_per_partition_rate_limit_options(const db::per_partition_rate_limit_options&);
default_names get_default_names() const {
return default_names(_raw);
}

View File

@@ -17,6 +17,7 @@
#include "utils/stall_free.hh"
#include "utils/overloaded_functor.hh"
#include "db/config.hh"
#include "db/tablet_options.hh"
#include "locator/load_sketch.hh"
#include "replica/database.hh"
#include "gms/feature_service.hh"
@@ -479,22 +480,20 @@ class load_balancer {
// due to the average size dropping below the merge threshold, as tablet count doubles.
const uint64_t _target_tablet_size = default_target_tablet_size;
static constexpr uint64_t target_max_tablet_size(uint64_t target_tablet_size) {
return target_tablet_size * 2;
}
static constexpr uint64_t target_min_tablet_size(uint64_t max_tablet_size) {
return double(max_tablet_size / 2) * 0.5;
}
struct table_size_desc {
uint64_t target_max_tablet_size;
uint64_t target_tablet_size;
uint64_t avg_tablet_size;
locator::resize_decision resize_decision;
size_t tablet_count;
size_t shard_count;
size_t min_tablet_count;
uint64_t target_max_tablet_size() const noexcept {
return target_tablet_size * 2;
}
uint64_t target_min_tablet_size() const noexcept {
return load_balancer::target_min_tablet_size(target_max_tablet_size);
return target_tablet_size / 2;
}
};
@@ -503,24 +502,36 @@ class load_balancer {
std::vector<table_id_and_size_desc> tables_need_resize;
std::vector<table_id_and_size_desc> tables_being_resized;
static bool table_needs_merge(const table_size_desc& d) {
// The initial_tablet_count is respected while the table is in "growing mode".
// We say that a table leaves this mode if it required a split above the initial
// tablet count. After that, we can rely purely on the average size to say that
// a table is shrinking and requires merge.
// FIXME: this is not perfect and we may want to leave the mode too if we detect
// average size is decreasing significantly, before any split happened.
bool left_growing_mode = !d.resize_decision.initial_decision();
lblogger.debug("table_needs_merge: tablet_count={}, avg_tablet_size={}, left_growing_mode={} (seq number: {})",
d.tablet_count, d.avg_tablet_size, left_growing_mode, d.resize_decision.sequence_number);
return left_growing_mode && d.tablet_count > 1 && d.avg_tablet_size < d.target_min_tablet_size();
}
static bool table_needs_split(const table_size_desc& d) {
return d.avg_tablet_size > d.target_max_tablet_size;
static locator::resize_decision to_resize_decision(const table_size_desc& d) {
locator::resize_decision decision;
auto target_tablet_count = d.tablet_count;
// Split based on min_tablet_count or avg_tablet_size, or
// if the current resize_decision is split, apply hysteresis,
// so it would get cancelled only when crossing back the half-way point.
if (d.tablet_count < d.min_tablet_count || d.avg_tablet_size > d.target_max_tablet_size() ||
(d.resize_decision.is_split() && d.avg_tablet_size >= d.target_tablet_size)) {
// TODO: extend to n-way split when needed
target_tablet_count *= 2;
decision.way = resize_decision::split{};
} else if (target_tablet_count / 2 >= d.min_tablet_count) {
// Consider merge, as long as it wouldn't violate min_tablet_count.
// If the current resize_decision is merge, apply hysteresis,
// so it would get cancelled only when crossing back the half-way point.
if (d.avg_tablet_size < d.target_min_tablet_size() ||
(d.resize_decision.is_merge() && d.avg_tablet_size <= d.target_tablet_size)) {
target_tablet_count /= 2;
decision.way = resize_decision::merge{};
}
}
lblogger.debug("to_resize_decision: tablet_count={}, avg_tablet_size={}, min_tablet_count={}, target_tablet_count={}: decision={}",
d.tablet_count, d.avg_tablet_size, d.min_tablet_count, target_tablet_count, decision.type_name());
return decision;
}
bool table_needs_resize(const table_size_desc& d) const {
return table_needs_merge(d) || table_needs_split(d);
return to_resize_decision(d).split_or_merge();
}
// Resize cancellation will account for possible oscillations caused by compaction, etc.
@@ -529,13 +540,7 @@ class load_balancer {
// If we cancel a split, that's because average size dropped so much a merge would be
// required post completion, and vice-versa.
bool table_needs_resize_cancellation(const table_size_desc& d) const {
auto& way = d.resize_decision.way;
if (std::holds_alternative<locator::resize_decision::split>(way)) {
return d.avg_tablet_size < d.target_max_tablet_size / 2;
} else if (std::holds_alternative<locator::resize_decision::merge>(way)) {
return d.avg_tablet_size > d.target_min_tablet_size() * 2;
}
return false;
return d.resize_decision.split_or_merge() && to_resize_decision(d).way != d.resize_decision.way;
}
void update(table_id id, table_size_desc d) {
@@ -560,22 +565,12 @@ class load_balancer {
return [] (const table_id_and_size_desc& a, const table_id_and_size_desc& b) {
auto urgency = [] (const table_size_desc& d) -> double {
// FIXME: only takes into account split today.
return double(d.avg_tablet_size) / d.target_max_tablet_size;
return double(d.avg_tablet_size) / d.target_max_tablet_size();
};
return urgency(a.second) < urgency(b.second);
};
}
static locator::resize_decision to_resize_decision(const table_size_desc& d) {
locator::resize_decision decision;
if (table_needs_split(d)) {
decision.way = locator::resize_decision::split{};
} else if (table_needs_merge(d)) {
decision.way = locator::resize_decision::merge{};
}
return decision;
}
// Resize decisions can be revoked with an empty (none) decision, so replicas
// will know they're no longer required to prepare storage for the execution of
// topology changes.
@@ -699,13 +694,13 @@ public:
, _skiplist(std::move(skiplist))
{ }
future<migration_plan> make_plan() {
future<migration_plan> make_plan(std::optional<unsigned> initial_scale, bool test_mode) {
const locator::topology& topo = _tm->get_topology();
migration_plan plan;
// Prepare plans for each DC separately and combine them to be executed in parallel.
for (auto&& dc : topo.get_datacenters()) {
auto dc_plan = co_await make_plan(dc);
auto dc_plan = co_await make_plan(dc, initial_scale, test_mode);
lblogger.info("Prepared {} migrations in DC {}", dc_plan.size(), dc);
plan.merge(std::move(dc_plan));
}
@@ -714,7 +709,7 @@ public:
plan.set_repair_plan(co_await make_repair_plan(plan));
// Merge table-wide resize decisions, may emit new decisions, revoke or finalize ongoing ones.
plan.merge_resize_plan(co_await make_resize_plan(plan));
plan.merge_resize_plan(co_await make_resize_plan(plan, initial_scale, test_mode));
lblogger.info("Prepared {} migration plans, out of which there were {} tablet migration(s) and {} resize decision(s) and {} tablet repair(s)",
plan.size(), plan.tablet_migration_count(), plan.resize_decision_count(), plan.tablet_repair_count());
@@ -1058,7 +1053,7 @@ public:
co_return std::move(plan);
}
future<table_resize_plan> make_resize_plan(const migration_plan& plan) {
future<table_resize_plan> make_resize_plan(const migration_plan& plan, std::optional<unsigned> initial_scale, bool test_mode) {
table_resize_plan resize_plan;
if (!_tm->tablets().balancing_enabled()) {
@@ -1083,13 +1078,33 @@ public:
});
table_size_desc size_desc {
.target_max_tablet_size = target_max_tablet_size(_target_tablet_size),
.target_tablet_size = _target_tablet_size,
.avg_tablet_size = avg_tablet_size,
.resize_decision = tmap.resize_decision(),
.tablet_count = tmap.tablet_count(),
.shard_count = shard_count
.shard_count = shard_count,
.min_tablet_count = 1,
};
// FIXME: the table or the replication_strategy might be missing in boost unit tests
auto t = _db.get_tables_metadata().get_table_if_exists(table);
if (t) {
const auto& erm = t->get_effective_replication_map();
if (const auto* rs = erm->get_replication_strategy().maybe_as_tablet_aware()) {
size_desc.min_tablet_count = std::max<size_t>(size_desc.min_tablet_count,
rs->calculate_min_tablet_count(t->schema(), erm->get_token_metadata_ptr(), size_desc.target_tablet_size, initial_scale));
} else {
auto msg = format("Table {}.{} has no tablet_aware_replication_strategy: uses_tablets={}", t->schema()->ks_name(), t->schema()->cf_name(), erm->get_replication_strategy().uses_tablets());
if (!test_mode) {
on_internal_error(lblogger, msg);
} else {
lblogger.debug("{}", msg);
}
}
} else if (!test_mode) {
on_internal_error(lblogger, format("Table {} does not exist", table));
}
resize_load.update(table, std::move(size_desc));
lblogger.info("Table {} with tablet_count={} has an average tablet size of {}", table, tmap.tablet_count(), avg_tablet_size);
co_await coroutine::maybe_yield();
@@ -2395,7 +2410,7 @@ public:
}
};
future<migration_plan> make_plan(dc_name dc) {
future<migration_plan> make_plan(dc_name dc, std::optional<unsigned> initial_scale, bool test_mode) {
migration_plan plan;
_dc = dc;
@@ -2708,10 +2723,10 @@ public:
_stopped = true;
}
future<migration_plan> balance_tablets(token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, std::unordered_set<host_id> skiplist) {
future<migration_plan> balance_tablets(token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, std::unordered_set<host_id> skiplist, bool test_mode) {
load_balancer lb(_db, tm, std::move(table_load_stats), _load_balancer_stats, _db.get_config().target_tablet_size_in_bytes(), std::move(skiplist));
lb.set_use_table_aware_balancing(_use_tablet_aware_balancing);
co_return co_await lb.make_plan();
co_return co_await lb.make_plan(_config.initial_tablets_scale, test_mode);
}
void set_use_tablet_aware_balancing(bool use_tablet_aware_balancing) {
@@ -2724,7 +2739,7 @@ public:
if (auto&& tablet_rs = rs->maybe_as_tablet_aware()) {
auto tm = _db.get_shared_token_metadata().get();
lblogger.debug("Creating tablets for {}.{} id={}", s.ks_name(), s.cf_name(), s.id());
auto map = tablet_rs->allocate_tablets_for_new_table(s.shared_from_this(), tm, _config.initial_tablets_scale).get();
auto map = tablet_rs->allocate_tablets_for_new_table(s.shared_from_this(), tm, _db.get_config().target_tablet_size_in_bytes(), _config.initial_tablets_scale).get();
muts.emplace_back(tablet_map_to_mutation(map, s.id(), s.ks_name(), s.cf_name(), ts, _db.features()).get());
}
}
@@ -2860,8 +2875,8 @@ future<> tablet_allocator::stop() {
return impl().stop();
}
future<migration_plan> tablet_allocator::balance_tablets(locator::token_metadata_ptr tm, locator::load_stats_ptr load_stats, std::unordered_set<host_id> skiplist) {
return impl().balance_tablets(std::move(tm), std::move(load_stats), std::move(skiplist));
future<migration_plan> tablet_allocator::balance_tablets(locator::token_metadata_ptr tm, locator::load_stats_ptr load_stats, std::unordered_set<host_id> skiplist, bool test_mode) {
return impl().balance_tablets(std::move(tm), std::move(load_stats), std::move(skiplist), test_mode);
}
void tablet_allocator::set_use_table_aware_balancing(bool use_tablet_aware_balancing) {

View File

@@ -229,7 +229,7 @@ public:
///
/// The algorithm takes care of limiting the streaming load on the system, also by taking active migrations into account.
///
future<migration_plan> balance_tablets(locator::token_metadata_ptr, locator::load_stats_ptr = {}, std::unordered_set<locator::host_id> = {});
future<migration_plan> balance_tablets(locator::token_metadata_ptr, locator::load_stats_ptr = {}, std::unordered_set<locator::host_id> = {}, bool test_mode = false);
load_balancer_stats_manager& stats();

View File

@@ -8,6 +8,7 @@
#include <boost/test/unit_test.hpp>
#include <fmt/ranges.h>
#include "db/tablet_options.hh"
#include "gms/inet_address.hh"
#include "inet_address_vectors.hh"
#include "locator/host_id.hh"
@@ -518,7 +519,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
"NetworkTopologyStrategy", params);
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);
auto tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get(), 1).get();
auto tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get(), service::default_target_tablet_size).get();
full_ring_check(tmap, ars_ptr, stm.get());
// Test reallocate_tablets after randomizing a different set of options

View File

@@ -1158,7 +1158,7 @@ SEASTAR_TEST_CASE(test_system_schema_version_is_stable) {
// If you changed the schema of system.batchlog then this is expected to fail.
// Just replace expected version with the new version.
BOOST_REQUIRE_EQUAL(s->version(), table_schema_version(utils::UUID("3febbbce-8841-304a-abb9-170078ac173d")));
BOOST_REQUIRE_EQUAL(s->version(), table_schema_version(utils::UUID("1f504ac7-350f-37aa-8a9e-105b1325d8e3")));
});
}

View File

@@ -1439,7 +1439,7 @@ void rebalance_tablets(tablet_allocator& talloc,
auto max_iterations = 1 + get_tablet_count(stm.get()->tablets()) * 10;
for (size_t i = 0; i < max_iterations; ++i) {
auto plan = talloc.balance_tablets(stm.get(), load_stats, skiplist).get();
auto plan = talloc.balance_tablets(stm.get(), load_stats, skiplist, true).get();
if (plan.empty()) {
return;
}

View File

@@ -234,6 +234,31 @@ def test_desc_table(cql, test_keyspace, random_seed, has_tablets):
finally:
cql.execute(f"DROP TABLE {new_tbl}")
# This test compares the content of `system_schema.tables` and `system_schema.columns` tables
# when providing tablet options to CREATE TABLE.
def test_desc_table_with_tablet_options(cql, test_keyspace, random_seed, has_tablets):
if has_tablets: # issue #18180
global counter_table_chance
counter_table_chance = 0
tablet_options = {
'min_tablet_count': '100',
'min_per_shard_tablet_count': '0.8', # Verify that a floating point value works for this hint
'expected_data_size_in_gb': '50',
}
with new_random_table(cql, test_keyspace, tablet_options=tablet_options) as tbl:
desc = cql.execute(f"DESC TABLE {tbl}")
desc_create_stmt = desc.one().create_statement
try:
new_tbl = f"{test_keyspace}.{unique_name()}"
new_create_stmt = desc_create_stmt.replace(tbl, new_tbl)
cql.execute(new_create_stmt)
new_desc_stmt = cql.execute(f"DESC TABLE {new_tbl}")
new_desc_create_stmt = new_desc_stmt.one().create_statement
assert new_desc_create_stmt == new_create_stmt
finally:
cql.execute(f"DROP TABLE {new_tbl}")
# Test that `DESC TABLE {tbl}` contains appropriate create statement for table
# This test compares the content of `system_schema.scylla_tables` tables, thus the test
# is `scylla_only`.
@@ -1280,7 +1305,7 @@ def new_random_keyspace(cql):
# UDTs that can be used to create the table. The function uses `new_test_table`
# from util.py, so it can be used in a "with", as:
# with new_random_table(cql, test_keyspace) as table:
def new_random_table(cql, keyspace, udts=[]):
def new_random_table(cql, keyspace, udts=[], tablet_options={}):
pk_n = random.randrange(1, max_pk)
ck_n = random.randrange(max_ck)
regular_n = random.randrange(1, max_regular)
@@ -1348,6 +1373,8 @@ def new_random_table(cql, keyspace, udts=[]):
# Extra properties which ScyllaDB supports but Cassandra doesn't
extras["paxos_grace_seconds"] = random.randrange(1000, 100000)
extras["tombstone_gc"] = f"{{'mode': 'timeout', 'propagation_delay_in_seconds': '{random.randrange(100, 100000)}'}}"
if tablet_options:
extras["tablets"] = str(tablet_options)
extra_options = [f"{k} = {v}" for (k, v) in extras.items()]
extra_str = " AND ".join(extra_options)

View File

@@ -13,7 +13,7 @@
#############################################################################
import pytest
from .util import new_test_keyspace, new_test_table, unique_name, index_table_name
from .util import new_test_keyspace, new_test_table, new_materialized_view, unique_name, index_table_name
from cassandra.protocol import ConfigurationException, InvalidRequest
# A fixture similar to "test_keyspace", just creates a keyspace that enables
@@ -323,3 +323,66 @@ def test_alter_tablet_keyspace_rf(cql, this_dc):
change_dc_rf(10) # increase RF by 2+ should fail
with pytest.raises(InvalidRequest):
change_dc_rf(0) # decrease RF by 2+ should fail
def test_tablet_options(cql, skip_without_tablets):
def describe_table(cql, table):
return cql.execute(f"DESC TABLE {table}").one().create_statement
ksdef = "WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND TABLETS = {'enabled': true};"
with new_test_keyspace(cql, ksdef) as keyspace:
tablets = "tablets = {'min_tablet_count': '100'}"
with new_test_table(cql, keyspace, "pk int PRIMARY KEY, c int", extra=f" WITH {tablets}") as table:
assert tablets in describe_table(cql, table)
# ALTER TABLE for other options does not affect existing tablets
cql.execute(f"ALTER TABLE {table} WITH gc_grace_seconds = 42")
assert tablets in describe_table(cql, table)
# Resetting tablets to an empty map drops all hints
tablets = "tablets = {}"
cql.execute(f"ALTER TABLE {table} WITH {tablets}")
assert "tablets" not in describe_table(cql, table)
# New tablets can be added by ALTER TABLE
tablets = "tablets = {'expected_data_size_in_gb': '50', 'min_tablet_count': '100'}"
cql.execute(f"ALTER TABLE {table} WITH {tablets}")
assert tablets in describe_table(cql, table)
# tablets with zero values are dropped
tablets = "tablets = {'expected_data_size_in_gb': '0', 'min_tablet_count': '100'}"
cql.execute(f"ALTER TABLE {table} WITH {tablets}")
assert "tablets = {'min_tablet_count': '100'}" in describe_table(cql, table)
# tablets are set as a whole, replacing the previously set hints
# Also, verify that a floating point value works for min_per_shard_tablet_count
tablets = "tablets = {'min_per_shard_tablet_count': '3.14'}"
cql.execute(f"ALTER TABLE {table} WITH {tablets}")
assert tablets in describe_table(cql, table)
def test_tablet_options_with_vnodes_based_keyspace(cql, skip_without_tablets):
# Test that tablets are disallowed when tablets are disabled for the keyspace
ksdef = "WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND TABLETS = {'enabled': false};"
with new_test_keyspace(cql, ksdef) as keyspace:
table = f"{keyspace}.{unique_name()}"
schema = "pk int PRIMARY KEY, c int"
tablets = "tablets = {'min_tablet_count': '100'}"
expected_msg = "tablet options cannot be used when tablets are disabled for the keyspace"
with pytest.raises(ConfigurationException, match=expected_msg):
cql.execute(f"CREATE TABLE {table} ({schema}) WITH {tablets}")
cql.execute(f"CREATE TABLE {table} ({schema})")
try:
with pytest.raises(ConfigurationException, match=expected_msg):
cql.execute(f"ALTER TABLE {table} WITH {tablets}")
finally:
cql.execute(f"DROP TABLE {table}")
def test_tablet_options_with_view(cql, skip_without_tablets):
def describe_view(cql, table):
res = list(cql.execute(f"DESC TABLE {table}"))
assert len(res) == 2
return res[1].create_statement
ksdef = "WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND TABLETS = {'enabled': true};"
tablets = "tablets = {'min_tablet_count': '100'}"
with new_test_keyspace(cql, ksdef) as keyspace, \
new_test_table(cql, keyspace, "pk int PRIMARY KEY, c int") as table, \
new_materialized_view(cql, table, '*', 'c, pk', 'pk is not null and c is not null', extra=f" WITH {tablets}") as view:
assert tablets in describe_view(cql, table), f"{tablets} not found in {describe_view(cql, table)}"

View File

@@ -298,9 +298,9 @@ future<results> test_load_balancing_with_many_tables(params p, bool tablet_aware
auto allocate = [&] (schema_ptr s, int rf, std::optional<int> initial_tablets) {
replication_strategy_config_options opts;
opts[rack1.dc] = format("{}", rf);
network_topology_strategy tablet_rs(replication_strategy_params(opts, initial_tablets.value_or(0)));
network_topology_strategy tablet_rs(replication_strategy_params(opts, initial_tablets));
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto map = co_await tablet_rs.allocate_tablets_for_new_table(s, stm.get(), 1);
auto map = co_await tablet_rs.allocate_tablets_for_new_table(s, stm.get(), service::default_target_tablet_size);
tm.tablets().set_tablet_map(s->id(), std::move(map));
}).get();
};