1 /*
2 * *************************************************************************************************************************************************************
3 *
4 * TheseFoolishThings: Miscellaneous utilities
5 * http://tidalwave.it/projects/thesefoolishthings
6 *
7 * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it)
8 *
9 * *************************************************************************************************************************************************************
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 *
14 * http://www.apache.org/licenses/LICENSE-2.0
15 *
16 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
18 *
19 * *************************************************************************************************************************************************************
20 *
21 * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22 * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23 *
24 * *************************************************************************************************************************************************************
25 */
26 package it.tidalwave.messagebus.spi;
27
28 import java.lang.ref.WeakReference;
29 // import javax.annotation.concurrent.ThreadSafe;
30 import jakarta.annotation.Nonnull;
31 import java.util.ArrayList;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.Executors;
38 import it.tidalwave.messagebus.MessageBus;
39 import lombok.Getter;
40 import lombok.extern.slf4j.Slf4j;
41
42 /***************************************************************************************************************************************************************
43 *
44 * A partial implementation of {@link MessageBus}.
45 *
46 * @author Fabrizio Giudici
47 *
48 **************************************************************************************************************************************************************/
49 /* @ThreadSafe */ @Slf4j @SuppressWarnings("this-escape")
50 public class SimpleMessageBus implements MessageBus
51 {
52 private final Map<Class<?>, List<WeakReference<Listener<?>>>> listenersMapByTopic = new HashMap<>();
53
54 private final MessageDelivery messageDelivery;
55
56 @Getter
57 private final Executor executor;
58
59 /***********************************************************************************************************************************************************
60 * Creates a new instance with a {@link SimpleAsyncMessageDelivery} strategy for delivery. It will use its own
61 * thread pool.
62 **********************************************************************************************************************************************************/
63 public SimpleMessageBus()
64 {
65 this(Executors.newFixedThreadPool(10));
66 }
67
68 /***********************************************************************************************************************************************************
69 * Creates a new instance given an executor and a {@link SimpleAsyncMessageDelivery} strategy for delivery.
70 *
71 * @param executor the {@link Executor}
72 **********************************************************************************************************************************************************/
73 public SimpleMessageBus (@Nonnull final Executor executor)
74 {
75 this(executor, new SimpleAsyncMessageDelivery());
76 }
77
78 /***********************************************************************************************************************************************************
79 * Creates a new instance given an executor and a strategy for delivery.
80 *
81 * @param executor the {@link Executor}
82 * @param messageDelivery the strategy for delivery
83 **********************************************************************************************************************************************************/
84 public SimpleMessageBus (@Nonnull final Executor executor, @Nonnull final MessageDelivery messageDelivery)
85 {
86 this.executor = executor;
87 this.messageDelivery = messageDelivery;
88 this.messageDelivery.initialize(this);
89 log.info("MessageBusSupport configured with {}", messageDelivery);
90 }
91
92 /***********************************************************************************************************************************************************
93 * {@inheritDoc}
94 **********************************************************************************************************************************************************/
95 @Override
96 public <T> void publish (@Nonnull final T message)
97 {
98 publish((Class<T>)message.getClass(), message);
99 }
100
101 /***********************************************************************************************************************************************************
102 * {@inheritDoc}
103 **********************************************************************************************************************************************************/
104 @Override
105 public <T> void publish (@Nonnull final Class<T> topic, @Nonnull final T message)
106 {
107 log.trace("publish({}, {})", topic, message);
108 messageDelivery.deliverMessage(topic, message);
109 }
110
111 /***********************************************************************************************************************************************************
112 * {@inheritDoc}
113 **********************************************************************************************************************************************************/
114 @Override
115 public <T> void subscribe (@Nonnull final Class<T> topic, @Nonnull final Listener<T> listener)
116 {
117 log.debug("subscribe({}, {})", topic, listener);
118 findListenersByTopic(topic).add(new WeakReference<>(listener));
119 }
120
121 /***********************************************************************************************************************************************************
122 * {@inheritDoc}
123 **********************************************************************************************************************************************************/
124 @Override
125 public void unsubscribe (@Nonnull final Listener<?> listener)
126 {
127 log.debug("unsubscribe({})", listener);
128
129 for (final var list : listenersMapByTopic.values())
130 {
131 list.removeIf(ref -> (ref.get() == null) || (ref.get() == listener));
132 }
133 }
134
135 /***********************************************************************************************************************************************************
136 * Dispatches a message.
137 *
138 * @param <T> the static type of the topic
139 * @param topic the dynamic type of the topic
140 * @param message the message
141 **********************************************************************************************************************************************************/
142 protected <T> void dispatchMessage (@Nonnull final Class<T> topic, @Nonnull final T message)
143 {
144 final var clone = new HashSet<>(listenersMapByTopic.entrySet()); // FIXME: marked as dubious by SpotBugs
145
146 for (final var e : clone)
147 {
148 if (e.getKey().isAssignableFrom(topic))
149 {
150 final List<WeakReference<MessageBus.Listener<T>>> listeners = (List)e.getValue();
151
152 for (final var listenerReference : listeners)
153 {
154 final var listener = listenerReference.get();
155
156 if (listener != null)
157 {
158 try
159 {
160 listener.notify(message);
161 }
162 catch (Throwable t)
163 {
164 log.warn("deliverMessage()", t);
165 }
166 }
167 }
168 }
169 }
170 }
171
172 /***********************************************************************************************************************************************************
173 **********************************************************************************************************************************************************/
174 @Nonnull
175 private <T> List<WeakReference<Listener<T>>> findListenersByTopic (@Nonnull final Class<T> topic)
176 {
177 // FIXME: use putIfAbsent()
178 List<WeakReference<Listener<T>>> listeners = (List)listenersMapByTopic.get(topic);
179
180 if (listeners == null)
181 {
182 listeners = new ArrayList<>();
183 listenersMapByTopic.put(topic, (List)listeners);
184 }
185
186 return listeners;
187 }
188 }