1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.onehippo.forge.camel.component.hippo;
17
18 import java.util.Arrays;
19 import java.util.Collections;
20 import java.util.HashSet;
21 import java.util.Set;
22
23 import org.apache.camel.Exchange;
24 import org.apache.camel.Processor;
25 import org.apache.camel.RuntimeCamelException;
26 import org.apache.camel.SuspendableService;
27 import org.apache.camel.support.DefaultConsumer;
28 import org.apache.camel.support.DefaultEndpoint;
29
30 import org.apache.commons.lang3.ArrayUtils;
31 import org.apache.commons.lang3.BooleanUtils;
32 import org.apache.commons.lang3.StringUtils;
33 import org.onehippo.cms7.event.HippoEvent;
34 import org.onehippo.cms7.services.eventbus.HippoEventListenerRegistry;
35 import org.onehippo.cms7.services.eventbus.Subscribe;
36 import org.onehippo.repository.events.PersistedHippoEventListener;
37 import org.onehippo.repository.events.PersistedHippoEventListenerRegistry;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import org.json.JSONObject;
42
43
44
45
46 public class HippoEventConsumer extends DefaultConsumer implements SuspendableService {
47
48 private static final Logger LOG = LoggerFactory.getLogger(HippoEventConsumer.class);
49
50 public static final String PERSISTED_LISTENER_FLAG = "_persisted";
51
52 public static final String PERSISTED_LISTENER_CHANNEL_NAME = "_channelName";
53
54 public static final String PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG = "_onlyNewEvents";
55
56 private static final Set<String> PERSISTED_LISTENER_OPTION_PARAM_NAME_SET = Collections
57 .unmodifiableSet(new HashSet<>(Arrays.asList(
58 PERSISTED_LISTENER_FLAG,
59 PERSISTED_LISTENER_CHANNEL_NAME,
60 PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)));
61
62 private final HippoEventEndpoint endpoint;
63 private final Processor processor;
64
65 private HippoLocalEventListener localEventListener;
66 private HippoPersistedEventListener persistedEventListener;
67
68 public HippoEventConsumer(HippoEventEndpoint endpoint, Processor processor) {
69 super(endpoint, processor);
70
71 this.endpoint = endpoint;
72 this.processor = processor;
73 }
74
75 @Override
76 protected void doStart() throws Exception {
77 super.doStart();
78
79 final boolean persistedEventConsumer = BooleanUtils.toBoolean((String) endpoint.getProperty(PERSISTED_LISTENER_FLAG));
80
81 if (persistedEventConsumer) {
82 LOG.info("Registering a persisted event consumer because the _persisted parameter set to true.");
83
84 HippoPersistedEventListener listener = new HippoPersistedEventListener();
85
86 final String channelName = (String) endpoint.getProperty(PERSISTED_LISTENER_CHANNEL_NAME);
87
88 if (StringUtils.isEmpty(channelName)) {
89 throw new RuntimeCamelException("Channel name must be specified for a persisted event consumer with '_channelName' parameter!");
90 }
91
92 listener.setChannelName(channelName);
93
94 final String [] eventCategories = StringUtils.split((String) endpoint.getProperty("category"), ",");
95
96 if (ArrayUtils.isNotEmpty(eventCategories) && StringUtils.isNotEmpty(eventCategories[0])) {
97 listener.setEventCategory(eventCategories[0]);
98 }
99
100 if (endpoint.hasProperty(PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)) {
101 listener.setOnlyNewEvents(BooleanUtils.toBoolean((String) endpoint.getProperty(PERSISTED_LISTENER_ONLY_NEW_EVENTS_FLAG)));
102 }
103
104 PersistedHippoEventListenerRegistry.get().register(listener);
105
106 persistedEventListener = listener;
107
108 } else {
109 LOG.info("Registering a local event consumer because the _persisted parameter unspecified or set to false.");
110
111 HippoLocalEventListener listener = new HippoLocalEventListener();
112 HippoEventListenerRegistry.get().register(listener);
113 localEventListener = listener;
114
115 }
116 }
117
118 @Override
119 protected void doStop() throws Exception {
120 super.doStop();
121
122 if (persistedEventListener != null) {
123 PersistedHippoEventListenerRegistry.get().unregister(persistedEventListener);
124 }
125
126 if (localEventListener != null) {
127 HippoEventListenerRegistry.get().unregister(localEventListener);
128 }
129 }
130
131 protected JSONObject createMessageBody(HippoEvent<?> event) {
132 return HippoEventConverter.toJSONObject(event);
133 }
134
135 protected Exchange createExchange(HippoEvent<?> event, JSONObject messageBody) {
136 Exchange exchange = ((DefaultEndpoint) getEndpoint()).createExchange();
137 HippoEventMessage hippoEventMessage = new HippoEventMessage(getEndpoint().getCamelContext());
138 hippoEventMessage.setBody(messageBody);
139 exchange.setIn(hippoEventMessage);
140 return exchange;
141 }
142
143 protected boolean isConsumable(final HippoEvent<?> event, final JSONObject messageBody) {
144 String [] availableValues;
145 String value;
146
147 for (String propName : endpoint.getPropertyNameSet()) {
148 if (PERSISTED_LISTENER_OPTION_PARAM_NAME_SET.contains(propName)) {
149 continue;
150 }
151
152 availableValues = StringUtils.split((String) endpoint.getProperty(propName), ",");
153 value = null;
154
155 if (messageBody.has(propName)) {
156 value = messageBody.getString(propName);
157 }
158
159 if (value == null && ArrayUtils.isNotEmpty(availableValues)) {
160 return false;
161 }
162
163 if (!ArrayUtils.contains(availableValues, value)) {
164 return false;
165 }
166 }
167
168 return true;
169 }
170
171 protected void handleHippoEvent(HippoEvent<?> event) {
172 RuntimeCamelException rce = null;
173
174 Exchange exchange = null;
175
176 try {
177 JSONObject messageBody = createMessageBody(event);
178
179 if (!isConsumable(event, messageBody)) {
180 return;
181 }
182
183 exchange = createExchange(event, messageBody);
184 processor.process(exchange);
185 } catch (Exception e) {
186 if (exchange != null) {
187 exchange.setException(e);
188 } else {
189 rce = new RuntimeCamelException(e);
190 }
191 }
192
193 if (exchange != null) {
194 rce = exchange.getException(RuntimeCamelException.class);
195 }
196
197 if (rce != null) {
198 throw rce;
199 }
200 }
201
202 public class HippoLocalEventListener {
203
204 @Subscribe
205 public void handleEvent(HippoEvent<?> event) {
206 handleHippoEvent(event);
207 }
208 }
209
210 public class HippoPersistedEventListener implements PersistedHippoEventListener {
211
212 private String channelName;
213 private String eventCategory;
214 private boolean onlyNewEvents = true;
215
216 @Override
217 public String getChannelName() {
218 return channelName;
219 }
220
221 public void setChannelName(final String channelName) {
222 this.channelName = channelName;
223 }
224
225 @Override
226 public String getEventCategory() {
227 return eventCategory;
228 }
229
230 public void setEventCategory(final String eventCategory) {
231 this.eventCategory = eventCategory;
232 }
233
234 @Override
235 public boolean onlyNewEvents() {
236 return isOnlyNewEvents();
237 }
238
239 public boolean isOnlyNewEvents() {
240 return onlyNewEvents;
241 }
242
243 public void setOnlyNewEvents(boolean onlyNewEvents) {
244 this.onlyNewEvents = onlyNewEvents;
245 }
246
247 @Override
248 public void onHippoEvent(HippoEvent event) {
249 handleHippoEvent(event);
250 }
251 }
252
253 }