CommitLogSegmentManagerCDC.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.commitlog;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState;
import org.apache.cassandra.exceptions.CDCWriteException;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.DirectorySizeCalculator;
import org.apache.cassandra.utils.NoSpamLogger;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
{
    static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManagerCDC.class);
    private final CDCSizeTracker cdcSizeTracker;
    public CommitLogSegmentManagerCDC(final CommitLog commitLog, String storageDirectory)
    {
        super(commitLog, storageDirectory);
        cdcSizeTracker = new CDCSizeTracker(this, new File(DatabaseDescriptor.getCDCLogLocation()));
    }
    @Override
    void start()
    {
        cdcSizeTracker.start();
        super.start();
    }
    public void discard(CommitLogSegment segment, boolean delete)
    {
        segment.close();
        addSize(-segment.onDiskSize());
        cdcSizeTracker.processDiscardedSegment(segment);
        if (delete)
            segment.logFile.delete();
        if (segment.getCDCState() != CDCState.CONTAINS)
        {
            // Always delete hard-link from cdc folder if this segment didn't contain CDC data. Note: File may not exist
            // if processing discard during startup.
            File cdcLink = segment.getCDCFile();
            File cdcIndexFile = segment.getCDCIndexFile();
            deleteCDCFiles(cdcLink, cdcIndexFile);
        }
    }
    /**
     * Delete the oldest hard-linked CDC commit log segment to free up space.
     * @param bytesToFree, the minimum space to free up
     * @return total size under the CDC folder in bytes after deletion
     */
    public long deleteOldLinkedCDCCommitLogSegment(long bytesToFree)
    {
        if (bytesToFree <= 0)
            return 0;
        File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
        Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does not exist.");
        File[] files = cdcDir.tryList(f -> CommitLogDescriptor.isValid(f.name()));
        if (files == null || files.length == 0)
        {
            logger.warn("Skip deleting due to no CDC commit log segments found.");
            return 0;
        }
        List<File> sorted = Arrays.stream(files)
                                  // sort by the commmit log segment id
                                  .sorted(new CommitLogSegment.CommitLogSegmentFileComparator())
                                  .collect(Collectors.toList());
        long bytesDeleted = 0;
        long bytesRemaining = 0;
        boolean deletionCompleted = false;
        // keep deleting from old to new until it reaches to the goal or the current writing segment
        for (File linkedCdcFile : sorted)
        {
            // only evaluate/update when deletionCompleted is false
            if (!deletionCompleted)
            {
                deletionCompleted = bytesDeleted >= bytesToFree || linkedCdcFile.equals(allocatingFrom().getCDCFile());
            }
            if (deletionCompleted)
            {
                bytesRemaining += linkedCdcFile.length();
            }
            else
            {
                File cdcIndexFile = CommitLogDescriptor.inferCdcIndexFile(linkedCdcFile);
                bytesDeleted += deleteCDCFiles(linkedCdcFile, cdcIndexFile);
            }
        }
        return bytesRemaining;
    }
    private long deleteCDCFiles(File cdcLink, File cdcIndexFile)
    {
        long total = 0;
        if (cdcLink != null && cdcLink.exists())
        {
            total += cdcLink.length();
            cdcLink.delete();
        }
        if (cdcIndexFile != null && cdcIndexFile.exists())
        {
            total += cdcIndexFile.length();
            cdcIndexFile.delete();
        }
        return total;
    }
    /**
     * Initiates the shutdown process for the management thread. Also stops the cdc on-disk size calculator executor.
     */
    public void shutdown()
    {
        cdcSizeTracker.shutdown();
        super.shutdown();
    }
    /**
     * Reserve space in the current segment for the provided mutation or, if there isn't space available,
     * create a new segment. For CDC mutations, allocation is expected to throw WTE if the segment disallows CDC mutations.
     *
     * @param mutation Mutation to allocate in segment manager
     * @param size total size (overhead + serialized) of mutation
     * @return the created Allocation object
     * @throws CDCWriteException If segment disallows CDC mutations, we throw
     */
    @Override
    public CommitLogSegment.Allocation allocate(Mutation mutation, int size) throws CDCWriteException
    {
        CommitLogSegment segment = allocatingFrom();
        CommitLogSegment.Allocation alloc;
        permitSegmentMaybe(segment);
        throwIfForbidden(mutation, segment);
        while ( null == (alloc = segment.allocate(mutation, size)) )
        {
            // Failed to allocate, so move to a new segment with enough room if possible.
            advanceAllocatingFrom(segment);
            segment = allocatingFrom();
            permitSegmentMaybe(segment);
            throwIfForbidden(mutation, segment);
        }
        if (mutation.trackedByCDC())
            segment.setCDCState(CDCState.CONTAINS);
        return alloc;
    }
    // Permit a forbidden segment under the following conditions.
    // - Non-blocking mode has just recently been enabled for CDC.
    // - The CDC total space has droppped below the limit (e.g. CDC consumer cleans up).
    private void permitSegmentMaybe(CommitLogSegment segment)
    {
        if (segment.getCDCState() != CDCState.FORBIDDEN)
            return;
        if (!DatabaseDescriptor.getCDCBlockWrites()
            || cdcSizeTracker.sizeInProgress.get() + DatabaseDescriptor.getCommitLogSegmentSize() < DatabaseDescriptor.getCDCTotalSpace())
        {
            CDCState oldState = segment.setCDCState(CDCState.PERMITTED);
            if (oldState == CDCState.FORBIDDEN)
            {
                FileUtils.createHardLink(segment.logFile, segment.getCDCFile());
                cdcSizeTracker.addSize(DatabaseDescriptor.getCommitLogSegmentSize());
            }
        }
    }
    private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) throws CDCWriteException
    {
        if (mutation.trackedByCDC() && segment.getCDCState() == CDCState.FORBIDDEN)
        {
            cdcSizeTracker.submitOverflowSizeRecalculation();
            String logMsg = String.format("Rejecting mutation to keyspace %s. Free up space in %s by processing CDC logs. " +
                                          "Total CDC bytes on disk is %s.",
                                          mutation.getKeyspaceName(), DatabaseDescriptor.getCDCLogLocation(),
                                          cdcSizeTracker.sizeInProgress.get());
            NoSpamLogger.log(logger,
                             NoSpamLogger.Level.WARN,
                             10,
                             TimeUnit.SECONDS,
                             logMsg);
            throw new CDCWriteException(logMsg);
        }
    }
    /**
     * On segment creation, flag whether the segment should accept CDC mutations or not based on the total currently
     * allocated unflushed CDC segments and the contents of cdc_raw
     *
     * Synchronized on this
     */
    @Override
    public CommitLogSegment createSegment()
    {
        CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this);
        cdcSizeTracker.processNewSegment(segment);
        // After processing, the state of the segment can either be PERMITTED or FORBIDDEN
        if (segment.getCDCState() == CDCState.PERMITTED)
        {
            // Hard link file in cdc folder for realtime tracking
            FileUtils.createHardLink(segment.logFile, segment.getCDCFile());
        }
        return segment;
    }
    /**
     * Delete untracked segment files after replay
     *
     * @param file segment file that is no longer in use.
     */
    @Override
    void handleReplayedSegment(final File file)
    {
        super.handleReplayedSegment(file);
        // delete untracked cdc segment hard link files if their index files do not exist
        File cdcFile = new File(DatabaseDescriptor.getCDCLogLocation(), file.name());
        File cdcIndexFile = new File(DatabaseDescriptor.getCDCLogLocation(), CommitLogDescriptor.fromFileName(file.name()).cdcIndexFileName());
        if (cdcFile.exists() && !cdcIndexFile.exists())
        {
            logger.trace("(Unopened) CDC segment {} is no longer needed and will be deleted now", cdcFile);
            cdcFile.delete();
        }
    }
    /**
     * For use after replay when replayer hard-links / adds tracking of replayed segments
     */
    public void addCDCSize(long size)
    {
        cdcSizeTracker.addSize(size);
    }
    /**
     * Tracks total disk usage of CDC subsystem, defined by the summation of all unflushed CommitLogSegments with CDC
     * data in them and all segments archived into cdc_raw.
     *
     * Allows atomic increment/decrement of unflushed size, however only allows increment on flushed and requires a full
     * directory walk to determine any potential deletions by CDC consumer.
     */
    private static class CDCSizeTracker extends DirectorySizeCalculator
    {
        private final RateLimiter rateLimiter = RateLimiter.create(1000.0 / DatabaseDescriptor.getCDCDiskCheckInterval());
        private ExecutorService cdcSizeCalculationExecutor;
        private final CommitLogSegmentManagerCDC segmentManager;
        // track the total size between two dictionary size calculations
        private final AtomicLong sizeInProgress;
        private final File path;
        CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path)
        {
            this.path = path;
            this.segmentManager = segmentManager;
            this.sizeInProgress = new AtomicLong(0);
        }
        /**
         * Needed for stop/restart during unit tests
         */
        public void start()
        {
            sizeInProgress.getAndSet(0);
            cdcSizeCalculationExecutor = executorFactory().configureSequential("CDCSizeCalculationExecutor")
                                                          .withRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy())
                                                          .withQueueLimit(0)
                                                          .withKeepAlive(1000, TimeUnit.SECONDS)
                                                          .build();
        }
        /**
         * Synchronous size recalculation on each segment creation/deletion call could lead to very long delays in new
         * segment allocation, thus long delays in thread signaling to wake waiting allocation / writer threads.
         *
         * This can be reached either from the segment management thread in AbstractCommitLogSegmentManager or from the
         * size recalculation executor, so we synchronize on this object to reduce the race overlap window available for
         * size to get off.
         *
         * Reference DirectorySizerBench for more information about performance of the directory size recalc.
         */
        void processNewSegment(CommitLogSegment segment)
        {
            int segmentSize = defaultSegmentSize();
            long allowance = DatabaseDescriptor.getCDCTotalSpace();
            boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
            // See synchronization in CommitLogSegment.setCDCState
            synchronized (segment.cdcStateLock)
            {
                segment.setCDCState(blocking && segmentSize + sizeInProgress.get() > allowance
                                    ? CDCState.FORBIDDEN
                                    : CDCState.PERMITTED);
                // Aggressively count in the (estimated) size of new segments.
                if (segment.getCDCState() == CDCState.PERMITTED)
                    addSize(segmentSize);
            }
            // Remove the oldest cdc segment file when exceeding the CDC storage allowance
            if (!blocking && sizeInProgress.get() > allowance)
            {
                long bytesToFree = sizeInProgress.get() - allowance;
                long remainingSize = segmentManager.deleteOldLinkedCDCCommitLogSegment(bytesToFree);
                long releasedSize = sizeInProgress.get() - remainingSize;
                sizeInProgress.getAndSet(remainingSize);
                logger.debug("Freed up {} ({}) bytes after deleting the oldest CDC commit log segments in non-blocking mode. " +
                             "Total on-disk CDC size: {}; allowed CDC size: {}",
                             releasedSize, bytesToFree, remainingSize, allowance);
            }
            // Take this opportunity to kick off a recalc to pick up any consumer file deletion.
            submitOverflowSizeRecalculation();
        }
        void processDiscardedSegment(CommitLogSegment segment)
        {
            if (!segment.getCDCFile().exists())
            {
                logger.debug("Not processing discarded CommitLogSegment {}; this segment appears to have been deleted already.", segment);
                return;
            }
            synchronized (segment.cdcStateLock)
            {
                // Add to flushed size before decrementing unflushed, so we don't have a window of false generosity
                if (segment.getCDCState() == CDCState.CONTAINS)
                    addSize(segment.onDiskSize());
                // Subtract the (estimated) size of the segment from processNewSegment.
                // For the segement that CONTAINS, we update with adding the actual onDiskSize and removing the estimated size.
                // For the segment that remains in PERMITTED, the file is to be deleted and the estimate should be returned.
                if (segment.getCDCState() != CDCState.FORBIDDEN)
                    addSize(-defaultSegmentSize());
            }
            // Take this opportunity to kick off a recalc to pick up any consumer file deletion.
            submitOverflowSizeRecalculation();
        }
        public void submitOverflowSizeRecalculation()
        {
            try
            {
                cdcSizeCalculationExecutor.submit(() -> {
                    rateLimiter.acquire();
                    calculateSize();
                });
            }
            catch (RejectedExecutionException e)
            {
                // Do nothing. Means we have one in flight so this req. should be satisfied when it completes.
            }
        }
        private int defaultSegmentSize()
        {
            // CommitLogSegmentSize is only loaded from yaml.
            // There is a setter but is used only for testing.
            return DatabaseDescriptor.getCommitLogSegmentSize();
        }
        private void calculateSize()
        {
            try
            {
                resetSize();
                Files.walkFileTree(path.toPath(), this);
                sizeInProgress.getAndSet(getAllocatedSize());
            }
            catch (IOException ie)
            {
                CommitLog.handleCommitError("Failed CDC Size Calculation", ie);
            }
        }
        public void shutdown()
        {
            if (cdcSizeCalculationExecutor != null && !cdcSizeCalculationExecutor.isShutdown())
            {
                cdcSizeCalculationExecutor.shutdown();
            }
        }
        private void addSize(long toAdd)
        {
            sizeInProgress.getAndAdd(toAdd);
        }
    }
    /**
     * Only use for testing / validation that size tracker is working. Not for production use.
     */
    @VisibleForTesting
    public long updateCDCTotalSize()
    {
        long sleepTime = DatabaseDescriptor.getCDCDiskCheckInterval() + 50L;
        // Give the update time to finish the last run if any. Therefore, avoid modifying production code only for testing purpose.
        Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
        cdcSizeTracker.submitOverflowSizeRecalculation();
        // Give the update time to run
        Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
        // then update the state of the segment it is allocating from. In produciton, the state is updated during "allocate"
        if (allocatingFrom().getCDCState() == CDCState.FORBIDDEN)
            cdcSizeTracker.processNewSegment(allocatingFrom());
        return cdcSizeTracker.getAllocatedSize();
    }
}