Skip to content

Commit af97b9c

Browse files
authored
[Fix #1406] Allow adding data into lifecycle CloudEvents (#1410)
* [Fix #1406] Allow adding data into lifecycle CloudEvents Signed-off-by: fjtirado <ftirados@redhat.com> * [Fix #1406] Review comments Signed-off-by: fjtirado <ftirados@redhat.com> --------- Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 57fb070 commit af97b9c

31 files changed

Lines changed: 1721 additions & 176 deletions

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener;
4040
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
4141
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListenerAdapter;
42+
import io.serverlessworkflow.impl.lifecycle.ce.DefaultLifeCycleCloudEventFactory;
43+
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowLifeCycleCloudEventFactory;
4244
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
4345
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
4446
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
@@ -96,6 +98,7 @@ public class WorkflowApplication implements AutoCloseable {
9698
private final Collection<CallableTaskProxyBuilder> callableProxyBuilders;
9799
private final CloudEventPredicateFactory cloudEventPredicateFactory;
98100
private final AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory;
101+
private final WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory;
99102

100103
private WorkflowApplication(Builder builder) {
101104
this.taskFactory = builder.taskFactory;
@@ -126,6 +129,7 @@ private WorkflowApplication(Builder builder) {
126129
this.callableProxyBuilders = builder.callableProxyBuilders;
127130
this.cloudEventPredicateFactory = builder.cloudEventPredicateFactory;
128131
this.allStrategyCorrelationInfoFactory = builder.allStrategyCorrelationInfoFactory;
132+
this.lifeCycleCloudEventFactory = builder.lifeCycleCloudEventFactory;
129133
}
130134

131135
public TaskExecutorFactory taskFactory() {
@@ -245,6 +249,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
245249
private URI defaultCatalogURI;
246250
private CloudEventPredicateFactory cloudEventPredicateFactory;
247251
private AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory;
252+
private WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory;
248253

249254
private Builder() {
250255
ServiceLoader.load(NamedWorkflowAdditionalObject.class)
@@ -383,6 +388,12 @@ public Builder withAllStrategyCorrelationInfoFactory(
383388
return this;
384389
}
385390

391+
public Builder withLifeCycleCloudEventFactory(
392+
WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory) {
393+
this.lifeCycleCloudEventFactory = lifeCycleCloudEventFactory;
394+
return this;
395+
}
396+
386397
public WorkflowApplication build() {
387398

388399
if (modelFactory == null) {
@@ -443,6 +454,9 @@ public WorkflowApplication build() {
443454
loadFirst(CloudEventPredicateFactory.class)
444455
.orElseGet(() -> new DefaultCloudEventPredicateFactory());
445456
}
457+
if (lifeCycleCloudEventFactory == null) {
458+
lifeCycleCloudEventFactory = new DefaultLifeCycleCloudEventFactory();
459+
}
446460
if (allStrategyCorrelationInfoFactory == null) {
447461
allStrategyCorrelationInfoFactory =
448462
definition -> InMemoryAllStrategyCorrelationInfo.instance();
@@ -579,4 +593,8 @@ public Collection<CallableTaskProxyBuilder> callableProxyBuilders() {
579593
public AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory() {
580594
return allStrategyCorrelationInfoFactory;
581595
}
596+
597+
public WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory() {
598+
return lifeCycleCloudEventFactory;
599+
}
582600
}

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java

Lines changed: 74 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,16 @@
2929
import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_STARTED;
3030
import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_STATUS_CHANGED;
3131
import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_SUSPENDED;
32-
import static io.serverlessworkflow.impl.WorkflowError.error;
33-
import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowDefinitionCEData.ref;
3432

3533
import io.cloudevents.CloudEvent;
3634
import io.cloudevents.CloudEventData;
3735
import io.cloudevents.core.builder.CloudEventBuilder;
3836
import io.cloudevents.core.data.PojoCloudEventData;
3937
import io.cloudevents.core.data.PojoCloudEventData.ToBytes;
4038
import io.serverlessworkflow.impl.WorkflowApplication;
41-
import io.serverlessworkflow.impl.WorkflowModel;
4239
import io.serverlessworkflow.impl.events.CloudEventUtils;
4340
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
4441
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
45-
import io.serverlessworkflow.impl.lifecycle.TaskEvent;
4642
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
4743
import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent;
4844
import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent;
@@ -82,202 +78,177 @@ public static Collection<String> getLifeCycleTypes() {
8278
WORKFLOW_STATUS_CHANGED);
8379
}
8480

81+
private WorkflowLifeCycleCloudEventFactory lifeCycleFactory(WorkflowEvent ev) {
82+
return ev.workflowContext().definition().application().lifeCycleCloudEventFactory();
83+
}
84+
8585
@Override
8686
public void onTaskStarted(TaskStartedEvent event) {
87+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
8788
publish(
8889
event,
8990
ev ->
90-
builder()
91-
.withData(
92-
cloudEventData(
93-
new TaskStartedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
94-
this::convert))
95-
.withType(TASK_STARTED)
96-
.build());
91+
factory.build(
92+
builder()
93+
.withData(cloudEventData(factory.build(event), this::convert))
94+
.withType(TASK_STARTED)));
9795
}
9896

9997
@Override
10098
public void onTaskRetried(TaskRetriedEvent event) {
99+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
101100
publish(
102101
event,
103102
ev ->
104-
builder()
105-
.withData(
106-
cloudEventData(
107-
new TaskRetriedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
108-
this::convert))
109-
.withType(TASK_STARTED)
110-
.build());
103+
factory.build(
104+
builder()
105+
.withData(cloudEventData(factory.build(event), this::convert))
106+
.withType(TASK_RETRIED)));
111107
}
112108

113109
@Override
114110
public void onTaskCompleted(TaskCompletedEvent event) {
111+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
115112
publish(
116113
event,
117114
ev ->
118-
builder()
119-
.withData(
120-
cloudEventData(
121-
new TaskCompletedCEData(
122-
id(ev), pos(ev), ref(ev), ev.eventDate(), output(ev)),
123-
this::convert))
124-
.withType(TASK_COMPLETED)
125-
.build());
115+
factory.build(
116+
builder()
117+
.withData(cloudEventData(factory.build(event), this::convert))
118+
.withType(TASK_COMPLETED)));
126119
}
127120

128121
@Override
129122
public void onTaskSuspended(TaskSuspendedEvent event) {
123+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
130124
publish(
131125
event,
132126
ev ->
133-
builder()
134-
.withData(
135-
cloudEventData(
136-
new TaskSuspendedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
137-
this::convert))
138-
.withType(TASK_SUSPENDED)
139-
.build());
127+
factory.build(
128+
builder()
129+
.withData(cloudEventData(factory.build(event), this::convert))
130+
.withType(TASK_SUSPENDED)));
140131
}
141132

142133
@Override
143134
public void onTaskResumed(TaskResumedEvent event) {
135+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
144136
publish(
145137
event,
146138
ev ->
147-
builder()
148-
.withData(
149-
cloudEventData(
150-
new TaskResumedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
151-
this::convert))
152-
.withType(TASK_RESUMED)
153-
.build());
139+
factory.build(
140+
builder()
141+
.withData(cloudEventData(factory.build(event), this::convert))
142+
.withType(TASK_RESUMED)));
154143
}
155144

156145
@Override
157146
public void onTaskCancelled(TaskCancelledEvent event) {
147+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
158148
publish(
159149
event,
160150
ev ->
161-
builder()
162-
.withData(
163-
cloudEventData(
164-
new TaskCancelledCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
165-
this::convert))
166-
.withType(TASK_CANCELLED)
167-
.build());
151+
factory.build(
152+
builder()
153+
.withData(cloudEventData(factory.build(event), this::convert))
154+
.withType(TASK_CANCELLED)));
168155
}
169156

170157
@Override
171158
public void onTaskFailed(TaskFailedEvent event) {
159+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
172160
publish(
173161
event,
174162
ev ->
175-
builder()
176-
.withData(
177-
cloudEventData(
178-
new TaskFailedCEData(id(ev), pos(ev), ref(ev), ev.eventDate(), error(ev)),
179-
this::convert))
180-
.withType(TASK_FAULTED)
181-
.build());
163+
factory.build(
164+
builder()
165+
.withData(cloudEventData(factory.build(event), this::convert))
166+
.withType(TASK_FAULTED)));
182167
}
183168

184169
@Override
185170
public void onWorkflowStarted(WorkflowStartedEvent event) {
171+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
186172
publish(
187173
event,
188174
ev ->
189-
builder()
190-
.withData(
191-
cloudEventData(
192-
new WorkflowStartedCEData(id(ev), ref(ev), ev.eventDate()), this::convert))
193-
.withType(WORKFLOW_STARTED)
194-
.build());
175+
factory.build(
176+
builder()
177+
.withData(cloudEventData(factory.build(event), this::convert))
178+
.withType(WORKFLOW_STARTED)));
195179
}
196180

197181
@Override
198182
public void onWorkflowSuspended(WorkflowSuspendedEvent event) {
183+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
199184
publish(
200185
event,
201186
ev ->
202-
builder()
203-
.withData(
204-
cloudEventData(
205-
new WorkflowSuspendedCEData(id(ev), ref(ev), ev.eventDate()),
206-
this::convert))
207-
.withType(WORKFLOW_SUSPENDED)
208-
.build());
187+
factory.build(
188+
builder()
189+
.withData(cloudEventData(factory.build(event), this::convert))
190+
.withType(WORKFLOW_SUSPENDED)));
209191
}
210192

211193
@Override
212194
public void onWorkflowCancelled(WorkflowCancelledEvent event) {
195+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
213196
publish(
214197
event,
215198
ev ->
216-
builder()
217-
.withData(
218-
cloudEventData(
219-
new WorkflowCancelledCEData(id(ev), ref(ev), ev.eventDate()),
220-
this::convert))
221-
.withType(WORKFLOW_CANCELLED)
222-
.build());
199+
factory.build(
200+
builder()
201+
.withData(cloudEventData(factory.build(event), this::convert))
202+
.withType(WORKFLOW_CANCELLED)));
223203
}
224204

225205
@Override
226206
public void onWorkflowResumed(WorkflowResumedEvent event) {
207+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
227208
publish(
228209
event,
229210
ev ->
230-
builder()
231-
.withData(
232-
cloudEventData(
233-
new WorkflowResumedCEData(id(ev), ref(ev), ev.eventDate()), this::convert))
234-
.withType(WORKFLOW_RESUMED)
235-
.build());
211+
factory.build(
212+
builder()
213+
.withData(cloudEventData(factory.build(event), this::convert))
214+
.withType(WORKFLOW_RESUMED)));
236215
}
237216

238217
@Override
239218
public void onWorkflowCompleted(WorkflowCompletedEvent event) {
219+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
240220
publish(
241221
event,
242222
ev ->
243-
builder()
244-
.withData(
245-
cloudEventData(
246-
new WorkflowCompletedCEData(
247-
id(ev), ref(ev), ev.eventDate(), from(event.output())),
248-
this::convert))
249-
.withType(WORKFLOW_COMPLETED)
250-
.build());
223+
factory.build(
224+
builder()
225+
.withData(cloudEventData(factory.build(event), this::convert))
226+
.withType(WORKFLOW_COMPLETED)));
251227
}
252228

253229
@Override
254230
public void onWorkflowFailed(WorkflowFailedEvent event) {
231+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
255232
publish(
256233
event,
257234
ev ->
258-
builder()
259-
.withData(
260-
cloudEventData(
261-
new WorkflowFailedCEData(id(ev), ref(ev), ev.eventDate(), error(ev)),
262-
this::convert))
263-
.withType(WORKFLOW_FAULTED)
264-
.build());
235+
factory.build(
236+
builder()
237+
.withData(cloudEventData(factory.build(event), this::convert))
238+
.withType(WORKFLOW_FAULTED)));
265239
}
266240

267241
@Override
268242
public void onWorkflowStatusChanged(WorkflowStatusEvent event) {
269243
if (appl(event).isStatusChangePublishingEnabled()) {
244+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
270245
publish(
271246
event,
272247
ev ->
273-
builder()
274-
.withData(
275-
cloudEventData(
276-
new WorkflowStatusCEDataEvent(
277-
id(ev), ref(ev), ev.eventDate(), ev.status().toString()),
278-
this::convert))
279-
.withType(WORKFLOW_STATUS_CHANGED)
280-
.build());
248+
factory.build(
249+
builder()
250+
.withData(cloudEventData(factory.build(event), this::convert))
251+
.withType(WORKFLOW_STATUS_CHANGED)));
281252
}
282253
}
283254

@@ -367,20 +338,4 @@ private static CloudEventBuilder builder() {
367338
private static WorkflowApplication appl(WorkflowEvent ev) {
368339
return ev.workflowContext().definition().application();
369340
}
370-
371-
private static String id(WorkflowEvent ev) {
372-
return ev.workflowContext().instanceData().id();
373-
}
374-
375-
private static String pos(TaskEvent ev) {
376-
return ev.taskContext().position().jsonPointer();
377-
}
378-
379-
private static Object output(TaskEvent ev) {
380-
return from(ev.taskContext().output());
381-
}
382-
383-
private static Object from(WorkflowModel model) {
384-
return model.asJavaObject();
385-
}
386341
}

0 commit comments

Comments
 (0)