1 /*
2 * *************************************************************************************************************************************************************
3 *
4 * TheseFoolishThings: Miscellaneous utilities
5 * http://tidalwave.it/projects/thesefoolishthings
6 *
7 * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it)
8 *
9 * *************************************************************************************************************************************************************
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 *
14 * http://www.apache.org/licenses/LICENSE-2.0
15 *
16 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
18 *
19 * *************************************************************************************************************************************************************
20 *
21 * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22 * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23 *
24 * *************************************************************************************************************************************************************
25 */
26 package it.tidalwave.actor.impl;
27
28 import javax.annotation.Nonnegative;
29 import javax.annotation.Nonnull;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.LinkedBlockingDeque;
33 import java.util.concurrent.LinkedBlockingQueue;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37 import lombok.extern.slf4j.Slf4j;
38
39 /***************************************************************************************************************************************************************
40 *
41 * An executor that propagates the {@link it.tidalwave.actor.Collaboration}.
42 *
43 * @author Fabrizio Giudici
44 *
45 **************************************************************************************************************************************************************/
46 @Slf4j
47 public class ExecutorWithPriority
48 {
49 private final Runnable consumer = new Runnable()
50 {
51 @Override
52 public void run()
53 {
54 for (;;)
55 {
56 final var runnable = runnableQueue.poll();
57
58 if (runnable == null)
59 {
60 break;
61 }
62
63 runnable.run();
64 }
65 }
66 };
67
68 private static interface PriorityRunnable extends Runnable
69 {
70 }
71
72 private final BlockingQueue<Runnable> runnableQueue = new LinkedBlockingDeque<>()
73 {
74 @Override
75 public boolean add (@Nonnull final Runnable runnable)
76 {
77 if (runnable instanceof PriorityRunnable)
78 {
79 addFirst(runnable);
80 }
81 else
82 {
83 addLast(runnable);
84 }
85
86 return true;
87 }
88 };
89
90 private final Executor executor;
91
92 /***********************************************************************************************************************************************************
93 * Creates an executor with the given pool size.
94 *
95 * @param poolSize the pool size
96 * @param name the thread base name
97 * @param initialPriority the initial thread priority in this executor
98 **********************************************************************************************************************************************************/
99 public ExecutorWithPriority (@Nonnegative final int poolSize,
100 @Nonnull final String name,
101 @Nonnegative final int initialPriority)
102 {
103 final var threadFactory = new ThreadFactory()
104 {
105 private int count = 0;
106
107 @Override @Nonnull
108 public Thread newThread (@Nonnull final Runnable runnable)
109 {
110 final var thread = new Thread(runnable, name + "-" + count++);
111 thread.setPriority(initialPriority);
112 return thread;
113 }
114 };
115
116 // first parameter should be 0, but in this case it goes single thread
117 executor = new ThreadPoolExecutor(poolSize, poolSize, 2, TimeUnit.SECONDS,
118 new LinkedBlockingQueue<>(), threadFactory);
119 // executor = new ThreadPoolExecutor(poolSize, poolSize, 2, TimeUnit.SECONDS, runnableQueue, threadFactory);
120 }
121
122 /***********************************************************************************************************************************************************
123 * Schedules the execution of a worker at the end of the queue.
124 *
125 * @param worker the worker
126 **********************************************************************************************************************************************************/
127 public void execute (@Nonnull final Runnable worker)
128 {
129 runnableQueue.add(worker);
130 executor.execute(consumer);
131 // executor.execute(worker);
132 }
133
134 /***********************************************************************************************************************************************************
135 * Schedules the execution of a worker as soon as possible.
136 *
137 * @param worker the worker
138 **********************************************************************************************************************************************************/
139 public void executeWithPriority (@Nonnull final Runnable worker)
140 {
141 // executor.execute(new PriorityRunnable()
142 runnableQueue.add((PriorityRunnable)worker::run);
143 executor.execute(consumer);
144 }
145 }