View Javadoc
1   /*
2    * Copyright 2025 Bloomreach B.V. (https://www.bloomreach.com)
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *         http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.onehippo.forge.camel.component.hippo;
17  
18  import java.util.Arrays;
19  import java.util.Collections;
20  import java.util.HashSet;
21  import java.util.Set;
22  
23  import org.apache.camel.Exchange;
24  import org.apache.camel.Processor;
25  import org.apache.camel.RuntimeCamelException;
26  import org.apache.camel.SuspendableService;
27  import org.apache.camel.support.DefaultConsumer;
28  import org.apache.camel.support.DefaultEndpoint;
29  
30  import org.apache.commons.lang3.ArrayUtils;
31  import org.apache.commons.lang3.BooleanUtils;
32  import org.apache.commons.lang3.StringUtils;
33  import org.onehippo.cms7.event.HippoEvent;
34  import org.onehippo.cms7.services.eventbus.HippoEventListenerRegistry;
35  import org.onehippo.cms7.services.eventbus.Subscribe;
36  import org.onehippo.repository.events.PersistedHippoEventListener;
37  import org.onehippo.repository.events.PersistedHippoEventListenerRegistry;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  import org.json.JSONObject;
42  
43  /**
44   * HippoEventConsumer.
45   */
46  public class HippoEventConsumer extends DefaultConsumer implements SuspendableService {
47  
48      private static final Logger LOG = LoggerFactory.getLogger(HippoEventConsumer.class);
49  
50      public static final String PERSISTED_LISTENER_FLAG = "_persisted";
51  
52      public static final String PERSISTED_LISTENER_CHANNEL_NAME = "_channelName";
53  
54      public static final String PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG = "_onlyNewEvents";
55  
56      private static final Set<String> PERSISTED_LISTENER_OPTION_PARAM_NAME_SET = Collections
57              .unmodifiableSet(new HashSet<>(Arrays.asList(
58                      PERSISTED_LISTENER_FLAG,
59                      PERSISTED_LISTENER_CHANNEL_NAME,
60                      PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)));
61  
62      private final HippoEventEndpoint endpoint;
63      private final Processor processor;
64  
65      private HippoLocalEventListener localEventListener;
66      private HippoPersistedEventListener persistedEventListener;
67  
68      public HippoEventConsumer(HippoEventEndpoint endpoint, Processor processor) {
69          super(endpoint, processor);
70  
71          this.endpoint = endpoint;
72          this.processor = processor;
73      }
74  
75      @Override
76      protected void doStart() throws Exception {
77          super.doStart();
78  
79          final boolean persistedEventConsumer = BooleanUtils.toBoolean((String) endpoint.getProperty(PERSISTED_LISTENER_FLAG));
80  
81          if (persistedEventConsumer) {
82              LOG.info("Registering a persisted event consumer because the _persisted parameter set to true.");
83  
84              HippoPersistedEventListener listener = new HippoPersistedEventListener();
85  
86              final String channelName = (String) endpoint.getProperty(PERSISTED_LISTENER_CHANNEL_NAME);
87  
88              if (StringUtils.isEmpty(channelName)) {
89                  throw new RuntimeCamelException("Channel name must be specified for a persisted event consumer with '_channelName' parameter!");
90              }
91  
92              listener.setChannelName(channelName);
93  
94              final String [] eventCategories = StringUtils.split((String) endpoint.getProperty("category"), ",");
95  
96              if (ArrayUtils.isNotEmpty(eventCategories) && StringUtils.isNotEmpty(eventCategories[0])) {
97                  listener.setEventCategory(eventCategories[0]);
98              }
99  
100             if (endpoint.hasProperty(PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)) {
101                 listener.setOnlyNewEvents(BooleanUtils.toBoolean((String) endpoint.getProperty(PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)));
102             }
103 
104             PersistedHippoEventListenerRegistry.get().register(listener);
105 
106             persistedEventListener = listener;
107 
108         } else {
109             LOG.info("Registering a local event consumer because the _persisted parameter unspecified or set to false.");
110 
111             HippoLocalEventListener listener = new HippoLocalEventListener();
112             HippoEventListenerRegistry.get().register(listener);
113             localEventListener = listener;
114 
115         }
116     }
117 
118     @Override
119     protected void doStop() throws Exception {
120         super.doStop();
121 
122         if (persistedEventListener != null) {
123             PersistedHippoEventListenerRegistry.get().unregister(persistedEventListener);
124         }
125 
126         if (localEventListener != null) {
127             HippoEventListenerRegistry.get().unregister(localEventListener);
128         }
129     }
130 
131     protected JSONObject createMessageBody(HippoEvent<?> event) {
132         return HippoEventConverter.toJSONObject(event);
133     }
134 
135     protected Exchange createExchange(HippoEvent<?> event, JSONObject messageBody) {
136         Exchange exchange = ((DefaultEndpoint) getEndpoint()).createExchange();
137         HippoEventMessage hippoEventMessage = new HippoEventMessage(getEndpoint().getCamelContext());
138         hippoEventMessage.setBody(messageBody);
139         exchange.setIn(hippoEventMessage);
140         return exchange;
141     }
142 
143     protected boolean isConsumable(final HippoEvent<?> event, final JSONObject messageBody) {
144         String [] availableValues;
145         String value;
146 
147         for (String propName : endpoint.getPropertyNameSet()) {
148             if (PERSISTED_LISTENER_OPTION_PARAM_NAME_SET.contains(propName)) {
149                 continue;
150             }
151 
152             availableValues = StringUtils.split((String) endpoint.getProperty(propName), ",");
153             value = null;
154 
155             if (messageBody.has(propName)) {
156                 value = messageBody.getString(propName);
157             }
158 
159             if (value == null && ArrayUtils.isNotEmpty(availableValues)) {
160                 return false;
161             }
162 
163             if (!ArrayUtils.contains(availableValues, value)) {
164                 return false;
165             }
166         }
167 
168         return true;
169     }
170 
171     protected void handleHippoEvent(HippoEvent<?> event) {
172         RuntimeCamelException rce = null;
173 
174         Exchange exchange = null;
175 
176         try {
177             JSONObject messageBody = createMessageBody(event);
178 
179             if (!isConsumable(event, messageBody)) {
180                 return;
181             }
182 
183             exchange = createExchange(event, messageBody);
184             processor.process(exchange);
185         } catch (Exception e) {
186             if (exchange != null) {
187                 exchange.setException(e);
188             } else {
189                 rce = new RuntimeCamelException(e);
190             }
191         }
192 
193         if (exchange != null) {
194             rce = exchange.getException(RuntimeCamelException.class);
195         }
196 
197         if (rce != null) {
198             throw rce;
199         }
200     }
201 
202     public class HippoLocalEventListener {
203 
204         @Subscribe
205         public void handleEvent(HippoEvent<?> event) {
206             handleHippoEvent(event);
207         }
208     }
209 
210     public class HippoPersistedEventListener implements PersistedHippoEventListener {
211 
212         private String channelName;
213         private String eventCategory;
214         private boolean onlyNewEvents = true;
215 
216         @Override
217         public String getChannelName() {
218             return channelName;
219         }
220 
221         public void setChannelName(final String channelName) {
222             this.channelName = channelName;
223         }
224 
225         @Override
226         public String getEventCategory() {
227             return eventCategory;
228         }
229 
230         public void setEventCategory(final String eventCategory) {
231             this.eventCategory = eventCategory;
232         }
233 
234         @Override
235         public boolean onlyNewEvents() {
236             return isOnlyNewEvents();
237         }
238 
239         public boolean isOnlyNewEvents() {
240             return onlyNewEvents;
241         }
242 
243         public void setOnlyNewEvents(boolean onlyNewEvents) {
244             this.onlyNewEvents = onlyNewEvents;
245         }
246 
247         @Override
248         public void onHippoEvent(HippoEvent event) {
249             handleHippoEvent(event);
250         }
251     }
252 
253 }