Rows.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.rows;

import java.util.*;

import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;

import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
import org.apache.cassandra.utils.MergeIterator;

/**
 * Static utilities to work on Row objects.
 */
public abstract class Rows
{
    private Rows() {}

    public static final Row EMPTY_STATIC_ROW = BTreeRow.emptyRow(Clustering.STATIC_CLUSTERING);

    /**
     * Creates a new simple row builder.
     *
     * @param metadata the metadata of the table this is a row of.
     * @param clusteringValues the value for the clustering columns of the row to add to this build. There may be no
     * values if either the table has no clustering column, or if you want to edit the static row. Note that as a
     * shortcut it is also allowed to pass a {@code Clustering} object directly, in which case that should be the
     * only argument.
     * @return a newly created builder.
     */
    public static Row.SimpleBuilder simpleBuilder(TableMetadata metadata, Object... clusteringValues)
    {
        return new SimpleBuilders.RowBuilder(metadata, clusteringValues);
    }

    private static class StatsAccumulation
    {
        private static final long COLUMN_INCR = 1L << 32;
        private static final long CELL_INCR = 1L;

        private static long accumulateOnCell(PartitionStatisticsCollector collector, Cell<?> cell, long l)
        {
            Cells.collectStats(cell, collector);
            return l + CELL_INCR;
        }

        private static long accumulateOnColumnData(PartitionStatisticsCollector collector, ColumnData cd, long l)
        {
            if (cd.column().isSimple())
            {
                l = accumulateOnCell(collector, (Cell<?>) cd, l) + COLUMN_INCR;
            }
            else
            {
                ComplexColumnData complexData = (ComplexColumnData)cd;
                collector.update(complexData.complexDeletion());
                int startingCells = unpackCellCount(l);
                l = complexData.accumulate(StatsAccumulation::accumulateOnCell, collector, l);
                if (unpackCellCount(l) > startingCells)
                    l += COLUMN_INCR;
            }
            return l;
        }

        private static int unpackCellCount(long v)
        {
            return (int) (v & 0xFFFFFFFFL);
        }

        private static int unpackColumnCount(long v)
        {
            return (int) (v >>> 32);
        }
    }

    /**
     * Collect statistics on a given row.
     *
     * @param row the row for which to collect stats.
     * @param collector the stats collector.
     * @return the total number of cells in {@code row}.
     */
    public static int collectStats(Row row, PartitionStatisticsCollector collector)
    {
        assert !row.isEmpty();

        collector.update(row.primaryKeyLivenessInfo());
        collector.update(row.deletion().time());

        long result = row.accumulate(StatsAccumulation::accumulateOnColumnData, collector, 0);

        collector.updateColumnSetPerRow(StatsAccumulation.unpackColumnCount(result));
        return StatsAccumulation.unpackCellCount(result);
    }

    /**
     * Given the result ({@code merged}) of merging multiple {@code inputs}, signals the difference between
     * each input and {@code merged} to {@code diffListener}.
     * <p>
     * Note that this method doesn't only emit cells etc where there's a difference. The listener is informed
     * of every corresponding entity between the merged and input rows, including those that are equal.
     *
     * @param diffListener the listener to which to signal the differences between the inputs and the merged result.
     * @param merged the result of merging {@code inputs}.
     * @param inputs the inputs whose merge yielded {@code merged}.
     */
    @SuppressWarnings("resource")
    public static void diff(RowDiffListener diffListener, Row merged, Row...inputs)
    {
        Clustering<?> clustering = merged.clustering();
        LivenessInfo mergedInfo = merged.primaryKeyLivenessInfo().isEmpty() ? null : merged.primaryKeyLivenessInfo();
        Row.Deletion mergedDeletion = merged.deletion().isLive() ? null : merged.deletion();
        for (int i = 0; i < inputs.length; i++)
        {
            Row input = inputs[i];
            LivenessInfo inputInfo = input == null || input.primaryKeyLivenessInfo().isEmpty() ? null : input.primaryKeyLivenessInfo();
            Row.Deletion inputDeletion = input == null || input.deletion().isLive() ? null : input.deletion();

            if (mergedInfo != null || inputInfo != null)
                diffListener.onPrimaryKeyLivenessInfo(i, clustering, mergedInfo, inputInfo);
            if (mergedDeletion != null || inputDeletion != null)
                diffListener.onDeletion(i, clustering, mergedDeletion, inputDeletion);
        }

        List<Iterator<ColumnData>> inputIterators = new ArrayList<>(1 + inputs.length);
        inputIterators.add(merged.iterator());
        for (Row row : inputs)
            inputIterators.add(row == null ? Collections.emptyIterator() : row.iterator());

        Iterator<?> iter = MergeIterator.get(inputIterators, ColumnData.comparator, new MergeIterator.Reducer<ColumnData, Object>()
        {
            ColumnData mergedData;
            ColumnData[] inputDatas = new ColumnData[inputs.length];
            public void reduce(int idx, ColumnData current)
            {
                if (idx == 0)
                    mergedData = current;
                else
                    inputDatas[idx - 1] = current;
            }

            protected Object getReduced()
            {
                for (int i = 0 ; i != inputDatas.length ; i++)
                {
                    ColumnData input = inputDatas[i];
                    if (mergedData != null || input != null)
                    {
                        ColumnMetadata column = (mergedData != null ? mergedData : input).column;
                        if (column.isSimple())
                        {
                            diffListener.onCell(i, clustering, (Cell<?>) mergedData, (Cell<?>) input);
                        }
                        else
                        {
                            ComplexColumnData mergedData = (ComplexColumnData) this.mergedData;
                            ComplexColumnData inputData = (ComplexColumnData) input;
                            if (mergedData == null)
                            {
                                // Everything in inputData has been shadowed
                                if (!inputData.complexDeletion().isLive())
                                    diffListener.onComplexDeletion(i, clustering, column, null, inputData.complexDeletion());
                                for (Cell<?> inputCell : inputData)
                                    diffListener.onCell(i, clustering, null, inputCell);
                            }
                            else if (inputData == null)
                            {
                                // Everything in inputData is new
                                if (!mergedData.complexDeletion().isLive())
                                    diffListener.onComplexDeletion(i, clustering, column, mergedData.complexDeletion(), null);
                                for (Cell<?> mergedCell : mergedData)
                                    diffListener.onCell(i, clustering, mergedCell, null);
                            }
                            else
                            {

                                if (!mergedData.complexDeletion().isLive() || !inputData.complexDeletion().isLive())
                                    diffListener.onComplexDeletion(i, clustering, column, mergedData.complexDeletion(), inputData.complexDeletion());

                                PeekingIterator<Cell<?>> mergedCells = Iterators.peekingIterator(mergedData.iterator());
                                PeekingIterator<Cell<?>> inputCells = Iterators.peekingIterator(inputData.iterator());
                                while (mergedCells.hasNext() && inputCells.hasNext())
                                {
                                    int cmp = column.cellPathComparator().compare(mergedCells.peek().path(), inputCells.peek().path());
                                    if (cmp == 0)
                                        diffListener.onCell(i, clustering, mergedCells.next(), inputCells.next());
                                    else if (cmp < 0)
                                        diffListener.onCell(i, clustering, mergedCells.next(), null);
                                    else // cmp > 0
                                        diffListener.onCell(i, clustering, null, inputCells.next());
                                }
                                while (mergedCells.hasNext())
                                    diffListener.onCell(i, clustering, mergedCells.next(), null);
                                while (inputCells.hasNext())
                                    diffListener.onCell(i, clustering, null, inputCells.next());
                            }
                        }
                    }

                }
                return null;
            }

            protected void onKeyChange()
            {
                mergedData = null;
                Arrays.fill(inputDatas, null);
            }
        });

        while (iter.hasNext())
            iter.next();
    }

    public static Row merge(Row existing, Row update)
    {
        return merge(existing, update, ColumnData.noOp);
    }

    /**
     * Merges two rows. In addition to reconciling the cells in each row, the liveness info, and deletion times for
     * the row and complex columns are also merged.
     * <p>
     * Note that this method assumes that the provided rows can meaningfully be reconciled together. That is,
     * that the rows share the same clustering value, and belong to the same partition.
     *
     * @param existing
     * @param update
     *
     * @return the row resulting from the merge.
     */
    public static Row merge(Row existing, Row update, ColumnData.PostReconciliationFunction onReconcile)
    {
        assert existing instanceof BTreeRow;
        assert update instanceof BTreeRow;
        return BTreeRow.merge((BTreeRow) existing, (BTreeRow) update, onReconcile);
    }

    /**
     * Returns a row that is obtained from the given existing row by removing everything that is shadowed by data in
     * the update row. In other words, produces the smallest result row such that
     * {@code merge(result, update, nowInSec) == merge(existing, update, nowInSec)} after filtering by rangeDeletion.
     *
     * @param existing source row
     * @param update shadowing row
     * @param rangeDeletion extra {@code DeletionTime} from covering tombstone
     */
    public static Row removeShadowedCells(Row existing, Row update, DeletionTime rangeDeletion)
    {
        Row.Builder builder = BTreeRow.sortedBuilder();
        Clustering<?> clustering = existing.clustering();
        builder.newRow(clustering);

        DeletionTime deletion = update.deletion().time();
        if (rangeDeletion.supersedes(deletion))
            deletion = rangeDeletion;

        LivenessInfo existingInfo = existing.primaryKeyLivenessInfo();
        if (!deletion.deletes(existingInfo))
            builder.addPrimaryKeyLivenessInfo(existingInfo);
        Row.Deletion rowDeletion = existing.deletion();
        if (!deletion.supersedes(rowDeletion.time()))
            builder.addRowDeletion(rowDeletion);

        Iterator<ColumnData> a = existing.iterator();
        Iterator<ColumnData> b = update.iterator();
        ColumnData nexta = a.hasNext() ? a.next() : null, nextb = b.hasNext() ? b.next() : null;
        while (nexta != null)
        {
            int comparison = nextb == null ? -1 : nexta.column.compareTo(nextb.column);
            if (comparison <= 0)
            {
                ColumnData cura = nexta;
                ColumnMetadata column = cura.column;
                ColumnData curb = comparison == 0 ? nextb : null;
                if (column.isSimple())
                {
                    Cells.addNonShadowed((Cell<?>) cura, (Cell<?>) curb, deletion, builder);
                }
                else
                {
                    ComplexColumnData existingData = (ComplexColumnData) cura;
                    ComplexColumnData updateData = (ComplexColumnData) curb;

                    DeletionTime existingDt = existingData.complexDeletion();
                    DeletionTime updateDt = updateData == null ? DeletionTime.LIVE : updateData.complexDeletion();

                    DeletionTime maxDt = updateDt.supersedes(deletion) ? updateDt : deletion;
                    if (existingDt.supersedes(maxDt))
                    {
                        builder.addComplexDeletion(column, existingDt);
                        maxDt = existingDt;
                    }

                    Iterator<Cell<?>> existingCells = existingData.iterator();
                    Iterator<Cell<?>> updateCells = updateData == null ? null : updateData.iterator();
                    Cells.addNonShadowedComplex(column, existingCells, updateCells, maxDt, builder);
                }
                nexta = a.hasNext() ? a.next() : null;
                if (curb != null)
                    nextb = b.hasNext() ? b.next() : null;
            }
            else
            {
                nextb = b.hasNext() ? b.next() : null;
            }
        }
        Row row = builder.build();
        return row != null && !row.isEmpty() ? row : null;
    }
}