1 /* 2 * ************************************************************************************************************************************************************* 3 * 4 * TheseFoolishThings: Miscellaneous utilities 5 * http://tidalwave.it/projects/thesefoolishthings 6 * 7 * Copyright (C) 2009 - 2024 by Tidalwave s.a.s. (http://tidalwave.it) 8 * 9 * ************************************************************************************************************************************************************* 10 * 11 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. 12 * You may obtain a copy of the License at 13 * 14 * http://www.apache.org/licenses/LICENSE-2.0 15 * 16 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 17 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. 18 * 19 * ************************************************************************************************************************************************************* 20 * 21 * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src 22 * git clone https://github.com/tidalwave-it/thesefoolishthings-src 23 * 24 * ************************************************************************************************************************************************************* 25 */ 26 package it.tidalwave.messagebus.spi; 27 28 import javax.annotation.Nonnull; 29 import java.util.ArrayList; 30 import java.util.Comparator; 31 import java.util.List; 32 import java.util.Queue; 33 import java.util.concurrent.ConcurrentNavigableMap; 34 import java.util.concurrent.ConcurrentSkipListMap; 35 import java.util.concurrent.LinkedBlockingQueue; 36 import lombok.Getter; 37 import lombok.RequiredArgsConstructor; 38 import lombok.ToString; 39 import lombok.extern.slf4j.Slf4j; 40 41 /*************************************************************************************************************************************************************** 42 * 43 * @author Fabrizio Giudici 44 * 45 **************************************************************************************************************************************************************/ 46 @Slf4j 47 public class MultiQueue 48 { 49 @RequiredArgsConstructor @Getter @ToString 50 static class TopicAndMessage<T> 51 { 52 @Nonnull 53 private final Class<T> topic; 54 55 @Nonnull 56 private final T message; 57 } 58 59 private final ConcurrentNavigableMap<Class<?>, Queue<?>> queueMapByTopic = 60 new ConcurrentSkipListMap<>(Comparator.comparing(Class::getName)); 61 62 private Class<?> latestSentTopic = null; 63 64 /*********************************************************************************************************************************************************** 65 * Adds a message of the given topic to this queue and issues a notification. 66 * 67 * @param <T> the static type of the message 68 * @param topic the dynamic type of the message 69 * @param message the message 70 **********************************************************************************************************************************************************/ 71 public synchronized <T> void add (@Nonnull final Class<T> topic, @Nonnull final T message) 72 { 73 getQueue(topic).add(message); 74 notifyAll(); 75 } 76 77 /*********************************************************************************************************************************************************** 78 * Removes and returns the next pair (topic, message) from the queue. Blocks until one is available. 79 * 80 * @param <T> the static type of the topic 81 * @return the topic and message 82 * @throws InterruptedException if interrupted while waiting 83 **********************************************************************************************************************************************************/ 84 @Nonnull 85 public synchronized <T> TopicAndMessage<T> remove() 86 throws InterruptedException 87 { 88 for (;;) 89 { 90 for (final var topic : reorderedTopics()) 91 { 92 final var queue = queueMapByTopic.get(topic); 93 final var message = queue.poll(); 94 95 if (message != null) 96 { 97 latestSentTopic = topic; 98 99 if (log.isTraceEnabled()) 100 { 101 log.trace("stats {}", stats()); 102 } 103 104 return new TopicAndMessage<>((Class<T>)topic, (T)message); 105 } 106 } 107 108 if (log.isTraceEnabled()) 109 { 110 log.trace("all queues empty; stats {}", stats()); 111 } 112 113 wait(); 114 } 115 } 116 117 /*********************************************************************************************************************************************************** 118 * Returns the list of topics reordered, so it starts just after latestSentTopic and wraps around. 119 **********************************************************************************************************************************************************/ 120 @Nonnull 121 private List<Class<?>> reorderedTopics() 122 { 123 final var keySet = queueMapByTopic.navigableKeySet(); 124 final List<Class<?>> scanSet = new ArrayList<>(); 125 126 if (latestSentTopic == null) 127 { 128 scanSet.addAll(keySet); 129 } 130 else 131 { 132 scanSet.addAll(keySet.subSet(latestSentTopic, false, keySet.last(), true)); 133 scanSet.addAll(keySet.subSet(keySet.first(), true, latestSentTopic, true)); 134 } 135 136 return scanSet; 137 } 138 139 /*********************************************************************************************************************************************************** 140 **********************************************************************************************************************************************************/ 141 private synchronized String stats() 142 { 143 final var b = new StringBuilder(); 144 var separator = ""; 145 146 for (final var e : queueMapByTopic.entrySet()) 147 { 148 b.append(separator).append(String.format("%s[%s]: %d", 149 e.getKey().getSimpleName(), e.getKey().equals(latestSentTopic) ? "X" : " ", e.getValue().size())); 150 separator = ", "; 151 } 152 153 return b.toString(); 154 } 155 156 /*********************************************************************************************************************************************************** 157 * Returns the queue associated to a given topic. The queue is created if the topic is new. 158 * 159 * @param topic the topic 160 * @return the queue 161 **********************************************************************************************************************************************************/ 162 @Nonnull 163 private synchronized <T> Queue<T> getQueue (@Nonnull final Class<T> topic) 164 { 165 // TODO Java 8 would make this easier 166 var queue = (Queue<T>)queueMapByTopic.get(topic); 167 168 if (queue == null) 169 { 170 queue = new LinkedBlockingQueue<>(); 171 queueMapByTopic.put(topic, queue); 172 } 173 174 return queue; 175 } 176 }