View Javadoc
1   /*
2    * *************************************************************************************************************************************************************
3    *
4    * TheseFoolishThings: Miscellaneous utilities
5    * http://tidalwave.it/projects/thesefoolishthings
6    *
7    * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it)
8    *
9    * *************************************************************************************************************************************************************
10   *
11   * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12   * You may obtain a copy of the License at
13   *
14   *     http://www.apache.org/licenses/LICENSE-2.0
15   *
16   * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17   * CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions and limitations under the License.
18   *
19   * *************************************************************************************************************************************************************
20   *
21   * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22   * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23   *
24   * *************************************************************************************************************************************************************
25   */
26  package it.tidalwave.messagebus.spi;
27  
28  import jakarta.annotation.Nonnull;
29  import java.util.ArrayList;
30  import java.util.Comparator;
31  import java.util.List;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentNavigableMap;
34  import java.util.concurrent.ConcurrentSkipListMap;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import lombok.Getter;
37  import lombok.RequiredArgsConstructor;
38  import lombok.ToString;
39  import lombok.extern.slf4j.Slf4j;
40  
41  /***************************************************************************************************************************************************************
42   *
43   * @author  Fabrizio Giudici
44   *
45   **************************************************************************************************************************************************************/
46  @Slf4j
47  public class MultiQueue 
48    {
49      @RequiredArgsConstructor @Getter @ToString
50      static class TopicAndMessage<T>
51        {
52          @Nonnull
53          private final Class<T> topic;
54  
55          @Nonnull
56          private final T message;
57        }
58       
59      private final ConcurrentNavigableMap<Class<?>, Queue<?>> queueMapByTopic =
60              new ConcurrentSkipListMap<>(Comparator.comparing(Class::getName));
61      
62      private Class<?> latestSentTopic = null;
63      
64      /***********************************************************************************************************************************************************
65       * Adds a message of the given topic to this queue and issues a notification.
66       *
67       * @param   <T>   the static type of the message
68       * @param   topic     the dynamic type of the message
69       * @param   message   the message
70       **********************************************************************************************************************************************************/
71      public synchronized <T> void add (@Nonnull final Class<T> topic, @Nonnull final T message)
72        {
73          getQueue(topic).add(message);
74          notifyAll();
75        }
76      
77      /***********************************************************************************************************************************************************
78       * Removes and returns the next pair (topic, message) from the queue. Blocks until one is available.
79       *
80       * @param   <T>                 the static type of the topic
81       * @return                          the topic and message
82       * @throws  InterruptedException    if interrupted while waiting
83       **********************************************************************************************************************************************************/
84      @Nonnull
85      public synchronized <T> TopicAndMessage<T> remove()
86        throws InterruptedException
87        {
88          for (;;)
89            {
90              for (final var topic : reorderedTopics())
91                {
92                  final var queue = queueMapByTopic.get(topic);
93                  final var message = queue.poll();
94                  
95                  if (message != null)
96                    {
97                      latestSentTopic = topic;
98                      
99                      if (log.isTraceEnabled())
100                       {
101                         log.trace("stats {}", stats());
102                       }
103                     
104                     return new TopicAndMessage<>((Class<T>)topic, (T)message);
105                   }
106               }
107 
108             if (log.isTraceEnabled())
109               {
110                 log.trace("all queues empty; stats {}", stats());
111               }
112     
113             wait();  
114           }
115       }
116 
117     /***********************************************************************************************************************************************************
118      * Returns the list of topics reordered, so it starts just after latestSentTopic and wraps around.
119      **********************************************************************************************************************************************************/
120     @Nonnull
121     private List<Class<?>> reorderedTopics() 
122       {
123         final var keySet = queueMapByTopic.navigableKeySet();
124         final List<Class<?>> scanSet = new ArrayList<>();
125 
126         if (latestSentTopic == null)
127           {
128             scanSet.addAll(keySet);
129           }
130         else
131           {
132             scanSet.addAll(keySet.subSet(latestSentTopic, false, keySet.last(), true));
133             scanSet.addAll(keySet.subSet(keySet.first(), true, latestSentTopic, true));
134           }
135         
136         return scanSet;
137       }
138     
139     /***********************************************************************************************************************************************************
140      **********************************************************************************************************************************************************/
141     private synchronized String stats()
142       {
143         final var b = new StringBuilder();
144         var separator = "";
145         
146         for (final var e : queueMapByTopic.entrySet())
147           {
148             b.append(separator).append(String.format("%s[%s]: %d", 
149                     e.getKey().getSimpleName(), e.getKey().equals(latestSentTopic) ? "X" : " ", e.getValue().size()));
150             separator = ", ";
151           }
152 
153         return b.toString();
154       }
155     
156     /***********************************************************************************************************************************************************
157      * Returns the queue associated to a given topic. The queue is created if the topic is new.
158      * 
159      * @param   topic       the topic
160      * @return              the queue
161      **********************************************************************************************************************************************************/
162     @Nonnull
163     private synchronized <T> Queue<T> getQueue (@Nonnull final Class<T> topic)
164       {
165         // TODO Java 8 would make this easier
166         var queue = (Queue<T>)queueMapByTopic.get(topic);
167         
168         if (queue == null)
169           {
170             queue = new LinkedBlockingQueue<>();
171             queueMapByTopic.put(topic, queue);
172           }
173         
174         return queue;
175       }
176   }