1 /*
2 * *************************************************************************************************************************************************************
3 *
4 * TheseFoolishThings: Miscellaneous utilities
5 * http://tidalwave.it/projects/thesefoolishthings
6 *
7 * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it)
8 *
9 * *************************************************************************************************************************************************************
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 *
14 * http://www.apache.org/licenses/LICENSE-2.0
15 *
16 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
18 *
19 * *************************************************************************************************************************************************************
20 *
21 * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22 * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23 *
24 * *************************************************************************************************************************************************************
25 */
26 package it.tidalwave.messagebus.spi;
27
28 import javax.annotation.Nonnull;
29 import java.util.ArrayList;
30 import java.util.Comparator;
31 import java.util.List;
32 import java.util.Queue;
33 import java.util.concurrent.ConcurrentNavigableMap;
34 import java.util.concurrent.ConcurrentSkipListMap;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import lombok.Getter;
37 import lombok.RequiredArgsConstructor;
38 import lombok.ToString;
39 import lombok.extern.slf4j.Slf4j;
40
41 /***************************************************************************************************************************************************************
42 *
43 * @author Fabrizio Giudici
44 *
45 **************************************************************************************************************************************************************/
46 @Slf4j
47 public class MultiQueue
48 {
49 @RequiredArgsConstructor @Getter @ToString
50 static class TopicAndMessage<T>
51 {
52 @Nonnull
53 private final Class<T> topic;
54
55 @Nonnull
56 private final T message;
57 }
58
59 private final ConcurrentNavigableMap<Class<?>, Queue<?>> queueMapByTopic =
60 new ConcurrentSkipListMap<>(Comparator.comparing(Class::getName));
61
62 private Class<?> latestSentTopic = null;
63
64 /***********************************************************************************************************************************************************
65 * Adds a message of the given topic to this queue and issues a notification.
66 *
67 * @param <T> the static type of the message
68 * @param topic the dynamic type of the message
69 * @param message the message
70 **********************************************************************************************************************************************************/
71 public synchronized <T> void add (@Nonnull final Class<T> topic, @Nonnull final T message)
72 {
73 getQueue(topic).add(message);
74 notifyAll();
75 }
76
77 /***********************************************************************************************************************************************************
78 * Removes and returns the next pair (topic, message) from the queue. Blocks until one is available.
79 *
80 * @param <T> the static type of the topic
81 * @return the topic and message
82 * @throws InterruptedException if interrupted while waiting
83 **********************************************************************************************************************************************************/
84 @Nonnull
85 public synchronized <T> TopicAndMessage<T> remove()
86 throws InterruptedException
87 {
88 for (;;)
89 {
90 for (final var topic : reorderedTopics())
91 {
92 final var queue = queueMapByTopic.get(topic);
93 final var message = queue.poll();
94
95 if (message != null)
96 {
97 latestSentTopic = topic;
98
99 if (log.isTraceEnabled())
100 {
101 log.trace("stats {}", stats());
102 }
103
104 return new TopicAndMessage<>((Class<T>)topic, (T)message);
105 }
106 }
107
108 if (log.isTraceEnabled())
109 {
110 log.trace("all queues empty; stats {}", stats());
111 }
112
113 wait();
114 }
115 }
116
117 /***********************************************************************************************************************************************************
118 * Returns the list of topics reordered, so it starts just after latestSentTopic and wraps around.
119 **********************************************************************************************************************************************************/
120 @Nonnull
121 private List<Class<?>> reorderedTopics()
122 {
123 final var keySet = queueMapByTopic.navigableKeySet();
124 final List<Class<?>> scanSet = new ArrayList<>();
125
126 if (latestSentTopic == null)
127 {
128 scanSet.addAll(keySet);
129 }
130 else
131 {
132 scanSet.addAll(keySet.subSet(latestSentTopic, false, keySet.last(), true));
133 scanSet.addAll(keySet.subSet(keySet.first(), true, latestSentTopic, true));
134 }
135
136 return scanSet;
137 }
138
139 /***********************************************************************************************************************************************************
140 **********************************************************************************************************************************************************/
141 private synchronized String stats()
142 {
143 final var b = new StringBuilder();
144 var separator = "";
145
146 for (final var e : queueMapByTopic.entrySet())
147 {
148 b.append(separator).append(String.format("%s[%s]: %d",
149 e.getKey().getSimpleName(), e.getKey().equals(latestSentTopic) ? "X" : " ", e.getValue().size()));
150 separator = ", ";
151 }
152
153 return b.toString();
154 }
155
156 /***********************************************************************************************************************************************************
157 * Returns the queue associated to a given topic. The queue is created if the topic is new.
158 *
159 * @param topic the topic
160 * @return the queue
161 **********************************************************************************************************************************************************/
162 @Nonnull
163 private synchronized <T> Queue<T> getQueue (@Nonnull final Class<T> topic)
164 {
165 // TODO Java 8 would make this easier
166 var queue = (Queue<T>)queueMapByTopic.get(topic);
167
168 if (queue == null)
169 {
170 queue = new LinkedBlockingQueue<>();
171 queueMapByTopic.put(topic, queue);
172 }
173
174 return queue;
175 }
176 }