V1OnDiskFormat.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.index.sai.disk.v1;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.EnumSet;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.utils.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Gauge;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.StorageAttachedIndex;
import org.apache.cassandra.index.sai.disk.PerColumnIndexWriter;
import org.apache.cassandra.index.sai.disk.PerSSTableIndexWriter;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.disk.RowMapping;
import org.apache.cassandra.index.sai.disk.SSTableIndex;
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.format.OnDiskFormat;
import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder;
import org.apache.cassandra.index.sai.metrics.AbstractMetrics;
import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.lucene.store.IndexInput;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
public class V1OnDiskFormat implements OnDiskFormat
{
private static final Logger logger = LoggerFactory.getLogger(V1OnDiskFormat.class);
@VisibleForTesting
public static final Set<IndexComponent> SKINNY_PER_SSTABLE_COMPONENTS = EnumSet.of(IndexComponent.GROUP_COMPLETION_MARKER,
IndexComponent.GROUP_META,
IndexComponent.TOKEN_VALUES,
IndexComponent.PARTITION_SIZES,
IndexComponent.PARTITION_KEY_BLOCKS,
IndexComponent.PARTITION_KEY_BLOCK_OFFSETS);
@VisibleForTesting
public static final Set<IndexComponent> WIDE_PER_SSTABLE_COMPONENTS = EnumSet.of(IndexComponent.GROUP_COMPLETION_MARKER,
IndexComponent.GROUP_META,
IndexComponent.TOKEN_VALUES,
IndexComponent.PARTITION_SIZES,
IndexComponent.PARTITION_KEY_BLOCKS,
IndexComponent.PARTITION_KEY_BLOCK_OFFSETS,
IndexComponent.CLUSTERING_KEY_BLOCKS,
IndexComponent.CLUSTERING_KEY_BLOCK_OFFSETS);
@VisibleForTesting
public static final Set<IndexComponent> LITERAL_COMPONENTS = EnumSet.of(IndexComponent.COLUMN_COMPLETION_MARKER,
IndexComponent.META,
IndexComponent.TERMS_DATA,
IndexComponent.POSTING_LISTS);
@VisibleForTesting
public static final Set<IndexComponent> NUMERIC_COMPONENTS = EnumSet.of(IndexComponent.COLUMN_COMPLETION_MARKER,
IndexComponent.META,
IndexComponent.BALANCED_TREE,
IndexComponent.POSTING_LISTS);
/**
* Global limit on heap consumed by all index segment building that occurs outside the context of Memtable flush.
* <p>
* Note that to avoid flushing small index segments, a segment is only flushed when
* both the global size of all building segments has breached the limit and the size of the
* segment in question reaches (segment_write_buffer_space_mb / # currently building column indexes).
* <p>
* ex. If there is only one column index building, it can buffer up to segment_write_buffer_space_mb.
* <p>
* ex. If there is one column index building per table across 8 compactors, each index will be
* eligible to flush once it reaches (segment_write_buffer_space_mb / 8) MBs.
*/
public static final long SEGMENT_BUILD_MEMORY_LIMIT = DatabaseDescriptor.getSAISegmentWriteBufferSpace().toBytes();
public static final NamedMemoryLimiter SEGMENT_BUILD_MEMORY_LIMITER = new NamedMemoryLimiter(SEGMENT_BUILD_MEMORY_LIMIT,
"Storage Attached Index Segment Builder");
static
{
CassandraMetricsRegistry.MetricName bufferSpaceUsed = DefaultNameFactory.createMetricName(AbstractMetrics.TYPE, "SegmentBufferSpaceUsedBytes", null);
CassandraMetricsRegistry.Metrics.register(bufferSpaceUsed, (Gauge<Long>) SEGMENT_BUILD_MEMORY_LIMITER::currentBytesUsed);
CassandraMetricsRegistry.MetricName bufferSpaceLimit = DefaultNameFactory.createMetricName(AbstractMetrics.TYPE, "SegmentBufferSpaceLimitBytes", null);
CassandraMetricsRegistry.Metrics.register(bufferSpaceLimit, (Gauge<Long>) () -> SEGMENT_BUILD_MEMORY_LIMIT);
CassandraMetricsRegistry.MetricName buildsInProgress = DefaultNameFactory.createMetricName(AbstractMetrics.TYPE, "ColumnIndexBuildsInProgress", null);
CassandraMetricsRegistry.Metrics.register(buildsInProgress, (Gauge<Integer>) SegmentBuilder::getActiveBuilderCount);
}
public static final V1OnDiskFormat instance = new V1OnDiskFormat();
protected V1OnDiskFormat()
{}
@Override
public PrimaryKeyMap.Factory newPrimaryKeyMapFactory(IndexDescriptor indexDescriptor, SSTableReader sstable)
{
return indexDescriptor.hasClustering() ? new WidePrimaryKeyMap.Factory(indexDescriptor, sstable)
: new SkinnyPrimaryKeyMap.Factory(indexDescriptor, sstable);
}
@Override
public SSTableIndex newSSTableIndex(SSTableContext sstableContext, IndexContext indexContext)
{
return new V1SSTableIndex(sstableContext, indexContext);
}
@Override
public PerSSTableIndexWriter newPerSSTableIndexWriter(IndexDescriptor indexDescriptor) throws IOException
{
return new SSTableComponentsWriter(indexDescriptor);
}
@Override
public PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex index,
IndexDescriptor indexDescriptor,
LifecycleNewTracker tracker,
RowMapping rowMapping)
{
// If we're not flushing, or we haven't yet started the initialization build, flush from SSTable contents.
if (tracker.opType() != OperationType.FLUSH || !index.isInitBuildStarted())
{
NamedMemoryLimiter limiter = SEGMENT_BUILD_MEMORY_LIMITER;
logger.info(index.getIndexContext().logMessage("Starting a compaction index build. Global segment memory usage: {}"),
prettyPrintMemory(limiter.currentBytesUsed()));
return new SSTableIndexWriter(indexDescriptor, index.getIndexContext(), limiter, index.isIndexValid());
}
return new MemtableIndexWriter(index.getIndexContext().getMemtableIndexManager().getPendingMemtableIndex(tracker),
indexDescriptor,
index.getIndexContext(),
rowMapping);
}
@Override
public boolean isPerSSTableIndexBuildComplete(IndexDescriptor indexDescriptor)
{
return indexDescriptor.hasComponent(IndexComponent.GROUP_COMPLETION_MARKER);
}
@Override
public boolean isPerColumnIndexBuildComplete(IndexDescriptor indexDescriptor, IndexContext indexContext)
{
return indexDescriptor.hasComponent(IndexComponent.GROUP_COMPLETION_MARKER) &&
indexDescriptor.hasComponent(IndexComponent.COLUMN_COMPLETION_MARKER, indexContext);
}
@Override
public void validatePerSSTableIndexComponents(IndexDescriptor indexDescriptor, boolean checksum)
{
for (IndexComponent indexComponent : perSSTableIndexComponents(indexDescriptor.hasClustering()))
{
if (isNotBuildCompletionMarker(indexComponent))
{
try (IndexInput input = indexDescriptor.openPerSSTableInput(indexComponent))
{
if (checksum)
SAICodecUtils.validateChecksum(input);
else
SAICodecUtils.validate(input);
}
catch (Exception e)
{
logger.warn(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}."),
checksum ? "Checksum validation" : "Validation",
indexComponent,
indexDescriptor.sstableDescriptor);
rethrowIOException(e);
}
}
}
}
@Override
public void validatePerColumnIndexComponents(IndexDescriptor indexDescriptor, IndexContext indexContext, boolean checksum)
{
for (IndexComponent indexComponent : perColumnIndexComponents(indexContext))
{
if (isNotBuildCompletionMarker(indexComponent))
{
try (IndexInput input = indexDescriptor.openPerIndexInput(indexComponent, indexContext))
{
if (checksum)
SAICodecUtils.validateChecksum(input);
else
SAICodecUtils.validate(input);
}
catch (Exception e)
{
logger.warn(indexDescriptor.logMessage("{} failed for index component {} on SSTable {}"),
checksum ? "Checksum validation" : "Validation",
indexComponent,
indexDescriptor.sstableDescriptor);
rethrowIOException(e);
}
}
}
}
private static void rethrowIOException(Exception e)
{
if (e instanceof IOException)
throw new UncheckedIOException((IOException) e);
if (e.getCause() instanceof IOException)
throw new UncheckedIOException((IOException) e.getCause());
throw Throwables.unchecked(e);
}
@Override
public Set<IndexComponent> perSSTableIndexComponents(boolean hasClustering)
{
return hasClustering ? WIDE_PER_SSTABLE_COMPONENTS : SKINNY_PER_SSTABLE_COMPONENTS;
}
@Override
public Set<IndexComponent> perColumnIndexComponents(IndexContext indexContext)
{
return indexContext.isLiteral() ? LITERAL_COMPONENTS : NUMERIC_COMPONENTS;
}
@Override
public int openFilesPerSSTableIndex(boolean hasClustering)
{
// For the V1 format the number of open files depends on whether the table has clustering. For wide tables
// the number of open files will be 6 per SSTable - token values, partition sizes index, partition key blocks,
// partition key block offsets, clustering key blocks & clustering key block offsets and for skinny tables
// the number of files will be 4 per SSTable - token values, partition key sizes, partition key blocks &
// partition key block offsets.
return hasClustering ? 6 : 4;
}
@Override
public int openFilesPerColumnIndex(IndexContext indexContext)
{
// For the V1 format there are always 2 open files per index - index (balanced tree or terms) + auxiliary postings
// for the balanced tree and postings for the literal terms
return 2;
}
protected boolean isNotBuildCompletionMarker(IndexComponent indexComponent)
{
return indexComponent != IndexComponent.GROUP_COMPLETION_MARKER &&
indexComponent != IndexComponent.COLUMN_COMPLETION_MARKER;
}
}