LoadingMap.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.utils.concurrent;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;
import org.cliffc.high_scale_lib.NonBlockingHashMap;

/**
 * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}.
 * <p>
 * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like
 * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)}
 * with synchronization scope reduced to the single key - that is, when dealing with a single key, unlike
 * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help
 * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such
 * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap}
 * does not guarantee at-most-once semantics of running the mapping function for a single key.
 *
 * @param <K>
 * @param <V>
 */
public class LoadingMap<K, V>
{
    private final NonBlockingHashMap<K, Future<V>> internalMap = new NonBlockingHashMap<>();

    /**
     * Returns a promise for the given key or null if there is nothing associated with the key.
     * <p/>if the promise is not fulfilled, it means that there a loading process associated with the key
     * <p/>if the promise is fulfilled with {@code null} value, it means that there is an unloading process associated with the key
     * <p/>if the promise is fulfilled with a failure, it means that a loading process associated with the key failed
     * but the exception was not propagated yet (a failed promise is eventually removed from the map)
     */
    @VisibleForTesting
    Future<V> get(K key)
    {
        return internalMap.get(key);
    }

    /**
     * Get a value for a given key. Returns a non-null object only if there is a successfully initialized value associated,
     * with the provided key. It returns {@code null} if there is no value for the key, or the value is being initialized
     * or removed. It does not throw if the last attempt to initialize the value failed.
     */
    public V getIfReady(K key)
    {
        Future<V> future = internalMap.get(key);
        return future != null ? future.getNow() : null;
    }

    /**
     * If the value for the given key is missing, execute a load function to obtain a value and put it into the map.
     * It is guaranteed that the loading and unloading a value for a single key are executed serially. It is also
     * guaranteed that the load function is executed exactly once to load a value into the map (regardless of the concurrent attempts).
     * <p/>
     * In case there is a concurrent attempt to load a value for this key, this attempt waits until the concurrent attempt
     * is done and returns its result (if succeeded). If the concurrent attempt fails, this attempt is retried. However,
     * if this attempt fails, it is not retried and the exception is rethrown. In case there is a concurrent attempt
     * to unload a value for this key, this attempt waits until the concurrent attempt is done and retries loading.
     * <p/>
     * When the mapping function returns {@code null}, {@link NullPointerException} is thrown. When the mapping function
     * throws exception, it is rethrown by this method. In both cases nothing gets added to the map.
     * <p/>
     * It is allowed to nest loading for a different key, though nested loading for the same key results in a deadlock.
     */
    public V blockingLoadIfAbsent(K key, Supplier<? extends V> loadFunction) throws RuntimeException
    {
        while (true)
        {
            Future<V> future = internalMap.get(key);
            boolean attemptedInThisThread = false;
            if (future == null)
            {
                AsyncPromise<V> newEntry = new AsyncPromise<>();
                future = internalMap.putIfAbsent(key, newEntry);
                if (future == null)
                {
                    // We managed to create an entry for the value. Now initialize it.
                    attemptedInThisThread = true;
                    future = newEntry;
                    try
                    {
                        V v = loadFunction.get();
                        if (v == null)
                            throw new NullPointerException("The mapping function returned null");
                        else
                            newEntry.setSuccess(v);
                    }
                    catch (Throwable t)
                    {
                        newEntry.setFailure(t);
                        // Remove future so that construction can be retried later
                        internalMap.remove(key, future);
                    }
                }

                // Else some other thread beat us to it, but we now have the reference to the future which we can wait for.
            }

            V v = future.awaitUninterruptibly().getNow();

            if (v != null) // implies success
                return v;

            if (attemptedInThisThread)
                // Rethrow if the failing attempt was initiated by us (failed and attemptedInThisThread)
                future.rethrowIfFailed();

            // Retry in other cases, that is, if blockingUnloadIfPresent was called in the meantime
            // (success and getNow == null) hoping that unloading gets finished soon, and if the concurrent attempt
            // to load entry fails
            Thread.yield();
        }
    }

    /**
     * If a value for the given key is present, unload function is run and the value is removed from the map.
     * Similarly to {@link #blockingLoadIfAbsent(Object, Supplier)} at-most-once semantics is guaranteed for unload
     * function.
     * <p/>
     * When unload function fails, the value is removed from the map anyway and the failure is rethrown.
     * <p/>
     * When the key was not found, the method returns {@code null}.
     *
     * @throws UnloadExecutionException when the unloading failed to complete - this is checked exception because
     *                                  the value is removed from the map regardless of the result of unloading;
     *                                  therefore if the unloading failed, the caller is responsible for handling that
     */
    public V blockingUnloadIfPresent(K key, Consumer<? super V> unloadFunction) throws UnloadExecutionException
    {
        Promise<V> droppedFuture = new AsyncPromise<V>().setSuccess(null);

        Future<V> existingFuture;
        do
        {
            existingFuture = internalMap.get(key);
            if (existingFuture == null || existingFuture.isDone() && existingFuture.getNow() == null)
                return null;
        } while (!internalMap.replace(key, existingFuture, droppedFuture));

        V v = existingFuture.awaitUninterruptibly().getNow();
        try
        {
            if (v == null)
                // which means that either the value failed to load or a concurrent attempt to unload already did the work
                return null;

            unloadFunction.accept(v);
            return v;
        }
        catch (Throwable t)
        {
            throw new UnloadExecutionException(v, t);
        }
        finally
        {
            Future<V> future = internalMap.remove(key);
            assert future == droppedFuture;
        }
    }

    /**
     * Thrown when unloading a value failed. It encapsulates the value which was failed to unload.
     */
    public static class UnloadExecutionException extends ExecutionException
    {
        private final Object value;

        public UnloadExecutionException(Object value, Throwable cause)
        {
            super(cause);
            this.value = value;
        }

        public <T> T value()
        {
            return (T) value;
        }
    }
}