Events
Direktiv has built-in support for CloudEvents, which can be a great way to interact with flows. The following flow has a start condition based on events. It would only start if an event of type com.github.pull.create
arrives with the source set to https://github.com/cloudevents/spec/pull
direktiv_api: workflow/v1
start:
type: event
event:
type: com.github.pull.create
context:
source: https://github.com/cloudevents/spec/pull
functions:
- id: httprequest
image: gcr.io/direktiv/functions/http-request:1.0
type: knative-workflow
states:
- id: notify
type: action
action:
function: httprequest
input:
method: "POST"
url: "https://jsonplaceholder.typicode.com/todos/1"
body: 'jq(."com.github.pull.create")'
CloudEvents
CloudEvents are specification for describing event data in a common way. They're JSON objects with a number of required fields, some optional fields, and a payload.
{
"specversion" : "1.0",
"type" : "com.github.pull.create",
"source" : "https://github.com/cloudevents/spec/pull",
"subject" : "123",
"id" : "A234-1234-1234",
"time" : "2018-04-05T17:31:00Z",
"comexampleextension1" : "value",
"comexampleothervalue" : 5,
"datacontenttype" : "text/xml",
"data" : "<much wow=\"xml\"/>"
}
CloudEvents can be sent via the API to a namespace or generated by flow within Direktiv, to be handled by any number of interested receivers on that namespace.
Start Types
The most common use for events in Direktiv is to have external services generate CloudEvents and send them to Direktiv to trigger your flows. But to make your flows trigger on an event you need to register the flow's interest in the event by adding the appropriate start type to your workflow definition:
direktiv_api: workflow/v1
start:
type: event
event:
type: com.github.pull.create
filters:
source: "https://github.com/cloudevents/spec/pull"
In this example a new instance will be created whenever a cloudevent is received that has the matching type
and source
values.
Two other event-based start types exist in Direktiv: the eventsXor
, and the eventsAnd
.
The eventsXor
registers an interest in multiple events and will trigger a new instance as soon as any one of them is received. The eventsAnd
also registers an interest in multiple events, but will only trigger once all have been received.
Event Payloads
Whenever an event is received its payload will be added to the instance data under a field with the same name as the event "type". This allows for a uniform approach to accepting events that supports single events, eventsXor, and eventsAnd. The payload itself consists of the full cloudevent including attributes, extension context attributes and data.
Instances Waiting for Events
Triggering flows is not the only thing you can do with events. Flows can be constructed to run some logic and then wait for an event before proceeding. Like the event-based start types, there are three event consuming states: consumeEvent
, eventsXor
, and eventsAnd
.
- id: wait-event
type: consumeEvent
event:
type: com.github.pull.create
context:
source: "https://github.com/cloudevents/spec/pull"
repository: 'jq(.repo)'
timeout: PT5M
transform: 'jq(."com.github.pull.create")'
transition: next-state
Timeouts
It's rarely a good idea to leave a flow waiting indefinitely. Direktiv allows you to define timeouts in ISO8601 format when waiting on an event. If the state is not ready to proceed before the timeout has elapsed an error will be thrown. It's possible to catch the error direktiv.cancels.timeout.soft
.
The timeout
field is not required, but Direktiv caps the maximum timeout whether specified or not to prevent flows from living forever. The default timeout is 15 minutes.
Context
Event-consuming states have a context
field. The context field can restrict which events are considered matches by requiring an exact match on a CloudEvent context field. This can be used to link certain events to e.g. customer ids or transaction ids.
GenerateEvent State
Flows can generate events for their namespace. The fields for this state are fairly self-explanatory. Here's an example:
- id: gen-event
type: generateEvent
event:
type: "my.custom.event"
source: "direktiv"
data: 'jq(.)'
datacontenttype: "application/json"
If the jq
command that populates the data
field outputs a plain base64 encoded string and the datacontenttype
field is set to anything other than application/json
Direktiv will decode the string before sending the event.