cql3: Convert classes from org.cassandra.cql3.selection package

This commit is contained in:
Tomasz Grabiec
2015-03-04 19:02:36 +01:00
parent 335f69070a
commit 8912417dd0
17 changed files with 1128 additions and 1004 deletions

View File

@@ -280,6 +280,9 @@ urchin_core = (['database.cc',
'cql3/query_options.cc',
'cql3/single_column_relation.cc',
'cql3/column_condition.cc',
'cql3/selection/simple_selector.cc',
'cql3/selection/selector_factories.cc',
'cql3/selection/selection.cc',
'db/db.cc',
'db/system_keyspace.cc',
'db/legacy_schema_tables.cc',

View File

@@ -3,6 +3,8 @@
*/
#include "cql3/column_identifier.hh"
#include "exceptions/exceptions.hh"
#include "cql3/selection/simple_selector.hh"
namespace cql3 {
@@ -28,4 +30,14 @@ std::ostream& operator<<(std::ostream& out, const column_identifier::raw& id) {
return out << id._text;
}
::shared_ptr<selection::selector::factory>
column_identifier::new_selector_factory(schema_ptr schema, std::vector<const column_definition*>& defs) {
auto def = get_column_definition(schema, *this);
if (!def) {
throw exceptions::invalid_request_exception(sprint("Undefined name %s in selection clause", _text));
}
return selection::simple_selector::new_factory(def->name_as_text(), add_and_get_index(*def, defs), def->type);
}
}

View File

@@ -125,17 +125,11 @@ public:
{
return new ColumnIdentifier(allocator.clone(bytes), text);
}
public Selector.Factory newSelectorFactory(CFMetaData cfm, List<ColumnDefinition> defs) throws InvalidRequestException
{
ColumnDefinition def = cfm.getColumnDefinition(this);
if (def == null)
throw new InvalidRequestException(String.format("Undefined name %s in selection clause", this));
return SimpleSelector.newFactory(def.name.toString(), addAndGetIndex(def, defs), def.type);
}
#endif
virtual ::shared_ptr<selection::selector::factory> new_selector_factory(schema_ptr schema,
std::vector<const column_definition*>& defs) override;
/**
* Because Thrift-created tables may have a non-text comparator, we cannot determine the proper 'key' until
* we know the comparator. ColumnIdentifier.Raw is a placeholder that can be converted to a real ColumnIdentifier

View File

@@ -1,512 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.selection;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.ResultSet;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.CounterCell;
import org.apache.cassandra.db.ExpiringCell;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.ByteBufferUtil;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
public abstract class Selection
{
/**
* A predicate that returns <code>true</code> for static columns.
*/
private static final Predicate<ColumnDefinition> STATIC_COLUMN_FILTER = new Predicate<ColumnDefinition>()
{
public boolean apply(ColumnDefinition def)
{
return def.isStatic();
}
};
private final CFMetaData cfm;
private final Collection<ColumnDefinition> columns;
private final ResultSet.Metadata metadata;
private final boolean collectTimestamps;
private final boolean collectTTLs;
protected Selection(CFMetaData cfm,
Collection<ColumnDefinition> columns,
List<ColumnSpecification> metadata,
boolean collectTimestamps,
boolean collectTTLs)
{
this.cfm = cfm;
this.columns = columns;
this.metadata = new ResultSet.Metadata(metadata);
this.collectTimestamps = collectTimestamps;
this.collectTTLs = collectTTLs;
}
// Overriden by SimpleSelection when appropriate.
public boolean isWildcard()
{
return false;
}
/**
* Checks if this selection contains static columns.
* @return <code>true</code> if this selection contains static columns, <code>false</code> otherwise;
*/
public boolean containsStaticColumns()
{
if (!cfm.hasStaticColumns())
return false;
if (isWildcard())
return true;
return !Iterables.isEmpty(Iterables.filter(columns, STATIC_COLUMN_FILTER));
}
/**
* Checks if this selection contains only static columns.
* @return <code>true</code> if this selection contains only static columns, <code>false</code> otherwise;
*/
public boolean containsOnlyStaticColumns()
{
if (!containsStaticColumns())
return false;
if (isWildcard())
return false;
for (ColumnDefinition def : getColumns())
{
if (!def.isPartitionKey() && !def.isStatic())
return false;
}
return true;
}
/**
* Checks if this selection contains a collection.
*
* @return <code>true</code> if this selection contains a collection, <code>false</code> otherwise.
*/
public boolean containsACollection()
{
if (!cfm.comparator.hasCollections())
return false;
for (ColumnDefinition def : getColumns())
if (def.type.isCollection() && def.type.isMultiCell())
return true;
return false;
}
/**
* Returns the index of the specified column.
*
* @param def the column definition
* @return the index of the specified column
*/
public int indexOf(final ColumnDefinition def)
{
return Iterators.indexOf(getColumns().iterator(), new Predicate<ColumnDefinition>()
{
public boolean apply(ColumnDefinition n)
{
return def.name.equals(n.name);
}
});
}
public ResultSet.Metadata getResultMetadata()
{
return metadata;
}
public static Selection wildcard(CFMetaData cfm)
{
List<ColumnDefinition> all = new ArrayList<ColumnDefinition>(cfm.allColumns().size());
Iterators.addAll(all, cfm.allColumnsInSelectOrder());
return new SimpleSelection(cfm, all, true);
}
public static Selection forColumns(CFMetaData cfm, Collection<ColumnDefinition> columns)
{
return new SimpleSelection(cfm, columns, false);
}
public int addColumnForOrdering(ColumnDefinition c)
{
columns.add(c);
metadata.addNonSerializedColumn(c);
return columns.size() - 1;
}
public boolean usesFunction(String ksName, String functionName)
{
return false;
}
private static boolean processesSelection(List<RawSelector> rawSelectors)
{
for (RawSelector rawSelector : rawSelectors)
{
if (rawSelector.processesSelection())
return true;
}
return false;
}
public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> rawSelectors) throws InvalidRequestException
{
List<ColumnDefinition> defs = new ArrayList<ColumnDefinition>();
SelectorFactories factories =
SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors, cfm), cfm, defs);
List<ColumnSpecification> metadata = collectMetadata(cfm, rawSelectors, factories);
return processesSelection(rawSelectors) ? new SelectionWithProcessing(cfm, defs, metadata, factories)
: new SimpleSelection(cfm, defs, metadata, false);
}
private static List<ColumnSpecification> collectMetadata(CFMetaData cfm,
List<RawSelector> rawSelectors,
SelectorFactories factories)
{
List<ColumnSpecification> metadata = new ArrayList<ColumnSpecification>(rawSelectors.size());
Iterator<RawSelector> iter = rawSelectors.iterator();
for (Selector.Factory factory : factories)
{
ColumnSpecification colSpec = factory.getColumnSpecification(cfm);
ColumnIdentifier alias = iter.next().alias;
metadata.add(alias == null ? colSpec : colSpec.withAlias(alias));
}
return metadata;
}
protected abstract Selectors newSelectors() throws InvalidRequestException;
/**
* @return the list of CQL3 columns value this SelectionClause needs.
*/
public Collection<ColumnDefinition> getColumns()
{
return columns;
}
public ResultSetBuilder resultSetBuilder(long now) throws InvalidRequestException
{
return new ResultSetBuilder(now);
}
public abstract boolean isAggregate();
/**
* Checks that selectors are either all aggregates or that none of them is.
*
* @param selectors the selectors to test.
* @param messageTemplate the error message template
* @param messageArgs the error message arguments
* @throws InvalidRequestException if some of the selectors are aggregate but not all of them
*/
static void validateSelectors(List<Selector> selectors, String messageTemplate, Object... messageArgs)
throws InvalidRequestException
{
int aggregates = 0;
for (Selector s : selectors)
if (s.isAggregate())
++aggregates;
if (aggregates != 0 && aggregates != selectors.size())
throw new InvalidRequestException(String.format(messageTemplate, messageArgs));
}
public class ResultSetBuilder
{
private final ResultSet resultSet;
/**
* As multiple thread can access a <code>Selection</code> instance each <code>ResultSetBuilder</code> will use
* its own <code>Selectors</code> instance.
*/
private final Selectors selectors;
/*
* We'll build CQL3 row one by one.
* The currentRow is the values for the (CQL3) columns we've fetched.
* We also collect timestamps and ttls for the case where the writetime and
* ttl functions are used. Note that we might collect timestamp and/or ttls
* we don't care about, but since the array below are allocated just once,
* it doesn't matter performance wise.
*/
List<ByteBuffer> current;
final long[] timestamps;
final int[] ttls;
final long now;
private ResultSetBuilder(long now) throws InvalidRequestException
{
this.resultSet = new ResultSet(getResultMetadata().copy(), new ArrayList<List<ByteBuffer>>());
this.selectors = newSelectors();
this.timestamps = collectTimestamps ? new long[columns.size()] : null;
this.ttls = collectTTLs ? new int[columns.size()] : null;
this.now = now;
}
public void add(ByteBuffer v)
{
current.add(v);
}
public void add(Cell c)
{
current.add(isDead(c) ? null : value(c));
if (timestamps != null)
{
timestamps[current.size() - 1] = isDead(c) ? Long.MIN_VALUE : c.timestamp();
}
if (ttls != null)
{
int ttl = -1;
if (!isDead(c) && c instanceof ExpiringCell)
ttl = c.getLocalDeletionTime() - (int) (now / 1000);
ttls[current.size() - 1] = ttl;
}
}
private boolean isDead(Cell c)
{
return c == null || !c.isLive(now);
}
public void newRow(int protocolVersion) throws InvalidRequestException
{
if (current != null)
{
selectors.addInputRow(protocolVersion, this);
if (!selectors.isAggregate())
{
resultSet.addRow(selectors.getOutputRow(protocolVersion));
selectors.reset();
}
}
current = new ArrayList<ByteBuffer>(columns.size());
}
public ResultSet build(int protocolVersion) throws InvalidRequestException
{
if (current != null)
{
selectors.addInputRow(protocolVersion, this);
resultSet.addRow(selectors.getOutputRow(protocolVersion));
selectors.reset();
current = null;
}
if (resultSet.isEmpty() && selectors.isAggregate())
{
resultSet.addRow(selectors.getOutputRow(protocolVersion));
}
return resultSet;
}
private ByteBuffer value(Cell c)
{
return (c instanceof CounterCell)
? ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
: c.value();
}
}
private static interface Selectors
{
public boolean isAggregate();
/**
* Adds the current row of the specified <code>ResultSetBuilder</code>.
*
* @param rs the <code>ResultSetBuilder</code>
* @throws InvalidRequestException
*/
public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException;
public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException;
public void reset();
}
// Special cased selection for when no function is used (this save some allocations).
private static class SimpleSelection extends Selection
{
private final boolean isWildcard;
public SimpleSelection(CFMetaData cfm, Collection<ColumnDefinition> columns, boolean isWildcard)
{
this(cfm, columns, new ArrayList<ColumnSpecification>(columns), isWildcard);
}
public SimpleSelection(CFMetaData cfm,
Collection<ColumnDefinition> columns,
List<ColumnSpecification> metadata,
boolean isWildcard)
{
/*
* In theory, even a simple selection could have multiple time the same column, so we
* could filter those duplicate out of columns. But since we're very unlikely to
* get much duplicate in practice, it's more efficient not to bother.
*/
super(cfm, columns, metadata, false, false);
this.isWildcard = isWildcard;
}
@Override
public boolean isWildcard()
{
return isWildcard;
}
public boolean isAggregate()
{
return false;
}
protected Selectors newSelectors()
{
return new Selectors()
{
private List<ByteBuffer> current;
public void reset()
{
current = null;
}
public List<ByteBuffer> getOutputRow(int protocolVersion)
{
return current;
}
public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
{
current = rs.current;
}
public boolean isAggregate()
{
return false;
}
};
}
}
private static class SelectionWithProcessing extends Selection
{
private final SelectorFactories factories;
public SelectionWithProcessing(CFMetaData cfm,
Collection<ColumnDefinition> columns,
List<ColumnSpecification> metadata,
SelectorFactories factories) throws InvalidRequestException
{
super(cfm,
columns,
metadata,
factories.containsWritetimeSelectorFactory(),
factories.containsTTLSelectorFactory());
this.factories = factories;
if (factories.doesAggregation() && !factories.containsOnlyAggregateFunctions())
throw new InvalidRequestException("the select clause must either contains only aggregates or none");
}
@Override
public boolean usesFunction(String ksName, String functionName)
{
return factories.usesFunction(ksName, functionName);
}
@Override
public int addColumnForOrdering(ColumnDefinition c)
{
int index = super.addColumnForOrdering(c);
factories.addSelectorForOrdering(c, index);
return index;
}
public boolean isAggregate()
{
return factories.containsOnlyAggregateFunctions();
}
protected Selectors newSelectors() throws InvalidRequestException
{
return new Selectors()
{
private final List<Selector> selectors = factories.newInstances();
public void reset()
{
for (int i = 0, m = selectors.size(); i < m; i++)
{
selectors.get(i).reset();
}
}
public boolean isAggregate()
{
return factories.containsOnlyAggregateFunctions();
}
public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException
{
List<ByteBuffer> outputRow = new ArrayList<>(selectors.size());
for (int i = 0, m = selectors.size(); i < m; i++)
{
outputRow.add(selectors.get(i).getOutput(protocolVersion));
}
return outputRow;
}
public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
{
for (int i = 0, m = selectors.size(); i < m; i++)
{
selectors.get(i).addInput(protocolVersion, rs);
}
}
};
}
}
}

View File

@@ -1,170 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.selection;
import java.nio.ByteBuffer;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.AssignmentTestable;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
/**
* A <code>Selector</code> is used to convert the data returned by the storage engine into the data requested by the
* user. They correspond to the &lt;selector&gt; elements from the select clause.
* <p>Since the introduction of aggregation, <code>Selector</code>s cannot be called anymore by multiple threads
* as they have an internal state.</p>
*/
public abstract class Selector implements AssignmentTestable
{
/**
* A factory for <code>Selector</code> instances.
*/
public static abstract class Factory
{
public boolean usesFunction(String ksName, String functionName)
{
return false;
}
/**
* Returns the column specification corresponding to the output value of the selector instances created by
* this factory.
*
* @param cfm the column family meta data
* @return a column specification
*/
public final ColumnSpecification getColumnSpecification(CFMetaData cfm)
{
return new ColumnSpecification(cfm.ksName,
cfm.cfName,
new ColumnIdentifier(getColumnName(), true),
getReturnType());
}
/**
* Creates a new <code>Selector</code> instance.
*
* @return a new <code>Selector</code> instance
*/
public abstract Selector newInstance() throws InvalidRequestException;
/**
* Checks if this factory creates selectors instances that creates aggregates.
*
* @return <code>true</code> if this factory creates selectors instances that creates aggregates,
* <code>false</code> otherwise
*/
public boolean isAggregateSelectorFactory()
{
return false;
}
/**
* Checks if this factory creates <code>writetime</code> selectors instances.
*
* @return <code>true</code> if this factory creates <code>writetime</code> selectors instances,
* <code>false</code> otherwise
*/
public boolean isWritetimeSelectorFactory()
{
return false;
}
/**
* Checks if this factory creates <code>TTL</code> selectors instances.
*
* @return <code>true</code> if this factory creates <code>TTL</code> selectors instances,
* <code>false</code> otherwise
*/
public boolean isTTLSelectorFactory()
{
return false;
}
/**
* Returns the name of the column corresponding to the output value of the selector instances created by
* this factory.
*
* @return a column name
*/
protected abstract String getColumnName();
/**
* Returns the type of the values returned by the selector instances created by this factory.
*
* @return the selector output type
*/
protected abstract AbstractType<?> getReturnType();
}
/**
* Add the current value from the specified <code>ResultSetBuilder</code>.
*
* @param protocolVersion protocol version used for serialization
* @param rs the <code>ResultSetBuilder</code>
* @throws InvalidRequestException if a problem occurs while add the input value
*/
public abstract void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException;
/**
* Returns the selector output.
*
* @param protocolVersion protocol version used for serialization
* @return the selector output
* @throws InvalidRequestException if a problem occurs while computing the output value
*/
public abstract ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException;
/**
* Returns the <code>Selector</code> output type.
*
* @return the <code>Selector</code> output type.
*/
public abstract AbstractType<?> getType();
/**
* Checks if this <code>Selector</code> is creating aggregates.
*
* @return <code>true</code> if this <code>Selector</code> is creating aggregates <code>false</code>
* otherwise.
*/
public boolean isAggregate()
{
return false;
}
/**
* Reset the internal state of this <code>Selector</code>.
*/
public abstract void reset();
public AssignmentTestable.TestResult testAssignment(String keyspace, ColumnSpecification receiver)
{
if (receiver.type.equals(getType()))
return AssignmentTestable.TestResult.EXACT_MATCH;
else if (receiver.type.isValueCompatibleWith(getType()))
return AssignmentTestable.TestResult.WEAKLY_ASSIGNABLE;
else
return AssignmentTestable.TestResult.NOT_ASSIGNABLE;
}
}

View File

@@ -1,189 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.selection;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.selection.Selector.Factory;
import org.apache.cassandra.exceptions.InvalidRequestException;
/**
* A set of <code>Selector</code> factories.
*/
final class SelectorFactories implements Iterable<Selector.Factory>
{
/**
* The <code>Selector</code> factories.
*/
private final List<Selector.Factory> factories;
/**
* <code>true</code> if one of the factory creates writetime selectors.
*/
private boolean containsWritetimeFactory;
/**
* <code>true</code> if one of the factory creates TTL selectors.
*/
private boolean containsTTLFactory;
/**
* The number of factories creating aggregates.
*/
private int numberOfAggregateFactories;
/**
* Creates a new <code>SelectorFactories</code> instance and collect the column definitions.
*
* @param selectables the <code>Selectable</code>s for which the factories must be created
* @param cfm the Column Family Definition
* @param defs the collector parameter for the column definitions
* @return a new <code>SelectorFactories</code> instance
* @throws InvalidRequestException if a problem occurs while creating the factories
*/
public static SelectorFactories createFactoriesAndCollectColumnDefinitions(List<Selectable> selectables,
CFMetaData cfm,
List<ColumnDefinition> defs)
throws InvalidRequestException
{
return new SelectorFactories(selectables, cfm, defs);
}
private SelectorFactories(List<Selectable> selectables,
CFMetaData cfm,
List<ColumnDefinition> defs)
throws InvalidRequestException
{
factories = new ArrayList<>(selectables.size());
for (Selectable selectable : selectables)
{
Factory factory = selectable.newSelectorFactory(cfm, defs);
containsWritetimeFactory |= factory.isWritetimeSelectorFactory();
containsTTLFactory |= factory.isTTLSelectorFactory();
if (factory.isAggregateSelectorFactory())
++numberOfAggregateFactories;
factories.add(factory);
}
}
public boolean usesFunction(String ksName, String functionName)
{
for (Factory factory : factories)
if (factory != null && factory.usesFunction(ksName, functionName))
return true;
return false;
}
/**
* Adds a new <code>Selector.Factory</code> for a column that is needed only for ORDER BY purposes.
* @param def the column that is needed for ordering
* @param index the index of the column definition in the Selection's list of columns
*/
public void addSelectorForOrdering(ColumnDefinition def, int index)
{
factories.add(SimpleSelector.newFactory(def.name.toString(), index, def.type));
}
/**
* Checks if this <code>SelectorFactories</code> contains only factories for aggregates.
*
* @return <code>true</code> if this <code>SelectorFactories</code> contains only factories for aggregates,
* <code>false</code> otherwise.
*/
public boolean containsOnlyAggregateFunctions()
{
int size = factories.size();
return size != 0 && numberOfAggregateFactories == size;
}
/**
* Whether the selector built by this factory does aggregation or not (either directly or in a sub-selector).
*
* @return <code>true</code> if the selector built by this factor does aggregation, <code>false</code> otherwise.
*/
public boolean doesAggregation()
{
return numberOfAggregateFactories > 0;
}
/**
* Checks if this <code>SelectorFactories</code> contains at least one factory for writetime selectors.
*
* @return <code>true</code> if this <code>SelectorFactories</code> contains at least one factory for writetime
* selectors, <code>false</code> otherwise.
*/
public boolean containsWritetimeSelectorFactory()
{
return containsWritetimeFactory;
}
/**
* Checks if this <code>SelectorFactories</code> contains at least one factory for TTL selectors.
*
* @return <code>true</code> if this <code>SelectorFactories</code> contains at least one factory for TTL
* selectors, <code>false</code> otherwise.
*/
public boolean containsTTLSelectorFactory()
{
return containsTTLFactory;
}
/**
* Creates a list of new <code>Selector</code> instances.
* @return a list of new <code>Selector</code> instances.
*/
public List<Selector> newInstances() throws InvalidRequestException
{
List<Selector> selectors = new ArrayList<>(factories.size());
for (Selector.Factory factory : factories)
{
selectors.add(factory.newInstance());
}
return selectors;
}
public Iterator<Factory> iterator()
{
return factories.iterator();
}
/**
* Returns the names of the columns corresponding to the output values of the selector instances created by
* these factories.
*
* @return a list of column names
*/
public List<String> getColumnNames()
{
return Lists.transform(factories, new Function<Selector.Factory, String>()
{
public String apply(Selector.Factory factory)
{
return factory.getColumnName();
}
});
}
}

View File

@@ -1,93 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.cql3.selection;
import java.nio.ByteBuffer;
import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
public final class SimpleSelector extends Selector
{
private final String columnName;
private final int idx;
private final AbstractType<?> type;
private ByteBuffer current;
public static Factory newFactory(final String columnName, final int idx, final AbstractType<?> type)
{
return new Factory()
{
@Override
protected String getColumnName()
{
return columnName;
}
@Override
protected AbstractType<?> getReturnType()
{
return type;
}
@Override
public Selector newInstance()
{
return new SimpleSelector(columnName, idx, type);
}
};
}
@Override
public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException
{
current = rs.current.get(idx);
}
@Override
public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException
{
return current;
}
@Override
public void reset()
{
current = null;
}
@Override
public AbstractType<?> getType()
{
return type;
}
@Override
public String toString()
{
return columnName;
}
private SimpleSelector(String columnName, int idx, AbstractType<?> type)
{
this.columnName = columnName;
this.idx = idx;
this.type = type;
}
}

View File

@@ -34,37 +34,33 @@ namespace selection {
class raw_selector {
public:
const shared_ptr<selectable::raw> selectable_;
const shared_ptr<column_identifier> alias;
const ::shared_ptr<selectable::raw> selectable_;
const ::shared_ptr<column_identifier> alias;
raw_selector(shared_ptr<selectable::raw> selectable__, shared_ptr<column_identifier> alias_)
: selectable_{selectable__}
, alias{alias_}
{ }
#if 0
/**
* Converts the specified list of <code>RawSelector</code>s into a list of <code>Selectable</code>s.
*
* @param raws the <code>RawSelector</code>s to converts.
* @return a list of <code>Selectable</code>s
*/
public static List<Selectable> toSelectables(List<RawSelector> raws, final CFMetaData cfm)
{
return Lists.transform(raws, new Function<RawSelector, Selectable>()
{
public Selectable apply(RawSelector raw)
{
return raw.selectable.prepare(cfm);
}
});
static std::vector<::shared_ptr<selectable>> to_selectables(const std::vector<::shared_ptr<raw_selector>>& raws,
schema_ptr schema) {
std::vector<::shared_ptr<selectable>> r;
r.reserve(raws.size());
for (auto&& raw : raws) {
r.emplace_back(raw->selectable_->prepare(schema));
}
return r;
}
public boolean processesSelection()
{
return selectable.processesSelection();
bool processes_selection() const {
return selectable_->processes_selection();
}
#endif
};
}

View File

@@ -26,8 +26,9 @@
#ifndef CQL3_SELECTION_SELECTABLE_HH
#define CQL3_SELECTION_SELECTABLE_HH
#include "database.hh"
#include "schema.hh"
#include "core/shared_ptr.hh"
#include "cql3/selection/selector.hh"
namespace cql3 {
@@ -51,24 +52,22 @@ import org.apache.commons.lang3.text.StrBuilder;
class selectable {
public:
#if 0
public abstract Selector.Factory newSelectorFactory(CFMetaData cfm, List<ColumnDefinition> defs)
throws InvalidRequestException;
protected static int addAndGetIndex(ColumnDefinition def, List<ColumnDefinition> l)
{
int idx = l.indexOf(def);
if (idx < 0)
{
idx = l.size();
l.add(def);
virtual ~selectable() {}
virtual ::shared_ptr<selector::factory> new_selector_factory(schema_ptr schema, std::vector<const column_definition*>& defs) = 0;
protected:
static size_t add_and_get_index(const column_definition& def, std::vector<const column_definition*>& defs) {
auto i = std::find(defs.begin(), defs.end(), &def);
if (i != defs.end()) {
return std::distance(defs.begin(), i);
}
return idx;
defs.push_back(&def);
return defs.size() - 1;
}
#endif
public:
class raw {
public:
virtual ~raw() {}
virtual ::shared_ptr<selectable> prepare(schema_ptr s) = 0;
/**

216
cql3/selection/selection.cc Normal file
View File

@@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#include "cql3/selection/selection.hh"
#include "cql3/selection/selector_factories.hh"
namespace cql3 {
namespace selection {
// Special cased selection for when no function is used (this save some allocations).
class simple_selection : public selection {
private:
const bool _is_wildcard;
public:
static ::shared_ptr<simple_selection> make(schema_ptr schema, std::vector<const column_definition*> columns, bool is_wildcard) {
std::vector<::shared_ptr<column_specification>> metadata;
metadata.reserve(columns.size());
for (auto&& col : columns) {
metadata.emplace_back(col->column_specification);
}
return ::make_shared<simple_selection>(schema, std::move(columns), std::move(metadata), is_wildcard);
}
/*
* In theory, even a simple selection could have multiple time the same column, so we
* could filter those duplicate out of columns. But since we're very unlikely to
* get much duplicate in practice, it's more efficient not to bother.
*/
simple_selection(schema_ptr schema, std::vector<const column_definition*> columns,
std::vector<::shared_ptr<column_specification>> metadata, bool is_wildcard)
: selection(schema, std::move(columns), std::move(metadata), false, false)
, _is_wildcard(is_wildcard)
{ }
virtual bool is_wildcard() const override { return _is_wildcard; }
virtual bool is_aggregate() const override { return false; }
protected:
class simple_selectors : public selectors {
private:
std::vector<bytes_opt> _current;
public:
virtual void reset() override {
_current.clear();
}
virtual std::vector<bytes_opt> get_output_row(int32_t protocol_version) override {
return std::move(_current);
}
virtual void add_input_row(int32_t protocol_version, result_set_builder& rs) override {
_current = std::move(rs.current);
}
virtual bool is_aggregate() {
return false;
}
};
std::unique_ptr<selectors> new_selectors() {
return std::make_unique<simple_selectors>();
}
};
class selection_with_processing : public selection {
private:
::shared_ptr<selector_factories> _factories;
public:
selection_with_processing(schema_ptr schema, std::vector<const column_definition*> columns,
std::vector<::shared_ptr<column_specification>> metadata, ::shared_ptr<selector_factories> factories)
: selection(schema, std::move(columns), std::move(metadata),
factories->contains_write_time_selector_factory(),
factories->contains_ttl_selector_factory())
, _factories(std::move(factories))
{
if (_factories->does_aggregation() && !_factories->contains_only_aggregate_functions()) {
throw exceptions::invalid_request_exception("the select clause must either contains only aggregates or none");
}
}
virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const override {
return _factories->uses_function(ks_name, function_name);
}
virtual uint32_t add_column_for_ordering(const column_definition& c) override {
uint32_t index = selection::add_column_for_ordering(c);
_factories->add_selector_for_ordering(c, index);
return index;
}
virtual bool is_aggregate() const override {
return _factories->contains_only_aggregate_functions();
}
protected:
class selectors_with_processing : public selectors {
private:
::shared_ptr<selector_factories> _factories;
std::vector<::shared_ptr<selector>> _selectors;
public:
selectors_with_processing(::shared_ptr<selector_factories> factories)
: _factories(std::move(factories))
, _selectors(_factories->new_instances())
{ }
virtual void reset() override {
for (auto&& s : _selectors) {
s->reset();
}
}
virtual bool is_aggregate() override {
return _factories->contains_only_aggregate_functions();
}
virtual std::vector<bytes_opt> get_output_row(int32_t protocol_version) override {
std::vector<bytes_opt> output_row;
output_row.reserve(_selectors.size());
for (auto&& s : _selectors) {
output_row.emplace_back(s->get_output(protocol_version));
}
return output_row;
}
virtual void add_input_row(int32_t protocol_version, result_set_builder& rs) {
for (auto&& s : _selectors) {
s->add_input(protocol_version, rs);
}
}
};
std::unique_ptr<selectors> new_selectors() {
return std::make_unique<selectors_with_processing>(_factories);
}
};
::shared_ptr<selection> selection::wildcard(schema_ptr schema) {
return simple_selection::make(schema, column_definition::vectorize(schema->all_columns_in_select_order()), true);
}
::shared_ptr<selection> selection::for_columns(schema_ptr schema, std::vector<const column_definition*> columns) {
return simple_selection::make(schema, std::move(columns), false);
}
uint32_t selection::add_column_for_ordering(const column_definition& c) {
_columns.push_back(&c);
_metadata->add_non_serialized_column(c.column_specification);
return _columns.size() - 1;
}
::shared_ptr<selection> selection::from_selectors(schema_ptr schema, const std::vector<::shared_ptr<raw_selector>>& raw_selectors) {
std::vector<const column_definition*> defs;
::shared_ptr<selector_factories> factories =
selector_factories::create_factories_and_collect_column_definitions(
raw_selector::to_selectables(raw_selectors, schema), schema, defs);
auto metadata = collect_metadata(schema, raw_selectors, *factories);
if (processes_selection(raw_selectors)) {
return ::make_shared<selection_with_processing>(schema, std::move(defs), std::move(metadata), std::move(factories));
} else {
return ::make_shared<simple_selection>(schema, std::move(defs), std::move(metadata), false);
}
}
std::vector<::shared_ptr<column_specification>>
selection::collect_metadata(schema_ptr schema, const std::vector<::shared_ptr<raw_selector>>& raw_selectors,
const selector_factories& factories) {
std::vector<::shared_ptr<column_specification>> r;
r.reserve(raw_selectors.size());
auto i = raw_selectors.begin();
for (auto&& factory : factories) {
::shared_ptr<column_specification> col_spec = factory->get_column_specification(schema);
::shared_ptr<column_identifier> alias = (*i++)->alias;
r.push_back(alias ? col_spec->with_alias(alias) : col_spec);
}
return r;
}
result_set_builder::result_set_builder(selection& s, db_clock::time_point now, int32_t protocol_version)
: _result_set(std::make_unique<result_set>(::make_shared<metadata>(*(s.get_result_metadata()))))
, _selectors(s.new_selectors())
, _now(now)
, _protocol_version(protocol_version)
{
if (s._collect_timestamps) {
_timestamps.resize(s._columns.size(), 0);
}
if (s._collect_TTLs) {
_ttls.resize(s._columns.size(), 0);
}
}
}
}

304
cql3/selection/selection.hh Normal file
View File

@@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include "database.hh"
#include "schema.hh"
#include "cql3/column_specification.hh"
#include "exceptions/exceptions.hh"
#include "cql3/result_set.hh"
#include "cql3/selection/raw_selector.hh"
#include "cql3/selection/selector_factories.hh"
#include "unimplemented.hh"
namespace cql3 {
namespace selection {
class selectors {
public:
virtual ~selectors() {}
virtual bool is_aggregate() = 0;
/**
* Adds the current row of the specified <code>ResultSetBuilder</code>.
*
* @param rs the <code>ResultSetBuilder</code>
* @throws InvalidRequestException
*/
virtual void add_input_row(int32_t protocol_version, result_set_builder& rs) = 0;
virtual std::vector<bytes_opt> get_output_row(int32_t protocol_version) = 0;
virtual void reset() = 0;
};
class selection {
private:
schema_ptr _schema;
std::vector<const column_definition*> _columns;
::shared_ptr<metadata> _metadata;
const bool _collect_timestamps;
const bool _collect_TTLs;
protected:
selection(schema_ptr schema, std::vector<const column_definition*> columns, std::vector<::shared_ptr<column_specification>> metadata_,
bool collect_timestamps, bool collect_TTLs)
: _schema(std::move(schema))
, _columns(std::move(columns))
, _metadata(::make_shared<metadata>(std::move(metadata_)))
, _collect_timestamps(collect_timestamps)
, _collect_TTLs(collect_TTLs)
{ }
virtual ~selection() {}
public:
// Overriden by SimpleSelection when appropriate.
virtual bool is_wildcard() const {
return false;
}
/**
* Checks if this selection contains static columns.
* @return <code>true</code> if this selection contains static columns, <code>false</code> otherwise;
*/
bool contains_static_columns() const {
if (!_schema->has_static_columns()) {
return false;
}
if (is_wildcard()) {
return true;
}
return std::any_of(_columns.begin(), _columns.end(), [] (auto&& def) { return def->is_static(); });
}
/**
* Checks if this selection contains only static columns.
* @return <code>true</code> if this selection contains only static columns, <code>false</code> otherwise;
*/
bool contains_only_static_columns() const {
if (!contains_static_columns()) {
return false;
}
if (is_wildcard()) {
return false;
}
for (auto&& def : _columns) {
if (!def->is_partition_key() && !def->is_static()) {
return false;
}
}
return true;
}
/**
* Checks if this selection contains a collection.
*
* @return <code>true</code> if this selection contains a collection, <code>false</code> otherwise.
*/
bool contains_a_collection() const {
if (!_schema->has_collections()) {
return false;
}
return std::any_of(_columns.begin(), _columns.end(), [] (auto&& def) {
return def->type->is_collection() && def->type->is_multi_cell();
});
}
/**
* Returns the index of the specified column.
*
* @param def the column definition
* @return the index of the specified column
*/
int32_t index_of(const column_definition& def) const {
auto i = std::find(_columns.begin(), _columns.end(), &def);
if (i == _columns.end()) {
return -1;
}
return std::distance(_columns.begin(), i);
}
bool has_column(const column_definition& def) const {
return std::find(_columns.begin(), _columns.end(), &def) != _columns.end();
}
::shared_ptr<metadata> get_result_metadata() {
return _metadata;
}
static ::shared_ptr<selection> wildcard(schema_ptr schema);
static ::shared_ptr<selection> for_columns(schema_ptr schema, std::vector<const column_definition*> columns);
virtual uint32_t add_column_for_ordering(const column_definition& c);
virtual bool uses_function(const sstring &ks_name, const sstring& function_name) const {
return false;
}
private:
static bool processes_selection(const std::vector<::shared_ptr<raw_selector>>& raw_selectors) {
return std::any_of(raw_selectors.begin(), raw_selectors.end(),
[] (auto&& s) { return s->processes_selection(); });
}
static std::vector<::shared_ptr<column_specification>> collect_metadata(schema_ptr schema,
const std::vector<::shared_ptr<raw_selector>>& raw_selectors, const selector_factories& factories);
public:
static ::shared_ptr<selection> from_selectors(schema_ptr schema, const std::vector<::shared_ptr<raw_selector>>& raw_selectors);
virtual std::unique_ptr<selectors> new_selectors() = 0;
/**
* Returns a range of CQL3 columns this selection needs.
*/
auto const& get_columns() {
return _columns;
}
uint32_t get_column_count() {
return _columns.size();
}
::shared_ptr<result_set_builder> make_result_set_builder(db_clock::time_point now, int32_t protocol_version) {
return ::make_shared<result_set_builder>(*this, now, protocol_version);
}
virtual bool is_aggregate() const = 0;
/**
* Checks that selectors are either all aggregates or that none of them is.
*
* @param selectors the selectors to test.
* @param messageTemplate the error message template
* @param messageArgs the error message arguments
* @throws InvalidRequestException if some of the selectors are aggregate but not all of them
*/
template<typename... Args>
static void validate_selectors(const std::vector<::shared_ptr<selector>>& selectors, const sstring& msg, Args&&... args) {
int32_t aggregates = 0;
for (auto&& s : selectors) {
if (s->is_aggregate()) {
++aggregates;
}
}
if (aggregates != 0 && aggregates != selectors.size()) {
throw exceptions::invalid_request_exception(sprint(msg, std::forward<Args>(args)...));
}
}
friend class result_set_builder;
};
class result_set_builder {
private:
std::unique_ptr<result_set> _result_set;
std::unique_ptr<selectors> _selectors;
public:
std::vector<bytes_opt> current;
private:
std::vector<api::timestamp_type> _timestamps;
std::vector<int32_t> _ttls;
const db_clock::time_point _now;
int32_t _protocol_version;
public:
result_set_builder(selection& s, db_clock::time_point now, int32_t protocol_version);
void add_empty() {
current.emplace_back();
}
void add(bytes_opt value) {
current.emplace_back(std::move(value));
}
void add(const column_definition& def, atomic_cell::view c) {
current.emplace_back(get_value(def.type, c));
if (!_timestamps.empty()) {
_timestamps[current.size() - 1] = c.is_dead() ? api::min_timestamp : c.timestamp();
}
if (!_ttls.empty()) {
gc_clock::duration ttl(-1);
if (c.is_live_and_has_ttl()) {
ttl = *c.ttl() - to_gc_clock(_now);
}
_ttls[current.size() - 1] = ttl.count();
}
#if 0
List<Cell> cells = row.getMultiCellColumn(def.name);
ByteBuffer buffer = cells == null
? null
: ((CollectionType)def.type).serializeForNativeProtocol(cells, options.getProtocolVersion());
result.add(buffer);
#endif
}
void new_row() {
if (!current.empty()) {
_selectors->add_input_row(_protocol_version, *this);
if (!_selectors->is_aggregate()) {
_result_set->add_row(_selectors->get_output_row(_protocol_version));
_selectors->reset();
}
current.clear();
}
}
std::unique_ptr<result_set> build() {
if (!current.empty()) {
_selectors->add_input_row(_protocol_version, *this);
_result_set->add_row(_selectors->get_output_row(_protocol_version));
_selectors->reset();
current.clear();
}
if (_result_set->empty() && _selectors->is_aggregate()) {
_result_set->add_row(_selectors->get_output_row(_protocol_version));
}
return std::move(_result_set);
}
private:
bytes_opt get_value(data_type t, atomic_cell::view c) {
if (c.is_dead()) {
return {};
}
if (t->is_counter()) {
fail(unimplemented::cause::COUNTERS);
#if 0
ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
#endif
}
return {to_bytes(c.value())};
}
};
}
}

181
cql3/selection/selector.hh Normal file
View File

@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include <vector>
#include "cql3/assignment_testable.hh"
#include "types.hh"
#include "schema.hh"
namespace cql3 {
namespace selection {
class result_set_builder;
/**
* A <code>selector</code> is used to convert the data returned by the storage engine into the data requested by the
* user. They correspond to the &lt;selector&gt; elements from the select clause.
* <p>Since the introduction of aggregation, <code>selector</code>s cannot be called anymore by multiple threads
* as they have an internal state.</p>
*/
class selector : public assignment_testable {
public:
class factory;
virtual ~selector() {}
/**
* Add the current value from the specified <code>result_set_builder</code>.
*
* @param protocol_version protocol version used for serialization
* @param rs the <code>result_set_builder</code>
* @throws InvalidRequestException if a problem occurs while add the input value
*/
virtual void add_input(int32_t protocol_version, result_set_builder& rs) = 0;
/**
* Returns the selector output.
*
* @param protocol_version protocol version used for serialization
* @return the selector output
* @throws InvalidRequestException if a problem occurs while computing the output value
*/
virtual bytes_opt get_output(int32_t protocol_version) = 0;
/**
* Returns the <code>selector</code> output type.
*
* @return the <code>selector</code> output type.
*/
virtual data_type get_type() = 0;
/**
* Checks if this <code>selector</code> is creating aggregates.
*
* @return <code>true</code> if this <code>selector</code> is creating aggregates <code>false</code>
* otherwise.
*/
virtual bool is_aggregate() {
return false;
}
/**
* Reset the internal state of this <code>selector</code>.
*/
virtual void reset() = 0;
virtual assignment_testable::test_result test_assignment(const sstring& keyspace, ::shared_ptr<column_specification> receiver) override {
if (receiver->type == get_type()) {
return assignment_testable::test_result::EXACT_MATCH;
} else if (receiver->type->is_value_compatible_with(*get_type())) {
return assignment_testable::test_result::WEAKLY_ASSIGNABLE;
} else {
return assignment_testable::test_result::NOT_ASSIGNABLE;
}
}
};
/**
* A factory for <code>selector</code> instances.
*/
class selector::factory {
public:
virtual ~factory() {}
virtual bool uses_function(const sstring& ks_name, const sstring& function_name) {
return false;
}
/**
* Returns the column specification corresponding to the output value of the selector instances created by
* this factory.
*
* @param schema the column family schema
* @return a column specification
*/
::shared_ptr<column_specification> get_column_specification(schema_ptr schema) {
return ::make_shared<column_specification>(schema->ks_name,
schema->cf_name,
::make_shared<column_identifier>(column_name(), true),
get_return_type());
}
/**
* Creates a new <code>selector</code> instance.
*
* @return a new <code>selector</code> instance
*/
virtual ::shared_ptr<selector> new_instance() = 0;
/**
* Checks if this factory creates selectors instances that creates aggregates.
*
* @return <code>true</code> if this factory creates selectors instances that creates aggregates,
* <code>false</code> otherwise
*/
virtual bool is_aggregate_selector_factory() {
return false;
}
/**
* Checks if this factory creates <code>writetime</code> selectors instances.
*
* @return <code>true</code> if this factory creates <code>writetime</code> selectors instances,
* <code>false</code> otherwise
*/
virtual bool is_write_time_selector_factory() {
return false;
}
/**
* Checks if this factory creates <code>TTL</code> selectors instances.
*
* @return <code>true</code> if this factory creates <code>TTL</code> selectors instances,
* <code>false</code> otherwise
*/
virtual bool is_ttl_selector_factory() {
return false;
}
/**
* Returns the name of the column corresponding to the output value of the selector instances created by
* this factory.
*
* @return a column name
*/
virtual const sstring& column_name() = 0;
/**
* Returns the type of the values returned by the selector instances created by this factory.
*
* @return the selector output type
*/
virtual data_type get_return_type() = 0;
};
}
}

View File

@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#include "cql3/selection/selector_factories.hh"
#include "cql3/selection/simple_selector.hh"
#include "cql3/selection/selectable.hh"
namespace cql3 {
namespace selection {
selector_factories::selector_factories(std::vector<::shared_ptr<selectable>> selectables, schema_ptr schema,
std::vector<const column_definition*>& defs)
: _contains_write_time_factory(false)
, _contains_ttl_factory(false)
, _number_of_aggregate_factories(0)
{
_factories.reserve(selectables.size());
for (auto&& selectable : selectables) {
auto factory = selectable->new_selector_factory(schema, defs);
_contains_write_time_factory |= factory->is_write_time_selector_factory();
_contains_ttl_factory |= factory->is_ttl_selector_factory();
if (factory->is_aggregate_selector_factory()) {
++_number_of_aggregate_factories;
}
_factories.emplace_back(std::move(factory));
}
}
bool selector_factories::uses_function(const sstring& ks_name, const sstring& function_name) const {
for (auto&& f : _factories) {
if (f && f->uses_function(ks_name, function_name)) {
return true;
}
}
return false;
}
void selector_factories::add_selector_for_ordering(const column_definition& def, uint32_t index) {
_factories.emplace_back(simple_selector::new_factory(def.name_as_text(), index, def.type));
}
std::vector<::shared_ptr<selector>> selector_factories::new_instances() const {
std::vector<::shared_ptr<selector>> r;
r.reserve(_factories.size());
for (auto&& f : _factories) {
r.emplace_back(f->new_instance());
}
return r;
}
std::vector<sstring> selector_factories::get_column_names() const {
std::vector<sstring> r;
r.reserve(_factories.size());
std::transform(_factories.begin(), _factories.end(), std::back_inserter(r), [] (auto&& f) {
return f->column_name();
});
return r;
}
}
}

View File

@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include <vector>
#include "cql3/selection/selector.hh"
#include "cql3/selection/selectable.hh"
namespace cql3 {
namespace selection {
/**
* A set of <code>selector</code> factories.
*/
class selector_factories {
private:
/**
* The <code>Selector</code> factories.
*/
std::vector<::shared_ptr<selector::factory>> _factories;
/**
* <code>true</code> if one of the factory creates writetime selectors.
*/
bool _contains_write_time_factory;
/**
* <code>true</code> if one of the factory creates TTL selectors.
*/
bool _contains_ttl_factory;
/**
* The number of factories creating aggregates.
*/
uint32_t _number_of_aggregate_factories;
public:
/**
* Creates a new <code>SelectorFactories</code> instance and collect the column definitions.
*
* @param selectables the <code>Selectable</code>s for which the factories must be created
* @param cfm the Column Family Definition
* @param defs the collector parameter for the column definitions
* @return a new <code>SelectorFactories</code> instance
* @throws InvalidRequestException if a problem occurs while creating the factories
*/
static ::shared_ptr<selector_factories> create_factories_and_collect_column_definitions(
std::vector<::shared_ptr<selectable>> selectables,
schema_ptr schema,
std::vector<const column_definition*>& defs) {
return ::make_shared<selector_factories>(std::move(selectables), std::move(schema), defs);
}
selector_factories(std::vector<::shared_ptr<selectable>> selectables, schema_ptr schema, std::vector<const column_definition*>& defs);
public:
bool uses_function(const sstring& ks_name, const sstring& function_name) const;
/**
* Adds a new <code>Selector.Factory</code> for a column that is needed only for ORDER BY purposes.
* @param def the column that is needed for ordering
* @param index the index of the column definition in the Selection's list of columns
*/
void add_selector_for_ordering(const column_definition& def, uint32_t index);
/**
* Checks if this <code>SelectorFactories</code> contains only factories for aggregates.
*
* @return <code>true</code> if this <code>SelectorFactories</code> contains only factories for aggregates,
* <code>false</code> otherwise.
*/
bool contains_only_aggregate_functions() const {
auto size = _factories.size();
return size != 0 && _number_of_aggregate_factories == size;
}
/**
* Whether the selector built by this factory does aggregation or not (either directly or in a sub-selector).
*
* @return <code>true</code> if the selector built by this factor does aggregation, <code>false</code> otherwise.
*/
bool does_aggregation() const {
return _number_of_aggregate_factories > 0;
}
/**
* Checks if this <code>SelectorFactories</code> contains at least one factory for writetime selectors.
*
* @return <code>true</code> if this <code>SelectorFactories</code> contains at least one factory for writetime
* selectors, <code>false</code> otherwise.
*/
bool contains_write_time_selector_factory() const {
return _contains_write_time_factory;
}
/**
* Checks if this <code>SelectorFactories</code> contains at least one factory for TTL selectors.
*
* @return <code>true</code> if this <code>SelectorFactories</code> contains at least one factory for TTL
* selectors, <code>false</code> otherwise.
*/
bool contains_ttl_selector_factory() const {
return _contains_ttl_factory;
}
/**
* Creates a list of new <code>selector</code> instances.
* @return a list of new <code>selector</code> instances.
*/
std::vector<::shared_ptr<selector>> new_instances() const;
auto begin() const {
return _factories.begin();
}
auto end() const {
return _factories.end();
}
/**
* Returns the names of the columns corresponding to the output values of the selector instances created by
* these factories.
*
* @return a list of column names
*/
std::vector<sstring> get_column_names() const;
};
}
}

View File

@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#include "cql3/selection/simple_selector.hh"
namespace cql3 {
namespace selection {
::shared_ptr<selector>
simple_selector_factory::new_instance() {
return ::make_shared<simple_selector>(_column_name, _idx, _type);
}
}
}

View File

@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include "cql3/selection/selection.hh"
#include "cql3/selection/selector.hh"
namespace cql3 {
namespace selection {
class simple_selector_factory : public selector::factory {
private:
const sstring _column_name;
const uint32_t _idx;
data_type _type;
public:
simple_selector_factory(const sstring& column_name, uint32_t idx, data_type type)
: _column_name(std::move(column_name))
, _idx(idx)
, _type(type)
{ }
virtual const sstring& column_name() override {
return _column_name;
}
virtual data_type get_return_type() override {
return _type;
}
virtual ::shared_ptr<selector> new_instance() override;
};
class simple_selector : public selector {
private:
const sstring _column_name;
const uint32_t _idx;
data_type _type;
bytes_opt _current;
public:
static ::shared_ptr<factory> new_factory(const sstring& column_name, uint32_t idx, data_type type) {
return ::make_shared<simple_selector_factory>(column_name, idx, type);
}
simple_selector(const sstring& column_name, uint32_t idx, data_type type)
: _column_name(std::move(column_name))
, _idx(idx)
, _type(type)
{ }
virtual void add_input(int32_t protocol_version, result_set_builder& rs) override {
// TODO: can we steal it?
_current = rs.current[_idx];
}
virtual bytes_opt get_output(int32_t protocol_version) override {
return std::move(_current);
}
virtual void reset() override {
_current = {};
}
virtual data_type get_type() override {
return _type;
}
#if 0
@Override
public String toString()
{
return columnName;
}
#endif
};
}
}

View File

@@ -14,6 +14,7 @@
#include "types.hh"
#include "tuple.hh"
#include "gc_clock.hh"
#include "unimplemented.hh"
using column_id = uint32_t;
@@ -149,6 +150,10 @@ public:
bool is_last_partition_key(column_definition& def) {
return &_partition_key[_partition_key.size() - 1] == &def;
}
bool has_collections() {
warn(unimplemented::cause::COLLECTIONS);
return false; // FIXME
}
bool has_static_columns() {
return !_static_columns.empty();
}