View Javadoc
1   /*
2    * *************************************************************************************************************************************************************
3    *
4    * TheseFoolishThings: Miscellaneous utilities
5    * http://tidalwave.it/projects/thesefoolishthings
6    *
7    * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it)
8    *
9    * *************************************************************************************************************************************************************
10   *
11   * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12   * You may obtain a copy of the License at
13   *
14   *     http://www.apache.org/licenses/LICENSE-2.0
15   *
16   * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17   * CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions and limitations under the License.
18   *
19   * *************************************************************************************************************************************************************
20   *
21   * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22   * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23   *
24   * *************************************************************************************************************************************************************
25   */
26  package it.tidalwave.messagebus.spi;
27  
28  import java.lang.ref.WeakReference;
29  import javax.annotation.concurrent.ThreadSafe;
30  import jakarta.annotation.Nonnull;
31  import java.util.ArrayList;
32  import java.util.HashMap;
33  import java.util.HashSet;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.concurrent.Executor;
37  import java.util.concurrent.Executors;
38  import it.tidalwave.messagebus.MessageBus;
39  import lombok.Getter;
40  import lombok.extern.slf4j.Slf4j;
41  
42  /***************************************************************************************************************************************************************
43   *
44   * A partial implementation of {@link MessageBus}.
45   *
46   * @author  Fabrizio Giudici
47   *
48   **************************************************************************************************************************************************************/
49  @ThreadSafe @Slf4j
50  public class SimpleMessageBus implements MessageBus
51    {
52      private final Map<Class<?>, List<WeakReference<Listener<?>>>> listenersMapByTopic = new HashMap<>();
53  
54      private final MessageDelivery messageDelivery;
55      
56      @Getter
57      private final Executor executor;
58      
59      /***********************************************************************************************************************************************************
60       * Creates a new instance with a {@link SimpleAsyncMessageDelivery} strategy for delivery. It will use its own
61       * thread pool.
62       **********************************************************************************************************************************************************/
63      public SimpleMessageBus() 
64        {
65          this(Executors.newFixedThreadPool(10));
66        }
67  
68      /***********************************************************************************************************************************************************
69       * Creates a new instance given an executor and a {@link SimpleAsyncMessageDelivery} strategy for delivery.
70       *
71       * @param   executor          the {@link Executor}
72       **********************************************************************************************************************************************************/
73      public SimpleMessageBus (@Nonnull final Executor executor)
74        {
75          this(executor, new SimpleAsyncMessageDelivery());
76        }
77      
78      /***********************************************************************************************************************************************************
79       * Creates a new instance given an executor and a strategy for delivery.
80       *
81       * @param   executor          the {@link Executor}
82       * @param   messageDelivery   the strategy for delivery
83       **********************************************************************************************************************************************************/
84      public SimpleMessageBus (@Nonnull final Executor executor, @Nonnull final MessageDelivery messageDelivery)
85        {
86          this.executor = executor;
87          this.messageDelivery = messageDelivery;
88          this.messageDelivery.initialize(this);
89          log.info("MessageBusSupport configured with {}", messageDelivery);
90        }
91       
92      /***********************************************************************************************************************************************************
93       * {@inheritDoc}
94       **********************************************************************************************************************************************************/
95      @Override
96      public <T> void publish (@Nonnull final T message)
97        {
98          publish((Class<T>)message.getClass(), message);
99        }
100 
101     /***********************************************************************************************************************************************************
102      * {@inheritDoc}
103      **********************************************************************************************************************************************************/
104     @Override
105     public <T> void publish (@Nonnull final Class<T> topic, @Nonnull final T message)
106       {
107         log.trace("publish({}, {})", topic, message);
108         messageDelivery.deliverMessage(topic, message);
109       }
110 
111     /***********************************************************************************************************************************************************
112      * {@inheritDoc}
113      **********************************************************************************************************************************************************/
114     @Override
115     public <T> void subscribe (@Nonnull final Class<T> topic, @Nonnull final Listener<T> listener)
116       {
117         log.debug("subscribe({}, {})", topic, listener);
118         findListenersByTopic(topic).add(new WeakReference<>(listener));
119       }
120 
121     /***********************************************************************************************************************************************************
122      * {@inheritDoc}
123      **********************************************************************************************************************************************************/
124     @Override
125     public void unsubscribe (@Nonnull final Listener<?> listener)
126       {
127         log.debug("unsubscribe({})", listener);
128 
129         for (final var list : listenersMapByTopic.values())
130           {
131             list.removeIf(ref -> (ref.get() == null) || (ref.get() == listener));
132           }
133       }
134 
135     /***********************************************************************************************************************************************************
136      * Dispatches a message.
137      *
138      * @param   <T>   the static type of the topic
139      * @param   topic     the dynamic type of the topic
140      * @param   message   the message
141      **********************************************************************************************************************************************************/
142     protected <T> void dispatchMessage (@Nonnull final Class<T> topic, @Nonnull final T message)
143       {
144         final var clone = new HashSet<>(listenersMapByTopic.entrySet()); // FIXME: marked as dubious by SpotBugs
145 
146         for (final var e : clone)
147           {
148             if (e.getKey().isAssignableFrom(topic))
149               {
150                 final List<WeakReference<MessageBus.Listener<T>>> listeners = (List)e.getValue();
151 
152                 for (final var listenerReference : listeners)
153                   {
154                     final var listener = listenerReference.get();
155 
156                     if (listener != null)
157                       {
158                         try
159                           {
160                             listener.notify(message);
161                           }
162                         catch (Throwable t)
163                           {
164                             log.warn("deliverMessage()", t);
165                           }
166                       }
167                   }
168               }
169           }
170       }
171     
172     /***********************************************************************************************************************************************************
173      **********************************************************************************************************************************************************/
174     @Nonnull
175     private <T> List<WeakReference<Listener<T>>> findListenersByTopic (@Nonnull final Class<T> topic)
176       {
177         // FIXME: use putIfAbsent()
178         List<WeakReference<Listener<T>>> listeners = (List)listenersMapByTopic.get(topic);
179 
180         if (listeners == null)
181           {
182             listeners = new ArrayList<>();
183             listenersMapByTopic.put(topic, (List)listeners);
184           }
185 
186         return listeners;
187       }
188   }