LiteralIndexSegmentTermsReader.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.index.sai.disk.v1.segment;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.exceptions.QueryCancelledException;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.QueryContext;
import org.apache.cassandra.index.sai.disk.io.IndexFileUtils;
import org.apache.cassandra.index.sai.disk.v1.postings.PostingsReader;
import org.apache.cassandra.index.sai.disk.v1.trie.TrieTermsDictionaryReader;
import org.apache.cassandra.index.sai.metrics.QueryEventListener;
import org.apache.cassandra.index.sai.postings.PostingList;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.lucene.store.IndexInput;
import static org.apache.cassandra.index.sai.disk.v1.SAICodecUtils.validate;
/**
* Synchronous reader of terms dictionary and postings lists to produce a {@link PostingList} with matching row ids.
*
* {@link #exactMatch(ByteComparable, QueryEventListener.TrieIndexEventListener, QueryContext)} does:
* <ul>
* <li>{@link TermQuery#lookupPostingsOffset(ByteComparable)}: does term dictionary lookup to find the posting list file
* position</li>
* <li>{@link TermQuery#getPostingsReader(long)}: reads posting list block summary and initializes posting read which
* reads the first block of the posting list into memory</li>
* </ul>
*/
public class LiteralIndexSegmentTermsReader implements Closeable
{
private static final Logger logger = LoggerFactory.getLogger(LiteralIndexSegmentTermsReader.class);
private final IndexContext indexContext;
private final FileHandle termDictionaryFile;
private final FileHandle postingsFile;
private final long termDictionaryRoot;
public LiteralIndexSegmentTermsReader(IndexContext indexContext,
FileHandle termsData,
FileHandle postingLists,
long root,
long termsFooterPointer) throws IOException
{
this.indexContext = indexContext;
termDictionaryFile = termsData;
postingsFile = postingLists;
termDictionaryRoot = root;
try (final IndexInput indexInput = IndexFileUtils.instance.openInput(termDictionaryFile))
{
validate(indexInput, termsFooterPointer);
}
try (final IndexInput indexInput = IndexFileUtils.instance.openInput(postingsFile))
{
validate(indexInput);
}
}
@Override
public void close()
{
FileUtils.closeQuietly(termDictionaryFile);
FileUtils.closeQuietly(postingsFile);
}
public PostingList exactMatch(ByteComparable term, QueryEventListener.TrieIndexEventListener perQueryEventListener, QueryContext context)
{
perQueryEventListener.onSegmentHit();
return new TermQuery(term, perQueryEventListener, context).execute();
}
@VisibleForTesting
public class TermQuery
{
private final IndexInput postingsInput;
private final IndexInput postingsSummaryInput;
private final QueryEventListener.TrieIndexEventListener listener;
private final long lookupStartTime;
private final QueryContext context;
private final ByteComparable term;
TermQuery(ByteComparable term, QueryEventListener.TrieIndexEventListener listener, QueryContext context)
{
this.listener = listener;
postingsInput = IndexFileUtils.instance.openInput(postingsFile);
postingsSummaryInput = IndexFileUtils.instance.openInput(postingsFile);
this.term = term;
lookupStartTime = Clock.Global.nanoTime();
this.context = context;
}
public PostingList execute()
{
try
{
long postingOffset = lookupPostingsOffset(term);
if (postingOffset == PostingList.OFFSET_NOT_FOUND)
{
FileUtils.closeQuietly(postingsInput);
FileUtils.closeQuietly(postingsSummaryInput);
return null;
}
context.checkpoint();
// when posting is found, resources will be closed when posting reader is closed.
return getPostingsReader(postingOffset);
}
catch (Throwable e)
{
if (!(e instanceof QueryCancelledException))
logger.error(indexContext.logMessage("Failed to execute term query"), e);
closeOnException();
throw Throwables.cleaned(e);
}
}
private void closeOnException()
{
FileUtils.closeQuietly(postingsInput);
FileUtils.closeQuietly(postingsSummaryInput);
}
public long lookupPostingsOffset(ByteComparable term)
{
try (TrieTermsDictionaryReader reader = new TrieTermsDictionaryReader(termDictionaryFile.instantiateRebufferer(null), termDictionaryRoot))
{
final long offset = reader.exactMatch(term);
listener.onTraversalComplete(Clock.Global.nanoTime() - lookupStartTime, TimeUnit.NANOSECONDS);
if (offset == TrieTermsDictionaryReader.NOT_FOUND)
return PostingList.OFFSET_NOT_FOUND;
return offset;
}
}
public PostingsReader getPostingsReader(long offset) throws IOException
{
PostingsReader.BlocksSummary header = new PostingsReader.BlocksSummary(postingsSummaryInput, offset);
return new PostingsReader(postingsInput, header, listener.postingListEventListener());
}
}
}