MigrationCoordinator.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.schema;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.FutureTask;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Simulate;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import static org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS;
import static org.apache.cassandra.config.CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS;
import static org.apache.cassandra.config.CassandraRelevantProperties.SCHEMA_PULL_INTERVAL_MS;
import static org.apache.cassandra.net.Verb.SCHEMA_PUSH_REQ;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.Simulate.With.MONITORS;
import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
/**
 * Migration coordinator is responsible for tracking schema versions on various nodes and, if needed, synchronize the
 * schema. It performs periodic checks and if there is a schema version mismatch between the current node and the other
 * node, it pulls the schema and applies the changes locally through the callback.
 *
 * In particular the Migration Coordinator keeps track of all schema versions reported from each node in the cluster.
 * As long as a certain version is advertised by some node, it is being tracked. As long as a version is tracked,
 * the migration coordinator tries to fetch it by its periodic job.
 *
 * It works in close cooperation with {@link DefaultSchemaUpdateHandler} which is responsible for maintaining local
 * schema metadata stored in {@link SchemaKeyspace}.
 */
@Simulate(with = MONITORS)
public class MigrationCoordinator
{
    private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class);
    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(MigrationCoordinator.logger, 1, TimeUnit.MINUTES);
    private static final Future<Void> FINISHED_FUTURE = ImmediateFuture.success(null);
    private static LongSupplier getUptimeFn = () -> ManagementFactory.getRuntimeMXBean().getUptime();
    @VisibleForTesting
    public static void setUptimeFn(LongSupplier supplier)
    {
        getUptimeFn = supplier;
    }
    private static final int MIGRATION_DELAY_IN_MS = CassandraRelevantProperties.MIGRATION_DELAY.getInt();
    public static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
    private static ImmutableSet<UUID> getIgnoredVersions()
    {
        String s = IGNORED_SCHEMA_CHECK_VERSIONS.getString();
        if (s == null || s.isEmpty())
            return ImmutableSet.of();
        ImmutableSet.Builder<UUID> versions = ImmutableSet.builder();
        for (String version : s.split(","))
        {
            versions.add(UUID.fromString(version));
        }
        return versions.build();
    }
    private static final Set<UUID> IGNORED_VERSIONS = getIgnoredVersions();
    private static Set<InetAddressAndPort> getIgnoredEndpoints()
    {
        Set<InetAddressAndPort> endpoints = new HashSet<>();
        String s = IGNORED_SCHEMA_CHECK_ENDPOINTS.getString();
        if (s == null || s.isEmpty())
            return endpoints;
        for (String endpoint : s.split(","))
        {
            try
            {
                endpoints.add(InetAddressAndPort.getByName(endpoint));
            }
            catch (UnknownHostException e)
            {
                throw new RuntimeException(e);
            }
        }
        return endpoints;
    }
    static class VersionInfo
    {
        final UUID version;
        /**
         * The set of endpoints containing this schema version
         */
        final Set<InetAddressAndPort> endpoints           = Sets.newConcurrentHashSet();
        /**
         * The set of endpoints from which we are already fetching the schema
         */
        final Set<InetAddressAndPort> outstandingRequests = Sets.newConcurrentHashSet();
        /**
         * The queue of endpoints from which we are going to fetch the schema
         */
        final Deque<InetAddressAndPort> requestQueue      = new ArrayDeque<>();
        /**
         * Threads waiting for schema synchronization are waiting until this object is signalled
         */
        private final WaitQueue waitQueue = newWaitQueue();
        /**
         * Whether this schema version have been received
         */
        volatile boolean receivedSchema;
        VersionInfo(UUID version)
        {
            this.version = version;
        }
        WaitQueue.Signal register()
        {
            return waitQueue.register();
        }
        void markReceived()
        {
            if (receivedSchema)
                return;
            receivedSchema = true;
            waitQueue.signalAll();
        }
        boolean wasReceived()
        {
            return receivedSchema;
        }
        @Override
        public String toString()
        {
            return "VersionInfo{" +
                   "version=" + version +
                   ", outstandingRequests=" + outstandingRequests +
                   ", requestQueue=" + requestQueue +
                   ", waitQueue.waiting=" + waitQueue.getWaiting() +
                   ", receivedSchema=" + receivedSchema +
                   '}';
        }
    }
    private final Map<UUID, VersionInfo> versionInfo = new HashMap<>();
    private final Map<InetAddressAndPort, UUID> endpointVersions = new HashMap<>();
    private final Set<InetAddressAndPort> ignoredEndpoints = getIgnoredEndpoints();
    private final ScheduledExecutorService periodicCheckExecutor;
    private final MessagingService messagingService;
    private final AtomicReference<ScheduledFuture<?>> periodicPullTask = new AtomicReference<>();
    private final int maxOutstandingVersionRequests;
    private final Gossiper gossiper;
    private final Supplier<UUID> schemaVersion;
    private final BiConsumer<InetAddressAndPort, Collection<Mutation>> schemaUpdateCallback;
    final ExecutorPlus executor;
    /**
     * Creates but does not start migration coordinator instance.
     * @param messagingService      messaging service instance used to communicate with other nodes for pulling schema
     *                              and pushing changes
     * @param periodicCheckExecutor executor on which the periodic checks are scheduled
     */
    MigrationCoordinator(MessagingService messagingService,
                         ExecutorPlus executor,
                         ScheduledExecutorService periodicCheckExecutor,
                         int maxOutstandingVersionRequests,
                         Gossiper gossiper,
                         Supplier<UUID> schemaVersionSupplier,
                         BiConsumer<InetAddressAndPort, Collection<Mutation>> schemaUpdateCallback)
    {
        this.messagingService = messagingService;
        this.executor = executor;
        this.periodicCheckExecutor = periodicCheckExecutor;
        this.maxOutstandingVersionRequests = maxOutstandingVersionRequests;
        this.gossiper = gossiper;
        this.schemaVersion = schemaVersionSupplier;
        this.schemaUpdateCallback = schemaUpdateCallback;
    }
    void start()
    {
        long interval = SCHEMA_PULL_INTERVAL_MS.getLong();
        logger.info("Starting migration coordinator and scheduling pulling schema versions every {}", Duration.ofMillis(interval));
        announce(schemaVersion.get());
        periodicPullTask.updateAndGet(curTask -> curTask == null
                                                 ? periodicCheckExecutor.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions, interval, interval, TimeUnit.MILLISECONDS)
                                                 : curTask);
    }
    private synchronized void pullUnreceivedSchemaVersions()
    {
        logger.debug("Pulling unreceived schema versions...");
        for (VersionInfo info : versionInfo.values())
        {
            if (info.wasReceived() || info.outstandingRequests.size() > 0)
            {
                logger.trace("Skipping pull of schema {} because it has been already recevied, or it is being received ({})", info.version, info);
                continue;
            }
            maybePullSchema(info);
        }
    }
    private synchronized Future<Void> maybePullSchema(VersionInfo info)
    {
        if (info.endpoints.isEmpty() || info.wasReceived() || !shouldPullSchema(info.version))
        {
            logger.trace("Not pulling schema {} because it was received, there is no endpoint to provide it, or we should not pull it ({})", info.version, info);
            return FINISHED_FUTURE;
        }
        if (info.outstandingRequests.size() >= maxOutstandingVersionRequests)
        {
            logger.trace("Not pulling schema {} because the number of outstanding requests has been exceeded ({} >= {})", info.version, info.outstandingRequests.size(), maxOutstandingVersionRequests);
            return FINISHED_FUTURE;
        }
        for (int i = 0, isize = info.requestQueue.size(); i < isize; i++)
        {
            InetAddressAndPort endpoint = info.requestQueue.remove();
            if (!info.endpoints.contains(endpoint))
            {
                logger.trace("Skipping request of schema {} from {} because the endpoint does not have that schema any longer", info.version, endpoint);
                continue;
            }
            if (shouldPullFromEndpoint(endpoint) && info.outstandingRequests.add(endpoint))
            {
                return scheduleSchemaPull(endpoint, info);
            }
            else
            {
                // return to queue
                logger.trace("Could not pull schema {} from {} - the request will be added back to the queue", info.version, endpoint);
                info.requestQueue.offer(endpoint);
            }
        }
        // no suitable endpoints were found, check again in a minute, the periodic task will pick it up
        return FINISHED_FUTURE;
    }
    synchronized Map<UUID, Set<InetAddressAndPort>> outstandingVersions()
    {
        HashMap<UUID, Set<InetAddressAndPort>> map = new HashMap<>();
        for (VersionInfo info : versionInfo.values())
            if (!info.wasReceived())
                map.put(info.version, ImmutableSet.copyOf(info.endpoints));
        return map;
    }
    @VisibleForTesting
    VersionInfo getVersionInfoUnsafe(UUID version)
    {
        return versionInfo.get(version);
    }
    private boolean shouldPullSchema(UUID version)
    {
        UUID localSchemaVersion = schemaVersion.get();
        if (localSchemaVersion == null)
        {
            logger.debug("Not pulling schema {} because the local schama version is not known yet", version);
            return false;
        }
        if (localSchemaVersion.equals(version))
        {
            logger.debug("Not pulling schema {} because it is the same as the local schema", version);
            return false;
        }
        return true;
    }
    private boolean shouldPullFromEndpoint(InetAddressAndPort endpoint)
    {
        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
        {
            logger.trace("Not pulling schema from local endpoint");
            return false;
        }
        EndpointState state = gossiper.getEndpointStateForEndpoint(endpoint);
        if (state == null)
        {
            logger.trace("Not pulling schema from endpoint {} because its state is unknown", endpoint);
            return false;
        }
        VersionedValue releaseVersionValue = state.getApplicationState(ApplicationState.RELEASE_VERSION);
        if (releaseVersionValue == null)
            return false;
        final String releaseVersion = releaseVersionValue.value;
        final String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
        if (!releaseVersion.startsWith(ourMajorVersion))
        {
            logger.debug("Not pulling schema from {} because release version in Gossip is not major version {}, it is {}",
                         endpoint, ourMajorVersion, releaseVersion);
            return false;
        }
        if (!messagingService.versions.knows(endpoint))
        {
            logger.debug("Not pulling schema from {} because their messaging version is unknown", endpoint);
            return false;
        }
        if (messagingService.versions.getRaw(endpoint) != MessagingService.current_version)
        {
            logger.debug("Not pulling schema from {} because their schema format is incompatible", endpoint);
            return false;
        }
        if (gossiper.isGossipOnlyMember(endpoint))
        {
            logger.debug("Not pulling schema from {} because it's a gossip only member", endpoint);
            return false;
        }
        return true;
    }
    private boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version)
    {
        UUID localSchemaVersion = schemaVersion.get();
        if (SchemaConstants.emptyVersion.equals(localSchemaVersion) || getUptimeFn.getAsLong() < MIGRATION_DELAY_IN_MS)
        {
            // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
            logger.debug("Immediately submitting migration task for {}, " +
                         "schema versions: local={}, remote={}",
                         endpoint,
                         DistributedSchema.schemaVersionToString(localSchemaVersion),
                         DistributedSchema.schemaVersionToString(version));
            return true;
        }
        return false;
    }
    /**
     * If a previous schema update brought our version the same as the incoming schema, don't apply it
     */
    private synchronized boolean shouldApplySchemaFor(VersionInfo info)
    {
        if (info.wasReceived())
            return false;
        return !Objects.equals(schemaVersion.get(), info.version);
    }
    synchronized Future<Void> reportEndpointVersion(InetAddressAndPort endpoint, UUID version)
    {
        logger.debug("Reported schema {} at endpoint {}", version, endpoint);
        if (ignoredEndpoints.contains(endpoint) || IGNORED_VERSIONS.contains(version))
        {
            endpointVersions.remove(endpoint);
            removeEndpointFromVersion(endpoint, null);
            logger.debug("Discarding endpoint {} or schema {} because either endpoint or schema version were marked as ignored", endpoint, version);
            return FINISHED_FUTURE;
        }
        UUID current = endpointVersions.put(endpoint, version);
        if (current != null && current.equals(version))
        {
            logger.trace("Skipping report of schema {} from {} because we already know that", version, endpoint);
            return FINISHED_FUTURE;
        }
        VersionInfo info = versionInfo.computeIfAbsent(version, VersionInfo::new);
        if (Objects.equals(schemaVersion.get(), version))
        {
            info.markReceived();
            logger.trace("Schema {} from {} has been marked as recevied because it is equal the local schema", version, endpoint);
        }
        else
        {
            info.requestQueue.addFirst(endpoint);
        }
        info.endpoints.add(endpoint);
        logger.trace("Added endpoint {} to schema {}: {}", endpoint, info.version, info);
        // disassociate this endpoint from its (now) previous schema version
        removeEndpointFromVersion(endpoint, current);
        return maybePullSchema(info);
    }
    private synchronized void removeEndpointFromVersion(InetAddressAndPort endpoint, UUID version)
    {
        if (version == null)
            return;
        VersionInfo info = versionInfo.get(version);
        if (info == null)
            return;
        info.endpoints.remove(endpoint);
        logger.trace("Removed endpoint {} from schema {}: {}", endpoint, version, info);
        if (info.endpoints.isEmpty())
        {
            info.waitQueue.signalAll();
            versionInfo.remove(version);
            logger.trace("Removed schema info: {}", info);
        }
    }
    private void clearVersionsInfo()
    {
        Iterator<Map.Entry<UUID, VersionInfo>> it = versionInfo.entrySet().iterator();
        while (it.hasNext())
        {
            Map.Entry<UUID, VersionInfo> entry = it.next();
            it.remove();
            entry.getValue().waitQueue.signal();
        }
    }
    private void reportCurrentSchemaVersionOnEndpoint(InetAddressAndPort endpoint)
    {
        if (FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
        {
            reportEndpointVersion(endpoint, schemaVersion.get());
        }
        else
        {
            EndpointState state = gossiper.getEndpointStateForEndpoint(endpoint);
            if (state != null)
            {
                UUID v = state.getSchemaVersion();
                if (v != null)
                {
                    reportEndpointVersion(endpoint, v);
                }
            }
        }
    }
    /**
     * Resets the migration coordinator by notifying all waiting threads and removing all the existing version info.
     * Then, it is populated with the information about schema versions on different endpoints provided by Gossiper.
     * Each version is marked as unreceived so the migration coordinator will start pulling schemas from other nodes.
     */
    synchronized void reset()
    {
        logger.info("Resetting migration coordinator...");
        // clear all the managed information
        this.endpointVersions.clear();
        clearVersionsInfo();
        // now report again the versions we are aware of
        gossiper.getLiveMembers().forEach(this::reportCurrentSchemaVersionOnEndpoint);
    }
    synchronized void removeAndIgnoreEndpoint(InetAddressAndPort endpoint)
    {
        logger.debug("Removing and ignoring endpoint {}", endpoint);
        Preconditions.checkArgument(endpoint != null);
        // TODO The endpoint address is now ignored but when a node with the same address is added again later,
        //  there will be no way to include it in schema synchronization other than restarting each other node
        //  see https://issues.apache.org/jira/browse/CASSANDRA-17883 for details
        ignoredEndpoints.add(endpoint);
        Set<UUID> versions = ImmutableSet.copyOf(versionInfo.keySet());
        for (UUID version : versions)
        {
            removeEndpointFromVersion(endpoint, version);
        }
    }
    private Future<Void> scheduleSchemaPull(InetAddressAndPort endpoint, VersionInfo info)
    {
        FutureTask<Void> task = new FutureTask<>(() -> pullSchema(endpoint, new Callback(endpoint, info)));
        if (shouldPullImmediately(endpoint, info.version))
        {
            logger.debug("Pulling {} immediately from {}", info, endpoint);
            submitToMigrationIfNotShutdown(task);
        }
        else
        {
            logger.debug("Postponing pull of {} from {} for {}ms", info, endpoint, MIGRATION_DELAY_IN_MS);
            ScheduledExecutors.nonPeriodicTasks.schedule(() -> submitToMigrationIfNotShutdown(task), MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
        }
        return task;
    }
    void announce(UUID schemaVersion)
    {
        if (gossiper.isEnabled())
            gossiper.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(schemaVersion));
        SchemaDiagnostics.versionAnnounced(Schema.instance);
    }
    private Future<?> submitToMigrationIfNotShutdown(Runnable task)
    {
        boolean skipped = false;
        try
        {
            if (executor.isShutdown() || executor.isTerminated())
            {
                skipped = true;
                return ImmediateFuture.success(null);
            }
            return executor.submit(task);
        }
        catch (RejectedExecutionException ex)
        {
            skipped = true;
            return ImmediateFuture.success(null);
        }
        finally
        {
            if (skipped)
            {
                logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown.");
            }
        }
    }
    private class Callback implements RequestCallback<Collection<Mutation>>
    {
        final InetAddressAndPort endpoint;
        final VersionInfo info;
        public Callback(InetAddressAndPort endpoint, VersionInfo info)
        {
            this.endpoint = endpoint;
            this.info = info;
        }
        public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
        {
            fail();
        }
        Future<Void> fail()
        {
            return pullComplete(endpoint, info, false);
        }
        public void onResponse(Message<Collection<Mutation>> message)
        {
            response(message.payload);
        }
        Future<Void> response(Collection<Mutation> mutations)
        {
            synchronized (info)
            {
                if (shouldApplySchemaFor(info))
                {
                    try
                    {
                        schemaUpdateCallback.accept(endpoint, mutations);
                    }
                    catch (Exception e)
                    {
                        logger.error(String.format("Unable to merge schema from %s", endpoint), e);
                        return fail();
                    }
                }
                return pullComplete(endpoint, info, true);
            }
        }
        public boolean isLatencyForSnitch()
        {
            return false;
        }
    }
    private void pullSchema(InetAddressAndPort endpoint, RequestCallback<Collection<Mutation>> callback)
    {
        if (!gossiper.isAlive(endpoint))
        {
            noSpamLogger.warn("Can't send schema pull request: node {} is down.", endpoint);
            callback.onFailure(endpoint, RequestFailureReason.UNKNOWN);
            return;
        }
        // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
        // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
        // a higher major.
        if (!shouldPullFromEndpoint(endpoint))
        {
            logger.info("Skipped sending a migration request: node {} has a higher major version now.", endpoint);
            callback.onFailure(endpoint, RequestFailureReason.UNKNOWN);
            return;
        }
        logger.debug("Requesting schema from {}", endpoint);
        sendMigrationMessage(endpoint, callback);
    }
    private void sendMigrationMessage(InetAddressAndPort endpoint, RequestCallback<Collection<Mutation>> callback)
    {
        Message<NoPayload> message = Message.out(Verb.SCHEMA_PULL_REQ, NoPayload.noPayload);
        logger.info("Sending schema pull request to {}", endpoint);
        messagingService.sendWithCallback(message, endpoint, callback);
    }
    private synchronized Future<Void> pullComplete(InetAddressAndPort endpoint, VersionInfo info, boolean wasSuccessful)
    {
        if (wasSuccessful)
            info.markReceived();
        info.outstandingRequests.remove(endpoint);
        info.requestQueue.add(endpoint);
        return maybePullSchema(info);
    }
    /**
     * Wait until we've received schema responses for all versions we're aware of
     * @param waitMillis
     * @return true if response for all schemas were received, false if we timed out waiting
     */
    boolean awaitSchemaRequests(long waitMillis)
    {
        if (!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
            Gossiper.waitToSettle();
        if (versionInfo.isEmpty())
            logger.debug("Nothing in versionInfo - so no schemas to wait for");
        List<WaitQueue.Signal> signalList = null;
        try
        {
            synchronized (this)
            {
                signalList = new ArrayList<>(versionInfo.size());
                for (VersionInfo version : versionInfo.values())
                {
                    if (version.wasReceived())
                        continue;
                    signalList.add(version.register());
                }
                if (signalList.isEmpty())
                    return true;
            }
            long deadline = nanoTime() + TimeUnit.MILLISECONDS.toNanos(waitMillis);
            return signalList.stream().allMatch(signal -> signal.awaitUntilUninterruptibly(deadline));
        }
        finally
        {
            if (signalList != null)
                signalList.forEach(WaitQueue.Signal::cancel);
        }
    }
    Pair<Set<InetAddressAndPort>, Set<InetAddressAndPort>> pushSchemaMutations(Collection<Mutation> schemaMutations)
    {
        logger.debug("Pushing schema mutations: {}", schemaMutations);
        Set<InetAddressAndPort> schemaDestinationEndpoints = new HashSet<>();
        Set<InetAddressAndPort> schemaEndpointsIgnored = new HashSet<>();
        Message<Collection<Mutation>> message = Message.out(SCHEMA_PUSH_REQ, schemaMutations);
        for (InetAddressAndPort endpoint : gossiper.getLiveMembers())
        {
            if (shouldPushSchemaTo(endpoint))
            {
                logger.debug("Pushing schema mutations to {}: {}", endpoint, schemaMutations);
                messagingService.send(message, endpoint);
                schemaDestinationEndpoints.add(endpoint);
            }
            else
            {
                schemaEndpointsIgnored.add(endpoint);
            }
        }
        return Pair.create(schemaDestinationEndpoints, schemaEndpointsIgnored);
    }
    private boolean shouldPushSchemaTo(InetAddressAndPort endpoint)
    {
        // only push schema to nodes with known and equal versions
        return !endpoint.equals(FBUtilities.getBroadcastAddressAndPort())
               && messagingService.versions.knows(endpoint)
               && messagingService.versions.getRaw(endpoint) == MessagingService.current_version;
    }
}