1 /* 2 * ************************************************************************************************************************************************************* 3 * 4 * TheseFoolishThings: Miscellaneous utilities 5 * http://tidalwave.it/projects/thesefoolishthings 6 * 7 * Copyright (C) 2009 - 2024 by Tidalwave s.a.s. (http://tidalwave.it) 8 * 9 * ************************************************************************************************************************************************************* 10 * 11 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. 12 * You may obtain a copy of the License at 13 * 14 * http://www.apache.org/licenses/LICENSE-2.0 15 * 16 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 17 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. 18 * 19 * ************************************************************************************************************************************************************* 20 * 21 * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src 22 * git clone https://github.com/tidalwave-it/thesefoolishthings-src 23 * 24 * ************************************************************************************************************************************************************* 25 */ 26 package it.tidalwave.messagebus.spi; 27 28 import java.lang.ref.WeakReference; 29 import javax.annotation.Nonnull; 30 import javax.annotation.concurrent.ThreadSafe; 31 import java.util.ArrayList; 32 import java.util.HashMap; 33 import java.util.HashSet; 34 import java.util.List; 35 import java.util.Map; 36 import java.util.concurrent.Executor; 37 import java.util.concurrent.Executors; 38 import it.tidalwave.messagebus.MessageBus; 39 import lombok.Getter; 40 import lombok.extern.slf4j.Slf4j; 41 42 /*************************************************************************************************************************************************************** 43 * 44 * A partial implementation of {@link MessageBus}. 45 * 46 * @author Fabrizio Giudici 47 * 48 **************************************************************************************************************************************************************/ 49 @ThreadSafe @Slf4j 50 public class SimpleMessageBus implements MessageBus 51 { 52 private final Map<Class<?>, List<WeakReference<Listener<?>>>> listenersMapByTopic = new HashMap<>(); 53 54 private final MessageDelivery messageDelivery; 55 56 @Getter 57 private final Executor executor; 58 59 /*********************************************************************************************************************************************************** 60 * Creates a new instance with a {@link SimpleAsyncMessageDelivery} strategy for delivery. It will use its own 61 * thread pool. 62 **********************************************************************************************************************************************************/ 63 public SimpleMessageBus() 64 { 65 this(Executors.newFixedThreadPool(10)); 66 } 67 68 /*********************************************************************************************************************************************************** 69 * Creates a new instance given an executor and a {@link SimpleAsyncMessageDelivery} strategy for delivery. 70 * 71 * @param executor the {@link Executor} 72 **********************************************************************************************************************************************************/ 73 public SimpleMessageBus (@Nonnull final Executor executor) 74 { 75 this(executor, new SimpleAsyncMessageDelivery()); 76 } 77 78 /*********************************************************************************************************************************************************** 79 * Creates a new instance given an executor and a strategy for delivery. 80 * 81 * @param executor the {@link Executor} 82 * @param messageDelivery the strategy for delivery 83 **********************************************************************************************************************************************************/ 84 public SimpleMessageBus (@Nonnull final Executor executor, @Nonnull final MessageDelivery messageDelivery) 85 { 86 this.executor = executor; 87 this.messageDelivery = messageDelivery; 88 this.messageDelivery.initialize(this); 89 log.info("MessageBusSupport configured with {}", messageDelivery); 90 } 91 92 /*********************************************************************************************************************************************************** 93 * {@inheritDoc} 94 **********************************************************************************************************************************************************/ 95 @Override 96 public <T> void publish (@Nonnull final T message) 97 { 98 publish((Class<T>)message.getClass(), message); 99 } 100 101 /*********************************************************************************************************************************************************** 102 * {@inheritDoc} 103 **********************************************************************************************************************************************************/ 104 @Override 105 public <T> void publish (@Nonnull final Class<T> topic, @Nonnull final T message) 106 { 107 log.trace("publish({}, {})", topic, message); 108 messageDelivery.deliverMessage(topic, message); 109 } 110 111 /*********************************************************************************************************************************************************** 112 * {@inheritDoc} 113 **********************************************************************************************************************************************************/ 114 @Override 115 public <T> void subscribe (@Nonnull final Class<T> topic, @Nonnull final Listener<T> listener) 116 { 117 log.debug("subscribe({}, {})", topic, listener); 118 findListenersByTopic(topic).add(new WeakReference<>(listener)); 119 } 120 121 /*********************************************************************************************************************************************************** 122 * {@inheritDoc} 123 **********************************************************************************************************************************************************/ 124 @Override 125 public void unsubscribe (@Nonnull final Listener<?> listener) 126 { 127 log.debug("unsubscribe({})", listener); 128 129 for (final var list : listenersMapByTopic.values()) 130 { 131 list.removeIf(ref -> (ref.get() == null) || (ref.get() == listener)); 132 } 133 } 134 135 /*********************************************************************************************************************************************************** 136 * Dispatches a message. 137 * 138 * @param <T> the static type of the topic 139 * @param topic the dynamic type of the topic 140 * @param message the message 141 **********************************************************************************************************************************************************/ 142 protected <T> void dispatchMessage (@Nonnull final Class<T> topic, @Nonnull final T message) 143 { 144 final var clone = new HashSet<>(listenersMapByTopic.entrySet()); // FIXME: marked as dubious by SpotBugs 145 146 for (final var e : clone) 147 { 148 if (e.getKey().isAssignableFrom(topic)) 149 { 150 final List<WeakReference<MessageBus.Listener<T>>> listeners = (List)e.getValue(); 151 152 for (final var listenerReference : listeners) 153 { 154 final var listener = listenerReference.get(); 155 156 if (listener != null) 157 { 158 try 159 { 160 listener.notify(message); 161 } 162 catch (Throwable t) 163 { 164 log.warn("deliverMessage()", t); 165 } 166 } 167 } 168 } 169 } 170 } 171 172 /*********************************************************************************************************************************************************** 173 **********************************************************************************************************************************************************/ 174 @Nonnull 175 private <T> List<WeakReference<Listener<T>>> findListenersByTopic (@Nonnull final Class<T> topic) 176 { 177 // FIXME: use putIfAbsent() 178 List<WeakReference<Listener<T>>> listeners = (List)listenersMapByTopic.get(topic); 179 180 if (listeners == null) 181 { 182 listeners = new ArrayList<>(); 183 listenersMapByTopic.put(topic, (List)listeners); 184 } 185 186 return listeners; 187 } 188 }