SegmentBuilder.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.index.sai.disk.v1.segment;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.NotThreadSafe;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.v1.bbtree.BlockBalancedTreeRamBuffer;
import org.apache.cassandra.index.sai.disk.v1.bbtree.NumericIndexWriter;
import org.apache.cassandra.index.sai.disk.v1.trie.LiteralIndexWriter;
import org.apache.cassandra.index.sai.memory.RAMStringIndexer;
import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.TypeUtil;
import org.apache.cassandra.utils.FastByteOperations;
import org.apache.lucene.util.BytesRefBuilder;
/**
* Creates an on-heap index data structure to be flushed to an SSTable index.
*/
@NotThreadSafe
public abstract class SegmentBuilder
{
private static final Logger logger = LoggerFactory.getLogger(SegmentBuilder.class);
// Served as safe net in case memory limit is not triggered or when merger merges small segments..
public static final long LAST_VALID_SEGMENT_ROW_ID = (Integer.MAX_VALUE / 2) - 1L;
private static long testLastValidSegmentRowId = -1;
/** The number of column indexes being built globally. (Starts at one to avoid divide by zero.) */
private static final AtomicInteger ACTIVE_BUILDER_COUNT = new AtomicInteger(0);
/** Minimum flush size, dynamically updated as segment builds are started and completed/aborted. */
private static volatile long minimumFlushBytes;
private final NamedMemoryLimiter limiter;
private final long lastValidSegmentRowID;
private boolean flushed = false;
private boolean active = true;
// segment metadata
private long minSSTableRowId = -1;
private long maxSSTableRowId = -1;
private long segmentRowIdOffset = 0;
// in token order
private PrimaryKey minKey;
private PrimaryKey maxKey;
// in termComparator order
private ByteBuffer minTerm;
private ByteBuffer maxTerm;
final AbstractType<?> termComparator;
long totalBytesAllocated;
int rowCount = 0;
int maxSegmentRowId = -1;
public static class BlockBalancedTreeSegmentBuilder extends SegmentBuilder
{
private final byte[] scratch;
private final BlockBalancedTreeRamBuffer trieBuffer;
public BlockBalancedTreeSegmentBuilder(AbstractType<?> termComparator, NamedMemoryLimiter limiter)
{
super(termComparator, limiter);
scratch = new byte[TypeUtil.fixedSizeOf(termComparator)];
trieBuffer = new BlockBalancedTreeRamBuffer(TypeUtil.fixedSizeOf(termComparator));
totalBytesAllocated = this.trieBuffer.memoryUsed();
}
@Override
public boolean isEmpty()
{
return trieBuffer.numRows() == 0;
}
@Override
protected long addInternal(ByteBuffer term, int segmentRowId)
{
TypeUtil.toComparableBytes(term, termComparator, scratch);
return trieBuffer.add(segmentRowId, scratch);
}
@Override
protected SegmentMetadata.ComponentMetadataMap flushInternal(IndexDescriptor indexDescriptor, IndexContext indexContext) throws IOException
{
NumericIndexWriter writer = new NumericIndexWriter(indexDescriptor,
indexContext,
TypeUtil.fixedSizeOf(termComparator),
maxSegmentRowId);
return writer.writeCompleteSegment(trieBuffer.iterator());
}
}
public static class RAMStringSegmentBuilder extends SegmentBuilder
{
final RAMStringIndexer ramIndexer;
final BytesRefBuilder stringBuffer = new BytesRefBuilder();
public RAMStringSegmentBuilder(AbstractType<?> termComparator, NamedMemoryLimiter limiter)
{
super(termComparator, limiter);
ramIndexer = new RAMStringIndexer();
totalBytesAllocated = ramIndexer.estimatedBytesUsed();
}
@Override
public boolean isEmpty()
{
return ramIndexer.isEmpty();
}
@Override
protected long addInternal(ByteBuffer term, int segmentRowId)
{
copyBufferToBytesRef(term, stringBuffer);
return ramIndexer.add(stringBuffer.get(), segmentRowId);
}
@Override
protected SegmentMetadata.ComponentMetadataMap flushInternal(IndexDescriptor indexDescriptor, IndexContext indexContext) throws IOException
{
try (LiteralIndexWriter writer = new LiteralIndexWriter(indexDescriptor, indexContext))
{
return writer.writeCompleteSegment(ramIndexer.getTermsWithPostings());
}
}
private void copyBufferToBytesRef(ByteBuffer buffer, BytesRefBuilder stringBuffer)
{
int length = buffer.remaining();
stringBuffer.clear();
stringBuffer.grow(length);
FastByteOperations.copy(buffer, buffer.position(), stringBuffer.bytes(), 0, length);
stringBuffer.setLength(length);
}
}
public static int getActiveBuilderCount()
{
return ACTIVE_BUILDER_COUNT.get();
}
private SegmentBuilder(AbstractType<?> termComparator, NamedMemoryLimiter limiter)
{
this.termComparator = termComparator;
this.limiter = limiter;
lastValidSegmentRowID = testLastValidSegmentRowId >= 0 ? testLastValidSegmentRowId : LAST_VALID_SEGMENT_ROW_ID;
minimumFlushBytes = limiter.limitBytes() / ACTIVE_BUILDER_COUNT.incrementAndGet();
}
public SegmentMetadata flush(IndexDescriptor indexDescriptor, IndexContext indexContext) throws IOException
{
assert !flushed : "Cannot flush an already flushed segment";
flushed = true;
if (getRowCount() == 0)
{
logger.warn(indexContext.logMessage("No rows to index during flush of SSTable {}."), indexDescriptor.sstableDescriptor);
return null;
}
SegmentMetadata.ComponentMetadataMap indexMetas = flushInternal(indexDescriptor, indexContext);
return new SegmentMetadata(segmentRowIdOffset, rowCount, minSSTableRowId, maxSSTableRowId, minKey, maxKey, minTerm, maxTerm, indexMetas);
}
public long add(ByteBuffer term, PrimaryKey key, long sstableRowId)
{
assert !flushed : "Cannot add to a flushed segment.";
assert sstableRowId >= maxSSTableRowId;
minSSTableRowId = minSSTableRowId < 0 ? sstableRowId : minSSTableRowId;
maxSSTableRowId = sstableRowId;
assert maxKey == null || maxKey.compareTo(key) <= 0;
if (minKey == null)
minKey = key;
maxKey = key;
minTerm = TypeUtil.min(term, minTerm, termComparator);
maxTerm = TypeUtil.max(term, maxTerm, termComparator);
if (rowCount == 0)
{
// use first global rowId in the segment as segment rowId offset
segmentRowIdOffset = sstableRowId;
}
rowCount++;
// segmentRowIdOffset should encode sstableRowId into Integer
int segmentRowId = castToSegmentRowId(sstableRowId, segmentRowIdOffset);
maxSegmentRowId = Math.max(maxSegmentRowId, segmentRowId);
long bytesAllocated = addInternal(term, segmentRowId);
totalBytesAllocated += bytesAllocated;
return bytesAllocated;
}
public static int castToSegmentRowId(long sstableRowId, long segmentRowIdOffset)
{
return Math.toIntExact(sstableRowId - segmentRowIdOffset);
}
public long totalBytesAllocated()
{
return totalBytesAllocated;
}
public boolean hasReachedMinimumFlushSize()
{
return totalBytesAllocated >= minimumFlushBytes;
}
public long getMinimumFlushBytes()
{
return minimumFlushBytes;
}
/**
* This method does three things:
* <p>
* 1. It decrements active builder count and updates the global minimum flush size to reflect that.
* 2. It releases the builder's memory against its limiter.
* 3. It defensively marks the builder inactive to make sure nothing bad happens if we try to close it twice.
*
* @param indexContext an {@link IndexContext} used for creating logging messages
*
* @return the number of bytes used by the memory limiter after releasing this builder
*/
public long release(IndexContext indexContext)
{
if (active)
{
minimumFlushBytes = limiter.limitBytes() / ACTIVE_BUILDER_COUNT.getAndDecrement();
long used = limiter.decrement(totalBytesAllocated);
active = false;
return used;
}
logger.warn(indexContext.logMessage("Attempted to release storage-attached index segment builder memory after builder marked inactive."));
return limiter.currentBytesUsed();
}
public abstract boolean isEmpty();
protected abstract long addInternal(ByteBuffer term, int segmentRowId);
protected abstract SegmentMetadata.ComponentMetadataMap flushInternal(IndexDescriptor indexDescriptor, IndexContext indexContext) throws IOException;
public int getRowCount()
{
return rowCount;
}
/**
* @return true if next SSTable row ID exceeds max segment row ID
*/
public boolean exceedsSegmentLimit(long ssTableRowId)
{
if (getRowCount() == 0)
return false;
// To handle the case where there are many non-indexable rows. eg. rowId-1 and rowId-3B are indexable,
// the rest are non-indexable. We should flush them as 2 separate segments, because rowId-3B is going
// to cause error in on-disk index structure with 2B limitation.
return ssTableRowId - segmentRowIdOffset > lastValidSegmentRowID;
}
@VisibleForTesting
public static void updateLastValidSegmentRowId(long lastValidSegmentRowID)
{
testLastValidSegmentRowId = lastValidSegmentRowID;
}
}