View Javadoc
1   /*
2    * *************************************************************************************************************************************************************
3    *
4    * TheseFoolishThings: Miscellaneous utilities
5    * http://tidalwave.it/projects/thesefoolishthings
6    *
7    * Copyright (C) 2009 - 2024 by Tidalwave s.a.s. (http://tidalwave.it)
8    *
9    * *************************************************************************************************************************************************************
10   *
11   * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12   * You may obtain a copy of the License at
13   *
14   *     http://www.apache.org/licenses/LICENSE-2.0
15   *
16   * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17   * CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions and limitations under the License.
18   *
19   * *************************************************************************************************************************************************************
20   *
21   * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22   * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23   *
24   * *************************************************************************************************************************************************************
25   */
26  package it.tidalwave.actor.impl;
27  
28  import javax.annotation.Nonnegative;
29  import javax.annotation.Nonnull;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.LinkedBlockingDeque;
33  import java.util.concurrent.LinkedBlockingQueue;
34  import java.util.concurrent.ThreadFactory;
35  import java.util.concurrent.ThreadPoolExecutor;
36  import java.util.concurrent.TimeUnit;
37  import lombok.extern.slf4j.Slf4j;
38  
39  /***************************************************************************************************************************************************************
40   *
41   * An executor that propagates the {@link it.tidalwave.actor.Collaboration}.
42   *
43   * @author  Fabrizio Giudici
44   *
45   **************************************************************************************************************************************************************/
46  @Slf4j
47  public class ExecutorWithPriority
48    {
49      private final Runnable consumer = new Runnable()
50        {
51          @Override
52          public void run()
53            {
54              for (;;)
55                {
56                  final var runnable = runnableQueue.poll();
57  
58                  if (runnable == null)
59                    {
60                      break;
61                    }
62  
63                  runnable.run();
64                }
65            }
66        };
67  
68      private static interface PriorityRunnable extends Runnable
69        {
70        }
71  
72      private final BlockingQueue<Runnable> runnableQueue = new LinkedBlockingDeque<>()
73        {
74          @Override
75          public boolean add (@Nonnull final Runnable runnable)
76            {
77              if (runnable instanceof PriorityRunnable)
78                {
79                  addFirst(runnable);
80                }
81              else
82                {
83                  addLast(runnable);
84                }
85  
86              return true;
87            }
88        };
89  
90      private final Executor executor;
91  
92      /***********************************************************************************************************************************************************
93       * Creates an executor with the given pool size.
94       *
95       * @param  poolSize          the pool size
96       * @param  name              the thread base name
97       * @param  initialPriority   the initial thread priority in this executor
98       **********************************************************************************************************************************************************/
99      public ExecutorWithPriority (@Nonnegative final int poolSize,
100                                  @Nonnull final String name,
101                                  @Nonnegative final int initialPriority)
102       {
103         final var threadFactory = new ThreadFactory()
104           {
105             private int count = 0;
106 
107             @Override @Nonnull
108             public Thread newThread (@Nonnull final Runnable runnable)
109               {
110                 final var thread = new Thread(runnable, name + "-" + count++);
111                 thread.setPriority(initialPriority);
112                 return thread;
113               }
114           };
115 
116         // first parameter should be 0, but in this case it goes single thread
117         executor = new ThreadPoolExecutor(poolSize, poolSize, 2, TimeUnit.SECONDS,
118                                           new LinkedBlockingQueue<>(), threadFactory);
119 //        executor = new ThreadPoolExecutor(poolSize, poolSize, 2, TimeUnit.SECONDS, runnableQueue, threadFactory);
120       }
121 
122     /***********************************************************************************************************************************************************
123      * Schedules the execution of a worker at the end of the queue.
124      *
125      * @param  worker  the worker
126      **********************************************************************************************************************************************************/
127     public void execute (@Nonnull final Runnable worker)
128       {
129         runnableQueue.add(worker);
130         executor.execute(consumer);
131 //        executor.execute(worker);
132       }
133 
134     /***********************************************************************************************************************************************************
135      * Schedules the execution of a worker as soon as possible.
136      *
137      * @param  worker  the worker
138      **********************************************************************************************************************************************************/
139     public void executeWithPriority (@Nonnull final Runnable worker)
140       {
141 //        executor.execute(new PriorityRunnable()
142         runnableQueue.add((PriorityRunnable)worker::run);
143         executor.execute(consumer);
144       }
145   }