ListenerList.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils.concurrent;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import com.google.common.util.concurrent.FutureCallback;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.concurrent.ExecutionFailure;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import static org.apache.cassandra.utils.concurrent.ListenerList.Notifying.NOTIFYING;
/**
* Encapsulate one or more items in a linked-list that is immutable whilst shared, forming a prepend-only list (or stack).
* Once the list is ready to consume, exclusive ownership is taken by clearing the shared variable containing it, after
* which the list may be invoked using {@link #notifyExclusive(ListenerList, Future)}, which reverses the list before invoking the work it contains.
*/
abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
{
abstract void notifySelf(Executor notifyExecutor, Future<V> future);
static ListenerList pushHead(ListenerList prev, ListenerList next)
{
if (prev instanceof Notifying<?>)
{
Notifying result = new Notifying();
result.next = next;
next.next = prev == NOTIFYING ? null : prev;
return result;
}
next.next = prev;
return next;
}
/**
* Logically append {@code newListener} to {@link #listeners}
* (at this stage it is a stack, so we actually prepend)
*
* @param newListener must be either a {@link ListenerList} or {@link GenericFutureListener}
*/
@Inline
static <T> void push(AtomicReferenceFieldUpdater<? super T, ListenerList> updater, T in, ListenerList newListener)
{
IntrusiveStack.push(updater, in, newListener, ListenerList::pushHead);
}
/**
* Logically append {@code newListener} to {@link #listeners}
* (at this stage it is a stack, so we actually prepend)
*
* @param newListener must be either a {@link ListenerList} or {@link GenericFutureListener}
*/
@Inline
static <T> void pushExclusive(AtomicReferenceFieldUpdater<? super T, ListenerList> updater, T in, ListenerList newListener)
{
IntrusiveStack.pushExclusive(updater, in, newListener, ListenerList::pushHead);
}
static <V, T extends Future<V>> void notify(AtomicReferenceFieldUpdater<? super T, ListenerList> updater, T in)
{
while (true)
{
ListenerList<V> listeners = updater.get(in);
if (listeners == null || listeners instanceof Notifying)
return; // either no listeners, or we are already notifying listeners, so we'll get to the new one when ready
if (updater.compareAndSet(in, listeners, NOTIFYING))
{
while (true)
{
notifyExclusive(listeners, in);
if (updater.compareAndSet(in, NOTIFYING, null))
return;
listeners = updater.getAndSet(in, NOTIFYING);
}
}
}
}
/**
* Requires exclusive ownership of {@code head}.
*
* Task submission occurs in the order the operations were submitted; if all of the executors
* are immediate or unspecified this guarantees execution order.
* Tasks are submitted to the executor individually, as this simplifies semantics and
* we anticipate few listeners in practice, and even fewer with indirect executors.
*
* @param head must be either a {@link ListenerList} or {@link GenericFutureListener}
*/
static <T> void notifyExclusive(ListenerList<T> head, Future<T> future)
{
Executor notifyExecutor; {
Executor exec = future.notifyExecutor();
notifyExecutor = inExecutor(exec) ? null : exec;
}
head = reverse(head);
forEach(head, i -> i.notifySelf(notifyExecutor, future));
}
/**
* Notify {@code listener} on the invoking thread, handling any exceptions
*/
static <F extends io.netty.util.concurrent.Future<?>> void notifyListener(GenericFutureListener<F> listener, F future)
{
try
{
listener.operationComplete(future);
}
catch (Throwable t)
{
// TODO: suboptimal package interdependency - move FutureTask etc here?
ExecutionFailure.handle(t);
}
}
/**
* Notify {@code listener} using {@code notifyExecutor} if set, and handling exceptions otherwise
*/
static <F extends io.netty.util.concurrent.Future<?>> void notifyListener(Executor notifyExecutor, GenericFutureListener<F> listener, F future)
{
if (notifyExecutor == null) notifyListener(listener, future);
else safeExecute(notifyExecutor, () -> notifyListener(listener, future));
}
/**
* Notify {@code listener} using {@code notifyExecutor} if set, and handling exceptions otherwise
*/
static void notifyListener(@Nullable Executor notifyExecutor, Runnable listener)
{
safeExecute(notifyExecutor, listener);
}
private static void safeExecute(@Nullable Executor notifyExecutor, Runnable runnable)
{
if (notifyExecutor == null)
notifyExecutor = ImmediateExecutor.INSTANCE;
try
{
notifyExecutor.execute(runnable);
}
catch (Exception | Error e)
{
// TODO: suboptimal package interdependency - move FutureTask etc here?
ExecutionFailure.handle(e);
}
}
/**
* Encapsulate a regular listener in a linked list
*/
static class GenericFutureListenerList<V> extends ListenerList<V>
{
final GenericFutureListener listener;
GenericFutureListenerList(GenericFutureListener listener)
{
this.listener = listener;
}
@Override
void notifySelf(Executor notifyExecutor, Future<V> future)
{
notifyListener(notifyExecutor, listener, future);
}
}
/**
* Encapsulates the invocation of a callback with everything needed to submit for execution
* without incurring significant further overhead as a list
*/
static class CallbackListener<V> extends ListenerList<V> implements Runnable
{
final Future<V> future;
final FutureCallback<? super V> callback;
CallbackListener(Future<V> future, FutureCallback<? super V> callback)
{
this.future = future;
this.callback = callback;
}
@Override
public void run()
{
if (future.isSuccess()) callback.onSuccess(future.getNow());
else callback.onFailure(future.cause());
}
@Override
void notifySelf(Executor notifyExecutor, Future<V> future)
{
notifyListener(notifyExecutor, this);
}
}
/**
* Encapsulates the invocation of a callback with everything needed to submit for execution
* without incurring significant further overhead as a list
*/
static class CallbackBiConsumerListener<V> extends ListenerList<V> implements Runnable
{
final Future<V> future;
final BiConsumer<? super V, Throwable> callback;
final Executor executor;
CallbackBiConsumerListener(Future<V> future, BiConsumer<? super V, Throwable> callback, Executor executor)
{
this.future = future;
this.callback = callback;
this.executor = executor;
}
@Override
public void run()
{
if (future.isSuccess()) callback.accept(future.getNow(), null);
else callback.accept(null, future.cause());
}
@Override
void notifySelf(Executor notifyExecutor, Future<V> future)
{
notifyListener(executor == null ? notifyExecutor : executor, this);
}
}
/**
* Encapsulates the invocation of a callback with everything needed to submit for execution
* without incurring significant further overhead as a list
*/
static class CallbackListenerWithExecutor<V> extends CallbackListener<V>
{
final Executor executor;
CallbackListenerWithExecutor(Future<V> future, FutureCallback<? super V> callback, Executor executor)
{
super(future, callback);
this.executor = executor;
}
@Override
void notifySelf(Executor notifyExecutor, Future<V> future)
{
notifyListener(executor, this);
}
}
/**
* Encapsulates the invocation of a callback with everything needed to submit for execution
* without incurring significant further overhead as a list
*/
static class CallbackLambdaListener<V> extends ListenerList<V> implements Runnable
{
final Future<V> future;
final Consumer<? super V> onSuccess;
final Consumer<? super Throwable> onFailure;
final Executor executor;
CallbackLambdaListener(Future<V> future, Consumer<? super V> onSuccess, Consumer<? super Throwable> onFailure, Executor executor)
{
this.future = future;
this.onSuccess = onSuccess;
this.onFailure = onFailure;
this.executor = executor;
}
@Override
public void run()
{
if (future.isSuccess()) onSuccess.accept(future.getNow());
else onFailure.accept(future.cause());
}
@Override
void notifySelf(Executor notifyExecutor, Future future)
{
notifyListener(executor == null ? notifyExecutor : executor, this);
}
}
/**
* Encapsulate a task, executable on completion by {@link Future#notifyExecutor}, in a linked list for storing in
* {@link #listeners}, either as a listener on its own (since we need to encapsulate it anyway), or alongside
* other listeners in a list
*/
static class RunnableWithNotifyExecutor<V> extends ListenerList<V>
{
final Runnable task;
RunnableWithNotifyExecutor(Runnable task)
{
this.task = task;
}
@Override
void notifySelf(Executor notifyExecutor, Future<V> future)
{
notifyListener(notifyExecutor, task);
}
}
/**
* Encapsulate a task executable on completion in a linked list for storing in {@link #listeners},
* either as a listener on its own (since we need to encapsulate it anyway), or alongside other listeners
* in a list
*/
static class RunnableWithExecutor<V> extends ListenerList<V>
{
final Runnable task;
@Nullable final Executor executor;
RunnableWithExecutor(Runnable task, @Nullable Executor executor)
{
this.task = task;
this.executor = executor;
}
@Override
void notifySelf(Executor notifyExecutor, Future<V> future)
{
notifyListener(inExecutor(executor) ? null : executor, task);
}
}
/**
* Dummy that indicates listeners are already being notified after the future was completed,
* so we cannot notify them ourselves whilst maintaining the guaranteed invocation order.
* The invocation of the list can be left to the thread already notifying listeners.
*/
static class Notifying<V> extends ListenerList<V>
{
static final Notifying NOTIFYING = new Notifying();
@Override
void notifySelf(Executor notifyExecutor, Future<V> future)
{
}
}
/**
* @return true iff the invoking thread is executing {@code executor}
*/
static boolean inExecutor(Executor executor)
{
return (executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop())
|| (executor instanceof ExecutorPlus && ((ExecutorPlus) executor).inExecutor());
}
}