RoundRobinAsyncMessageDelivery.java

  1. /*
  2.  * *********************************************************************************************************************
  3.  *
  4.  * TheseFoolishThings: Miscellaneous utilities
  5.  * http://tidalwave.it/projects/thesefoolishthings
  6.  *
  7.  * Copyright (C) 2009 - 2023 by Tidalwave s.a.s. (http://tidalwave.it)
  8.  *
  9.  * *********************************************************************************************************************
  10.  *
  11.  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  12.  * the License. You may obtain a copy of the License at
  13.  *
  14.  *     http://www.apache.org/licenses/LICENSE-2.0
  15.  *
  16.  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  17.  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the
  18.  * specific language governing permissions and limitations under the License.
  19.  *
  20.  * *********************************************************************************************************************
  21.  *
  22.  * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
  23.  * git clone https://github.com/tidalwave-it/thesefoolishthings-src
  24.  *
  25.  * *********************************************************************************************************************
  26.  */
  27. package it.tidalwave.messagebus.spi;

  28. import javax.annotation.Nonnull;
  29. import java.util.concurrent.Executor;
  30. import it.tidalwave.messagebus.spi.MultiQueue.TopicAndMessage;
  31. import lombok.Getter;
  32. import lombok.Setter;
  33. import lombok.ToString;
  34. import lombok.extern.slf4j.Slf4j;

  35. /***********************************************************************************************************************
  36.  *
  37.  * An implementation of {@link MessageDelivery} that dispatches messages in a round-robin fashion, topic by topic.
  38.  * Each delivery is performed in a separated thread.
  39.  *
  40.  * @author  Fabrizio Giudici
  41.  * @since   2.2
  42.  *
  43.  **********************************************************************************************************************/
  44. @Slf4j @ToString(of = "workers")
  45. public class RoundRobinAsyncMessageDelivery implements MessageDelivery
  46.   {
  47.     @Nonnull
  48.     private SimpleMessageBus messageBusSupport;
  49.    
  50.     @Getter @Setter
  51.     private int workers = 10;
  52.    
  53.     private final MultiQueue multiQueue = new MultiQueue();
  54.    
  55.     /*******************************************************************************************************************
  56.      *
  57.      *
  58.      ******************************************************************************************************************/
  59.     private final Runnable dispatcher = new Runnable()
  60.       {
  61.         @Override
  62.         public void run()
  63.           {
  64.             for (;;)
  65.               {
  66.                 try
  67.                   {
  68.                     dispatchMessage(multiQueue.remove());
  69.                   }
  70.                 catch (InterruptedException e)
  71.                   {
  72.                     break;
  73.                   }
  74.               }
  75.           }
  76.        
  77.         private <TOPIC> void dispatchMessage (@Nonnull final TopicAndMessage<TOPIC> tam)
  78.           {
  79.             messageBusSupport.dispatchMessage(tam.getTopic(), tam.getMessage());
  80.           }
  81.       };

  82.     /*******************************************************************************************************************
  83.      *
  84.      * {@inheritDoc}
  85.      *
  86.      ******************************************************************************************************************/
  87.     @Override
  88.     public void initialize (@Nonnull final SimpleMessageBus messageBusSupport)
  89.       {
  90.         this.messageBusSupport = messageBusSupport;
  91.         final Executor executor = this.messageBusSupport.getExecutor();
  92.        
  93.         for (int i = 0; i < workers; i++)
  94.           {
  95.             executor.execute(dispatcher);
  96.           }
  97.       }
  98.    
  99.     /*******************************************************************************************************************
  100.      *
  101.      * {@inheritDoc}
  102.      *
  103.      ******************************************************************************************************************/
  104.     @Override
  105.     public <TOPIC> void deliverMessage (@Nonnull final Class<TOPIC> topic, @Nonnull final TOPIC message)
  106.       {
  107.         multiQueue.add(topic, message);
  108.       }
  109.   }