KeyRangeConcatIterator.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.index.sai.iterators;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.io.util.FileUtils;
/**
* {@link KeyRangeConcatIterator} takes a list of sorted range iterator and concatenates them, leaving duplicates in
* place, to produce a new stably sorted iterator. Duplicates are eliminated later in
* {@link org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher}
* as results from multiple SSTable indexes and their respective segments are consumed.
*
* ex. (1, 2, 3) + (3, 3, 4, 5) -> (1, 2, 3, 3, 3, 4, 5)
* ex. (1, 2, 2, 3) + (3, 4, 4, 6, 6, 7) -> (1, 2, 2, 3, 3, 4, 4, 6, 6, 7)
*
* TODO Investigate removing the use of PriorityQueue from this class <a href="https://issues.apache.org/jira/browse/CASSANDRA-18165">CASSANDRA-18165</a>
*/
public class KeyRangeConcatIterator extends KeyRangeIterator
{
public static final String MUST_BE_SORTED_ERROR = "RangeIterator must be sorted, previous max: %s, next min: %s";
private final PriorityQueue<KeyRangeIterator> ranges;
private final List<KeyRangeIterator> toRelease;
protected KeyRangeConcatIterator(KeyRangeIterator.Builder.Statistics statistics, PriorityQueue<KeyRangeIterator> ranges)
{
super(statistics);
this.ranges = ranges;
this.toRelease = new ArrayList<>(ranges);
}
@Override
@SuppressWarnings({"resource", "RedundantSuppression"})
protected void performSkipTo(PrimaryKey nextKey)
{
while (!ranges.isEmpty())
{
if (ranges.peek().getCurrent().compareTo(nextKey) >= 0)
break;
KeyRangeIterator head = ranges.poll();
if (head.getMaximum().compareTo(nextKey) >= 0)
{
head.skipTo(nextKey);
ranges.add(head);
break;
}
}
}
@Override
public void close()
{
// due to lazy key fetching, we cannot close iterator immediately
FileUtils.closeQuietly(toRelease);
}
@Override
@SuppressWarnings({"resource", "RedundantSuppression"})
protected PrimaryKey computeNext()
{
while (!ranges.isEmpty())
{
KeyRangeIterator current = ranges.poll();
if (current.hasNext())
{
PrimaryKey next = current.next();
// hasNext will update RangeIterator's current which is used to sort in PQ
if (current.hasNext())
ranges.add(current);
return next;
}
}
return endOfData();
}
public static Builder builder(int size)
{
return new Builder(size);
}
@VisibleForTesting
public static class Builder extends KeyRangeIterator.Builder
{
private final PriorityQueue<KeyRangeIterator> ranges;
Builder(int size)
{
super(new ConcatStatistics());
ranges = new PriorityQueue<>(size, Comparator.comparing(KeyRangeIterator::getCurrent));
}
@Override
public KeyRangeIterator.Builder add(KeyRangeIterator range)
{
if (range == null)
return this;
if (range.getCount() > 0)
ranges.add(range);
else
FileUtils.closeQuietly(range);
statistics.update(range);
return this;
}
@Override
public int rangeCount()
{
return ranges.size();
}
@Override
public void cleanup()
{
FileUtils.closeQuietly(ranges);
}
@Override
protected KeyRangeIterator buildIterator()
{
if (rangeCount() == 1)
return ranges.poll();
return new KeyRangeConcatIterator(statistics, ranges);
}
}
private static class ConcatStatistics extends KeyRangeIterator.Builder.Statistics
{
@Override
public void update(KeyRangeIterator range)
{
// range iterators should be sorted, but previous max must not be greater than next min.
if (range.getCount() > 0)
{
if (count == 0)
{
min = range.getMinimum();
}
else if (count > 0 && max.compareTo(range.getMinimum()) > 0)
{
throw new IllegalArgumentException(String.format(MUST_BE_SORTED_ERROR, max, range.getMinimum()));
}
max = range.getMaximum();
count += range.getCount();
}
}
}
}