PartitionIndexBuilder.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.io.sstable.format.bti;

import java.io.IOException;
import java.util.function.Consumer;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.tries.IncrementalTrieWriter;
import org.apache.cassandra.io.tries.Walker;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;

/**
 * Partition index builder: stores index or data positions in an incrementally built, page aware on-disk trie.
 * <p>
 * The files created by this builder are read by {@link PartitionIndex}.
 */
class PartitionIndexBuilder implements AutoCloseable
{
    private final SequentialWriter writer;
    private final IncrementalTrieWriter<PartitionIndex.Payload> trieWriter;
    private final FileHandle.Builder fhBuilder;

    // the last synced data file position
    private long dataSyncPosition;
    // the last synced row index file position
    private long rowIndexSyncPosition;
    // the last synced partition index file position
    private long partitionIndexSyncPosition;

    // Partial index can only be used after all three files have been synced to the required positions.
    private long partialIndexDataEnd;
    private long partialIndexRowEnd;
    private long partialIndexPartitionEnd;
    private IncrementalTrieWriter.PartialTail partialIndexTail;
    private Consumer<PartitionIndex> partialIndexConsumer;
    private DecoratedKey partialIndexLastKey;

    private int lastDiffPoint;
    private DecoratedKey firstKey;
    private DecoratedKey lastKey;
    private DecoratedKey lastWrittenKey;
    private PartitionIndex.Payload lastPayload;

    public PartitionIndexBuilder(SequentialWriter writer, FileHandle.Builder fhBuilder)
    {
        this.writer = writer;
        this.trieWriter = IncrementalTrieWriter.open(PartitionIndex.TRIE_SERIALIZER, writer);
        this.fhBuilder = fhBuilder;
    }

    /*
     * Called when partition index has been flushed to the given position.
     * If this makes all required positions for a partial view flushed, this will call the partialIndexConsumer.
     */
    public void markPartitionIndexSynced(long upToPosition)
    {
        partitionIndexSyncPosition = upToPosition;
        refreshReadableBoundary();
    }

    /*
     * Called when row index has been flushed to the given position.
     * If this makes all required positions for a partial view flushed, this will call the partialIndexConsumer.
     */
    public void markRowIndexSynced(long upToPosition)
    {
        rowIndexSyncPosition = upToPosition;
        refreshReadableBoundary();
    }

    /*
     * Called when data file has been flushed to the given position.
     * If this makes all required positions for a partial view flushed, this will call the partialIndexConsumer.
     */
    public void markDataSynced(long upToPosition)
    {
        dataSyncPosition = upToPosition;
        refreshReadableBoundary();
    }

    private void refreshReadableBoundary()
    {
        if (partialIndexConsumer == null)
            return;
        if (dataSyncPosition < partialIndexDataEnd)
            return;
        if (rowIndexSyncPosition < partialIndexRowEnd)
            return;
        if (partitionIndexSyncPosition < partialIndexPartitionEnd)
            return;

        try (FileHandle fh = fhBuilder.withLengthOverride(writer.getLastFlushOffset()).complete())
        {
            @SuppressWarnings({ "resource", "RedundantSuppression" })
            PartitionIndex pi = new PartitionIndexEarly(fh, partialIndexTail.root(), partialIndexTail.count(), firstKey, partialIndexLastKey, partialIndexTail.cutoff(), partialIndexTail.tail());
            partialIndexConsumer.accept(pi);
            partialIndexConsumer = null;
        }
        finally
        {
            fhBuilder.withLengthOverride(-1);
        }

    }

    /**
    * @param decoratedKey the key for this record
    * @param position the position to write with the record:
    *    - positive if position points to an index entry in the index file
    *    - negative if ~position points directly to the key in the data file
    */
    public void addEntry(DecoratedKey decoratedKey, long position) throws IOException
    {
        if (lastKey == null)
        {
            firstKey = decoratedKey;
            lastDiffPoint = 0;
        }
        else
        {
            int diffPoint = ByteComparable.diffPoint(lastKey, decoratedKey, Walker.BYTE_COMPARABLE_VERSION);
            ByteComparable prevPrefix = ByteComparable.cut(lastKey, Math.max(diffPoint, lastDiffPoint));
            trieWriter.add(prevPrefix, lastPayload);
            lastWrittenKey = lastKey;
            lastDiffPoint = diffPoint;
        }
        lastKey = decoratedKey;
        lastPayload = new PartitionIndex.Payload(position, decoratedKey.filterHashLowerBits());
    }

    public long complete() throws IOException
    {
        // Do not trigger pending partial builds.
        partialIndexConsumer = null;

        if (lastKey != lastWrittenKey)
        {
            ByteComparable prevPrefix = ByteComparable.cut(lastKey, lastDiffPoint);
            trieWriter.add(prevPrefix, lastPayload);
        }

        long root = trieWriter.complete();
        long count = trieWriter.count();
        long firstKeyPos = writer.position();
        if (firstKey != null)
        {
            ByteBufferUtil.writeWithShortLength(firstKey.getKey(), writer);
            ByteBufferUtil.writeWithShortLength(lastKey.getKey(), writer);
        }
        else
        {
            assert lastKey == null;
            writer.writeShort(0);
            writer.writeShort(0);
        }

        writer.writeLong(firstKeyPos);
        writer.writeLong(count);
        writer.writeLong(root);

        writer.sync();
        fhBuilder.withLengthOverride(writer.getLastFlushOffset());

        return root;
    }

    /**
     * Builds a PartitionIndex representing the records written until this point without interrupting writes. Because
     * data in buffered writers does not get immediately flushed to the file system, and we do not want to force flushing
     * of the relevant files (which e.g. could cause a problem for compressed data files), this call cannot return
     * immediately. Instead, it will take an index snapshot but wait with making it active (by calling the provided
     * callback) until it registers that all relevant files (data, row index and partition index) have been flushed at
     * least as far as the required positions.
     *
     * @param callWhenReady callback that is given the prepared partial index when all relevant data has been flushed
     * @param rowIndexEnd the position in the row index file we need to be able to read to (exclusive) to read all
     *                    records written so far
     * @param dataEnd the position in the data file we need to be able to read to (exclusive) to read all records
     *                    written so far
     * @return true if the request was accepted, false if there's no point to do this at this time (e.g. another
     *         partial representation is prepared but still isn't usable).
     */
    public boolean buildPartial(Consumer<PartitionIndex> callWhenReady, long rowIndexEnd, long dataEnd)
    {
        // If we haven't advanced since the last time we prepared, there's nothing to do.
        if (lastWrittenKey == partialIndexLastKey)
            return false;

        // Don't waste time if an index was already prepared but hasn't reached usability yet.
        if (partialIndexConsumer != null)
            return false;

        try
        {
            partialIndexTail = trieWriter.makePartialRoot();
            partialIndexDataEnd = dataEnd;
            partialIndexRowEnd = rowIndexEnd;
            partialIndexPartitionEnd = writer.position();
            partialIndexLastKey = lastWrittenKey;
            partialIndexConsumer = callWhenReady;
            return true;
        }
        catch (IOException e)
        {
            // As writes happen on in-memory buffers, failure here is not expected.
            throw new AssertionError(e);
        }
    }

    // close the builder and release any associated memory
    public void close()
    {
        trieWriter.close();
    }
}