RepairMessage.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.repair.messages;

import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.RepairRetrySpec;
import org.apache.cassandra.config.RetrySpec;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.Backoff;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.Future;

import static org.apache.cassandra.net.MessageFlag.CALL_BACK_ON_FAILURE;

/**
 * Base class of all repair related request/response messages.
 *
 * @since 2.0
 */
public abstract class RepairMessage
{
    private enum ErrorHandling { NONE, TIMEOUT, RETRY }
    private static final CassandraVersion SUPPORTS_RETRY = new CassandraVersion("5.0.0-alpha2.SNAPSHOT");
    private static final Map<Verb, CassandraVersion> VERB_TIMEOUT_VERSIONS;

    static
    {
        CassandraVersion timeoutVersion = new CassandraVersion("4.0.7-SNAPSHOT");
        EnumMap<Verb, CassandraVersion> map = new EnumMap<>(Verb.class);
        map.put(Verb.VALIDATION_REQ, timeoutVersion);
        map.put(Verb.SYNC_REQ, timeoutVersion);
        map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY);
        map.put(Verb.SYNC_RSP, SUPPORTS_RETRY);
        VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map);
    }
    private static final Set<Verb> SUPPORTS_RETRY_WITHOUT_VERSION_CHECK = Collections.unmodifiableSet(EnumSet.of(Verb.CLEANUP_MSG));

    private static final Logger logger = LoggerFactory.getLogger(RepairMessage.class);
    @Nullable
    public final RepairJobDesc desc;

    protected RepairMessage(@Nullable RepairJobDesc desc)
    {
        this.desc = desc;
    }

    public TimeUUID parentRepairSession()
    {
        return desc.parentSessionId;
    }

    public interface RepairFailureCallback
    {
        void onFailure(Exception e);
    }

    private static Backoff backoff(SharedContext ctx, Verb verb)
    {
        RepairRetrySpec retrySpec = DatabaseDescriptor.getRepairRetrySpec();
        RetrySpec spec = verb == Verb.VALIDATION_RSP ? retrySpec.getMerkleTreeResponseSpec() : retrySpec;
        if (!spec.isEnabled())
            return Backoff.None.INSTANCE;
        return new Backoff.ExponentialBackoff(spec.maxAttempts.value, spec.baseSleepTime.toMilliseconds(), spec.maxSleepTime.toMilliseconds(), ctx.random().get()::nextDouble);
    }

    public static Supplier<Boolean> notDone(Future<?> f)
    {
        return () -> !f.isDone();
    }

    private static Supplier<Boolean> always()
    {
        return () -> true;
    }

    public static <T> void sendMessageWithRetries(SharedContext ctx, Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, InetAddressAndPort endpoint, RequestCallback<T> finalCallback)
    {
        sendMessageWithRetries(ctx, backoff(ctx, verb), allowRetry, request, verb, endpoint, finalCallback, 0);
    }

    public static <T> void sendMessageWithRetries(SharedContext ctx, RepairMessage request, Verb verb, InetAddressAndPort endpoint, RequestCallback<T> finalCallback)
    {
        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, verb, endpoint, finalCallback, 0);
    }

    public static void sendMessageWithRetries(SharedContext ctx, RepairMessage request, Verb verb, InetAddressAndPort endpoint)
    {
        sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request, verb, endpoint, new RequestCallback<>()
        {
            @Override
            public void onResponse(Message<Object> msg)
            {
            }

            @Override
            public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
            {
            }
        }, 0);
    }

    private static <T> void sendMessageWithRetries(SharedContext ctx, Backoff backoff, Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt)
    {
        RequestCallback<T> callback = new RequestCallback<>()
        {
            @Override
            public void onResponse(Message<T> msg)
            {
                finalCallback.onResponse(msg);
            }

            @Override
            public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
            {
                ErrorHandling allowed = errorHandlingSupported(ctx, endpoint, verb, request.parentRepairSession());
                switch (allowed)
                {
                    case NONE:
                        logger.error("[#{}] {} failed on {}: {}", request.parentRepairSession(), verb, from, failureReason);
                        return;
                    case TIMEOUT:
                        finalCallback.onFailure(from, failureReason);
                        return;
                    case RETRY:
                        int maxAttempts = backoff.maxAttempts();
                        if (failureReason == RequestFailureReason.TIMEOUT && attempt < maxAttempts && allowRetry.get())
                        {
                            ctx.optionalTasks().schedule(() -> sendMessageWithRetries(ctx, backoff, allowRetry, request, verb, endpoint, finalCallback, attempt + 1),
                                                         backoff.computeWaitTime(attempt), backoff.unit());
                            return;
                        }
                        finalCallback.onFailure(from, failureReason);
                        return;
                    default:
                        throw new AssertionError("Unknown error handler: " + allowed);
                }
            }

            @Override
            public boolean invokeOnFailure()
            {
                return true;
            }
        };
        ctx.messaging().sendWithCallback(Message.outWithFlag(verb, request, CALL_BACK_ON_FAILURE),
                                         endpoint,
                                         callback);
    }

    public static void sendMessageWithFailureCB(SharedContext ctx, Supplier<Boolean> allowRetry, RepairMessage request, Verb verb, InetAddressAndPort endpoint, RepairFailureCallback failureCallback)
    {
        RequestCallback<?> callback = new RequestCallback<>()
        {
            @Override
            public void onResponse(Message<Object> msg)
            {
                logger.info("[#{}] {} received by {}", request.parentRepairSession(), verb, endpoint);
                // todo: at some point we should make repair messages follow the normal path, actually using this
            }

            @Override
            public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
            {
                failureCallback.onFailure(RepairException.error(request.desc, PreviewKind.NONE, String.format("Got %s failure from %s: %s", verb, from, failureReason)));
            }

            @Override
            public boolean invokeOnFailure()
            {
                return true;
            }
        };
        sendMessageWithRetries(ctx, allowRetry, request, verb, endpoint, callback);
    }

    private static ErrorHandling errorHandlingSupported(SharedContext ctx, InetAddressAndPort from, Verb verb, TimeUUID parentSessionId)
    {
        if (SUPPORTS_RETRY_WITHOUT_VERSION_CHECK.contains(verb))
            return ErrorHandling.RETRY;
        // Repair in mixed mode isn't fully supported, but also not activally blocked... so in the common case all participants
        // will be on the same version as this instance, so can avoid the lookup from gossip
        CassandraVersion remoteVersion = ctx.gossiper().getReleaseVersion(from);
        if (remoteVersion == null)
        {
            if (VERB_TIMEOUT_VERSIONS.containsKey(verb))
            {
                logger.warn("[#{}] Not failing repair due to remote host {} not supporting repair message timeouts (version is unknown)", parentSessionId, from);
                return ErrorHandling.NONE;
            }
            return ErrorHandling.TIMEOUT;
        }
        if (remoteVersion.compareTo(SUPPORTS_RETRY) >= 0)
            return ErrorHandling.RETRY;
        CassandraVersion timeoutVersion = VERB_TIMEOUT_VERSIONS.get(verb);
        if (timeoutVersion == null || remoteVersion.compareTo(timeoutVersion) >= 0)
            return ErrorHandling.TIMEOUT;
        return ErrorHandling.NONE;
    }
}