View Javadoc
1   /*
2    * Copyright 2024 Bloomreach B.V. (https://www.bloomreach.com)
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *         http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.onehippo.forge.camel.component.hippo;
17  
18  import java.util.Arrays;
19  import java.util.Collections;
20  import java.util.HashSet;
21  import java.util.Set;
22  
23  import org.apache.camel.Exchange;
24  import org.apache.camel.Processor;
25  import org.apache.camel.RuntimeCamelException;
26  import org.apache.camel.SuspendableService;
27  import org.apache.camel.support.DefaultConsumer;
28  import org.apache.camel.support.DefaultEndpoint;
29  import org.apache.commons.lang.ArrayUtils;
30  import org.apache.commons.lang.BooleanUtils;
31  import org.apache.commons.lang.StringUtils;
32  import org.onehippo.cms7.event.HippoEvent;
33  import org.onehippo.cms7.services.eventbus.HippoEventListenerRegistry;
34  import org.onehippo.cms7.services.eventbus.Subscribe;
35  import org.onehippo.repository.events.PersistedHippoEventListener;
36  import org.onehippo.repository.events.PersistedHippoEventListenerRegistry;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  import net.sf.json.JSONObject;
41  
42  /**
43   * HippoEventConsumer.
44   */
45  public class HippoEventConsumer extends DefaultConsumer implements SuspendableService {
46  
47      private static final Logger LOG = LoggerFactory.getLogger(HippoEventConsumer.class);
48  
49      public static final String PERSISTED_LISTENER_FLAG = "_persisted";
50  
51      public static final String PERSISTED_LISTENER_CHANNEL_NAME = "_channelName";
52  
53      public static final String PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG = "_onlyNewEvents";
54  
55      private static final Set<String> PERSISTED_LISTENER_OPTION_PARAM_NAME_SET = Collections
56              .unmodifiableSet(new HashSet<>(Arrays.asList(
57                      PERSISTED_LISTENER_FLAG,
58                      PERSISTED_LISTENER_CHANNEL_NAME,
59                      PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)));
60  
61      private final HippoEventEndpoint endpoint;
62      private final Processor processor;
63  
64      private HippoLocalEventListener localEventListener;
65      private HippoPersistedEventListener persistedEventListener;
66  
67      public HippoEventConsumer(HippoEventEndpoint endpoint, Processor processor) {
68          super(endpoint, processor);
69  
70          this.endpoint = endpoint;
71          this.processor = processor;
72      }
73  
74      @Override
75      protected void doStart() throws Exception {
76          super.doStart();
77  
78          final boolean persistedEventConsumer = BooleanUtils.toBoolean((String) endpoint.getProperty(PERSISTED_LISTENER_FLAG));
79  
80          if (persistedEventConsumer) {
81              LOG.info("Registering a persisted event consumer because the _persisted parameter set to true.");
82  
83              HippoPersistedEventListener listener = new HippoPersistedEventListener();
84  
85              final String channelName = (String) endpoint.getProperty(PERSISTED_LISTENER_CHANNEL_NAME);
86  
87              if (StringUtils.isEmpty(channelName)) {
88                  throw new RuntimeCamelException("Channel name must be specified for a persisted event consumer with '_channelName' parameter!");
89              }
90  
91              listener.setChannelName(channelName);
92  
93              final String [] eventCategories = StringUtils.split((String) endpoint.getProperty("category"), ",");
94  
95              if (ArrayUtils.isNotEmpty(eventCategories) && StringUtils.isNotEmpty(eventCategories[0])) {
96                  listener.setEventCategory(eventCategories[0]);
97              }
98  
99              if (endpoint.hasProperty(PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)) {
100                 listener.setOnlyNewEvents(BooleanUtils.toBoolean((String) endpoint.getProperty(PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)));
101             }
102 
103             PersistedHippoEventListenerRegistry.get().register(listener);
104 
105             persistedEventListener = listener;
106 
107         } else {
108             LOG.info("Registering a local event consumer because the _persisted parameter unspecified or set to false.");
109 
110             HippoLocalEventListener listener = new HippoLocalEventListener();
111             HippoEventListenerRegistry.get().register(listener);
112             localEventListener = listener;
113 
114         }
115     }
116 
117     @Override
118     protected void doStop() throws Exception {
119         super.doStop();
120 
121         if (persistedEventListener != null) {
122             PersistedHippoEventListenerRegistry.get().unregister(persistedEventListener);
123         }
124 
125         if (localEventListener != null) {
126             HippoEventListenerRegistry.get().unregister(localEventListener);
127         }
128     }
129 
130     protected JSONObject createMessageBody(HippoEvent<?> event) {
131         return HippoEventConverter.toJSONObject(event);
132     }
133 
134     protected Exchange createExchange(HippoEvent<?> event, JSONObject messageBody) {
135         Exchange exchange = ((DefaultEndpoint) getEndpoint()).createExchange();
136         HippoEventMessage hippoEventMessage = new HippoEventMessage(getEndpoint().getCamelContext());
137         hippoEventMessage.setBody(messageBody);
138         exchange.setIn(hippoEventMessage);
139         return exchange;
140     }
141 
142     protected boolean isConsumable(final HippoEvent<?> event, final JSONObject messageBody) {
143         String [] availableValues;
144         String value;
145 
146         for (String propName : endpoint.getPropertyNameSet()) {
147             if (PERSISTED_LISTENER_OPTION_PARAM_NAME_SET.contains(propName)) {
148                 continue;
149             }
150 
151             availableValues = StringUtils.split((String) endpoint.getProperty(propName), ",");
152             value = null;
153 
154             if (messageBody.has(propName)) {
155                 value = messageBody.getString(propName);
156             }
157 
158             if (value == null && ArrayUtils.isNotEmpty(availableValues)) {
159                 return false;
160             }
161 
162             if (!ArrayUtils.contains(availableValues, value)) {
163                 return false;
164             }
165         }
166 
167         return true;
168     }
169 
170     protected void handleHippoEvent(HippoEvent<?> event) {
171         RuntimeCamelException rce = null;
172 
173         Exchange exchange = null;
174 
175         try {
176             JSONObject messageBody = createMessageBody(event);
177 
178             if (!isConsumable(event, messageBody)) {
179                 return;
180             }
181 
182             exchange = createExchange(event, messageBody);
183             processor.process(exchange);
184         } catch (Exception e) {
185             if (exchange != null) {
186                 exchange.setException(e);
187             } else {
188                 rce = new RuntimeCamelException(e);
189             }
190         }
191 
192         if (exchange != null) {
193             rce = exchange.getException(RuntimeCamelException.class);
194         }
195 
196         if (rce != null) {
197             throw rce;
198         }
199     }
200 
201     public class HippoLocalEventListener {
202 
203         @Subscribe
204         public void handleEvent(HippoEvent<?> event) {
205             handleHippoEvent(event);
206         }
207     }
208 
209     public class HippoPersistedEventListener implements PersistedHippoEventListener {
210 
211         private String channelName;
212         private String eventCategory;
213         private boolean onlyNewEvents = true;
214 
215         @Override
216         public String getChannelName() {
217             return channelName;
218         }
219 
220         public void setChannelName(final String channelName) {
221             this.channelName = channelName;
222         }
223 
224         @Override
225         public String getEventCategory() {
226             return eventCategory;
227         }
228 
229         public void setEventCategory(final String eventCategory) {
230             this.eventCategory = eventCategory;
231         }
232 
233         @Override
234         public boolean onlyNewEvents() {
235             return isOnlyNewEvents();
236         }
237 
238         public boolean isOnlyNewEvents() {
239             return onlyNewEvents;
240         }
241 
242         public void setOnlyNewEvents(boolean onlyNewEvents) {
243             this.onlyNewEvents = onlyNewEvents;
244         }
245 
246         @Override
247         public void onHippoEvent(HippoEvent event) {
248             handleHippoEvent(event);
249         }
250     }
251 
252 }