service: Convert StorageProxy

This commit is contained in:
Tomasz Grabiec
2015-01-23 21:00:00 +01:00
parent 09a893aed4
commit 58677dc911
4 changed files with 143 additions and 87 deletions

View File

@@ -235,6 +235,7 @@ deps = {
'utils/murmur_hash.cc',
'utils/uuid.cc',
'types.cc',
'service/storage_proxy.cc',
'db/db.cc',
'io/io.cc',
'utils/utils.cc',

View File

@@ -15,58 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
import com.google.common.base.Predicate;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.*;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.metrics.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
#include "db/consistency_level.hh"
#include "storage_proxy.hh"
#include "unimplemented.hh"
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.paxos.*;
import org.apache.cassandra.sink.SinkManager;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.triggers.TriggerExecutor;
import org.apache.cassandra.utils.*;
namespace service {
public class StorageProxy implements StorageProxyMBean
{
#if 0
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node
@@ -500,19 +462,22 @@ public class StorageProxy implements StorageProxyMBean
if (shouldBlock)
responseHandler.get();
}
#endif
/**
* Use this method to have these Mutations applied
* across all replicas. This method will take care
* of the possibility of a replica being down and hint
* the data across to some other replica.
*
* @param mutations the mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level)
throws UnavailableException, OverloadedException, WriteTimeoutException
{
/**
* Use this method to have these Mutations applied
* across all replicas. This method will take care
* of the possibility of a replica being down and hint
* the data across to some other replica.
*
* @param mutations the mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
future<>
storage_proxy::mutate(std::vector<api::mutation> mutations, db::consistency_level cl) {
throw std::runtime_error("NOT IMPLEMENTED");
#if 0
Tracing.trace("Determining replicas for mutation");
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -590,36 +555,41 @@ public class StorageProxy implements StorageProxyMBean
{
writeMetrics.addNano(System.nanoTime() - startTime);
}
}
#endif
}
@SuppressWarnings("unchecked")
public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
ConsistencyLevel consistencyLevel,
boolean mutateAtomically)
throws WriteTimeoutException, UnavailableException, OverloadedException, InvalidRequestException
{
future<>
storage_proxy::mutate_with_triggers(std::vector<api::mutation> mutations, db::consistency_level cl,
bool should_mutate_atomically) {
unimplemented::triggers();
#if 0
Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
if (augmented != null)
mutateAtomically(augmented, consistencyLevel);
else if (mutateAtomically)
mutateAtomically((Collection<Mutation>) mutations, consistencyLevel);
else
mutate(mutations, consistencyLevel);
if (augmented != null) {
return mutate_atomically(augmented, consistencyLevel);
} else {
#endif
if (should_mutate_atomically) {
return mutate_atomically(std::move(mutations), cl);
}
return mutate(std::move(mutations), cl);
#if 0
}
#endif
}
/**
* See mutate. Adds additional steps before and after writing a batch.
* Before writing the batch (but after doing availability check against the FD for the row replicas):
* write the entire batch to a batchlog elsewhere in the cluster.
* After: remove the batchlog entry (after writing hints for the batch rows, if necessary).
*
* @param mutations the Mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
public static void mutateAtomically(Collection<Mutation> mutations, ConsistencyLevel consistency_level)
throws UnavailableException, OverloadedException, WriteTimeoutException
{
/**
* See mutate. Adds additional steps before and after writing a batch.
* Before writing the batch (but after doing availability check against the FD for the row replicas):
* write the entire batch to a batchlog elsewhere in the cluster.
* After: remove the batchlog entry (after writing hints for the batch rows, if necessary).
*
* @param mutations the Mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
future<>
storage_proxy::mutate_atomically(std::vector<api::mutation> mutations, db::consistency_level cl) {
unimplemented::lwt();
#if 0
Tracing.trace("Determining replicas for atomic batch");
long startTime = System.nanoTime();
@@ -666,8 +636,10 @@ public class StorageProxy implements StorageProxyMBean
{
writeMetrics.addNano(System.nanoTime() - startTime);
}
}
#endif
}
#if 0
private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid)
throws WriteTimeoutException
{
@@ -2307,4 +2279,6 @@ public class StorageProxy implements StorageProxyMBean
public long getReadRepairRepairedBackground() {
return ReadRepairMetrics.repairedBackground.count();
}
#endif
}

59
service/storage_proxy.hh Normal file
View File

@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
#pragma once
#include "db/api.hh"
namespace service {
class storage_proxy /*implements StorageProxyMBean*/ {
public:
/**
* Use this method to have these Mutations applied
* across all replicas. This method will take care
* of the possibility of a replica being down and hint
* the data across to some other replica.
*
* @param mutations the mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
static future<> mutate(std::vector<api::mutation> mutations, db::consistency_level cl);
static future<> mutate_with_triggers(std::vector<api::mutation> mutations,
db::consistency_level cl, bool should_mutate_atomically);
/**
* See mutate. Adds additional steps before and after writing a batch.
* Before writing the batch (but after doing availability check against the FD for the row replicas):
* write the entire batch to a batchlog elsewhere in the cluster.
* After: remove the batchlog entry (after writing hints for the batch rows, if necessary).
*
* @param mutations the Mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
static future<> mutate_atomically(std::vector<api::mutation> mutations, db::consistency_level cl);
};
}

View File

@@ -5,9 +5,18 @@
#pragma once
#include <iostream>
#include "core/print.hh"
namespace unimplemented {
static inline
void fail(sstring what) __attribute__((noreturn));
static inline
void fail(sstring what) {
throw std::runtime_error(sprint("not implemented: %s", what));
}
static inline
void warn(sstring what) {
std::cerr << "WARNING: Not implemented: " << what << std::endl;
@@ -18,9 +27,22 @@ void indexes() {
warn("indexes");
}
static inline
void lwt() __attribute__((noreturn));
static inline
void lwt() {
fail("light-weight transactions");
}
static inline
void auth() {
warn("auth");
}
static inline
void triggers() {
warn("triggers");
}
}