Skip to main content
The AsyncAPI Call enables workflows to interact with external services described by AsyncAPI, supporting both message publishing and subscription.

Overview

AsyncAPI calls allow you to interact with message brokers and event-driven systems, supporting protocols like AMQP, Kafka, MQTT, WebSocket, and more.

Properties

document
externalResource
required
The AsyncAPI document that defines the operation to call.Can reference AsyncAPI 2.6.0 or 3.0.0 specifications.
channel
string
The name of the channel on which to perform the operation.Used only for AsyncAPI v2.6.0.The operation to perform is defined by declaring either message (publish) or subscription (subscribe).
operation
string
A reference to the AsyncAPI operation to call.Used only for AsyncAPI v3.0.0.
server
asyncApiServer
An object used to configure the server to call the specified AsyncAPI operation on.If not set, defaults to the first server matching the operation’s channel.
protocol
string
The protocol to use to select the target server.Ignored if server has been set.Supported values: amqp, amqp1, anypointmq, googlepubsub, http, ibmmq, jms, kafka, mercure, mqtt, mqtt5, nats, pulsar, redis, sns, solace, sqs, stomp, ws
message
asyncApiOutboundMessage
An object used to configure the message to publish using the target operation.Required if subscription has not been set.
subscription
asyncApiSubscription
An object used to configure the subscription to messages consumed using the target operation.Required if message has not been set.
authentication
authentication
The authentication policy, or the name of the authentication policy, to use when calling the AsyncAPI operation.

AsyncAPI Server

Configures the target server:
server.name
string
required
The name of the server to call the specified AsyncAPI operation on.
server.variables
object
The target server’s variables, if any.

AsyncAPI Outbound Message

Configures a message to publish:
message.payload
object
The message’s payload, if any.
message.headers
object
The message’s headers, if any.

AsyncAPI Subscription

Configures a subscription to messages:
subscription.filter
string
A runtime expression used to filter consumed messages.
subscription.consume
subscriptionLifetime
required
An object used to configure the subscription’s lifetime.
subscription.foreach
subscriptionIterator
Configures the iterator, if any, for processing each consumed message.

Subscription Lifetime

consume.amount
integer
The amount of messages to consume.Required if while and until have not been set.
consume.for
duration
The duration that defines for how long to consume messages.
consume.while
string
A runtime expression used to determine whether or not to keep consuming messages.Required if amount and until have not been set.
consume.until
string
A runtime expression used to determine until when to consume messages.Required if amount and while have not been set.

Examples

Publishing a Message

document:
  dsl: '1.0.3'
  namespace: examples
  name: asyncapi-publish-example
  version: '0.1.0'
do:
  - publishGreetings:
      call: asyncapi
      with:
        document:
          endpoint: https://fake.com/docs/asyncapi.json
        operation: greet
        server:
          name: greetingsServer
          variables:
            environment: dev
        message:
          payload:
            greetings: Hello, World!
          headers:
            foo: bar
            bar: baz

Subscribing to Messages (Amount)

document:
  dsl: '1.0.3'
  namespace: examples
  name: asyncapi-subscribe-amount
  version: '0.1.0'
do:
  - getNotifications:
      call: asyncapi
      with:
        document:
          endpoint: https://fake.com/docs/asyncapi.json
        operation: getNotifications
        protocol: ws
        subscription:
          filter: '${ .correlationId == $context.userId }'
          consume:
            amount: 5

Subscribing Until Condition

document:
  dsl: '1.0.3'
  namespace: examples
  name: asyncapi-subscribe-until
  version: '0.1.0'
do:
  - subscribeToChatInbox:
      call: asyncapi
      with:
        document:
          endpoint: https://fake.com/docs/asyncapi.json
        operation: chat-inbox
        protocol: http
        subscription:
          filter: '${ .roomId == $workflow.input.chat.roomId }'
          consume:
            until: '${ ($context.messages | length) == 5 }'
            for:
              seconds: 10

Subscribing While Condition

document:
  dsl: '1.0.3'
  namespace: examples
  name: asyncapi-subscribe-while
  version: '0.1.0'
do:
  - monitorEvents:
      call: asyncapi
      with:
        document:
          endpoint: https://fake.com/docs/asyncapi.json
        operation: events
        protocol: kafka
        subscription:
          consume:
            while: '${ .status == "active" }'
            for:
              minutes: 5

Processing Each Message

document:
  dsl: '1.0.3'
  namespace: examples
  name: asyncapi-foreach-example
  version: '0.1.0'
do:
  - processMessages:
      call: asyncapi
      with:
        document:
          endpoint: https://fake.com/docs/asyncapi.json
        operation: chat-inbox
        protocol: http
        subscription:
          consume:
            amount: 10
          foreach:
            item: message
            at: index
            do:
              - emitEvent:
                  emit:
                    event:
                      with:
                        source: https://serverlessworkflow.io/samples
                        type: io.serverlessworkflow.samples.asyncapi.message.consumed.v1
                        data:
                          message: '${ $message }'

With Authentication

document:
  dsl: '1.0.3'
  namespace: examples
  name: asyncapi-auth-example
  version: '0.1.0'
use:
  authentications:
    messageQueueAuth:
      bearer:
        token: ${ .token }
do:
  - subscribeToQueue:
      call: asyncapi
      with:
        document:
          endpoint: https://fake.com/docs/asyncapi.json
        operation: orderEvents
        protocol: amqp
        subscription:
          consume:
            amount: 100
        authentication:
          use: messageQueueAuth

Kafka Example

document:
  dsl: '1.0.3'
  namespace: examples
  name: asyncapi-kafka-example
  version: '0.1.0'
do:
  - publishToKafka:
      call: asyncapi
      with:
        document:
          endpoint: https://example.com/kafka-api.json
        operation: publishOrder
        protocol: kafka
        message:
          payload:
            orderId: ${ .order.id }
            customerId: ${ .order.customerId }
            items: ${ .order.items }
          headers:
            contentType: application/json

Supported Protocols

AsyncAPI supports a wide range of protocols:
  • AMQP/AMQP1 - Advanced Message Queuing Protocol
  • Kafka - Distributed event streaming platform
  • MQTT/MQTT5 - Lightweight messaging for IoT
  • WebSocket (ws) - Full-duplex communication
  • HTTP - Standard web protocol
  • NATS - Cloud-native messaging system
  • Redis - In-memory data structure store
  • And more…

Subscription Output

An AsyncAPI subscribe operation produces a sequentially ordered array of all the messages it has consumed, potentially transformed using foreach.output.as.

Best Practices

  • Use Filters: Apply filters to consume only relevant messages
  • Set Timeouts: Always set a timeout with consume.for to prevent indefinite subscriptions
  • Handle Errors: Wrap AsyncAPI calls in try/catch blocks for robust error handling
  • Version Documents: Keep AsyncAPI documents versioned and accessible
  • Process Incrementally: Use foreach to process messages one at a time for better error handling

Build docs developers (and LLMs) love