SimpleMessageBus.java
/*
* *********************************************************************************************************************
*
* TheseFoolishThings: Miscellaneous utilities
* http://tidalwave.it/projects/thesefoolishthings
*
* Copyright (C) 2009 - 2023 by Tidalwave s.a.s. (http://tidalwave.it)
*
* *********************************************************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* *********************************************************************************************************************
*
* git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
* git clone https://github.com/tidalwave-it/thesefoolishthings-src
*
* *********************************************************************************************************************
*/
package it.tidalwave.messagebus.spi;
import java.lang.ref.WeakReference;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import it.tidalwave.messagebus.MessageBus;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/***********************************************************************************************************************
*
* A partial implementation of {@link MessageBus}.
*
* @author Fabrizio Giudici
*
**********************************************************************************************************************/
@ThreadSafe @Slf4j
public class SimpleMessageBus implements MessageBus
{
private final Map<Class<?>, List<WeakReference<Listener<?>>>> listenersMapByTopic = new HashMap<>();
private final MessageDelivery messageDelivery;
@Getter
private final Executor executor;
/*******************************************************************************************************************
*
* Creates a new instance with a {@link SimpleAsyncMessageDelivery} strategy for delivery. It will use its own
* thread pool.
*
******************************************************************************************************************/
public SimpleMessageBus()
{
this(Executors.newFixedThreadPool(10));
}
/*******************************************************************************************************************
*
* Creates a new instance given an executor and a {@link SimpleAsyncMessageDelivery} strategy for delivery.
*
* @param executor the {@link Executor}
*
******************************************************************************************************************/
public SimpleMessageBus (@Nonnull final Executor executor)
{
this(executor, new SimpleAsyncMessageDelivery());
}
/*******************************************************************************************************************
*
* Creates a new instance given an executor and a strategy for delivery.
*
* @param executor the {@link Executor}
* @param messageDelivery the strategy for delivery
*
******************************************************************************************************************/
public SimpleMessageBus (@Nonnull final Executor executor, @Nonnull final MessageDelivery messageDelivery)
{
this.executor = executor;
this.messageDelivery = messageDelivery;
this.messageDelivery.initialize(this);
log.info("MessageBusSupport configured with {}", messageDelivery);
}
/*******************************************************************************************************************
*
* {@inheritDoc}
*
******************************************************************************************************************/
@Override
public <T> void publish (@Nonnull final T message)
{
publish((Class<T>)message.getClass(), message);
}
/*******************************************************************************************************************
*
* {@inheritDoc}
*
******************************************************************************************************************/
@Override
public <T> void publish (@Nonnull final Class<T> topic, @Nonnull final T message)
{
log.trace("publish({}, {})", topic, message);
messageDelivery.deliverMessage(topic, message);
}
/*******************************************************************************************************************
*
* {@inheritDoc}
*
******************************************************************************************************************/
@Override
public <T> void subscribe (@Nonnull final Class<T> topic, @Nonnull final Listener<T> listener)
{
log.debug("subscribe({}, {})", topic, listener);
findListenersByTopic(topic).add(new WeakReference<>(listener));
}
/*******************************************************************************************************************
*
* {@inheritDoc}
*
******************************************************************************************************************/
@Override
public void unsubscribe (@Nonnull final Listener<?> listener)
{
log.debug("unsubscribe({})", listener);
for (final var list : listenersMapByTopic.values())
{
list.removeIf(ref -> (ref.get() == null) || (ref.get() == listener));
}
}
/*******************************************************************************************************************
*
* Dispatches a message.
*
* @param <T> the static type of the topic
* @param topic the dynamic type of the topic
* @param message the message
*
******************************************************************************************************************/
protected <T> void dispatchMessage (@Nonnull final Class<T> topic, @Nonnull final T message)
{
final var clone = new HashSet<>(listenersMapByTopic.entrySet()); // FIXME: marked as dubious by SpotBugs
for (final var e : clone)
{
if (e.getKey().isAssignableFrom(topic))
{
final List<WeakReference<MessageBus.Listener<T>>> listeners = (List)e.getValue();
for (final var listenerReference : listeners)
{
final var listener = listenerReference.get();
if (listener != null)
{
try
{
listener.notify(message);
}
catch (Throwable t)
{
log.warn("deliverMessage()", t);
}
}
}
}
}
}
/*******************************************************************************************************************
*
*
******************************************************************************************************************/
@Nonnull
private <T> List<WeakReference<Listener<T>>> findListenersByTopic (@Nonnull final Class<T> topic)
{
// FIXME: use putIfAbsent()
List<WeakReference<Listener<T>>> listeners = (List)listenersMapByTopic.get(topic);
if (listeners == null)
{
listeners = new ArrayList<>();
listenersMapByTopic.put(topic, (List)listeners);
}
return listeners;
}
}