1 /* 2 * ************************************************************************************************************************************************************* 3 * 4 * TheseFoolishThings: Miscellaneous utilities 5 * http://tidalwave.it/projects/thesefoolishthings 6 * 7 * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it) 8 * 9 * ************************************************************************************************************************************************************* 10 * 11 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. 12 * You may obtain a copy of the License at 13 * 14 * http://www.apache.org/licenses/LICENSE-2.0 15 * 16 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 17 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. 18 * 19 * ************************************************************************************************************************************************************* 20 * 21 * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src 22 * git clone https://github.com/tidalwave-it/thesefoolishthings-src 23 * 24 * ************************************************************************************************************************************************************* 25 */ 26 package it.tidalwave.actor.impl; 27 28 import java.lang.reflect.Method; 29 import javax.annotation.Nonnull; 30 import javax.inject.Provider; 31 import java.util.ArrayList; 32 import java.util.List; 33 import it.tidalwave.actor.CollaborationCompletedMessage; 34 import it.tidalwave.actor.CollaborationStartedMessage; 35 import it.tidalwave.actor.annotation.OriginatedBy; 36 import it.tidalwave.actor.spi.ActorActivatorStats; 37 import it.tidalwave.actor.spi.CollaborationAwareMessageBus; 38 import it.tidalwave.messagebus.MessageBus; 39 import it.tidalwave.messagebus.annotation.ListensTo; 40 import lombok.RequiredArgsConstructor; 41 import lombok.extern.slf4j.Slf4j; 42 import static it.tidalwave.messagebus.spi.ReflectionUtils.*; 43 44 /*************************************************************************************************************************************************************** 45 * 46 * @author Fabrizio Giudici 47 * 48 **************************************************************************************************************************************************************/ 49 @RequiredArgsConstructor @Slf4j 50 public class CollaborationAwareMessageBusAdapter implements MethodProcessor 51 { 52 @Nonnull 53 private final Object owner; 54 55 @Nonnull 56 private final ExecutorWithPriority executor; 57 58 @Nonnull 59 private final ActorActivatorStats stats; 60 61 private final List<MessageBus.Listener<?>> messageBusListeners = new ArrayList<>(); 62 63 private final Provider<CollaborationAwareMessageBus> messageBus = 64 Locator.createProviderFor(CollaborationAwareMessageBus.class); 65 66 /*********************************************************************************************************************************************************** 67 * 68 **********************************************************************************************************************************************************/ 69 @Override @Nonnull 70 public FilterResult filter (@Nonnull final Class<?> clazz) 71 { 72 return FilterResult.ACCEPT; // TODO: should filter with @Actor? 73 } 74 75 /*********************************************************************************************************************************************************** 76 * 77 **********************************************************************************************************************************************************/ 78 @Override 79 public void process (@Nonnull final Method method) 80 { 81 final var parameterAnnotations = method.getParameterAnnotations(); 82 83 if ((parameterAnnotations.length == 1) && containsAnnotation(parameterAnnotations[0], ListensTo.class)) 84 { 85 registerMessageListener(method); 86 } 87 else if ((parameterAnnotations.length == 2) && containsAnnotation(parameterAnnotations[0], ListensTo.class) 88 && containsAnnotation(parameterAnnotations[1], OriginatedBy.class)) 89 { 90 registerCollaborationListener(method); 91 } 92 } 93 94 /*********************************************************************************************************************************************************** 95 * 96 **********************************************************************************************************************************************************/ 97 public void unsubscribe() 98 { 99 for (final var listener : messageBusListeners) 100 { 101 messageBus.get().unsubscribe(listener); 102 } 103 } 104 105 /*********************************************************************************************************************************************************** 106 * 107 **********************************************************************************************************************************************************/ 108 private <T> void registerMessageListener (@Nonnull final Method method) 109 { 110 log.info("registerMessageListener({})", method); 111 112 final var topic = (Class<T>)method.getParameterTypes()[0]; 113 addListener(method, new MessageListenerAdapter<>(owner, method, executor, stats), topic); 114 } 115 116 /*********************************************************************************************************************************************************** 117 * 118 **********************************************************************************************************************************************************/ 119 private <T> void registerCollaborationListener (@Nonnull final Method method) 120 { 121 log.info("registerCollaborationListener({})", method); 122 final var collaborationMessageType = method.getParameterTypes()[0]; 123 final var messageType = method.getParameterTypes()[1]; 124 125 final MessageBus.Listener messageListener = collaborationMessageType.equals(CollaborationStartedMessage.class) 126 ? new CollaborationMessageListenerAdapter<CollaborationStartedMessage>(owner, method, executor, messageType, stats) 127 : new CollaborationMessageListenerAdapter<CollaborationCompletedMessage>(owner, method, executor, messageType, stats); 128 addListener(method, messageListener, collaborationMessageType); 129 } 130 131 /*********************************************************************************************************************************************************** 132 * 133 **********************************************************************************************************************************************************/ 134 private <T> void addListener (@Nonnull final Method method, 135 @Nonnull final MessageBus.Listener<T> messageListener, 136 @Nonnull final Class<T> topic) throws SecurityException 137 { 138 method.setAccessible(true); 139 messageBusListeners.add(messageListener); 140 messageBus.get().subscribe(topic, messageListener); 141 } 142 }