RoundRobinAsyncMessageDelivery.java

  1. /*
  2.  * *********************************************************************************************************************
  3.  *
  4.  * TheseFoolishThings: Miscellaneous utilities
  5.  * http://tidalwave.it/projects/thesefoolishthings
  6.  *
  7.  * Copyright (C) 2009 - 2023 by Tidalwave s.a.s. (http://tidalwave.it)
  8.  *
  9.  * *********************************************************************************************************************
  10.  *
  11.  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  12.  * the License. You may obtain a copy of the License at
  13.  *
  14.  *     http://www.apache.org/licenses/LICENSE-2.0
  15.  *
  16.  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  17.  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the
  18.  * specific language governing permissions and limitations under the License.
  19.  *
  20.  * *********************************************************************************************************************
  21.  *
  22.  * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
  23.  * git clone https://github.com/tidalwave-it/thesefoolishthings-src
  24.  *
  25.  * *********************************************************************************************************************
  26.  */
  27. package it.tidalwave.messagebus.spi;

  28. import javax.annotation.Nonnull;
  29. import it.tidalwave.messagebus.spi.MultiQueue.TopicAndMessage;
  30. import lombok.Getter;
  31. import lombok.Setter;
  32. import lombok.ToString;
  33. import lombok.extern.slf4j.Slf4j;

  34. /***********************************************************************************************************************
  35.  *
  36.  * An implementation of {@link MessageDelivery} that dispatches messages in a round-robin fashion, topic by topic.
  37.  * Each delivery is performed in a separated thread.
  38.  *
  39.  * @author  Fabrizio Giudici
  40.  * @since   2.2
  41.  *
  42.  **********************************************************************************************************************/
  43. @Slf4j @ToString(of = "workers")
  44. public class RoundRobinAsyncMessageDelivery implements MessageDelivery
  45.   {
  46.     @Nonnull
  47.     private SimpleMessageBus messageBusSupport;
  48.    
  49.     @Getter @Setter
  50.     private int workers = 10;
  51.    
  52.     private final MultiQueue multiQueue = new MultiQueue();
  53.    
  54.     /*******************************************************************************************************************
  55.      *
  56.      *
  57.      ******************************************************************************************************************/
  58.     private final Runnable dispatcher = new Runnable()
  59.       {
  60.         @Override
  61.         public void run()
  62.           {
  63.             for (;;)
  64.               {
  65.                 try
  66.                   {
  67.                     dispatchMessage(multiQueue.remove());
  68.                   }
  69.                 catch (InterruptedException e)
  70.                   {
  71.                     break;
  72.                   }
  73.               }
  74.           }
  75.        
  76.         private <T> void dispatchMessage (@Nonnull final TopicAndMessage<T> tam)
  77.           {
  78.             messageBusSupport.dispatchMessage(tam.getTopic(), tam.getMessage());
  79.           }
  80.       };

  81.     /*******************************************************************************************************************
  82.      *
  83.      * {@inheritDoc}
  84.      *
  85.      ******************************************************************************************************************/
  86.     @Override
  87.     public void initialize (@Nonnull final SimpleMessageBus messageBusSupport)
  88.       {
  89.         this.messageBusSupport = messageBusSupport;
  90.         final var executor = this.messageBusSupport.getExecutor();
  91.        
  92.         for (var i = 0; i < workers; i++)
  93.           {
  94.             executor.execute(dispatcher);
  95.           }
  96.       }
  97.    
  98.     /*******************************************************************************************************************
  99.      *
  100.      * {@inheritDoc}
  101.      *
  102.      ******************************************************************************************************************/
  103.     @Override
  104.     public <T> void deliverMessage (@Nonnull final Class<T> topic, @Nonnull final T message)
  105.       {
  106.         multiQueue.add(topic, message);
  107.       }
  108.   }