Enabling Event-Streaming Operations
Server-sent events (SSE) is a core web feature that provides servers with a low overhead solution to push real-time events to the client when they become available. SSE can be used to stream chat completions from a large language model, real-time stock prices, and sensor readings to clients.
SSE is similar to WebSockets in that it uses a persistent connection but differs in that it is unidirectional - only the server sends events. SSE is simpler to implement in many existing backend HTTP frameworks.
INFO
Speakeasy makes it easy to build SSE into your SDKs without vendor extensions or heuristics. Leverage SSE by modeling SSE streams as text/event-stream
responses with pure OpenAPI.
Here's a short example of using an SDK to chat with an LLM and read its response as a stream:
import { SDK } from '@speakeasy/sdk';const sdk = new SDK()const response = await sdk.chat.create({ prompt: "What are the top 3 French cheeses by consumption?"})for await (const event of response.chatStream) { process.stdout.write(event.data);}
INFO
The SSE feature is currently supported in TypeScript, Python, Go, and Java. Let us know if you'd like to see support for other languages.
Modeling SSE in OpenAPI
To implement SSE in your SDKs, model an API endpoint that serves an event stream in your OpenAPI document. Each server-sent event can contain up to four types of fields: id
, event
, data
, and retry
.
The example below illustrates an operation that streams events containing only a data
field that holds string content:
paths: /chat: post: summary: Create a chat completion from a prompt operationId: create tags: [chat] requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/ChatRequest' responses: '200': description: Chat completion created content: text/event-stream: schema: $ref: '#/components/schemas/ChatStream'components: schemas: ChatRequest: type: object required: [prompt] properties: prompt: type: string ChatStream: description: A server-sent event containing chat completion content type: object required: [data] properties: data: type: string
SSE implementation isn't limited to string data. If you specify that data
is an object, then SDKs will assume the field will contain JSON content. Raw data received from the server will be deserialized into an object for the application code to consume.
components: schemas: ChatStream: description: A server-sent event containing chat completion content type: object required: [data] properties: data: type: object properties: content: type: string model: type: string enum: ["foo-gpt-tiny", "foo-gpt-small"] created: type: integer
The Speakeasy-generated TypeScript SDK for the example above will allow users to access this object:
for await (const event of response.chatStream) { const { content, model, created } = event.data; process.stdout.write(content);}
Other streaming APIs send multiple types of events with the id
and event
fields. These event types can be described as a union (oneOf
) with the event
field acting as a discriminator:
components: schemas: ChatStream: oneOf: - $ref: '#/components/schemas/HeartbeatEvent' - $ref: '#/components/schemas/ChatEvent' discriminator: propertyName: event mapping: ping: '#/components/schemas/HeartbeatEvent' completion: '#/components/schemas/ChatEvent' HeartbeatEvent: description: A server-sent event indicating that the server is still processing the request type: object required: [event] properties: event: type: string const: "ping" ChatEvent: description: A server-sent event containing chat completion content type: object required: [id, event, data] properties: id: type: string event: type: string const: completion data: type: object required: [content] properties: content: type: string
Note that across all these examples, the schema for the events only ever specifies one or more of the four recognized fields. Adding other fields will trigger a validation error when generating an SDK with the Speakeasy CLI or GitHub action.
Sentinel events
Some SSE APIs will terminate the stream by sending a final, special event. This sentinel event is only used to signal that there are no more events and is not intended for application code to handle.
In the example below, the final data: [DONE]
event is the sentinel event:
HTTP/1.1 200 OKContent-Type: text/event-stream; charset=utf-8Date: Fri, 12 Jul 2024 14:29:22 GMTKeep-Alive: timeout=5, max=1000Connection: Keep-Alivedata: {"content": "there"}data: {"content": "are 7"}data: {"content": "continents in the world"}data: [DONE]
To hide this final event in generated SDK methods, use the x-speakeasy-sse-sentinel: <string>
extension on a text/event-stream
media object:
paths: /chat: post: summary: Create a chat completion from a prompt operationId: create tags: [chat] requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/ChatRequest' responses: '200': description: Chat completion created content: text/event-stream:+ x-speakeasy-sse-sentinel: '[DONE]' schema: $ref: '#/components/schemas/ChatEvent'components: schemas: ChatEvent: description: A server-sent event containing chat completion content type: object required: [data] properties: data: type: object required: [content] properties: content: type: string
Application code like the following TypeScript sample will behave as expected. The async iteration loop will finish when the sentinel event is encountered:
const llm = new LLM();const stream = await llm.chat.create({ prompt: "How many continents are there?",});for await (const event of stream) {// ^? ChatEvent process.stdout.write(event.data.content);}