IndexSearchResultIterator.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.index.sai.disk;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.QueryCancelledException;
import org.apache.cassandra.index.sai.QueryContext;
import org.apache.cassandra.index.sai.plan.Expression;
import org.apache.cassandra.index.sai.iterators.KeyRangeIterator;
import org.apache.cassandra.index.sai.iterators.KeyRangeUnionIterator;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Throwables;

public class IndexSearchResultIterator extends KeyRangeIterator
{
    private static final Logger logger = LoggerFactory.getLogger(IndexSearchResultIterator.class);

    private final QueryContext context;
    private final KeyRangeIterator union;
    private final Collection<SSTableIndex> referencedIndexes;

    private IndexSearchResultIterator(KeyRangeIterator union, Collection<SSTableIndex> referencedIndexes, QueryContext queryContext)
    {
        super(union.getMinimum(), union.getMaximum(), union.getCount());

        this.union = union;
        this.referencedIndexes = referencedIndexes;
        this.context = queryContext;
    }

    /**
     * Builds a new {@link IndexSearchResultIterator} that wraps a {@link KeyRangeUnionIterator} over the
     * results of searching the {@link org.apache.cassandra.index.sai.memory.MemtableIndex} and the {@link SSTableIndex}es.
     */
    @SuppressWarnings({"resource", "RedundantSuppression"})
    public static IndexSearchResultIterator build(Expression expression,
                                                  Collection<SSTableIndex> sstableIndexes,
                                                  AbstractBounds<PartitionPosition> keyRange,
                                                  QueryContext queryContext)
    {
        List<KeyRangeIterator> subIterators = new ArrayList<>(1 + sstableIndexes.size());

        KeyRangeIterator memtableIterator = expression.context.getMemtableIndexManager().searchMemtableIndexes(expression, keyRange);
        if (memtableIterator != null)
            subIterators.add(memtableIterator);

        for (SSTableIndex sstableIndex : sstableIndexes)
        {
            try
            {
                queryContext.checkpoint();
                queryContext.sstablesHit++;

                if (sstableIndex.isReleased())
                    throw new IllegalStateException(sstableIndex.getIndexContext().logMessage("Index was released from the view during the query"));

                List<KeyRangeIterator> segmentIterators = sstableIndex.search(expression, keyRange, queryContext);

                if (!segmentIterators.isEmpty())
                    subIterators.addAll(segmentIterators);
            }
            catch (Throwable e)
            {
                if (!(e instanceof QueryCancelledException))
                    logger.debug(sstableIndex.getIndexContext().logMessage(String.format("Failed search an index %s, aborting query.", sstableIndex.getSSTable())), e);

                throw Throwables.cleaned(e);
            }
        }

        KeyRangeIterator union = KeyRangeUnionIterator.build(subIterators);
        return new IndexSearchResultIterator(union, sstableIndexes, queryContext);
    }

    protected PrimaryKey computeNext()
    {
        try
        {
            return union.hasNext() ? union.next() : endOfData();
        }
        finally
        {
            context.checkpoint();
        }
    }

    protected void performSkipTo(PrimaryKey nextKey)
    {
        try
        {
            union.skipTo(nextKey);
        }
        finally
        {
            context.checkpoint();
        }
    }

    public void close()
    {
        FileUtils.closeQuietly(union);
        referencedIndexes.forEach(IndexSearchResultIterator::releaseQuietly);
        referencedIndexes.clear();
    }

    private static void releaseQuietly(SSTableIndex index)
    {
        try
        {
            index.release();
        }
        catch (Throwable e)
        {
            logger.error(index.getIndexContext().logMessage(String.format("Failed to release index on SSTable %s", index.getSSTable())), e);
        }
    }
}