Messaging Specification#

The Blueapi service publishes Bluesky documents and other events to the message bus, allowing subscribers to keep track of the status of plans, as well as other status changes. This page documents the channels to which clients can subscribe to receive these messages and their structure.

asyncapi: "2.6.0"
id: https://github.com/DiamondLightSource/blueapi
info:
  title: Bluesky Worker Service
  version: 0.0.2
  description: Service for controlling access to and running scans based on Bluesky Plans and Ophyd Devices
  contact:
    name: Callum Forrester
    email: callum.forrester@diamond.ac.uk
  license:
    name: Apache 2.0
    url: https://www.apache.org/licenses/LICENSE-2.0.html
defaultContentType: application/json
channels:
  public.worker.event:
    description: Produces events relating to worker operation and supplying data produced by runs associated with a plan
    subscribe:
      operationId: dataEvent
      summary: Events representing collection of data
      description: Data documents produced by running plans
      message:
        oneOf:
          - $ref: "#/components/messages/workerStateEvent"
          - $ref: "#/components/messages/workerProgressEvent"
          - $ref: "#/components/messages/taggedStartDocument"
          - $ref: "#/components/messages/taggedDescriptorDocument"
          - $ref: "#/components/messages/taggedEventDocument"
          - $ref: "#/components/messages/taggedStopDocument"
          - $ref: "#/components/messages/taggedResourceDocument"
          - $ref: "#/components/messages/taggedDatumDocument"
          - $ref: "#/components/messages/taggedResourceStream"
          - $ref: "#/components/messages/taggedDatumStream"
components:
  messages: # Defined as Components to allow referencing from other APIs. # TODO: Define protocol specific correlationId bindings
    taggedStartDocument:
      messageId: runStart
      summary: Indicates the start of a Bluesky run based on a previously specified Plan; describes the initial position/metadata of a scan and its involved devices
      headers:
        $ref: "#/components/schemas/contextHeaders"
      payload:
        type: object
        properties:
          name:
            const: "start"
          doc:
            $ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/run_start.json"
    taggedStopDocument:
      messageId: runStop
      summary: Indicates the completion of a Bluesky run based on a previously specified Plan; describes end conditions and metadata
      headers:
        $ref: "#/components/schemas/contextHeaders"
      payload:
        type: object
        properties:
          name:
            const: "stop"
          doc:
            $ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/run_stop.json"
    taggedDescriptorDocument:
      messageId: eventStreamDescriptor
      summary: Describes the devices to be within a scientifically related stream of measurements
      headers:
        $ref: "#/components/schemas/contextHeaders"
      payload:
        type: object
        properties:
          name:
            const: "descriptor"
          doc:
            $ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/event_descriptor.json"
    taggedEventDocument:
      messageId: dataEvent
      summary: Describes a point measurement for a number of scientifically related devices
      headers:
        $ref: "#/components/schemas/contextHeaders"
      payload:
        type: object
        properties:
          name:
            const: "event"
          doc:
            $ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/event.json"
    taggedResourceDocument:
      messageId: resource
      summary: Describes an external resource (file, database entry etc.) that is to be referenced by later datum
      headers:
        $ref: "#/components/schemas/contextHeaders"
      payload:
        type: object
        properties:
          name:
            const: "resource"
          doc:
            $ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/resource.json"
    taggedDatumDocument:
      messageId: datum
      summary: Describes how to access a point measurement within an external resource
      headers:
        $ref: "#/components/schemas/contextHeaders"
      payload:
        type: object
        properties:
          name:
            const: "datum"
          doc:
            $ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/datum.json"
    taggedResourceStream:
      messageId: streamResource
      summary: Describes an external resource (file, database entry etc.) that is to be referenced by later Stream Datum
      headers:
        $ref: "#/components/schemas/contextHeaders"
      payload:
        type: object
        properties:
          name:
            const: "stream_resource"
          doc:
            $ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/stream_resource.json"
    taggedDatumStream:
      messageId: streamDatum
      summary: Describes how to access a slice of an external resource
      headers:
        $ref: "#/components/schemas/contextHeaders"
      payload:
        type: object
        properties:
          name:
            const: "stream_datum"
          doc:
            $ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/stream_datum.json"
    workerStateEvent:
      messageId: stateEvent
      headers:
        $ref: "#/components/schemas/contextHeaders"
      payload:
        type: object
        required:
          - state
          - errors
          - warnings
        properties:
          state:
            $ref: "#/components/schemas/workerState"
          taskStatus:
            $ref: "#/components/schemas/taskStatus"
          errors:
            description: A list of any errors generated during the execution of a task
            type: array
            items:
              type: string
          warnings:
            description: A list of any warnings generated during the execution of a task
            type: array
            items:
              type: string
    workerProgressEvent:
      messageId: progressEvent
      headers:
        $ref: "#/components/schemas/contextHeaders"
      payload:
        type: object
        required:
          - taskName
          - statuses
        properties:
          taskName:
            description: Unique id of the task, returned when it was originally submitted
            type: string
          statuses:
            type: object
            description: Status object providing various indicators for the task
            additionalProperties:
              $ref: "#/components/schemas/statusView"
  schemas:
    contextHeaders:
      type: object
      required:
        - destination
      properties:
        destination:
          description: Name of the channel on which the message is being sent
          type: string
        replyDestination:
          description: Name of the temporary queue, specified by the caller where they will listen for response messages to requests submitted on the worker.X channels
          type: string
        correlationId:
          description: Unique identifier of an exchange supplied by its originator
          type: string
    workerState:
      description: The state of the Worker.
      enum:
        - IDLE
        - RUNNING
        - PAUSING
        - PAUSED
        - HALTING
        - STOPPING
        - ABORTING
        - SUSPENDING
        - PANICKED
        - UNKNOWN
    taskStatus:
      description: Current state of a task the worker is running.
      type: object
      required:
        - taskName
        - taskComplete
        - taskFailed
      properties:
        taskName:
          description: Unique id of the task, returned when it was originally submitted
          type: string
        taskComplete:
          description: Indication whether the task reaced the end of its execution
          type: boolean
        taskFailed:
          description: Indication whether the expected task outcome was achieved
          type: boolean
    statusView:
      type: object
      required:
        - displayName
        - unit
        - precision
        - done
      properties:
        displayName:
          description: Human-readable name indicating what this status describes
          type: string
        current:
          description: Current value of operation progress, if known
          type: number
          format: float
        initial:
          description: Initial value of operation progress, if known
          type: number
          format: float
        target:
          description: Target value of operation progress, if known
          type: number
          format: float
        unit:
          description: Units of progress
          type: string
          default: Units
        precision:
          description: Sensible precision of progress to display
          type: integer
          default: 3
        done:
          description: Whether the operation this status describes is complete
          type: boolean
          default: false
        percentage:
          description: Percentage of status completion, if known
          type: number
          format: float
        timeElapsed:
          description: Time elapsed since status operation beginning, if known
          type: number
          format: float
        timeRemaining:
          description: Estimated time remaining until operation completion, if known
          type: number
          format: float