1 /*
2 * *************************************************************************************************************************************************************
3 *
4 * TheseFoolishThings: Miscellaneous utilities
5 * http://tidalwave.it/projects/thesefoolishthings
6 *
7 * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it)
8 *
9 * *************************************************************************************************************************************************************
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 *
14 * http://www.apache.org/licenses/LICENSE-2.0
15 *
16 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
18 *
19 * *************************************************************************************************************************************************************
20 *
21 * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22 * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23 *
24 * *************************************************************************************************************************************************************
25 */
26 package it.tidalwave.actor.impl;
27
28 import java.lang.reflect.Method;
29 import jakarta.annotation.Nonnull;
30 import javax.inject.Provider;
31 import java.util.ArrayList;
32 import java.util.List;
33 import it.tidalwave.actor.CollaborationCompletedMessage;
34 import it.tidalwave.actor.CollaborationStartedMessage;
35 import it.tidalwave.actor.annotation.OriginatedBy;
36 import it.tidalwave.actor.spi.ActorActivatorStats;
37 import it.tidalwave.actor.spi.CollaborationAwareMessageBus;
38 import it.tidalwave.messagebus.MessageBus;
39 import it.tidalwave.messagebus.annotation.ListensTo;
40 import lombok.RequiredArgsConstructor;
41 import lombok.extern.slf4j.Slf4j;
42 import static it.tidalwave.messagebus.spi.ReflectionUtils.*;
43
44 /***************************************************************************************************************************************************************
45 *
46 * @author Fabrizio Giudici
47 *
48 **************************************************************************************************************************************************************/
49 @RequiredArgsConstructor @Slf4j
50 public class CollaborationAwareMessageBusAdapter implements MethodProcessor
51 {
52 @Nonnull
53 private final Object owner;
54
55 @Nonnull
56 private final ExecutorWithPriority executor;
57
58 @Nonnull
59 private final ActorActivatorStats stats;
60
61 private final List<MessageBus.Listener<?>> messageBusListeners = new ArrayList<>();
62
63 private final Provider<CollaborationAwareMessageBus> messageBus =
64 Locator.createProviderFor(CollaborationAwareMessageBus.class);
65
66 /***********************************************************************************************************************************************************
67 *
68 **********************************************************************************************************************************************************/
69 @Override @Nonnull
70 public FilterResult filter (@Nonnull final Class<?> clazz)
71 {
72 return FilterResult.ACCEPT; // TODO: should filter with @Actor?
73 }
74
75 /***********************************************************************************************************************************************************
76 *
77 **********************************************************************************************************************************************************/
78 @Override
79 public void process (@Nonnull final Method method)
80 {
81 final var parameterAnnotations = method.getParameterAnnotations();
82
83 if ((parameterAnnotations.length == 1) && containsAnnotation(parameterAnnotations[0], ListensTo.class))
84 {
85 registerMessageListener(method);
86 }
87 else if ((parameterAnnotations.length == 2) && containsAnnotation(parameterAnnotations[0], ListensTo.class)
88 && containsAnnotation(parameterAnnotations[1], OriginatedBy.class))
89 {
90 registerCollaborationListener(method);
91 }
92 }
93
94 /***********************************************************************************************************************************************************
95 *
96 **********************************************************************************************************************************************************/
97 public void unsubscribe()
98 {
99 for (final var listener : messageBusListeners)
100 {
101 messageBus.get().unsubscribe(listener);
102 }
103 }
104
105 /***********************************************************************************************************************************************************
106 *
107 **********************************************************************************************************************************************************/
108 private <T> void registerMessageListener (@Nonnull final Method method)
109 {
110 log.info("registerMessageListener({})", method);
111
112 final var topic = (Class<T>)method.getParameterTypes()[0];
113 addListener(method, new MessageListenerAdapter<>(owner, method, executor, stats), topic);
114 }
115
116 /***********************************************************************************************************************************************************
117 *
118 **********************************************************************************************************************************************************/
119 private <T> void registerCollaborationListener (@Nonnull final Method method)
120 {
121 log.info("registerCollaborationListener({})", method);
122 final var collaborationMessageType = method.getParameterTypes()[0];
123 final var messageType = method.getParameterTypes()[1];
124
125 final MessageBus.Listener messageListener = collaborationMessageType.equals(CollaborationStartedMessage.class)
126 ? new CollaborationMessageListenerAdapter<CollaborationStartedMessage>(owner, method, executor, messageType, stats)
127 : new CollaborationMessageListenerAdapter<CollaborationCompletedMessage>(owner, method, executor, messageType, stats);
128 addListener(method, messageListener, collaborationMessageType);
129 }
130
131 /***********************************************************************************************************************************************************
132 *
133 **********************************************************************************************************************************************************/
134 private <T> void addListener (@Nonnull final Method method,
135 @Nonnull final MessageBus.Listener<T> messageListener,
136 @Nonnull final Class<T> topic) throws SecurityException
137 {
138 method.setAccessible(true);
139 messageBusListeners.add(messageListener);
140 messageBus.get().subscribe(topic, messageListener);
141 }
142 }