IndexDescriptor.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.index.sai.disk.format;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.io.Files;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.IndexValidation;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.StorageAttachedIndex;
import org.apache.cassandra.index.sai.disk.PerColumnIndexWriter;
import org.apache.cassandra.index.sai.disk.PerSSTableIndexWriter;
import org.apache.cassandra.index.sai.disk.PrimaryKeyMap;
import org.apache.cassandra.index.sai.disk.RowMapping;
import org.apache.cassandra.index.sai.disk.SSTableIndex;
import org.apache.cassandra.index.sai.disk.io.IndexFileUtils;
import org.apache.cassandra.index.sai.disk.io.IndexOutputWriter;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Throwables;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.IOUtils;

/**
 * The {@link IndexDescriptor} is an analog of the SSTable {@link Descriptor} and provides version
 * specific information about the on-disk state of a {@link StorageAttachedIndex}.
 * <p>
 * The {@link IndexDescriptor} is primarily responsible for maintaining a view of the on-disk state
 * of an index for a specific {@link org.apache.cassandra.io.sstable.SSTable}.
 * <p>
 * It is responsible for opening files for use by writers and readers.
 * <p>
 * Its remaining responsibility is to act as a proxy to the {@link OnDiskFormat} associated with the
 * index {@link Version}.
 */
public class IndexDescriptor
{
    private static final Logger logger = LoggerFactory.getLogger(IndexDescriptor.class);

    public final Version version;
    public final Descriptor sstableDescriptor;
    public final ClusteringComparator clusteringComparator;
    public final PrimaryKey.Factory primaryKeyFactory;

    private IndexDescriptor(Version version, Descriptor sstableDescriptor, IPartitioner partitioner, ClusteringComparator clusteringComparator)
    {
        this.version = version;
        this.sstableDescriptor = sstableDescriptor;
        this.clusteringComparator = clusteringComparator;
        this.primaryKeyFactory = new PrimaryKey.Factory(partitioner, clusteringComparator);
    }

    public static IndexDescriptor create(Descriptor descriptor, IPartitioner partitioner, ClusteringComparator clusteringComparator)
    {
        return new IndexDescriptor(Version.LATEST, descriptor, partitioner, clusteringComparator);
    }

    public static IndexDescriptor create(SSTableReader sstable)
    {
        for (Version version : Version.ALL)
        {
            IndexDescriptor indexDescriptor = new IndexDescriptor(version,
                                                                  sstable.descriptor,
                                                                  sstable.getPartitioner(),
                                                                  sstable.metadata().comparator);

            if (version.onDiskFormat().isPerSSTableIndexBuildComplete(indexDescriptor))
            {
                return indexDescriptor;
            }
        }
        return new IndexDescriptor(Version.LATEST,
                                   sstable.descriptor,
                                   sstable.getPartitioner(),
                                   sstable.metadata().comparator);
    }

    public boolean hasClustering()
    {
        return clusteringComparator.size() > 0;
    }

    public String componentName(IndexComponent indexComponent)
    {
        return version.fileNameFormatter().format(indexComponent, null);
    }

    public PrimaryKeyMap.Factory newPrimaryKeyMapFactory(SSTableReader sstable)
    {
        return version.onDiskFormat().newPrimaryKeyMapFactory(this, sstable);
    }

    public SSTableIndex newSSTableIndex(SSTableContext sstableContext, IndexContext indexContext)
    {
        return version.onDiskFormat().newSSTableIndex(sstableContext, indexContext);
    }

    public PerSSTableIndexWriter newPerSSTableIndexWriter() throws IOException
    {
        return version.onDiskFormat().newPerSSTableIndexWriter(this);
    }

    public PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex index,
                                                        LifecycleNewTracker tracker,
                                                        RowMapping rowMapping)
    {
        return version.onDiskFormat().newPerColumnIndexWriter(index, this, tracker, rowMapping);
    }

    public boolean isPerSSTableIndexBuildComplete()
    {
        return version.onDiskFormat().isPerSSTableIndexBuildComplete(this);
    }

    public boolean isPerColumnIndexBuildComplete(IndexContext indexContext)
    {
        return version.onDiskFormat().isPerColumnIndexBuildComplete(this, indexContext);
    }

    public boolean hasComponent(IndexComponent indexComponent)
    {
        return fileFor(indexComponent).exists();
    }

    public boolean hasComponent(IndexComponent indexComponent, IndexContext indexContext)
    {
        return fileFor(indexComponent, indexContext).exists();
    }

    public File fileFor(IndexComponent indexComponent)
    {
        return createFile(indexComponent, null);
    }

    public File fileFor(IndexComponent indexComponent, IndexContext indexContext)
    {
        return createFile(indexComponent, indexContext);
    }

    public boolean isIndexEmpty(IndexContext indexContext)
    {
        // The index is empty if the index build completed successfully in that both
        // a GROUP_COMPLETION_MARKER companent and a COLUMN_COMPLETION_MARKER exist for
        // the index and the number of per-index components is 1 indicating that only the
        // COLUMN_COMPLETION_MARKER exists for the index, as this is the only file that
        // will be written if the index is empty
        return isPerColumnIndexBuildComplete(indexContext) && numberOfPerIndexComponents(indexContext) == 1;
    }

    public void createComponentOnDisk(IndexComponent component) throws IOException
    {
        Files.touch(fileFor(component).toJavaIOFile());
    }

    public void createComponentOnDisk(IndexComponent component, IndexContext indexContext) throws IOException
    {
        Files.touch(fileFor(component, indexContext).toJavaIOFile());
    }

    public IndexInput openPerSSTableInput(IndexComponent indexComponent)
    {
        File file = fileFor(indexComponent);
        if (logger.isTraceEnabled())
            logger.trace(logMessage("Opening blocking index input for file {} ({})"),
                         file,
                         FBUtilities.prettyPrintMemory(file.length()));

        return IndexFileUtils.instance.openBlockingInput(file);
    }

    public IndexInput openPerIndexInput(IndexComponent indexComponent, IndexContext indexContext)
    {
        final File file = fileFor(indexComponent, indexContext);
        if (logger.isTraceEnabled())
            logger.trace(logMessage("Opening blocking index input for file {} ({})"),
                         file,
                         FBUtilities.prettyPrintMemory(file.length()));

        return IndexFileUtils.instance.openBlockingInput(file);
    }

    public IndexOutputWriter openPerSSTableOutput(IndexComponent component) throws IOException
    {
        return openPerSSTableOutput(component, false);
    }

    public IndexOutputWriter openPerSSTableOutput(IndexComponent component, boolean append) throws IOException
    {
        final File file = fileFor(component);

        if (logger.isTraceEnabled())
            logger.trace(logMessage("Creating SSTable attached index output for component {} on file {}..."),
                         component,
                         file);

        IndexOutputWriter writer = IndexFileUtils.instance.openOutput(file);

        if (append)
        {
            writer.skipBytes(file.length());
        }

        return writer;
    }

    public IndexOutputWriter openPerIndexOutput(IndexComponent indexComponent, IndexContext indexContext) throws IOException
    {
        return openPerIndexOutput(indexComponent, indexContext, false);
    }

    public IndexOutputWriter openPerIndexOutput(IndexComponent component, IndexContext indexContext, boolean append) throws IOException
    {
        final File file = fileFor(component, indexContext);

        if (logger.isTraceEnabled())
            logger.trace(indexContext.logMessage("Creating sstable attached index output for component {} on file {}..."),
                         component,
                         file);

        IndexOutputWriter writer = IndexFileUtils.instance.openOutput(file);

        if (append)
        {
            writer.skipBytes(file.length());
        }

        return writer;
    }

    public FileHandle createPerSSTableFileHandle(IndexComponent indexComponent, Throwables.DiscreteAction<?> cleanup)
    {
        try
        {
            final File file = fileFor(indexComponent);

            if (logger.isTraceEnabled())
            {
                logger.trace(logMessage("Opening {} file handle for {} ({})"),
                             file, FBUtilities.prettyPrintMemory(file.length()));
            }

            return new FileHandle.Builder(file).mmapped(true).complete();
        }
        catch (Throwable t)
        {
            throw handleFileHandleCleanup(t, cleanup);
        }
    }

    public FileHandle createPerIndexFileHandle(IndexComponent indexComponent, IndexContext indexContext, Throwables.DiscreteAction<?> cleanup)
    {
        try
        {
            final File file = fileFor(indexComponent, indexContext);

            if (logger.isTraceEnabled())
            {
                logger.trace(indexContext.logMessage("Opening file handle for {} ({})"),
                             file, FBUtilities.prettyPrintMemory(file.length()));
            }

            return new FileHandle.Builder(file).mmapped(true).complete();
        }
        catch (Throwable t)
        {
            throw handleFileHandleCleanup(t, cleanup);
        }
    }

    private RuntimeException handleFileHandleCleanup(Throwable t, Throwables.DiscreteAction<?> cleanup)
    {
        if (cleanup != null)
        {
            try
            {
                cleanup.perform();
            }
            catch (Exception e)
            {
                return Throwables.unchecked(Throwables.merge(t, e));
            }
        }
        return Throwables.unchecked(t);
    }

    public Set<Component> getLivePerSSTableComponents()
    {
        return version.onDiskFormat()
                      .perSSTableIndexComponents(hasClustering())
                      .stream()
                      .filter(c -> fileFor(c).exists())
                      .map(version::makePerSSTableComponent)
                      .collect(Collectors.toSet());
    }

    public Set<Component> getLivePerIndexComponents(IndexContext indexContext)
    {
        return version.onDiskFormat()
                      .perColumnIndexComponents(indexContext)
                      .stream()
                      .filter(c -> fileFor(c, indexContext).exists())
                      .map(c -> version.makePerIndexComponent(c, indexContext))
                      .collect(Collectors.toSet());
    }

    public long sizeOnDiskOfPerSSTableComponents()
    {
        return version.onDiskFormat()
                      .perSSTableIndexComponents(hasClustering())
                      .stream()
                      .map(this::fileFor)
                      .filter(File::exists)
                      .mapToLong(File::length)
                      .sum();
    }

    public long sizeOnDiskOfPerIndexComponents(IndexContext indexContext)
    {
        return version.onDiskFormat()
                      .perColumnIndexComponents(indexContext)
                      .stream()
                      .map(c -> fileFor(c, indexContext))
                      .filter(File::exists)
                      .mapToLong(File::length)
                      .sum();
    }

    @VisibleForTesting
    public long sizeOnDiskOfPerIndexComponent(IndexComponent indexComponent, IndexContext indexContext)
    {
        File componentFile = fileFor(indexComponent, indexContext);
        return componentFile.exists() ? componentFile.length() : 0;
    }

    @SuppressWarnings("BooleanMethodIsAlwaysInverted")
    public boolean validatePerIndexComponents(IndexContext indexContext, IndexValidation validation)
    {
        if (validation == IndexValidation.NONE)
            return true;

        logger.info(indexContext.logMessage("Validating per-column index components using mode " + validation));
        boolean checksum = validation == IndexValidation.CHECKSUM;

        try
        {
            version.onDiskFormat().validatePerColumnIndexComponents(this, indexContext, checksum);
            return true;
        }
        catch (UncheckedIOException e)
        {
            return false;
        }
    }

    @SuppressWarnings("BooleanMethodIsAlwaysInverted")
    public boolean validatePerSSTableComponents(IndexValidation validation)
    {
        if (validation == IndexValidation.NONE)
            return true;

        logger.info(logMessage("Validating per-sstable index components using mode " + validation));
        boolean checksum = validation == IndexValidation.CHECKSUM;

        try
        {
            version.onDiskFormat().validatePerSSTableIndexComponents(this, checksum);
            return true;
        }
        catch (UncheckedIOException e)
        {
            return false;
        }
    }

    public void checksumPerIndexComponents(IndexContext indexContext)
    {
        version.onDiskFormat().validatePerColumnIndexComponents(this, indexContext, true);
    }

    @SuppressWarnings("BooleanMethodIsAlwaysInverted")
    public void checksumPerSSTableComponents()
    {
        version.onDiskFormat().validatePerSSTableIndexComponents(this, true);
    }

    public void deletePerSSTableIndexComponents()
    {
        version.onDiskFormat()
               .perSSTableIndexComponents(hasClustering())
               .stream()
               .map(this::fileFor)
               .filter(File::exists)
               .forEach(this::deleteComponent);
    }

    public void deleteColumnIndex(IndexContext indexContext)
    {
        version.onDiskFormat()
               .perColumnIndexComponents(indexContext)
               .stream()
               .map(c -> fileFor(c, indexContext))
               .filter(File::exists)
               .forEach(this::deleteComponent);
    }

    @Override
    public int hashCode()
    {
        return Objects.hashCode(sstableDescriptor, version);
    }

    @Override
    public boolean equals(Object o)
    {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        IndexDescriptor other = (IndexDescriptor)o;
        return Objects.equal(sstableDescriptor, other.sstableDescriptor) &&
               Objects.equal(version, other.version);
    }

    @Override
    public String toString()
    {
        return sstableDescriptor.toString() + "-SAI";
    }

    public String logMessage(String message)
    {
        // Index names are unique only within a keyspace.
        return String.format("[%s.%s.*] %s",
                             sstableDescriptor.ksname,
                             sstableDescriptor.cfname,
                             message);
    }

    private File createFile(IndexComponent component, IndexContext indexContext)
    {
        Component customComponent = version.makePerIndexComponent(component, indexContext);
        return sstableDescriptor.fileFor(customComponent);
    }

    private long numberOfPerIndexComponents(IndexContext indexContext)
    {
        return version.onDiskFormat()
                      .perColumnIndexComponents(indexContext)
                      .stream()
                      .map(c -> fileFor(c, indexContext))
                      .filter(File::exists)
                      .count();
    }

    private void deleteComponent(File file)
    {
        logger.debug(logMessage("Deleting storage-attached index component file {}"), file);
        try
        {
            IOUtils.deleteFilesIfExist(file.toPath());
        }
        catch (IOException e)
        {
            logger.warn(logMessage("Unable to delete storage-attached index component file {} due to {}."), file, e.getMessage(), e);
        }
    }
}