MultiQueue.java

  1. /*
  2.  * *********************************************************************************************************************
  3.  *
  4.  * TheseFoolishThings: Miscellaneous utilities
  5.  * http://tidalwave.it/projects/thesefoolishthings
  6.  *
  7.  * Copyright (C) 2009 - 2023 by Tidalwave s.a.s. (http://tidalwave.it)
  8.  *
  9.  * *********************************************************************************************************************
  10.  *
  11.  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  12.  * the License. You may obtain a copy of the License at
  13.  *
  14.  *     http://www.apache.org/licenses/LICENSE-2.0
  15.  *
  16.  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  17.  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the
  18.  * specific language governing permissions and limitations under the License.
  19.  *
  20.  * *********************************************************************************************************************
  21.  *
  22.  * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
  23.  * git clone https://github.com/tidalwave-it/thesefoolishthings-src
  24.  *
  25.  * *********************************************************************************************************************
  26.  */
  27. package it.tidalwave.messagebus.spi;

  28. import javax.annotation.Nonnull;
  29. import java.util.ArrayList;
  30. import java.util.Comparator;
  31. import java.util.List;
  32. import java.util.Queue;
  33. import java.util.concurrent.ConcurrentNavigableMap;
  34. import java.util.concurrent.ConcurrentSkipListMap;
  35. import java.util.concurrent.LinkedBlockingQueue;
  36. import lombok.Getter;
  37. import lombok.RequiredArgsConstructor;
  38. import lombok.ToString;
  39. import lombok.extern.slf4j.Slf4j;

  40. /***********************************************************************************************************************
  41.  *
  42.  * @author  Fabrizio Giudici
  43.  *
  44.  **********************************************************************************************************************/
  45. @Slf4j
  46. public class MultiQueue
  47.   {
  48.     @RequiredArgsConstructor @Getter @ToString
  49.     static class TopicAndMessage<T>
  50.       {
  51.         @Nonnull
  52.         private final Class<T> topic;

  53.         @Nonnull
  54.         private final T message;
  55.       }
  56.      
  57.     private final ConcurrentNavigableMap<Class<?>, Queue<?>> queueMapByTopic =
  58.             new ConcurrentSkipListMap<>(Comparator.comparing(Class::getName));
  59.    
  60.     private Class<?> latestSentTopic = null;
  61.    
  62.     /*******************************************************************************************************************
  63.      *
  64.      * Adds a message of the given topic to this queue and issues a notification.
  65.      *
  66.      * @param   <T>   the static type of the message
  67.      * @param   topic     the dynamic type of the message
  68.      * @param   message   the message
  69.      *
  70.      ******************************************************************************************************************/
  71.     public synchronized <T> void add (@Nonnull final Class<T> topic, @Nonnull final T message)
  72.       {
  73.         getQueue(topic).add(message);
  74.         notifyAll();
  75.       }
  76.    
  77.     /*******************************************************************************************************************
  78.      *
  79.      * Removes and returns the next pair (topic, message) from the queue. Blocks until one is available.
  80.      *
  81.      * @param   <T>                 the static type of the topic
  82.      * @return                          the topic and message
  83.      * @throws  InterruptedException    if interrupted while waiting
  84.      *
  85.      ******************************************************************************************************************/
  86.     @Nonnull
  87.     public synchronized <T> TopicAndMessage<T> remove()
  88.       throws InterruptedException
  89.       {
  90.         for (;;)
  91.           {
  92.             for (final var topic : reorderedTopics())
  93.               {
  94.                 final var queue = queueMapByTopic.get(topic);
  95.                 final var message = queue.poll();
  96.                
  97.                 if (message != null)
  98.                   {
  99.                     latestSentTopic = topic;
  100.                    
  101.                     if (log.isTraceEnabled())
  102.                       {
  103.                         log.trace("stats {}", stats());
  104.                       }
  105.                    
  106.                     return new TopicAndMessage<>((Class<T>)topic, (T)message);
  107.                   }
  108.               }

  109.             if (log.isTraceEnabled())
  110.               {
  111.                 log.trace("all queues empty; stats {}", stats());
  112.               }
  113.    
  114.             wait();  
  115.           }
  116.       }

  117.     /*******************************************************************************************************************
  118.      *
  119.      * Returns the list of topics reordered, so it starts just after latestSentTopic and wraps around.
  120.      *
  121.      ******************************************************************************************************************/
  122.     @Nonnull
  123.     private List<Class<?>> reorderedTopics()
  124.       {
  125.         final var keySet = queueMapByTopic.navigableKeySet();
  126.         final List<Class<?>> scanSet = new ArrayList<>();

  127.         if (latestSentTopic == null)
  128.           {
  129.             scanSet.addAll(keySet);
  130.           }
  131.         else
  132.           {
  133.             scanSet.addAll(keySet.subSet(latestSentTopic, false, keySet.last(), true));
  134.             scanSet.addAll(keySet.subSet(keySet.first(), true, latestSentTopic, true));
  135.           }
  136.        
  137.         return scanSet;
  138.       }
  139.    
  140.     /*******************************************************************************************************************
  141.      *
  142.      *
  143.      ******************************************************************************************************************/
  144.     private synchronized String stats()
  145.       {
  146.         final var b = new StringBuilder();
  147.         var separator = "";
  148.        
  149.         for (final var e : queueMapByTopic.entrySet())
  150.           {
  151.             b.append(separator).append(String.format("%s[%s]: %d",
  152.                     e.getKey().getSimpleName(), e.getKey().equals(latestSentTopic) ? "X" : " ", e.getValue().size()));
  153.             separator = ", ";
  154.           }

  155.         return b.toString();
  156.       }
  157.    
  158.     /*******************************************************************************************************************
  159.      *
  160.      * Returns the queue associated to a given topic. The queue is created if the topic is new.
  161.      *
  162.      * @param   topic       the topic
  163.      * @return              the queue
  164.      *
  165.      ******************************************************************************************************************/
  166.     @Nonnull
  167.     private synchronized <T> Queue<T> getQueue (@Nonnull final Class<T> topic)
  168.       {
  169.         // TODO Java 8 would make this easier
  170.         var queue = (Queue<T>)queueMapByTopic.get(topic);
  171.        
  172.         if (queue == null)
  173.           {
  174.             queue = new LinkedBlockingQueue<>();
  175.             queueMapByTopic.put(topic, queue);
  176.           }
  177.        
  178.         return queue;
  179.       }
  180.   }