Messaging Specification#
The Blueapi service publishes Bluesky documents and other events to the message bus, allowing subscribers to keep track of the status of plans, as well as other status changes. This page documents the channels to which clients can subscribe to receive these messages and their structure.
asyncapi: "2.6.0"
id: https://github.com/DiamondLightSource/blueapi
info:
title: Bluesky Worker Service
version: 0.0.2
description: Service for controlling access to and running scans based on Bluesky Plans and Ophyd Devices
contact:
name: Callum Forrester
email: callum.forrester@diamond.ac.uk
license:
name: Apache 2.0
url: https://www.apache.org/licenses/LICENSE-2.0.html
defaultContentType: application/json
channels:
public.worker.event:
description: Produces events relating to worker operation and supplying data produced by runs associated with a plan
subscribe:
operationId: dataEvent
summary: Events representing collection of data
description: Data documents produced by running plans
message:
oneOf:
- $ref: "#/components/messages/workerStateEvent"
- $ref: "#/components/messages/workerProgressEvent"
- $ref: "#/components/messages/taggedStartDocument"
- $ref: "#/components/messages/taggedDescriptorDocument"
- $ref: "#/components/messages/taggedEventDocument"
- $ref: "#/components/messages/taggedStopDocument"
- $ref: "#/components/messages/taggedResourceDocument"
- $ref: "#/components/messages/taggedDatumDocument"
- $ref: "#/components/messages/taggedResourceStream"
- $ref: "#/components/messages/taggedDatumStream"
components:
messages: # Defined as Components to allow referencing from other APIs. # TODO: Define protocol specific correlationId bindings
taggedStartDocument:
messageId: runStart
summary: Indicates the start of a Bluesky run based on a previously specified Plan; describes the initial position/metadata of a scan and its involved devices
headers:
$ref: "#/components/schemas/contextHeaders"
payload:
type: object
properties:
name:
const: "start"
doc:
$ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/run_start.json"
taggedStopDocument:
messageId: runStop
summary: Indicates the completion of a Bluesky run based on a previously specified Plan; describes end conditions and metadata
headers:
$ref: "#/components/schemas/contextHeaders"
payload:
type: object
properties:
name:
const: "stop"
doc:
$ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/run_stop.json"
taggedDescriptorDocument:
messageId: eventStreamDescriptor
summary: Describes the devices to be within a scientifically related stream of measurements
headers:
$ref: "#/components/schemas/contextHeaders"
payload:
type: object
properties:
name:
const: "descriptor"
doc:
$ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/event_descriptor.json"
taggedEventDocument:
messageId: dataEvent
summary: Describes a point measurement for a number of scientifically related devices
headers:
$ref: "#/components/schemas/contextHeaders"
payload:
type: object
properties:
name:
const: "event"
doc:
$ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/event.json"
taggedResourceDocument:
messageId: resource
summary: Describes an external resource (file, database entry etc.) that is to be referenced by later datum
headers:
$ref: "#/components/schemas/contextHeaders"
payload:
type: object
properties:
name:
const: "resource"
doc:
$ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/resource.json"
taggedDatumDocument:
messageId: datum
summary: Describes how to access a point measurement within an external resource
headers:
$ref: "#/components/schemas/contextHeaders"
payload:
type: object
properties:
name:
const: "datum"
doc:
$ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/datum.json"
taggedResourceStream:
messageId: streamResource
summary: Describes an external resource (file, database entry etc.) that is to be referenced by later Stream Datum
headers:
$ref: "#/components/schemas/contextHeaders"
payload:
type: object
properties:
name:
const: "stream_resource"
doc:
$ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/stream_resource.json"
taggedDatumStream:
messageId: streamDatum
summary: Describes how to access a slice of an external resource
headers:
$ref: "#/components/schemas/contextHeaders"
payload:
type: object
properties:
name:
const: "stream_datum"
doc:
$ref: "https://raw.githubusercontent.com/bluesky/event-model/refs/tags/v1.22.1/src/event_model/schemas/stream_datum.json"
workerStateEvent:
messageId: stateEvent
headers:
$ref: "#/components/schemas/contextHeaders"
payload:
type: object
required:
- state
- errors
- warnings
properties:
state:
$ref: "#/components/schemas/workerState"
taskStatus:
$ref: "#/components/schemas/taskStatus"
errors:
description: A list of any errors generated during the execution of a task
type: array
items:
type: string
warnings:
description: A list of any warnings generated during the execution of a task
type: array
items:
type: string
workerProgressEvent:
messageId: progressEvent
headers:
$ref: "#/components/schemas/contextHeaders"
payload:
type: object
required:
- taskName
- statuses
properties:
taskName:
description: Unique id of the task, returned when it was originally submitted
type: string
statuses:
type: object
description: Status object providing various indicators for the task
additionalProperties:
$ref: "#/components/schemas/statusView"
schemas:
contextHeaders:
type: object
required:
- destination
properties:
destination:
description: Name of the channel on which the message is being sent
type: string
replyDestination:
description: Name of the temporary queue, specified by the caller where they will listen for response messages to requests submitted on the worker.X channels
type: string
correlationId:
description: Unique identifier of an exchange supplied by its originator
type: string
workerState:
description: The state of the Worker.
enum:
- IDLE
- RUNNING
- PAUSING
- PAUSED
- HALTING
- STOPPING
- ABORTING
- SUSPENDING
- PANICKED
- UNKNOWN
taskStatus:
description: Current state of a task the worker is running.
type: object
required:
- taskName
- taskComplete
- taskFailed
properties:
taskName:
description: Unique id of the task, returned when it was originally submitted
type: string
taskComplete:
description: Indication whether the task reaced the end of its execution
type: boolean
taskFailed:
description: Indication whether the expected task outcome was achieved
type: boolean
statusView:
type: object
required:
- displayName
- unit
- precision
- done
properties:
displayName:
description: Human-readable name indicating what this status describes
type: string
current:
description: Current value of operation progress, if known
type: number
format: float
initial:
description: Initial value of operation progress, if known
type: number
format: float
target:
description: Target value of operation progress, if known
type: number
format: float
unit:
description: Units of progress
type: string
default: Units
precision:
description: Sensible precision of progress to display
type: integer
default: 3
done:
description: Whether the operation this status describes is complete
type: boolean
default: false
percentage:
description: Percentage of status completion, if known
type: number
format: float
timeElapsed:
description: Time elapsed since status operation beginning, if known
type: number
format: float
timeRemaining:
description: Estimated time remaining until operation completion, if known
type: number
format: float