ViewBuilder.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.db.view;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;

import static java.util.stream.Collectors.toList;

/**
 * Builds a materialized view for the local token ranges.
 * <p>
 * The build is split in at least {@link #NUM_TASKS} {@link ViewBuilderTask tasks}, suitable of being parallelized by
 * the {@link CompactionManager} which will execute them.
 */
class ViewBuilder
{
    private static final Logger logger = LoggerFactory.getLogger(ViewBuilder.class);

    private static final int NUM_TASKS = Runtime.getRuntime().availableProcessors() * 4;

    private final ColumnFamilyStore baseCfs;
    private final View view;
    private final String ksName;
    private final UUID localHostId = SystemKeyspace.getOrInitializeLocalHostId();
    private final Set<Range<Token>> builtRanges = Sets.newConcurrentHashSet();
    private final Map<Range<Token>, Pair<Token, Long>> pendingRanges = Maps.newConcurrentMap();
    private final Set<ViewBuilderTask> tasks = Sets.newConcurrentHashSet();
    private volatile long keysBuilt = 0;
    private volatile boolean isStopped = false;
    private volatile Future<?> future = ImmediateFuture.success(null);

    ViewBuilder(ColumnFamilyStore baseCfs, View view)
    {
        this.baseCfs = baseCfs;
        this.view = view;
        ksName = baseCfs.metadata.keyspace;
    }

    public void start()
    {
        if (SystemKeyspace.isViewBuilt(ksName, view.name))
        {
            logger.debug("View already marked built for {}.{}", ksName, view.name);
            if (!SystemKeyspace.isViewStatusReplicated(ksName, view.name))
                updateDistributed();
        }
        else
        {
            SystemDistributedKeyspace.startViewBuild(ksName, view.name, localHostId);

            logger.debug("Starting build of view({}.{}). Flushing base table {}.{}",
                         ksName, view.name, ksName, baseCfs.name);
            baseCfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.VIEW_BUILD_STARTED);

            loadStatusAndBuild();
        }
    }

    private void loadStatusAndBuild()
    {
        loadStatus();
        build();
    }

    private void loadStatus()
    {
        builtRanges.clear();
        pendingRanges.clear();
        SystemKeyspace.getViewBuildStatus(ksName, view.name)
                      .forEach((range, pair) ->
                               {
                                   Token lastToken = pair.left;
                                   if (lastToken != null && lastToken.equals(range.right))
                                   {
                                       builtRanges.add(range);
                                       keysBuilt += pair.right;
                                   }
                                   else
                                   {
                                       pendingRanges.put(range, pair);
                                   }
                               });
    }

    private synchronized void build()
    {
        if (isStopped)
        {
            logger.debug("Stopped build for view({}.{}) after covering {} keys", ksName, view.name, keysBuilt);
            return;
        }

        // Get the local ranges for which the view hasn't already been built nor it's building
        RangesAtEndpoint replicatedRanges = StorageService.instance.getLocalReplicas(ksName);
        Replicas.temporaryAssertFull(replicatedRanges);
        Set<Range<Token>> newRanges = replicatedRanges.ranges()
                                                      .stream()
                                                      .map(r -> r.subtractAll(builtRanges))
                                                      .flatMap(Set::stream)
                                                      .map(r -> r.subtractAll(pendingRanges.keySet()))
                                                      .flatMap(Set::stream)
                                                      .collect(Collectors.toSet());
        // If there are no new nor pending ranges we should finish the build
        if (newRanges.isEmpty() && pendingRanges.isEmpty())
        {
            finish();
            return;
        }

        // Split the new local ranges and add them to the pending set
        DatabaseDescriptor.getPartitioner()
                          .splitter()
                          .map(s -> s.split(newRanges, NUM_TASKS))
                          .orElse(newRanges)
                          .forEach(r -> pendingRanges.put(r, Pair.<Token, Long>create(null, 0L)));

        // Submit a new view build task for each building range.
        // We keep record of all the submitted tasks to be able of stopping them.
        List<Future<Long>> futures = pendingRanges.entrySet()
                                                  .stream()
                                                  .map(e -> new ViewBuilderTask(baseCfs,
                                                                                view,
                                                                                e.getKey(),
                                                                                e.getValue().left,
                                                                                e.getValue().right))
                                                  .peek(tasks::add)
                                                  .map(CompactionManager.instance::submitViewBuilder)
                                                  .collect(toList());

        // Add a callback to process any eventual new local range and mark the view as built, doing a delayed retry if
        // the tasks don't succeed
        Future<List<Long>> future = FutureCombiner.allOf(futures);
        future.addCallback(new FutureCallback<List<Long>>()
        {
            public void onSuccess(List<Long> result)
            {
                keysBuilt += result.stream().mapToLong(x -> x).sum();
                builtRanges.addAll(pendingRanges.keySet());
                pendingRanges.clear();
                build();
            }

            public void onFailure(Throwable t)
            {
                if (t instanceof CompactionInterruptedException)
                {
                    internalStop(true);
                    keysBuilt = tasks.stream().mapToLong(ViewBuilderTask::keysBuilt).sum();
                    logger.info("Interrupted build for view({}.{}) after covering {} keys", ksName, view.name, keysBuilt);
                }
                else
                {
                    ScheduledExecutors.nonPeriodicTasks.schedule(() -> loadStatusAndBuild(), 5, TimeUnit.MINUTES);
                    logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", t);
                }
            }
        });
        this.future = future;
    }

    private void finish()
    {
        logger.debug("Marking view({}.{}) as built after covering {} keys ", ksName, view.name, keysBuilt);
        SystemKeyspace.finishViewBuildStatus(ksName, view.name);
        updateDistributed();
    }

    private void updateDistributed()
    {
        try
        {
            SystemDistributedKeyspace.successfulViewBuild(ksName, view.name, localHostId);
            SystemKeyspace.setViewBuiltReplicated(ksName, view.name);
        }
        catch (Exception e)
        {
            ScheduledExecutors.nonPeriodicTasks.schedule(this::updateDistributed, 5, TimeUnit.MINUTES);
            logger.warn("Failed to update the distributed status of view, sleeping 5 minutes before retrying", e);
        }
    }

    /**
     * Stops the view building.
     */
    void stop()
    {
        boolean wasStopped;
        synchronized (this)
        {
            wasStopped = isStopped;
            internalStop(false);
        }
        // TODO: very unclear what the goal is here. why do we wait only if we were the first to invoke stop?
        // but we wait outside the synchronized block to avoid a deadlock with `build` in the future callback
        if (!wasStopped)
            FBUtilities.waitOnFuture(future);
    }

    private void internalStop(boolean isCompactionInterrupted)
    {
        isStopped = true;
        tasks.forEach(task -> task.stop(isCompactionInterrupted));
    }
}