SortedTableWriter.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.io.sstable.format;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.Collection;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionPurger;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.db.guardrails.Threshold;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.cassandra.db.rows.PartitionSerializationException;
import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableFlushObserver;
import org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FilterFactory;
import org.apache.cassandra.utils.IFilter;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Transactional;

import static com.google.common.base.Preconditions.checkNotNull;

/**
 * A generic implementation of a writer which assumes the existence of some partition index and bloom filter.
 */
public abstract class SortedTableWriter<P extends SortedTablePartitionWriter, I extends SortedTableWriter.AbstractIndexWriter> extends SSTableWriter
{
    private final static Logger logger = LoggerFactory.getLogger(SortedTableWriter.class);

    // TODO dataWriter is not needed to be directly accessible - we can access everything we need for the dataWriter
    //   from a partition writer
    protected final SequentialWriter dataWriter;
    protected final I indexWriter;
    protected final P partitionWriter;
    private final FileHandle.Builder dataFileBuilder = new FileHandle.Builder(descriptor.fileFor(Components.DATA));
    private DecoratedKey lastWrittenKey;
    private DataPosition dataMark;
    private long lastEarlyOpenLength;

    public SortedTableWriter(Builder<P, I, ?, ?> builder, LifecycleNewTracker lifecycleNewTracker, SSTable.Owner owner)
    {
        super(builder, lifecycleNewTracker, owner);

        SequentialWriter dataWriter = null;
        I indexWriter = null;
        P partitionWriter = null;
        try
        {
            dataWriter = builder.openDataWriter();
            checkNotNull(dataWriter);

            indexWriter = builder.openIndexWriter(dataWriter);
            checkNotNull(indexWriter);

            partitionWriter = builder.openPartitionWriter(dataWriter, indexWriter);
            checkNotNull(partitionWriter);

            this.dataWriter = dataWriter;
            this.indexWriter = indexWriter;
            this.partitionWriter = partitionWriter;
        }
        catch (RuntimeException | Error ex)
        {
            Throwables.closeNonNullAndAddSuppressed(ex, partitionWriter, indexWriter, dataWriter);
            handleConstructionFailure(ex);
            throw ex;
        }
    }

    /**
     * Appends partition data to this writer.
     *
     * @param partition the partition to write
     * @return the created index entry if something was written, that is if {@code iterator} wasn't empty,
     * {@code null} otherwise.
     * @throws FSWriteError if write to the dataFile fails
     */
    @Override
    public final AbstractRowIndexEntry append(UnfilteredRowIterator partition)
    {
        if (partition.isEmpty())
            return null;

        try
        {
            if (!verifyPartition(partition.partitionKey()))
                return null;

            startPartition(partition.partitionKey(), partition.partitionLevelDeletion());

            AbstractRowIndexEntry indexEntry;
            if (header.hasStatic())
                addStaticRow(partition.partitionKey(), partition.staticRow());

            while (partition.hasNext())
                addUnfiltered(partition.partitionKey(), partition.next());

            indexEntry = endPartition(partition.partitionKey(), partition.partitionLevelDeletion());

            return indexEntry;
        }
        catch (BufferOverflowException boe)
        {
            throw new PartitionSerializationException(partition, boe);
        }
        catch (IOException e)
        {
            throw new FSWriteError(e, getFilename());
        }
    }

    private boolean verifyPartition(DecoratedKey key)
    {
        assert key != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values

        if (key.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
        {
            logger.error("Key size {} exceeds maximum of {}, skipping row", key.getKey().remaining(), FBUtilities.MAX_UNSIGNED_SHORT);
            return false;
        }

        if (lastWrittenKey != null && lastWrittenKey.compareTo(key) >= 0)
            throw new RuntimeException(String.format("Last written key %s >= current key %s, writing into %s", lastWrittenKey, key, getFilename()));

        return true;
    }

    private void startPartition(DecoratedKey key, DeletionTime partitionLevelDeletion) throws IOException
    {
        partitionWriter.start(key, partitionLevelDeletion);
        metadataCollector.updatePartitionDeletion(partitionLevelDeletion);

        onStartPartition(key);
    }

    private void addStaticRow(DecoratedKey key, Row row) throws IOException
    {
        guardCollectionSize(key, row);

        partitionWriter.addStaticRow(row);
        if (!row.isEmpty())
            Rows.collectStats(row, metadataCollector);

        onStaticRow(row);
    }

    private void addUnfiltered(DecoratedKey key, Unfiltered unfiltered) throws IOException
    {
        if (unfiltered.isRow())
            addRow(key, (Row) unfiltered);
        else
            addRangeTomstoneMarker((RangeTombstoneMarker) unfiltered);
    }

    private void addRow(DecoratedKey key, Row row) throws IOException
    {
        guardCollectionSize(key, row);

        partitionWriter.addUnfiltered(row);
        metadataCollector.updateClusteringValues(row.clustering());
        Rows.collectStats(row, metadataCollector);

        onRow(row);
    }

    private void addRangeTomstoneMarker(RangeTombstoneMarker marker) throws IOException
    {
        partitionWriter.addUnfiltered(marker);

        metadataCollector.updateClusteringValuesByBoundOrBoundary(marker.clustering());
        if (marker.isBoundary())
        {
            RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker) marker;
            metadataCollector.update(bm.endDeletionTime());
            metadataCollector.update(bm.startDeletionTime());
        }
        else
        {
            metadataCollector.update(((RangeTombstoneBoundMarker) marker).deletionTime());
        }

        onRangeTombstoneMarker(marker);
    }

    private AbstractRowIndexEntry endPartition(DecoratedKey key, DeletionTime partitionLevelDeletion) throws IOException
    {
        long finishResult = partitionWriter.finish();

        long endPosition = dataWriter.position();
        long rowSize = endPosition - partitionWriter.getInitialPosition();
        guardPartitionThreshold(Guardrails.partitionSize, key, rowSize);
        guardPartitionThreshold(Guardrails.partitionTombstones, key, metadataCollector.totalTombstones);
        metadataCollector.addPartitionSizeInBytes(rowSize);
        metadataCollector.addKey(key.getKey());
        metadataCollector.addCellPerPartitionCount();

        lastWrittenKey = key;
        last = lastWrittenKey;
        if (first == null)
            first = lastWrittenKey;

        if (logger.isTraceEnabled())
            logger.trace("wrote {} at {}", key, endPosition);

        return createRowIndexEntry(key, partitionLevelDeletion, finishResult);
    }

    protected void onStartPartition(DecoratedKey key)
    {
        notifyObservers(o -> o.startPartition(key, partitionWriter.getInitialPosition(), partitionWriter.getInitialPosition()));
    }

    protected void onStaticRow(Row row)
    {
        notifyObservers(o -> o.staticRow(row));
    }

    protected void onRow(Row row)
    {
        notifyObservers(o -> o.nextUnfilteredCluster(row));
    }

    protected void onRangeTombstoneMarker(RangeTombstoneMarker marker)
    {
        notifyObservers(o -> o.nextUnfilteredCluster(marker));
    }

    protected abstract AbstractRowIndexEntry createRowIndexEntry(DecoratedKey key, DeletionTime partitionLevelDeletion, long finishResult) throws IOException;

    protected final void notifyObservers(Consumer<SSTableFlushObserver> action)
    {
        if (observers != null && !observers.isEmpty())
            observers.forEach(action);
    }

    @Override
    public void mark()
    {
        dataMark = dataWriter.mark();
        indexWriter.mark();
    }

    @Override
    public void resetAndTruncate()
    {
        dataWriter.resetAndTruncate(dataMark);
        partitionWriter.reset();
        indexWriter.resetAndTruncate();
    }

    @Override
    protected SSTableWriter.TransactionalProxy txnProxy()
    {
        return new TransactionalProxy(() -> FBUtilities.immutableListWithFilteredNulls(indexWriter, dataWriter));
    }

    protected class TransactionalProxy extends SSTableWriter.TransactionalProxy
    {
        public TransactionalProxy(Supplier<ImmutableList<Transactional>> transactionals)
        {
            super(transactionals);
        }

        @Override
        protected Throwable doPostCleanup(Throwable accumulate)
        {
            accumulate = Throwables.close(accumulate, partitionWriter);
            accumulate = super.doPostCleanup(accumulate);
            return accumulate;
        }
    }

    @Override
    public long getFilePointer()
    {
        return dataWriter.position();
    }

    @Override
    public long getOnDiskFilePointer()
    {
        return dataWriter.getOnDiskFilePointer();
    }

    @Override
    public long getEstimatedOnDiskBytesWritten()
    {
        return dataWriter.getEstimatedOnDiskBytesWritten();
    }

    protected FileHandle openDataFile(long lengthOverride, StatsMetadata statsMetadata)
    {
        int dataBufferSize = ioOptions.diskOptimizationStrategy.bufferSize(statsMetadata.estimatedPartitionSize.percentile(ioOptions.diskOptimizationEstimatePercentile));


        FileHandle dataFile;
        try (CompressionMetadata compressionMetadata = compression ? ((CompressedSequentialWriter) dataWriter).open(lengthOverride) : null)
        {
            dataFile = dataFileBuilder.mmapped(ioOptions.defaultDiskAccessMode)
                                      .withMmappedRegionsCache(mmappedRegionsCache)
                                      .withChunkCache(chunkCache)
                                      .withCompressionMetadata(compressionMetadata)
                                      .bufferSize(dataBufferSize)
                                      .withLengthOverride(lengthOverride)
                                      .complete();
        }

        try
        {
            if (chunkCache != null)
            {
                if (lastEarlyOpenLength != 0 && dataFile.dataLength() > lastEarlyOpenLength)
                    chunkCache.invalidatePosition(dataFile, lastEarlyOpenLength);
            }
            lastEarlyOpenLength = dataFile.dataLength();
        }
        catch (RuntimeException | Error ex)
        {
            Throwables.closeNonNullAndAddSuppressed(ex, dataFile);
            throw ex;
        }

        return dataFile;
    }

    private void guardPartitionThreshold(Threshold guardrail, DecoratedKey key, long size)
    {
        if (guardrail.triggersOn(size, null))
        {
            String message = String.format("%s.%s:%s on sstable %s",
                                           metadata.keyspace,
                                           metadata.name,
                                           metadata().partitionKeyType.getString(key.getKey()),
                                           getFilename());
            guardrail.guard(size, message, true, null);
        }
    }

    private void guardCollectionSize(DecoratedKey partitionKey, Row row)
    {
        if (!Guardrails.collectionSize.enabled() && !Guardrails.itemsPerCollection.enabled())
            return;

        if (row.isEmpty() || SchemaConstants.isSystemKeyspace(metadata.keyspace))
            return;

        for (ColumnMetadata column : row.columns())
        {
            if (!column.type.isCollection() || !column.type.isMultiCell())
                continue;

            ComplexColumnData cells = row.getComplexColumnData(column);
            if (cells == null)
                continue;

            ComplexColumnData liveCells = cells.purge(DeletionPurger.PURGE_ALL, FBUtilities.nowInSeconds());
            if (liveCells == null)
                continue;

            int cellsSize = liveCells.dataSize();
            int cellsCount = liveCells.cellsCount();

            if (!Guardrails.collectionSize.triggersOn(cellsSize, null) &&
                !Guardrails.itemsPerCollection.triggersOn(cellsCount, null))
                continue;

            String keyString = metadata.getLocal().primaryKeyAsCQLLiteral(partitionKey.getKey(), row.clustering());
            String msg = String.format("%s in row %s in table %s",
                                       column.name.toString(),
                                       keyString,
                                       metadata);
            Guardrails.collectionSize.guard(cellsSize, msg, true, null);
            Guardrails.itemsPerCollection.guard(cellsCount, msg, true, null);
        }
    }

    protected static abstract class AbstractIndexWriter extends AbstractTransactional implements Transactional
    {
        protected final Descriptor descriptor;
        protected final TableMetadataRef metadata;
        protected final Set<Component> components;

        protected final IFilter bf;

        protected AbstractIndexWriter(Builder<?, ?, ?, ?> b)
        {
            this.descriptor = b.descriptor;
            this.metadata = b.getTableMetadataRef();
            this.components = b.getComponents();

            bf = FilterFactory.getFilter(b.getKeyCount(), b.getTableMetadataRef().getLocal().params.bloomFilterFpChance);
        }

        protected void flushBf()
        {
            if (components.contains(Components.FILTER))
            {
                try
                {
                    FilterComponent.save(bf, descriptor, true);
                }
                catch (IOException ex)
                {
                    throw new FSWriteError(ex, descriptor.fileFor(Components.FILTER));
                }
            }
        }

        public abstract void mark();

        public abstract void resetAndTruncate();

        protected void doPrepare()
        {
            flushBf();
        }

        @Override
        protected Throwable doPostCleanup(Throwable accumulate)
        {
            accumulate = bf.close(accumulate);
            return accumulate;
        }

        public IFilter getFilterCopy()
        {
            return bf.sharedCopy();
        }
    }

    public abstract static class Builder<P extends SortedTablePartitionWriter,
                                        I extends AbstractIndexWriter,
                                        W extends SortedTableWriter<P, I>,
                                        B extends Builder<P, I, W, B>> extends SSTableWriter.Builder<W, B>
    {

        public Builder(Descriptor descriptor)
        {
            super(descriptor);
        }

        @Override
        public B addDefaultComponents(Collection<Index.Group> indexGroups)
        {
            super.addDefaultComponents(indexGroups);

            if (FilterComponent.shouldUseBloomFilter(getTableMetadataRef().getLocal().params.bloomFilterFpChance))
            {
                addComponents(ImmutableSet.of(SSTableFormat.Components.FILTER));
            }

            return (B) this;
        }

        protected abstract SequentialWriter openDataWriter();

        protected abstract I openIndexWriter(SequentialWriter dataWriter);

        protected abstract P openPartitionWriter(SequentialWriter dataWriter, I indexWriter);
    }
}