View Javadoc
1   /*
2    * *************************************************************************************************************************************************************
3    *
4    * TheseFoolishThings: Miscellaneous utilities
5    * http://tidalwave.it/projects/thesefoolishthings
6    *
7    * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it)
8    *
9    * *************************************************************************************************************************************************************
10   *
11   * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12   * You may obtain a copy of the License at
13   *
14   *     http://www.apache.org/licenses/LICENSE-2.0
15   *
16   * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17   * CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions and limitations under the License.
18   *
19   * *************************************************************************************************************************************************************
20   *
21   * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22   * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23   *
24   * *************************************************************************************************************************************************************
25   */
26  package it.tidalwave.actor.impl;
27  
28  import javax.annotation.Nonnegative;
29  import jakarta.annotation.Nonnull;
30  import java.time.Duration;
31  import java.time.ZonedDateTime;
32  import java.util.ArrayList;
33  import java.util.List;
34  import java.util.UUID;
35  import java.io.Serializable;
36  import it.tidalwave.actor.Collaboration;
37  import it.tidalwave.actor.CollaborationCompletedMessage;
38  import it.tidalwave.actor.CollaborationStartedMessage;
39  import it.tidalwave.actor.annotation.Message;
40  import lombok.EqualsAndHashCode;
41  import lombok.Getter;
42  import lombok.RequiredArgsConstructor;
43  import lombok.ToString;
44  import lombok.extern.slf4j.Slf4j;
45  import static lombok.AccessLevel.PRIVATE;
46  
47  /***************************************************************************************************************************************************************
48   *
49   * @author  Fabrizio Giudici
50   *
51   **************************************************************************************************************************************************************/
52  @RequiredArgsConstructor(access = PRIVATE) @EqualsAndHashCode(of = "id") @Slf4j @ToString(of = "id")
53  public class DefaultCollaboration implements Serializable, Collaboration
54    {
55      /***********************************************************************************************************************************************************
56       *
57       **********************************************************************************************************************************************************/
58      private static final DefaultCollaboration NULL_DEFAULT_COLLABORATION = new DefaultCollaboration(new Object())
59        {
60          @Override
61          public void bindToCurrentThread()
62            {
63            }
64  
65          @Override
66          public void unbindFromCurrentThread()
67            {
68            }
69        };
70  
71      /***********************************************************************************************************************************************************
72       *
73       **********************************************************************************************************************************************************/
74      @RequiredArgsConstructor @ToString
75      private static class IdentityWrapper
76        {
77          @Nonnull
78          private final Object object;
79  
80          @Override
81          public boolean equals (final Object object)
82            {
83              if ((object == null) || (getClass() != object.getClass()))
84                {
85                  return false;
86                }
87  
88              final var other = (IdentityWrapper)object;
89              return this.object == other.object;
90            }
91  
92          @Override
93          public int hashCode()
94            {
95              return object.hashCode();
96            }
97        }
98  
99      private static final ThreadLocal<DefaultCollaboration> THREAD_LOCAL = new ThreadLocal<>();
100 
101     private final UUID id = UUID.randomUUID();
102 
103     @Nonnull @Getter
104     private final Object originatingMessage;
105 
106     private final ZonedDateTime startTime = ZonedDateTime.now();
107 
108     @Getter
109     private boolean completed;
110 
111     private final List<Object> suspensionTokens = new ArrayList<>();
112 
113     /** List of threads currently working for this Collaboration. */
114     // No need for being weak, since objects are explicitly removed
115     private final List<Thread> runningThreads = new ArrayList<>();
116 
117     /** List of messages currently being delivered as part of this Collaboration. */
118     // No need for being weak, since objects are explicitly removed
119     private final List<Object> deliveringMessages = new ArrayList<>();
120 
121     /** List of messages pending to be consumed as part of this Collaboration. */
122     // No need for being weak, since objects are explicitly removed
123     private final List<IdentityWrapper> pendingMessages = new ArrayList<>();
124 
125     private boolean collaborationStartedMessageSent = false;
126 
127     /***********************************************************************************************************************************************************
128      * Factory method to retrieve a {@link Collaboration}. This method accepts any object; if it is an implementation
129      * of {@link Provider}, the object is queried; otherwise {@link #NULL_DEFAULT_COLLABORATION} is returned.
130      *
131      * @param   object    the object associated to the {@code Collaboration}
132      * @return            the {@code Collaboration}
133      **********************************************************************************************************************************************************/
134     @Nonnull
135     public static DefaultCollaboration getCollaboration (@Nonnull final Object object)
136       {
137         return (object instanceof Provider) ? (DefaultCollaboration)((Provider)object).getCollaboration()
138                                             : NULL_DEFAULT_COLLABORATION;
139       }
140 
141     /***********************************************************************************************************************************************************
142      * Gets the {@link Collaboration} bound to the current thread or creates a new one.
143      *
144      * @param   originator    the object that will be considered the originator of the {@code Collaboration} in case it
145      *                        is created
146      * @return                the {@code Collaboration}
147      **********************************************************************************************************************************************************/
148     @Nonnull
149     public static DefaultCollaboration getOrCreateCollaboration (@Nonnull final Object originator)
150       {
151         var collaboration = THREAD_LOCAL.get();
152 
153         if (collaboration == null)
154           {
155             collaboration = new DefaultCollaboration(originator);
156           }
157 
158         return collaboration;
159       }
160 
161     /***********************************************************************************************************************************************************
162      * {@inheritDoc}
163      **********************************************************************************************************************************************************/
164     @Override @Nonnull
165     public ZonedDateTime getStartTime()
166       {
167         return startTime;
168       }
169 
170     /***********************************************************************************************************************************************************
171      * {@inheritDoc}
172      **********************************************************************************************************************************************************/
173     @Override @Nonnull
174     public Duration getDuration()
175       {
176         return Duration.between(startTime, ZonedDateTime.now());
177       }
178 
179     /***********************************************************************************************************************************************************
180      * {@inheritDoc}
181      **********************************************************************************************************************************************************/
182     @Override
183     public synchronized void waitForCompletion()
184       throws InterruptedException
185       {
186         while (!isCompleted())
187           {
188             wait();
189           }
190       }
191 
192     /***********************************************************************************************************************************************************
193      * {@inheritDoc}
194      **********************************************************************************************************************************************************/
195     @Override @Nonnegative
196     public synchronized int getDeliveringMessagesCount()
197       {
198         return deliveringMessages.size();
199       }
200 
201     /***********************************************************************************************************************************************************
202      * {@inheritDoc}
203      **********************************************************************************************************************************************************/
204     @Override @Nonnegative
205     public synchronized int getPendingMessagesCount()
206       {
207         return pendingMessages.size();
208       }
209 
210     /***********************************************************************************************************************************************************
211      * {@inheritDoc}
212      **********************************************************************************************************************************************************/
213     @Override @Nonnegative
214     public synchronized int getRunningThreadsCount()
215       {
216         return runningThreads.size();
217       }
218 
219     /***********************************************************************************************************************************************************
220      * {@inheritDoc}
221      **********************************************************************************************************************************************************/
222     @Override
223     public void resume (@Nonnull final Object suspensionToken, @Nonnull final Runnable runnable)
224       {
225         try
226           {
227             bindToCurrentThread();
228             runnable.run();
229             suspensionTokens.remove(suspensionToken);
230           }
231         finally
232           {
233             unbindFromCurrentThread();
234           }
235       }
236 
237     /***********************************************************************************************************************************************************
238      * {@inheritDoc}
239      **********************************************************************************************************************************************************/
240     @Override
241     public synchronized void resumeAndDie (@Nonnull final Object suspensionToken)
242       {
243         resume(suspensionToken, () -> {});
244       }
245 
246     /***********************************************************************************************************************************************************
247      * {@inheritDoc}
248      **********************************************************************************************************************************************************/
249     @Override
250     public synchronized Object suspend()
251       {
252         final Object suspensionToken = UUID.randomUUID();
253         suspensionTokens.add(suspensionToken);
254         return suspensionToken;
255       }
256 
257     /***********************************************************************************************************************************************************
258      * {@inheritDoc}
259      **********************************************************************************************************************************************************/
260     @Override
261     public synchronized boolean isSuspended()
262       {
263         return !suspensionTokens.isEmpty();
264       }
265 
266     /***********************************************************************************************************************************************************
267      *
268      **********************************************************************************************************************************************************/
269     public synchronized void bindToCurrentThread()
270       {
271         log.trace("bindToCurrentThread()");
272         THREAD_LOCAL.set(this);
273         runningThreads.add(Thread.currentThread());
274         notifyAll();
275         log();
276       }
277 
278     /***********************************************************************************************************************************************************
279      *
280      **********************************************************************************************************************************************************/
281     public synchronized void unbindFromCurrentThread()
282       {
283         log.trace("unbindFromCurrentThread()");
284         runningThreads.remove(Thread.currentThread());
285         THREAD_LOCAL.remove();
286         notifyAll();
287         log();
288         eventuallySendCompletionMessage();
289       }
290 
291     /***********************************************************************************************************************************************************
292      * Registers that the given {@link Message} is being delivered.
293      *
294      * @param  message  the {@code Message}
295      **********************************************************************************************************************************************************/
296     public synchronized void registerDeliveringMessage (@Nonnull final Object message)
297       {
298         log.trace("registerDeliveringMessage({})", message);
299 
300         final var annotation = message.getClass().getAnnotation(Message.class);
301 
302         if (annotation == null)
303           {
304             throw new IllegalArgumentException("Message must be annotated with @Message: " + message.getClass());
305           }
306 
307         if (annotation.daemon())
308           {
309             deliveringMessages.add(message);
310 
311             // Do this *after* enlisting message in deliveringMessages
312             if (!collaborationStartedMessageSent && !(message instanceof CollaborationStartedMessage))
313               {
314                 CollaborationStartedMessage.forCollaboration(this).send();
315                 collaborationStartedMessageSent = true;
316               }
317 
318             notifyAll();
319             log();
320           }
321       }
322 
323     /***********************************************************************************************************************************************************
324      * Registers that the given {@link Message} is no more being delivered.
325      *
326      * @param  message  the {@code Message}
327      **********************************************************************************************************************************************************/
328     public synchronized void unregisterDeliveringMessage (@Nonnull final Object message)
329       {
330         log.trace("unregisterDeliveringMessage({})", message);
331 
332         if (message.getClass().getAnnotation(Message.class).daemon())
333           {
334             deliveringMessages.remove(message);
335             notifyAll();
336             log();
337             eventuallySendCompletionMessage();
338           }
339       }
340 
341     /***********************************************************************************************************************************************************
342      * Registers that the given {@link Message} is pending - this means it is in the recipient's queue, waiting to be
343      * consumed.
344      *
345      * @param  message  the {@code Message}
346      **********************************************************************************************************************************************************/
347     public synchronized void registerPendingMessage (@Nonnull final Object message)
348       {
349         log.trace("registerPendingMessage({})", message);
350 
351         if (message.getClass().getAnnotation(Message.class).daemon())
352           {
353             pendingMessages.add(new IdentityWrapper(message));
354             notifyAll();
355             log();
356           }
357       }
358 
359     /***********************************************************************************************************************************************************
360      * Registers that the given {@link Message} is no more pending.
361      *
362      * @param  message  the {@code Message}
363      **********************************************************************************************************************************************************/
364     public synchronized void unregisterPendingMessage (@Nonnull final Object message)
365       {
366         log.trace("unregisterPendingMessage({})", message);
367 
368         if (message.getClass().getAnnotation(Message.class).daemon())
369           {
370             pendingMessages.remove(new IdentityWrapper(message));
371             notifyAll();
372             log();
373             eventuallySendCompletionMessage();
374           }
375       }
376 
377     /***********************************************************************************************************************************************************
378      * If this {@link Collaboration} has been completed (that is, there are no more messages around for it), sends a
379      * {@link CollaborationCompletedMessage}.
380      **********************************************************************************************************************************************************/
381     private void eventuallySendCompletionMessage()
382       {
383         final var enqueuedMessageCount = deliveringMessages.size()
384                                          + pendingMessages.size()
385                                          + runningThreads.size()
386                                          + suspensionTokens.size();
387 
388         if (!completed && (enqueuedMessageCount == 0))
389           {
390             log.debug(">>>> sending completion message for {}", this);
391             completed = true;
392             THREAD_LOCAL.remove();
393             CollaborationCompletedMessage.forCollaboration(this).send();
394           }
395       }
396 
397     /***********************************************************************************************************************************************************
398      *
399      **********************************************************************************************************************************************************/
400     private void log() // FIXME: drop or move out of synchronized
401       {
402 //        log.trace("{}: delivering messages: {}, pending messages: {}, running threads: {}, suspension tokens: {}",
403 //                  new Object[] {this, deliveringMessages.size(), pendingMessages.size(), runningThreads.size(), suspensionTokens.size()});
404 //
405 //        if (pendingMessages.size() < 2)
406 //          {
407 //            log.trace(">>>> pending messages: {}", pendingMessages);
408 //          }
409       }
410   }