1 /* 2 * ************************************************************************************************************************************************************* 3 * 4 * TheseFoolishThings: Miscellaneous utilities 5 * http://tidalwave.it/projects/thesefoolishthings 6 * 7 * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it) 8 * 9 * ************************************************************************************************************************************************************* 10 * 11 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. 12 * You may obtain a copy of the License at 13 * 14 * http://www.apache.org/licenses/LICENSE-2.0 15 * 16 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 17 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. 18 * 19 * ************************************************************************************************************************************************************* 20 * 21 * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src 22 * git clone https://github.com/tidalwave-it/thesefoolishthings-src 23 * 24 * ************************************************************************************************************************************************************* 25 */ 26 package it.tidalwave.actor.impl; 27 28 import javax.annotation.Nonnegative; 29 import jakarta.annotation.Nonnull; 30 import java.time.Duration; 31 import java.time.ZonedDateTime; 32 import java.util.ArrayList; 33 import java.util.List; 34 import java.util.UUID; 35 import java.io.Serializable; 36 import it.tidalwave.actor.Collaboration; 37 import it.tidalwave.actor.CollaborationCompletedMessage; 38 import it.tidalwave.actor.CollaborationStartedMessage; 39 import it.tidalwave.actor.annotation.Message; 40 import lombok.EqualsAndHashCode; 41 import lombok.Getter; 42 import lombok.RequiredArgsConstructor; 43 import lombok.ToString; 44 import lombok.extern.slf4j.Slf4j; 45 import static lombok.AccessLevel.PRIVATE; 46 47 /*************************************************************************************************************************************************************** 48 * 49 * @author Fabrizio Giudici 50 * 51 **************************************************************************************************************************************************************/ 52 @RequiredArgsConstructor(access = PRIVATE) @EqualsAndHashCode(of = "id") @Slf4j @ToString(of = "id") 53 public class DefaultCollaboration implements Serializable, Collaboration 54 { 55 /*********************************************************************************************************************************************************** 56 * 57 **********************************************************************************************************************************************************/ 58 private static final DefaultCollaboration NULL_DEFAULT_COLLABORATION = new DefaultCollaboration(new Object()) 59 { 60 @Override 61 public void bindToCurrentThread() 62 { 63 } 64 65 @Override 66 public void unbindFromCurrentThread() 67 { 68 } 69 }; 70 71 /*********************************************************************************************************************************************************** 72 * 73 **********************************************************************************************************************************************************/ 74 @RequiredArgsConstructor @ToString 75 private static class IdentityWrapper 76 { 77 @Nonnull 78 private final Object object; 79 80 @Override 81 public boolean equals (final Object object) 82 { 83 if ((object == null) || (getClass() != object.getClass())) 84 { 85 return false; 86 } 87 88 final var other = (IdentityWrapper)object; 89 return this.object == other.object; 90 } 91 92 @Override 93 public int hashCode() 94 { 95 return object.hashCode(); 96 } 97 } 98 99 private static final ThreadLocal<DefaultCollaboration> THREAD_LOCAL = new ThreadLocal<>(); 100 101 private final UUID id = UUID.randomUUID(); 102 103 @Nonnull @Getter 104 private final Object originatingMessage; 105 106 private final ZonedDateTime startTime = ZonedDateTime.now(); 107 108 @Getter 109 private boolean completed; 110 111 private final List<Object> suspensionTokens = new ArrayList<>(); 112 113 /** List of threads currently working for this Collaboration. */ 114 // No need for being weak, since objects are explicitly removed 115 private final List<Thread> runningThreads = new ArrayList<>(); 116 117 /** List of messages currently being delivered as part of this Collaboration. */ 118 // No need for being weak, since objects are explicitly removed 119 private final List<Object> deliveringMessages = new ArrayList<>(); 120 121 /** List of messages pending to be consumed as part of this Collaboration. */ 122 // No need for being weak, since objects are explicitly removed 123 private final List<IdentityWrapper> pendingMessages = new ArrayList<>(); 124 125 private boolean collaborationStartedMessageSent = false; 126 127 /*********************************************************************************************************************************************************** 128 * Factory method to retrieve a {@link Collaboration}. This method accepts any object; if it is an implementation 129 * of {@link Provider}, the object is queried; otherwise {@link #NULL_DEFAULT_COLLABORATION} is returned. 130 * 131 * @param object the object associated to the {@code Collaboration} 132 * @return the {@code Collaboration} 133 **********************************************************************************************************************************************************/ 134 @Nonnull 135 public static DefaultCollaboration getCollaboration (@Nonnull final Object object) 136 { 137 return (object instanceof Provider) ? (DefaultCollaboration)((Provider)object).getCollaboration() 138 : NULL_DEFAULT_COLLABORATION; 139 } 140 141 /*********************************************************************************************************************************************************** 142 * Gets the {@link Collaboration} bound to the current thread or creates a new one. 143 * 144 * @param originator the object that will be considered the originator of the {@code Collaboration} in case it 145 * is created 146 * @return the {@code Collaboration} 147 **********************************************************************************************************************************************************/ 148 @Nonnull 149 public static DefaultCollaboration getOrCreateCollaboration (@Nonnull final Object originator) 150 { 151 var collaboration = THREAD_LOCAL.get(); 152 153 if (collaboration == null) 154 { 155 collaboration = new DefaultCollaboration(originator); 156 } 157 158 return collaboration; 159 } 160 161 /*********************************************************************************************************************************************************** 162 * {@inheritDoc} 163 **********************************************************************************************************************************************************/ 164 @Override @Nonnull 165 public ZonedDateTime getStartTime() 166 { 167 return startTime; 168 } 169 170 /*********************************************************************************************************************************************************** 171 * {@inheritDoc} 172 **********************************************************************************************************************************************************/ 173 @Override @Nonnull 174 public Duration getDuration() 175 { 176 return Duration.between(startTime, ZonedDateTime.now()); 177 } 178 179 /*********************************************************************************************************************************************************** 180 * {@inheritDoc} 181 **********************************************************************************************************************************************************/ 182 @Override 183 public synchronized void waitForCompletion() 184 throws InterruptedException 185 { 186 while (!isCompleted()) 187 { 188 wait(); 189 } 190 } 191 192 /*********************************************************************************************************************************************************** 193 * {@inheritDoc} 194 **********************************************************************************************************************************************************/ 195 @Override @Nonnegative 196 public synchronized int getDeliveringMessagesCount() 197 { 198 return deliveringMessages.size(); 199 } 200 201 /*********************************************************************************************************************************************************** 202 * {@inheritDoc} 203 **********************************************************************************************************************************************************/ 204 @Override @Nonnegative 205 public synchronized int getPendingMessagesCount() 206 { 207 return pendingMessages.size(); 208 } 209 210 /*********************************************************************************************************************************************************** 211 * {@inheritDoc} 212 **********************************************************************************************************************************************************/ 213 @Override @Nonnegative 214 public synchronized int getRunningThreadsCount() 215 { 216 return runningThreads.size(); 217 } 218 219 /*********************************************************************************************************************************************************** 220 * {@inheritDoc} 221 **********************************************************************************************************************************************************/ 222 @Override 223 public void resume (@Nonnull final Object suspensionToken, @Nonnull final Runnable runnable) 224 { 225 try 226 { 227 bindToCurrentThread(); 228 runnable.run(); 229 suspensionTokens.remove(suspensionToken); 230 } 231 finally 232 { 233 unbindFromCurrentThread(); 234 } 235 } 236 237 /*********************************************************************************************************************************************************** 238 * {@inheritDoc} 239 **********************************************************************************************************************************************************/ 240 @Override 241 public synchronized void resumeAndDie (@Nonnull final Object suspensionToken) 242 { 243 resume(suspensionToken, () -> {}); 244 } 245 246 /*********************************************************************************************************************************************************** 247 * {@inheritDoc} 248 **********************************************************************************************************************************************************/ 249 @Override 250 public synchronized Object suspend() 251 { 252 final Object suspensionToken = UUID.randomUUID(); 253 suspensionTokens.add(suspensionToken); 254 return suspensionToken; 255 } 256 257 /*********************************************************************************************************************************************************** 258 * {@inheritDoc} 259 **********************************************************************************************************************************************************/ 260 @Override 261 public synchronized boolean isSuspended() 262 { 263 return !suspensionTokens.isEmpty(); 264 } 265 266 /*********************************************************************************************************************************************************** 267 * 268 **********************************************************************************************************************************************************/ 269 public synchronized void bindToCurrentThread() 270 { 271 log.trace("bindToCurrentThread()"); 272 THREAD_LOCAL.set(this); 273 runningThreads.add(Thread.currentThread()); 274 notifyAll(); 275 log(); 276 } 277 278 /*********************************************************************************************************************************************************** 279 * 280 **********************************************************************************************************************************************************/ 281 public synchronized void unbindFromCurrentThread() 282 { 283 log.trace("unbindFromCurrentThread()"); 284 runningThreads.remove(Thread.currentThread()); 285 THREAD_LOCAL.remove(); 286 notifyAll(); 287 log(); 288 eventuallySendCompletionMessage(); 289 } 290 291 /*********************************************************************************************************************************************************** 292 * Registers that the given {@link Message} is being delivered. 293 * 294 * @param message the {@code Message} 295 **********************************************************************************************************************************************************/ 296 public synchronized void registerDeliveringMessage (@Nonnull final Object message) 297 { 298 log.trace("registerDeliveringMessage({})", message); 299 300 final var annotation = message.getClass().getAnnotation(Message.class); 301 302 if (annotation == null) 303 { 304 throw new IllegalArgumentException("Message must be annotated with @Message: " + message.getClass()); 305 } 306 307 if (annotation.daemon()) 308 { 309 deliveringMessages.add(message); 310 311 // Do this *after* enlisting message in deliveringMessages 312 if (!collaborationStartedMessageSent && !(message instanceof CollaborationStartedMessage)) 313 { 314 CollaborationStartedMessage.forCollaboration(this).send(); 315 collaborationStartedMessageSent = true; 316 } 317 318 notifyAll(); 319 log(); 320 } 321 } 322 323 /*********************************************************************************************************************************************************** 324 * Registers that the given {@link Message} is no more being delivered. 325 * 326 * @param message the {@code Message} 327 **********************************************************************************************************************************************************/ 328 public synchronized void unregisterDeliveringMessage (@Nonnull final Object message) 329 { 330 log.trace("unregisterDeliveringMessage({})", message); 331 332 if (message.getClass().getAnnotation(Message.class).daemon()) 333 { 334 deliveringMessages.remove(message); 335 notifyAll(); 336 log(); 337 eventuallySendCompletionMessage(); 338 } 339 } 340 341 /*********************************************************************************************************************************************************** 342 * Registers that the given {@link Message} is pending - this means it is in the recipient's queue, waiting to be 343 * consumed. 344 * 345 * @param message the {@code Message} 346 **********************************************************************************************************************************************************/ 347 public synchronized void registerPendingMessage (@Nonnull final Object message) 348 { 349 log.trace("registerPendingMessage({})", message); 350 351 if (message.getClass().getAnnotation(Message.class).daemon()) 352 { 353 pendingMessages.add(new IdentityWrapper(message)); 354 notifyAll(); 355 log(); 356 } 357 } 358 359 /*********************************************************************************************************************************************************** 360 * Registers that the given {@link Message} is no more pending. 361 * 362 * @param message the {@code Message} 363 **********************************************************************************************************************************************************/ 364 public synchronized void unregisterPendingMessage (@Nonnull final Object message) 365 { 366 log.trace("unregisterPendingMessage({})", message); 367 368 if (message.getClass().getAnnotation(Message.class).daemon()) 369 { 370 pendingMessages.remove(new IdentityWrapper(message)); 371 notifyAll(); 372 log(); 373 eventuallySendCompletionMessage(); 374 } 375 } 376 377 /*********************************************************************************************************************************************************** 378 * If this {@link Collaboration} has been completed (that is, there are no more messages around for it), sends a 379 * {@link CollaborationCompletedMessage}. 380 **********************************************************************************************************************************************************/ 381 private void eventuallySendCompletionMessage() 382 { 383 final var enqueuedMessageCount = deliveringMessages.size() 384 + pendingMessages.size() 385 + runningThreads.size() 386 + suspensionTokens.size(); 387 388 if (!completed && (enqueuedMessageCount == 0)) 389 { 390 log.debug(">>>> sending completion message for {}", this); 391 completed = true; 392 THREAD_LOCAL.remove(); 393 CollaborationCompletedMessage.forCollaboration(this).send(); 394 } 395 } 396 397 /*********************************************************************************************************************************************************** 398 * 399 **********************************************************************************************************************************************************/ 400 private void log() // FIXME: drop or move out of synchronized 401 { 402 // log.trace("{}: delivering messages: {}, pending messages: {}, running threads: {}, suspension tokens: {}", 403 // new Object[] {this, deliveringMessages.size(), pendingMessages.size(), runningThreads.size(), suspensionTokens.size()}); 404 // 405 // if (pendingMessages.size() < 2) 406 // { 407 // log.trace(">>>> pending messages: {}", pendingMessages); 408 // } 409 } 410 }