NumericIndexWriter.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.index.sai.disk.v1.bbtree;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.io.IndexOutputWriter;
import org.apache.cassandra.index.sai.disk.v1.segment.SegmentMetadata;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PackedLongValues;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Specialized writer for values, that builds them into a {@link BlockBalancedTreeWriter} with auxiliary
* posting lists on eligible tree levels.
* <p>
* Given a sorted input {@link BlockBalancedTreeIterator}, the flush process is optimised because we don't need to
* buffer all point values to sort them.
*/
public class NumericIndexWriter
{
public static final int MAX_POINTS_IN_LEAF_NODE = BlockBalancedTreeWriter.DEFAULT_MAX_POINTS_IN_LEAF_NODE;
private static final int DEFAULT_POSTINGS_SIZE = 128;
private final BlockBalancedTreeWriter writer;
private final IndexDescriptor indexDescriptor;
private final IndexContext indexContext;
private final int bytesPerValue;
/**
* @param maxSegmentRowId maximum possible segment row ID, used to create `maxRows` for the balanced tree
*/
public NumericIndexWriter(IndexDescriptor indexDescriptor,
IndexContext indexContext,
int bytesPerValue,
long maxSegmentRowId)
{
this(indexDescriptor, indexContext, MAX_POINTS_IN_LEAF_NODE, bytesPerValue, maxSegmentRowId);
}
@VisibleForTesting
public NumericIndexWriter(IndexDescriptor indexDescriptor,
IndexContext indexContext,
int maxPointsInLeafNode,
int bytesPerValue,
long maxSegmentRowId)
{
checkArgument(maxSegmentRowId >= 0, "[%s] maxSegmentRowId must be non-negative value, but got %s", indexContext.getIndexName(), maxSegmentRowId);
this.indexDescriptor = indexDescriptor;
this.indexContext = indexContext;
this.bytesPerValue = bytesPerValue;
this.writer = new BlockBalancedTreeWriter(bytesPerValue, maxPointsInLeafNode);
}
@Override
public String toString()
{
return MoreObjects.toStringHelper(this)
.add("indexContext", indexContext)
.add("bytesPerValue", bytesPerValue)
.toString();
}
private static class LeafCallback implements BlockBalancedTreeWriter.Callback
{
final List<PackedLongValues> leafPostings = new ArrayList<>(DEFAULT_POSTINGS_SIZE);
public int numLeaves()
{
return leafPostings.size();
}
@Override
public void writeLeafPostings(BlockBalancedTreeWriter.RowIDAndIndex[] leafPostings, int offset, int count)
{
PackedLongValues.Builder builder = PackedLongValues.monotonicBuilder(PackedInts.COMPACT);
for (int i = offset; i < count; ++i)
{
builder.add(leafPostings[i].rowID);
}
this.leafPostings.add(builder.build());
}
}
/**
* Writes a balanced tree and posting lists from a {@link BlockBalancedTreeIterator}.
*
* @param values sorted {@link BlockBalancedTreeIterator} values to write
*
* @return metadata describing the location and size of this balanced tree in the overall SSTable balanced tree component file
*/
public SegmentMetadata.ComponentMetadataMap writeCompleteSegment(BlockBalancedTreeIterator values) throws IOException
{
long treePosition;
SegmentMetadata.ComponentMetadataMap components = new SegmentMetadata.ComponentMetadataMap();
LeafCallback leafCallback = new LeafCallback();
try (IndexOutput treeOutput = indexDescriptor.openPerIndexOutput(IndexComponent.BALANCED_TREE, indexContext, true))
{
// The SSTable balanced tree component file is opened in append mode, so our offset is the current file pointer.
long treeOffset = treeOutput.getFilePointer();
treePosition = writer.write(treeOutput, values, leafCallback);
// If the treePosition is less than 0 then we didn't write any values out and the index is empty
if (treePosition < 0)
return components;
long treeLength = treeOutput.getFilePointer() - treeOffset;
Map<String, String> attributes = new LinkedHashMap<>();
attributes.put("max_points_in_leaf_node", Integer.toString(writer.getMaxPointsInLeafNode()));
attributes.put("num_leaves", Integer.toString(leafCallback.numLeaves()));
attributes.put("num_values", Long.toString(writer.getValueCount()));
attributes.put("bytes_per_value", Long.toString(writer.getBytesPerValue()));
components.put(IndexComponent.BALANCED_TREE, treePosition, treeOffset, treeLength, attributes);
}
try (BlockBalancedTreeWalker reader = new BlockBalancedTreeWalker(indexDescriptor.createPerIndexFileHandle(IndexComponent.BALANCED_TREE, indexContext, null), treePosition);
IndexOutputWriter postingsOutput = indexDescriptor.openPerIndexOutput(IndexComponent.POSTING_LISTS, indexContext, true))
{
long postingsOffset = postingsOutput.getFilePointer();
BlockBalancedTreePostingsWriter postingsWriter = new BlockBalancedTreePostingsWriter();
reader.traverse(postingsWriter);
// The balanced tree postings writer already writes its own header & footer.
long postingsPosition = postingsWriter.finish(postingsOutput, leafCallback.leafPostings, indexContext);
Map<String, String> attributes = new LinkedHashMap<>();
attributes.put("num_leaf_postings", Integer.toString(postingsWriter.numLeafPostings));
attributes.put("num_non_leaf_postings", Integer.toString(postingsWriter.numNonLeafPostings));
long postingsLength = postingsOutput.getFilePointer() - postingsOffset;
components.put(IndexComponent.POSTING_LISTS, postingsPosition, postingsOffset, postingsLength, attributes);
}
return components;
}
/**
* @return number of values added
*/
public long getValueCount()
{
return writer.getValueCount();
}
}