RepairedDataInfo.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db;
import java.nio.ByteBuffer;
import java.util.function.LongPredicate;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PurgeFunction;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.transform.MoreRows;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
@NotThreadSafe
class RepairedDataInfo
{
    public static final RepairedDataInfo NO_OP_REPAIRED_DATA_INFO = new RepairedDataInfo(null)
    {
        @Override
        public UnfilteredPartitionIterator withRepairedDataInfo(UnfilteredPartitionIterator iterator)
        {
            return iterator;
        }
        @Override
        public UnfilteredRowIterator withRepairedDataInfo(UnfilteredRowIterator iterator)
        {
            return iterator;
        }
            
        @Override
        public UnfilteredPartitionIterator extend(UnfilteredPartitionIterator partitions, DataLimits.Counter limit)
        {
           return partitions;
        }
    };
    // Keeps a digest of the partition currently being processed. Since we won't know
    // whether a partition will be fully purged from a read result until it's been
    // consumed, we buffer this per-partition digest and add it to the final digest
    // when the partition is closed (if it wasn't fully purged).
    private Digest perPartitionDigest;
    private Digest perCommandDigest;
    private boolean isConclusive = true;
    private ByteBuffer calculatedDigest = null;
    // Doesn't actually purge from the underlying iterators, but excludes from the digest
    // the purger can't be initialized until we've iterated all the sstables for the query
    // as it requires the oldest repaired tombstone
    private RepairedDataPurger purger;
    private boolean isFullyPurged = true;
    // Supplies additional partitions from the repaired data set to be consumed when the limit of
    // executing ReadCommand has been reached. This is to ensure that each replica attempts to
    // read the same amount of repaired data, otherwise comparisons of the repaired data digests
    // may be invalidated by varying amounts of repaired data being present on each replica.
    // This can't be initialized until after the underlying repaired iterators have been merged.
    private UnfilteredPartitionIterator postLimitPartitions = null;
    private final DataLimits.Counter repairedCounter;
    private UnfilteredRowIterator currentPartition;
    private TableMetrics metrics;
    public RepairedDataInfo(DataLimits.Counter repairedCounter)
    {
        this.repairedCounter = repairedCounter;
    }
    /**
     * If either repaired status tracking is not active or the command has not yet been
     * executed, then this digest will be an empty buffer.
     * Otherwise, it will contain a digest of the repaired data read, or an empty buffer
     * if no repaired data was read.
     *
     * @return a digest of the repaired data read during local execution of a command
     */
    ByteBuffer getDigest()
    {
        if (calculatedDigest != null)
            return calculatedDigest;
        calculatedDigest = perCommandDigest == null
                           ? ByteBufferUtil.EMPTY_BYTE_BUFFER
                           : ByteBuffer.wrap(perCommandDigest.digest());
        return calculatedDigest;
    }
    void prepare(ColumnFamilyStore cfs, long nowInSec, long oldestUnrepairedTombstone)
    {
        this.purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone);
        this.metrics = cfs.metric;
    }
    void finalize(UnfilteredPartitionIterator postLimitPartitions)
    {
        this.postLimitPartitions = postLimitPartitions;
    }
    /**
     * Returns a boolean indicating whether any relevant sstables were skipped during the read
     * that produced the repaired data digest.
     *
     * If true, then no pending repair sessions or partition deletes have influenced the extent
     * of the repaired sstables that went into generating the digest.
     * This indicates whether or not the digest can reliably be used to infer consistency
     * issues between the repaired sets across replicas.
     *
     * If either repaired status tracking is not active or the command has not yet been
     * executed, then this will always return true.
     *
     * @return boolean to indicate confidence in the whether or not the digest of the repaired data can be
     *         reliably be used to infer inconsistency issues between the repaired sets across replicas
     */
    boolean isConclusive()
    {
        return isConclusive;
    }
    void markInconclusive()
    {
        isConclusive = false;
    }
    private void onNewPartition(UnfilteredRowIterator partition)
    {
        assert purger != null;
        purger.setCurrentKey(partition.partitionKey());
        purger.setIsReverseOrder(partition.isReverseOrder());
        this.currentPartition = partition;
    }
    private Digest getPerPartitionDigest()
    {
        if (perPartitionDigest == null)
            perPartitionDigest = Digest.forRepairedDataTracking();
        return perPartitionDigest;
    }
    public UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator)
    {
        class WithTracking extends Transformation<UnfilteredRowIterator>
        {
            protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
            {
                return withRepairedDataInfo(partition);
            }
        }
        return Transformation.apply(iterator, new WithTracking());
    }
    public UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator)
    {
        class WithTracking extends Transformation<UnfilteredRowIterator>
        {
            protected DecoratedKey applyToPartitionKey(DecoratedKey key)
            {
                getPerPartitionDigest().update(key.getKey());
                return key;
            }
            protected DeletionTime applyToDeletion(DeletionTime deletionTime)
            {
                if (repairedCounter.isDone())
                    return deletionTime;
                assert purger != null;
                DeletionTime purged = purger.applyToDeletion(deletionTime);
                if (!purged.isLive())
                    isFullyPurged = false;
                purged.digest(getPerPartitionDigest());
                return deletionTime;
            }
            protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
            {
                if (repairedCounter.isDone())
                    return marker;
                assert purger != null;
                RangeTombstoneMarker purged = purger.applyToMarker(marker);
                if (purged != null)
                {
                    isFullyPurged = false;
                    purged.digest(getPerPartitionDigest());
                }
                return marker;
            }
            protected Row applyToStatic(Row row)
            {
                return applyToRow(row);
            }
            protected Row applyToRow(Row row)
            {
                if (repairedCounter.isDone())
                    return row;
                assert purger != null;
                Row purged = purger.applyToRow(row);
                if (purged != null && !purged.isEmpty())
                {
                    isFullyPurged = false;
                    purged.digest(getPerPartitionDigest());
                }
                return row;
            }
            protected void onPartitionClose()
            {
                if (perPartitionDigest != null)
                {
                    // If the partition wasn't completely emptied by the purger,
                    // calculate the digest for the partition and use it to
                    // update the overall digest
                    if (!isFullyPurged)
                    {
                        if (perCommandDigest == null)
                            perCommandDigest = Digest.forRepairedDataTracking();
                        byte[] partitionDigest = perPartitionDigest.digest();
                        perCommandDigest.update(partitionDigest, 0, partitionDigest.length);
                    }
                    perPartitionDigest = null;
                }
                isFullyPurged = true;
            }
        }
        if (repairedCounter.isDone())
            return iterator;
        UnfilteredRowIterator tracked = repairedCounter.applyTo(Transformation.apply(iterator, new WithTracking()));
        onNewPartition(tracked);
        return tracked;
    }
    public UnfilteredPartitionIterator extend(final UnfilteredPartitionIterator partitions,
                                              final DataLimits.Counter limit)
    {
        class OverreadRepairedData extends Transformation<UnfilteredRowIterator> implements MoreRows<UnfilteredRowIterator>
        {
            protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
            {
                return MoreRows.extend(partition, this, partition.columns());
            }
            public UnfilteredRowIterator moreContents()
            {
                // We don't need to do anything until the DataLimits of the
                // of the read have been reached
                if (!limit.isDone() || repairedCounter.isDone())
                    return null;
                long countBeforeOverreads = repairedCounter.counted();
                long overreadStartTime = nanoTime();
                if (currentPartition != null)
                    consumePartition(currentPartition, repairedCounter);
                if (postLimitPartitions != null)
                    while (postLimitPartitions.hasNext() && !repairedCounter.isDone())
                        consumePartition(postLimitPartitions.next(), repairedCounter);
                // we're not actually providing any more rows, just consuming the repaired data
                long rows = repairedCounter.counted() - countBeforeOverreads;
                long nanos = nanoTime() - overreadStartTime;
                metrics.repairedDataTrackingOverreadRows.update(rows);
                metrics.repairedDataTrackingOverreadTime.update(nanos, TimeUnit.NANOSECONDS);
                Tracing.trace("Read {} additional rows of repaired data for tracking in {}ps", rows, TimeUnit.NANOSECONDS.toMicros(nanos));
                return null;
            }
            private void consumePartition(UnfilteredRowIterator partition, DataLimits.Counter counter)
            {
                if (partition == null)
                    return;
                while (!counter.isDone() && partition.hasNext())
                    partition.next();
                partition.close();
            }
        }
        // If the read didn't touch any sstables prepare() hasn't been called and
        // we can skip this transformation
        if (metrics == null || repairedCounter.isDone())
            return partitions;
        return Transformation.apply(partitions, new OverreadRepairedData());
    }
    /**
     * Although PurgeFunction extends Transformation, this is never applied to an iterator.
     * Instead, it is used by RepairedDataInfo during the generation of a repaired data
     * digest to exclude data which will actually be purged later on in the read pipeline.
     */
    private static class RepairedDataPurger extends PurgeFunction
    {
        RepairedDataPurger(ColumnFamilyStore cfs,
                           long nowInSec,
                           long oldestUnrepairedTombstone)
        {
            super(nowInSec,
                  cfs.gcBefore(nowInSec),
                  oldestUnrepairedTombstone,
                  cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
                  cfs.metadata.get().enforceStrictLiveness());
        }
        protected LongPredicate getPurgeEvaluator()
        {
            return (time) -> true;
        }
        void setCurrentKey(DecoratedKey key)
        {
            super.onNewPartition(key);
        }
        void setIsReverseOrder(boolean isReverseOrder)
        {
            super.setReverseOrder(isReverseOrder);
        }
        public DeletionTime applyToDeletion(DeletionTime deletionTime)
        {
            return super.applyToDeletion(deletionTime);
        }
        public Row applyToRow(Row row)
        {
            return super.applyToRow(row);
        }
        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
        {
            return super.applyToMarker(marker);
        }
    }
}