1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.onehippo.forge.camel.component.hippo;
17
18 import java.util.Arrays;
19 import java.util.Collections;
20 import java.util.HashSet;
21 import java.util.Set;
22
23 import org.apache.camel.Exchange;
24 import org.apache.camel.Processor;
25 import org.apache.camel.RuntimeCamelException;
26 import org.apache.camel.SuspendableService;
27 import org.apache.camel.support.DefaultConsumer;
28 import org.apache.camel.support.DefaultEndpoint;
29 import org.apache.commons.lang.ArrayUtils;
30 import org.apache.commons.lang.BooleanUtils;
31 import org.apache.commons.lang.StringUtils;
32 import org.onehippo.cms7.event.HippoEvent;
33 import org.onehippo.cms7.services.eventbus.HippoEventListenerRegistry;
34 import org.onehippo.cms7.services.eventbus.Subscribe;
35 import org.onehippo.repository.events.PersistedHippoEventListener;
36 import org.onehippo.repository.events.PersistedHippoEventListenerRegistry;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import net.sf.json.JSONObject;
41
42
43
44
45 public class HippoEventConsumer extends DefaultConsumer implements SuspendableService {
46
47 private static final Logger LOG = LoggerFactory.getLogger(HippoEventConsumer.class);
48
49 public static final String PERSISTED_LISTENER_FLAG = "_persisted";
50
51 public static final String PERSISTED_LISTENER_CHANNEL_NAME = "_channelName";
52
53 public static final String PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG = "_onlyNewEvents";
54
55 private static final Set<String> PERSISTED_LISTENER_OPTION_PARAM_NAME_SET = Collections
56 .unmodifiableSet(new HashSet<>(Arrays.asList(
57 PERSISTED_LISTENER_FLAG,
58 PERSISTED_LISTENER_CHANNEL_NAME,
59 PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)));
60
61 private final HippoEventEndpoint endpoint;
62 private final Processor processor;
63
64 private HippoLocalEventListener localEventListener;
65 private HippoPersistedEventListener persistedEventListener;
66
67 public HippoEventConsumer(HippoEventEndpoint endpoint, Processor processor) {
68 super(endpoint, processor);
69
70 this.endpoint = endpoint;
71 this.processor = processor;
72 }
73
74 @Override
75 protected void doStart() throws Exception {
76 super.doStart();
77
78 final boolean persistedEventConsumer = BooleanUtils.toBoolean((String) endpoint.getProperty(PERSISTED_LISTENER_FLAG));
79
80 if (persistedEventConsumer) {
81 LOG.info("Registering a persisted event consumer because the _persisted parameter set to true.");
82
83 HippoPersistedEventListener listener = new HippoPersistedEventListener();
84
85 final String channelName = (String) endpoint.getProperty(PERSISTED_LISTENER_CHANNEL_NAME);
86
87 if (StringUtils.isEmpty(channelName)) {
88 throw new RuntimeCamelException("Channel name must be specified for a persisted event consumer with '_channelName' parameter!");
89 }
90
91 listener.setChannelName(channelName);
92
93 final String [] eventCategories = StringUtils.split((String) endpoint.getProperty("category"), ",");
94
95 if (ArrayUtils.isNotEmpty(eventCategories) && StringUtils.isNotEmpty(eventCategories[0])) {
96 listener.setEventCategory(eventCategories[0]);
97 }
98
99 if (endpoint.hasProperty(PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)) {
100 listener.setOnlyNewEvents(BooleanUtils.toBoolean((String) endpoint.getProperty(PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)));
101 }
102
103 PersistedHippoEventListenerRegistry.get().register(listener);
104
105 persistedEventListener = listener;
106
107 } else {
108 LOG.info("Registering a local event consumer because the _persisted parameter unspecified or set to false.");
109
110 HippoLocalEventListener listener = new HippoLocalEventListener();
111 HippoEventListenerRegistry.get().register(listener);
112 localEventListener = listener;
113
114 }
115 }
116
117 @Override
118 protected void doStop() throws Exception {
119 super.doStop();
120
121 if (persistedEventListener != null) {
122 PersistedHippoEventListenerRegistry.get().unregister(persistedEventListener);
123 }
124
125 if (localEventListener != null) {
126 HippoEventListenerRegistry.get().unregister(localEventListener);
127 }
128 }
129
130 protected JSONObject createMessageBody(HippoEvent<?> event) {
131 return HippoEventConverter.toJSONObject(event);
132 }
133
134 protected Exchange createExchange(HippoEvent<?> event, JSONObject messageBody) {
135 Exchange exchange = ((DefaultEndpoint) getEndpoint()).createExchange();
136 HippoEventMessage hippoEventMessage = new HippoEventMessage(getEndpoint().getCamelContext());
137 hippoEventMessage.setBody(messageBody);
138 exchange.setIn(hippoEventMessage);
139 return exchange;
140 }
141
142 protected boolean isConsumable(final HippoEvent<?> event, final JSONObject messageBody) {
143 String [] availableValues;
144 String value;
145
146 for (String propName : endpoint.getPropertyNameSet()) {
147 if (PERSISTED_LISTENER_OPTION_PARAM_NAME_SET.contains(propName)) {
148 continue;
149 }
150
151 availableValues = StringUtils.split((String) endpoint.getProperty(propName), ",");
152 value = null;
153
154 if (messageBody.has(propName)) {
155 value = messageBody.getString(propName);
156 }
157
158 if (value == null && ArrayUtils.isNotEmpty(availableValues)) {
159 return false;
160 }
161
162 if (!ArrayUtils.contains(availableValues, value)) {
163 return false;
164 }
165 }
166
167 return true;
168 }
169
170 protected void handleHippoEvent(HippoEvent<?> event) {
171 RuntimeCamelException rce = null;
172
173 Exchange exchange = null;
174
175 try {
176 JSONObject messageBody = createMessageBody(event);
177
178 if (!isConsumable(event, messageBody)) {
179 return;
180 }
181
182 exchange = createExchange(event, messageBody);
183 processor.process(exchange);
184 } catch (Exception e) {
185 if (exchange != null) {
186 exchange.setException(e);
187 } else {
188 rce = new RuntimeCamelException(e);
189 }
190 }
191
192 if (exchange != null) {
193 rce = exchange.getException(RuntimeCamelException.class);
194 }
195
196 if (rce != null) {
197 throw rce;
198 }
199 }
200
201 public class HippoLocalEventListener {
202
203 @Subscribe
204 public void handleEvent(HippoEvent<?> event) {
205 handleHippoEvent(event);
206 }
207 }
208
209 public class HippoPersistedEventListener implements PersistedHippoEventListener {
210
211 private String channelName;
212 private String eventCategory;
213 private boolean onlyNewEvents = true;
214
215 @Override
216 public String getChannelName() {
217 return channelName;
218 }
219
220 public void setChannelName(final String channelName) {
221 this.channelName = channelName;
222 }
223
224 @Override
225 public String getEventCategory() {
226 return eventCategory;
227 }
228
229 public void setEventCategory(final String eventCategory) {
230 this.eventCategory = eventCategory;
231 }
232
233 @Override
234 public boolean onlyNewEvents() {
235 return isOnlyNewEvents();
236 }
237
238 public boolean isOnlyNewEvents() {
239 return onlyNewEvents;
240 }
241
242 public void setOnlyNewEvents(boolean onlyNewEvents) {
243 this.onlyNewEvents = onlyNewEvents;
244 }
245
246 @Override
247 public void onHippoEvent(HippoEvent event) {
248 handleHippoEvent(event);
249 }
250 }
251
252 }