View Javadoc
1   /*
2    * *************************************************************************************************************************************************************
3    *
4    * TheseFoolishThings: Miscellaneous utilities
5    * http://tidalwave.it/projects/thesefoolishthings
6    *
7    * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it)
8    *
9    * *************************************************************************************************************************************************************
10   *
11   * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12   * You may obtain a copy of the License at
13   *
14   *     http://www.apache.org/licenses/LICENSE-2.0
15   *
16   * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17   * CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions and limitations under the License.
18   *
19   * *************************************************************************************************************************************************************
20   *
21   * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22   * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23   *
24   * *************************************************************************************************************************************************************
25   */
26  package it.tidalwave.actor.impl;
27  
28  import java.lang.reflect.Method;
29  import jakarta.annotation.Nonnull;
30  import javax.inject.Provider;
31  import java.util.ArrayList;
32  import java.util.List;
33  import it.tidalwave.actor.CollaborationCompletedMessage;
34  import it.tidalwave.actor.CollaborationStartedMessage;
35  import it.tidalwave.actor.annotation.OriginatedBy;
36  import it.tidalwave.actor.spi.ActorActivatorStats;
37  import it.tidalwave.actor.spi.CollaborationAwareMessageBus;
38  import it.tidalwave.messagebus.MessageBus;
39  import it.tidalwave.messagebus.annotation.ListensTo;
40  import lombok.RequiredArgsConstructor;
41  import lombok.extern.slf4j.Slf4j;
42  import static it.tidalwave.messagebus.spi.ReflectionUtils.*;
43  
44  /***************************************************************************************************************************************************************
45   *
46   * @author  Fabrizio Giudici
47   *
48   **************************************************************************************************************************************************************/
49  @RequiredArgsConstructor @Slf4j
50  public class CollaborationAwareMessageBusAdapter implements MethodProcessor
51    {
52      @Nonnull
53      private final Object owner;
54  
55      @Nonnull
56      private final ExecutorWithPriority executor;
57  
58      @Nonnull
59      private final ActorActivatorStats stats;
60  
61      private final List<MessageBus.Listener<?>> messageBusListeners = new ArrayList<>();
62  
63      private final Provider<CollaborationAwareMessageBus> messageBus =
64              Locator.createProviderFor(CollaborationAwareMessageBus.class);
65  
66      /***********************************************************************************************************************************************************
67       *
68       **********************************************************************************************************************************************************/
69      @Override @Nonnull
70      public FilterResult filter (@Nonnull final Class<?> clazz)
71        {
72          return FilterResult.ACCEPT; // TODO: should filter with @Actor?
73        }
74  
75      /***********************************************************************************************************************************************************
76       *
77       **********************************************************************************************************************************************************/
78      @Override
79      public void process (@Nonnull final Method method)
80        {
81          final var parameterAnnotations = method.getParameterAnnotations();
82  
83          if ((parameterAnnotations.length == 1) && containsAnnotation(parameterAnnotations[0], ListensTo.class))
84            {
85              registerMessageListener(method);
86            }
87          else if ((parameterAnnotations.length == 2) && containsAnnotation(parameterAnnotations[0], ListensTo.class)
88                                                      && containsAnnotation(parameterAnnotations[1], OriginatedBy.class))
89            {
90              registerCollaborationListener(method);
91            }
92        }
93  
94      /***********************************************************************************************************************************************************
95       *
96       **********************************************************************************************************************************************************/
97      public void unsubscribe()
98        {
99          for (final var listener : messageBusListeners)
100           {
101             messageBus.get().unsubscribe(listener);
102           }
103       }
104 
105     /***********************************************************************************************************************************************************
106      *
107      **********************************************************************************************************************************************************/
108     private <T> void registerMessageListener (@Nonnull final Method method)
109       {
110         log.info("registerMessageListener({})", method);
111 
112         final var topic = (Class<T>)method.getParameterTypes()[0];
113         addListener(method, new MessageListenerAdapter<>(owner, method, executor, stats), topic);
114       }
115 
116     /***********************************************************************************************************************************************************
117      *
118      **********************************************************************************************************************************************************/
119     private <T> void registerCollaborationListener (@Nonnull final Method method)
120       {
121         log.info("registerCollaborationListener({})", method);
122         final var collaborationMessageType = method.getParameterTypes()[0];
123         final var messageType = method.getParameterTypes()[1];
124 
125         final MessageBus.Listener messageListener = collaborationMessageType.equals(CollaborationStartedMessage.class)
126             ? new CollaborationMessageListenerAdapter<CollaborationStartedMessage>(owner, method, executor, messageType, stats)
127             : new CollaborationMessageListenerAdapter<CollaborationCompletedMessage>(owner, method, executor, messageType, stats);
128         addListener(method, messageListener, collaborationMessageType);
129       }
130 
131     /***********************************************************************************************************************************************************
132      *
133      **********************************************************************************************************************************************************/
134     private <T> void addListener (@Nonnull final Method method,
135                                   @Nonnull final MessageBus.Listener<T> messageListener,
136                                   @Nonnull final Class<T> topic) throws SecurityException
137       {
138         method.setAccessible(true);
139         messageBusListeners.add(messageListener);
140         messageBus.get().subscribe(topic, messageListener);
141       }
142   }