Instrument Server (AbstractInstrumentServer)#
A TCP server framework designed for interfacing with scientific instruments. This base class handles the network management, socket lifecycles, and command buffering, allowing you to focus exclusively on hardware-specific logic.
Features#
Connection Lifecycle Management: Automatic handling of client connections and disconnections via Python context managers.
Command Dispatcher: Built-in registry for mapping byte-string commands (e.g.,
b"move") to Python methods.Buffered Parsing: Correctly handles TCP fragmentation by buffering incoming data until a newline (
\n) is reached.Timeout Safety: Includes a deadline-based timeout context to prevent hardware hangs from locking the server loop.
Standardized Logging: Integrated with
sm_bluesky.logfor consistent tracking of server events and errors.
Communication Protocol#
The server communicates using a simple Tab-Separated Value (TSV) format over a raw TCP stream.
Request Format (Client → Server)#
Commands must be newline-terminated.
COMMAND + \t + ARG1 + \t + ARG2... + \n
Response Format (Server → Client)#
Success:
1+\t+[Optional Data]+\nError:
0+\t+[Error Message]+\n
Default Methods#
Command |
Arguments |
Description |
|---|---|---|
|
None |
Returns |
|
None |
Re-establishes connection to hardware server. |
|
None |
Safely disconnects from hardware. |
|
None |
Stops the server and disconnects hardware. |
Implementation Guide#
To use this framework, create a subclass and implement the mandatory abstract methods.
1. Define your Hardware Class#
from sm_bluesky.servers import AbstractInstrumentServer
class MyMotorServer(AbstractInstrumentServer):
def __init__(self, host, port):
super().__init__(host, port)
# Add custom hardware commands to the registry
self._command_registry[b"move_abs"] = self._move_absolute
def connect_hardware(self) -> bool:
# Logic to initialize your physical device
print("Initializing Motor...")
return True
def disconnect_hardware(self) -> None:
# Logic to safely shut down hardware
print("Parking Motor...")
def _move_absolute(self, position: bytes):
# Hardware logic: convert bytes arg to float
pos_mm = float(position)
print(f"Moving to {pos_mm}mm")
# Periodic timeout check for long operations
self._check_timeout("Motor Move")
self._send_response(b"Moved to " + position)
if __name__ == "__main__":
# Initialize and start the server
server = MyInstrumentServer("127.0.0.1", 5000)
try:
server.start()
except KeyboardInterrupt:
server.stop()