KeyStoreWriter.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.index.sai.disk.v1.keystore;

import java.io.Closeable;
import java.io.IOException;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;

import org.apache.cassandra.index.sai.disk.v1.MetadataWriter;
import org.apache.cassandra.index.sai.disk.v1.SAICodecUtils;
import org.apache.cassandra.index.sai.disk.v1.bitpack.NumericValuesWriter;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.FastByteOperations;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.StringHelper;

/**
 * Writes a sequence of partition keys or clustering keys for use with {@link KeyLookup}.
 * <p>
 * Partition keys are written unordered and clustering keys are written in ordered partitions determined by calls to
 * {@link #startPartition()}. In either case keys can be of varying lengths.
 * <p>
 * The {@link #blockShift} field is used to quickly determine the id of the current block
 * based on a point id or to check if we are exactly at the beginning of the block.
 * <p>
 * Keys are organized in blocks of (2 ^ {@link #blockShift}) keys.
 * <p>
 * The blocks should not be too small because they allow prefix compression of the keys except the first key in a block.
 * <p>
 * The blocks should not be too large because we can't just randomly jump to the key inside the block, but we have to
 * iterate through all the keys from the start of the block.
 *
 * @see KeyLookup
 */
@NotThreadSafe
public class KeyStoreWriter implements Closeable
{
    private final int blockShift;
    private final int blockMask;
    private final boolean clustering;
    private final IndexOutput keysOutput;
    private final NumericValuesWriter offsetsWriter;
    private final String componentName;
    private final MetadataWriter metadataWriter;

    private BytesRefBuilder prevKey = new BytesRefBuilder();
    private BytesRefBuilder tempKey = new BytesRefBuilder();

    private final long bytesStartFP;

    private boolean inPartition = false;
    private int maxKeyLength = -1;
    private long pointId = 0;

    /**
     * Creates a new writer.
     * <p>
     * It does not own the components, so you must close the components by yourself
     * after you're done with the writer.
     *
     * @param componentName the component name for the {@link KeyLookupMeta}
     * @param metadataWriter the {@link MetadataWriter} for storing the {@link KeyLookupMeta}
     * @param keysOutput where to write the prefix-compressed keys
     * @param keysBlockOffsets  where to write the offsets of each block of keys
     * @param blockShift the block shift that is used to determine the block size
     * @param clustering determines whether the keys will be written as ordered partitions
     */
    public KeyStoreWriter(String componentName,
                          MetadataWriter metadataWriter,
                          IndexOutput keysOutput,
                          NumericValuesWriter keysBlockOffsets,
                          int blockShift,
                          boolean clustering) throws IOException
    {
        this.componentName = componentName;
        this.metadataWriter = metadataWriter;
        SAICodecUtils.writeHeader(keysOutput);
        this.blockShift = blockShift;
        this.blockMask = (1 << this.blockShift) - 1;
        this.clustering = clustering;
        this.keysOutput = keysOutput;
        this.keysOutput.writeVInt(blockShift);
        this.keysOutput.writeByte((byte ) (clustering ? 1 : 0));
        this.bytesStartFP = keysOutput.getFilePointer();
        this.offsetsWriter = keysBlockOffsets;
    }

    public void startPartition()
    {
        assert clustering : "Cannot start a partition on a non-clustering key store";

        inPartition = false;
    }

    /**
     * Appends a key at the end of the sequence.
     *
     * @throws IOException if write to disk fails
     * @throws IllegalArgumentException if the key is not greater than the previous added key
     */
    public void add(final @Nonnull ByteComparable key) throws IOException
    {
        tempKey.clear();
        copyBytes(key, tempKey);

        BytesRef keyRef = tempKey.get();

        if (clustering && inPartition)
        {
            if (compareKeys(keyRef, prevKey.get()) <= 0)
                throw new IllegalArgumentException("Clustering keys must be in ascending lexographical order");
        }

        inPartition = true;

        writeKey(keyRef);

        maxKeyLength = Math.max(maxKeyLength, keyRef.length);

        BytesRefBuilder temp = this.tempKey;
        this.tempKey = this.prevKey;
        this.prevKey = temp;

        pointId++;
    }

    private void writeKey(BytesRef key) throws IOException
    {
        if ((pointId & blockMask) == 0)
        {
            offsetsWriter.add(keysOutput.getFilePointer() - bytesStartFP);

            keysOutput.writeVInt(key.length);
            keysOutput.writeBytes(key.bytes, key.offset, key.length);
        }
        else
        {
            int prefixLength = 0;
            int suffixLength = 0;

            // If the key is the same as the previous key then we use prefix and suffix lengths of 0.
            // This means that we store a byte of 0 and don't write any data for the key.
            if (compareKeys(prevKey.get(), key) != 0)
            {
                prefixLength = StringHelper.bytesDifference(prevKey.get(), key);
                suffixLength = key.length - prefixLength;
            }
            // The prefix and suffix lengths are written as a byte followed by up to 2 vints. An attempt is
            // made to compress the lengths into the byte (if prefix length < 15 and/or suffix length < 15).
            // If either length exceeds the compressed byte maximum, it is written as a vint following the byte.
            keysOutput.writeByte((byte) (Math.min(prefixLength, 15) | (Math.min(15, suffixLength) << 4)));

            if (prefixLength + suffixLength > 0)
            {
                if (prefixLength >= 15)
                    keysOutput.writeVInt(prefixLength - 15);
                if (suffixLength >= 15)
                    keysOutput.writeVInt(suffixLength - 15);

                keysOutput.writeBytes(key.bytes, key.offset + prefixLength, key.length - prefixLength);
            }
        }
    }

    /**
     * Flushes any in-memory buffers to the output streams.
     * Does not close the output streams.
     * No more writes are allowed.
     */
    @Override
    public void close() throws IOException
    {
        try (IndexOutput output = metadataWriter.builder(componentName))
        {
            SAICodecUtils.writeFooter(keysOutput);
            KeyLookupMeta.write(output, pointId, maxKeyLength);
        }
        finally
        {
            FileUtils.close(offsetsWriter, keysOutput);
        }
    }

    private int compareKeys(BytesRef left, BytesRef right)
    {
        return FastByteOperations.compareUnsigned(left.bytes, left.offset, left.offset + left.length,
                                                  right.bytes, right.offset, right.offset + right.length);
    }

    private void copyBytes(ByteComparable source, BytesRefBuilder dest)
    {
        ByteSource byteSource = source.asComparableBytes(ByteComparable.Version.OSS50);
        int val;
        while ((val = byteSource.next()) != ByteSource.END_OF_STREAM)
            dest.append((byte) val);
    }
}