RepairSession.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.repair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.repair.consistent.ConsistentSession;
import org.apache.cassandra.repair.consistent.LocalSession;
import org.apache.cassandra.repair.consistent.LocalSessions;
import org.apache.cassandra.repair.messages.SyncResponse;
import org.apache.cassandra.repair.messages.ValidationResponse;
import org.apache.cassandra.repair.state.SessionState;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
/**
* Coordinates the (active) repair of a list of non overlapping token ranges.
*
* A given RepairSession repairs a set of replicas for a given set of ranges on a list
* of column families. For each of the column family to repair, RepairSession
* creates a {@link RepairJob} that handles the repair of that CF.
*
* A given RepairJob has the 3 main phases:
* <ol>
* <li>
* Paxos repair: unfinished paxos operations in the range/keyspace/table are first completed
* </li>
* <li>Validation phase: the job requests merkle trees from each of the replica involves
* ({@link org.apache.cassandra.repair.ValidationTask}) and waits until all trees are received (in
* validationComplete()).
* </li>
* <li>Synchronization phase: once all trees are received, the job compares each tree with all the others. If there is
* difference between 2 trees, the differences between the 2 endpoints will be streamed with a {@link SyncTask}.
* </li>
* </ol>
* The job is done once all its SyncTasks are done (i.e. have either computed no differences
* or the streaming they started is done (syncComplete())).
*
* A given session will execute the first phase (validation phase) of each of it's job
* sequentially. In other words, it will start the first job and only start the next one
* once that first job validation phase is complete. This is done so that the replica only
* create one merkle tree per range at a time, which is our way to ensure that such creation starts
* roughly at the same time on every node (see CASSANDRA-2816). However the synchronization
* phases are allowed to run concurrently (with each other and with validation phases).
*
* A given RepairJob has 2 modes: either sequential or not (RepairParallelism). If sequential,
* it will requests merkle tree creation from each replica in sequence (though in that case
* we still first send a message to each node to flush and snapshot data so each merkle tree
* creation is still done on similar data, even if the actual creation is not
* done simulatneously). If not sequential, all merkle tree are requested in parallel.
* Similarly, if a job is sequential, it will handle one SymmetricSyncTask at a time, but will handle
* all of them in parallel otherwise.
*/
public class RepairSession extends AsyncFuture<RepairSessionResult> implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener,
LocalSessions.Listener
{
private static final Logger logger = LoggerFactory.getLogger(RepairSession.class);
public final SessionState state;
public final RepairParallelism parallelismDegree;
public final boolean pullRepair;
/** Range to repair */
public final boolean isIncremental;
public final PreviewKind previewKind;
public final boolean repairPaxos;
public final boolean paxosOnly;
private final AtomicBoolean isFailed = new AtomicBoolean(false);
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
private final ConcurrentMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask> validating = new ConcurrentHashMap<>();
// Remote syncing jobs wait response in syncingTasks map
private final ConcurrentMap<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
// Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
public final SafeExecutor taskExecutor;
public final boolean optimiseStreams;
public final SharedContext ctx;
private volatile List<RepairJob> jobs = Collections.emptyList();
private volatile boolean terminated = false;
/**
* Create new repair session.
* @param parentRepairSession the parent sessions id
* @param commonRange ranges to repair
* @param keyspace name of keyspace
* @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
* @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption)
* @param repairPaxos true if incomplete paxos operations should be completed as part of repair
* @param paxosOnly true if we should only complete paxos operations, not run a normal repair
* @param cfnames names of columnfamilies
*/
public RepairSession(SharedContext ctx,
TimeUUID parentRepairSession,
CommonRange commonRange,
String keyspace,
RepairParallelism parallelismDegree,
boolean isIncremental,
boolean pullRepair,
PreviewKind previewKind,
boolean optimiseStreams,
boolean repairPaxos,
boolean paxosOnly,
String... cfnames)
{
this.ctx = ctx;
this.repairPaxos = repairPaxos;
this.paxosOnly = paxosOnly;
assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
this.state = new SessionState(ctx.clock(), parentRepairSession, keyspace, cfnames, commonRange);
this.parallelismDegree = parallelismDegree;
this.isIncremental = isIncremental;
this.previewKind = previewKind;
this.pullRepair = pullRepair;
this.optimiseStreams = optimiseStreams;
this.taskExecutor = new SafeExecutor(createExecutor(ctx));
}
@VisibleForTesting
protected ExecutorPlus createExecutor(SharedContext ctx)
{
return ctx.executorFactory().pooled("RepairJobTask", Integer.MAX_VALUE);
}
public TimeUUID getId()
{
return state.id;
}
public Collection<Range<Token>> ranges()
{
return state.commonRange.ranges;
}
public Collection<InetAddressAndPort> endpoints()
{
return state.commonRange.endpoints;
}
public synchronized void trackValidationCompletion(Pair<RepairJobDesc, InetAddressAndPort> key, ValidationTask task)
{
if (terminated)
{
task.abort(new RuntimeException("Session terminated"));
return;
}
validating.put(key, task);
}
public synchronized void trackSyncCompletion(Pair<RepairJobDesc, SyncNodePair> key, CompletableRemoteSyncTask task)
{
if (terminated)
return;
syncingTasks.put(key, task);
}
/**
* Receive merkle tree response or failed response from {@code endpoint} for current repair job.
*
* @param desc repair job description
* @param message containing the merkle trees or an error
*/
public void validationComplete(RepairJobDesc desc, Message<ValidationResponse> message)
{
InetAddressAndPort endpoint = message.from();
MerkleTrees trees = message.payload.trees;
ValidationTask task = validating.remove(Pair.create(desc, endpoint));
// replies without a callback get dropped, so if in mixed mode this should be ignored
ctx.messaging().send(message.emptyResponse(), message.from());
if (task == null)
{
// The trees may be off-heap, and will therefore need to be released.
if (trees != null)
trees.release();
// either the session completed so the validation is no longer needed, or this is a retry; in both cases there is nothing to do
return;
}
String msg = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
logger.info("{} {}", previewKind.logPrefix(getId()), msg);
Tracing.traceRepair(msg);
task.treesReceived(trees);
}
/**
* Notify this session that sync completed/failed with given {@code SyncNodePair}.
*
* @param desc synced repair job
* @param message nodes that completed sync and if they were successful
*/
public void syncComplete(RepairJobDesc desc, Message<SyncResponse> message)
{
SyncNodePair nodes = message.payload.nodes;
CompletableRemoteSyncTask task = syncingTasks.remove(Pair.create(desc, nodes));
// replies without a callback get dropped, so if in mixed mode this should be ignored
ctx.messaging().send(message.emptyResponse(), message.from());
if (task == null)
return;
if (logger.isDebugEnabled())
logger.debug("{} Repair completed between {} and {} on {}", previewKind.logPrefix(getId()), nodes.coordinator, nodes.peer, desc.columnFamily);
task.syncComplete(message.payload.success, message.payload.summaries);
}
@VisibleForTesting
Map<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> getSyncingTasks()
{
return Collections.unmodifiableMap(syncingTasks);
}
private String repairedNodes()
{
StringBuilder sb = new StringBuilder();
sb.append(FBUtilities.getBroadcastAddressAndPort());
for (InetAddressAndPort ep : state.commonRange.endpoints)
sb.append(", ").append(ep);
return sb.toString();
}
/**
* Start RepairJob on given ColumnFamilies.
*
* This first validates if all replica are available, and if they are,
* creates RepairJobs and submit to run on given executor.
*
* @param executor Executor to run validation
*/
public void start(ExecutorPlus executor)
{
state.phase.start();
String message;
if (terminated)
return;
logger.info("{} parentSessionId = {}: new session: will sync {} on range {} for {}.{}",
previewKind.logPrefix(getId()), state.parentRepairSession, repairedNodes(), state.commonRange, state.keyspace, Arrays.toString(state.cfnames));
Tracing.traceRepair("Syncing range {}", state.commonRange);
if (!previewKind.isPreview() && !paxosOnly)
{
SystemDistributedKeyspace.startRepairs(getId(), state.parentRepairSession, state.keyspace, state.cfnames, state.commonRange);
}
if (state.commonRange.endpoints.isEmpty())
{
logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", state.commonRange));
state.phase.skip(message);
Tracing.traceRepair(message);
trySuccess(new RepairSessionResult(state.id, state.keyspace, state.commonRange.ranges, Lists.<RepairResult>newArrayList(), state.commonRange.hasSkippedReplicas));
if (!previewKind.isPreview())
{
SystemDistributedKeyspace.failRepairs(getId(), state.keyspace, state.cfnames, new RuntimeException(message));
}
return;
}
// Checking all nodes are live
for (InetAddressAndPort endpoint : state.commonRange.endpoints)
{
if (!ctx.failureDetector().isAlive(endpoint) && !state.commonRange.hasSkippedReplicas)
{
message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
state.phase.fail(message);
logger.error("{} {}", previewKind.logPrefix(getId()), message);
Exception e = new IOException(message);
tryFailure(e);
if (!previewKind.isPreview())
{
SystemDistributedKeyspace.failRepairs(getId(), state.keyspace, state.cfnames, e);
}
return;
}
}
// Create and submit RepairJob for each ColumnFamily
state.phase.jobsSubmitted();
List<RepairJob> jobs = new ArrayList<>(state.cfnames.length);
for (String cfname : state.cfnames)
{
RepairJob job = new RepairJob(this, cfname);
state.register(job.state);
executor.execute(job);
jobs.add(job);
}
this.jobs = jobs;
// When all RepairJobs are done without error, cleanup and set the final result
FBUtilities.allOf(jobs).addCallback(new FutureCallback<List<RepairResult>>()
{
public void onSuccess(List<RepairResult> results)
{
state.phase.success();
// this repair session is completed
logger.info("{} {}", previewKind.logPrefix(getId()), "Session completed successfully");
Tracing.traceRepair("Completed sync of range {}", state.commonRange);
trySuccess(new RepairSessionResult(state.id, state.keyspace, state.commonRange.ranges, results, state.commonRange.hasSkippedReplicas));
// mark this session as terminated
terminate(null);
taskExecutor.shutdown();
}
public void onFailure(Throwable t)
{
state.phase.fail(t);
String msg = "{} Session completed with the following error";
if (Throwables.anyCauseMatches(t, RepairException::shouldWarn))
logger.warn(msg+ ": {}", previewKind.logPrefix(getId()), t.getMessage());
else
logger.error(msg, previewKind.logPrefix(getId()), t);
Tracing.traceRepair("Session completed with the following error: {}", t);
forceShutdown(t);
}
}, taskExecutor);
}
public synchronized void terminate(@Nullable Throwable reason)
{
terminated = true;
List<RepairJob> jobs = this.jobs;
if (jobs != null)
{
for (RepairJob job : jobs)
job.abort(reason);
}
this.jobs = null;
validating.clear();
syncingTasks.clear();
}
/**
* clear all RepairJobs and terminate this session.
*
* @param reason Cause of error for shutdown
*/
public void forceShutdown(Throwable reason)
{
tryFailure(reason);
terminate(reason);
taskExecutor.shutdown();
}
public void onRemove(InetAddressAndPort endpoint)
{
convict(endpoint, Double.MAX_VALUE);
}
public void onRestart(InetAddressAndPort endpoint, EndpointState epState)
{
convict(endpoint, Double.MAX_VALUE);
}
public void convict(InetAddressAndPort endpoint, double phi)
{
if (!state.commonRange.endpoints.contains(endpoint))
return;
// We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
return;
// Though unlikely, it is possible to arrive here multiple time and we
// want to avoid print an error message twice
if (!isFailed.compareAndSet(false, true))
return;
Exception exception = new IOException(String.format("Endpoint %s died", endpoint));
logger.error("{} session completed with the following error", previewKind.logPrefix(getId()), exception);
// If a node failed, we stop everything (though there could still be some activity in the background)
forceShutdown(exception);
}
public void onIRStateChange(LocalSession session)
{
// we should only be registered as listeners for PreviewKind.REPAIRED, but double check here
if (previewKind == PreviewKind.REPAIRED &&
session.getState() == ConsistentSession.State.FINALIZED &&
includesTables(session.tableIds))
{
for (Range<Token> range : session.ranges)
{
if (range.intersects(ranges()))
{
logger.warn("{} An intersecting incremental repair with session id = {} finished, preview repair might not be accurate", previewKind.logPrefix(getId()), session.sessionID);
forceShutdown(RepairException.warn("An incremental repair with session id "+session.sessionID+" finished during this preview repair runtime"));
return;
}
}
}
}
private boolean includesTables(Set<TableId> tableIds)
{
Keyspace ks = Keyspace.open(state.keyspace);
if (ks != null)
{
for (String table : state.cfnames)
{
ColumnFamilyStore cfs = ks.getColumnFamilyStore(table);
if (tableIds.contains(cfs.metadata.id))
return true;
}
}
return false;
}
private static class SafeExecutor implements Executor
{
private final ExecutorPlus delegate;
private SafeExecutor(ExecutorPlus delegate)
{
this.delegate = delegate;
}
@Override
public void execute(Runnable command)
{
try
{
delegate.execute(command);
}
catch (RejectedExecutionException e)
{
// task executor was shutdown, so fall back to a known good executor to finish callbacks
Stage.INTERNAL_RESPONSE.execute(command);
}
}
public void shutdown()
{
delegate.shutdown();
}
}
}