Speakeasy Logo
Skip to Content

Enabling Event-Streaming Operations

Server-sent events (SSE) is a core web feature that provides servers with a low overhead solution to push real-time events to the client when they become available. SSE can be used to stream chat completions from a large language model, real-time stock prices, and sensor readings to clients.

SSE is similar to WebSockets in that it uses a persistent connection but differs in that it is unidirectional - only the server sends events. SSE is simpler to implement in many existing backend HTTP frameworks.

Here’s a short example of using an SDK to chat with an LLM and read its response as a stream:

import { SDK } from '@speakeasy/sdk'; const sdk = new SDK() const response = await sdk.chat.create({ prompt: "What are the top 3 French cheeses by consumption?" }) for await (const event of response.chatStream) { process.stdout.write(event.data); }

Modeling SSE in OpenAPI

To implement SSE in a generated SDKs, model an API endpoint that serves an event stream in an OpenAPI document. Each server-sent event can contain up to four types of fields: id, event, data, and retry.

Basic Implementation

The example below illustrates an operation that streams events containing only a data field that holds string content:

paths: /chat: post: summary: Create a chat completion from a prompt operationId: create tags: [chat] requestBody: required: true content: application/json: schema: $ref: "#/components/schemas/ChatRequest" responses: "200": description: Chat completion created content: text/event-stream: schema: $ref: "#/components/schemas/ChatStream" components: schemas: ChatRequest: type: object required: [prompt] properties: prompt: type: string ChatStream: description: A server-sent event containing chat completion content type: object required: [data] properties: data: type: string

When data is a JSON Object

SSE implementation isn’t limited to string data. If data is specified as an object instead of a string, then SDKs will assume the field will contain JSON content. Raw data received from the server will be deserialized into an object for the application code to consume.

components: schemas: ChatStream: description: A server-sent event containing chat completion content type: object required: [data] properties: data: type: object properties: content: type: string model: type: string enum: ["foo-gpt-tiny", "foo-gpt-small"] created: type: integer

The Speakeasy-generated TypeScript SDK for the example above will allow users to access this object:

for await (const event of response.chatStream) { const { content, model, created } = event.data; process.stdout.write(content); }

Handling Multiple Event Types

Other streaming APIs send multiple types of events with the id and event fields. These event types can be described as a union (oneOf) with the event field acting as a discriminator:

components: schemas: ChatStream: oneOf: - $ref: "#/components/schemas/HeartbeatEvent" - $ref: "#/components/schemas/ChatEvent" discriminator: propertyName: event mapping: ping: "#/components/schemas/HeartbeatEvent" completion: "#/components/schemas/ChatEvent" HeartbeatEvent: description: A server-sent event indicating that the server is still processing the request type: object required: [event] properties: event: type: string const: "ping" ChatEvent: description: A server-sent event containing chat completion content type: object required: [id, event, data] properties: id: type: string event: type: string const: completion data: type: object required: [content] properties: content: type: string

Endpoints with Multiple Response Types

For APIs that handle both JSON responses and streaming events, use URL fragments to define separate paths for each response type. Each fragment maps to a specific behavior—either returning a complete JSON response or streaming data. This approach allows Speakeasy to generate distinct SDK methods with clear return types while maintaining API flexibility.

paths: /chat: post: summary: > Create a chat completion from a prompt. The entire response is returned as a single JSON object. operationId: create tags: [chat] requestBody: required: true content: application/json: schema: $ref: "#/components/schemas/ChatRequestJson" responses: "200": description: Chat completion created content: application/json: schema: $ref: "#/components/schemas/ChatResponse" /chat#streamed: post: summary: > Create a chat completion from a prompt. The response is streamed in chunks as it is generated. operationId: createStreamed tags: [chat] requestBody: required: true content: application/json: schema: $ref: "#/components/schemas/ChatRequestStream" responses: "200": description: Chat completion created content: text/event-stream: schema: $ref: "#/components/schemas/ChatStream" components: schemas: ChatRequest: # ... ChatRequestJson: allOf: - $ref: "#/components/schemas/ChatRequest" - type: object properties: # !mark(1:4) stream: type: boolean enum: [false] default: false ChatRequestStream: allOf: - $ref: "#/components/schemas/ChatRequest" - type: object properties: # !mark(1:4) stream: type: boolean enum: [true] default: true ChatResponse: # ... ChatStream: # ...

Use chat for the non-streaming endpoint and chatStreamed for the streaming endpoint:

import { SDK } from '@speakeasy/sdk'; const sdk = new SDK() // Non-streaming method const jsonResponse = await sdk.chat.create({ prompt: "What are the top 3 French cheeses by consumption?" }); console.log(jsonResponse.content); // Streaming method const stream = await sdk.chat.createStreamed({ prompt: "What are the top 3 French cheeses by consumption?" }); for await (const event of response.chatStream) { process.stdout.write(event.data); }

Alternative: Single method with overloads

Instead of using URL fragments to create separate methods, you can define a single endpoint that supports both response types. Since inferSSEOverload defaults to true in your gen.yaml, TypeScript and Python SDKs will automatically generate a single overloaded method that provides type safety based on the stream parameter value:

paths: /chat: post: summary: Create a chat completion from a prompt operationId: create tags: [chat] requestBody: required: true content: application/json: schema: $ref: "#/components/schemas/ChatRequest" responses: "200": description: Chat completion response content: application/json: schema: $ref: "#/components/schemas/ChatResponse" text/event-stream: schema: $ref: "#/components/schemas/ChatStream" components: schemas: ChatRequest: type: object required: [prompt] properties: prompt: type: string stream: type: boolean description: Whether to stream the response # The presence of this boolean 'stream' field in the request body # is critical for inferSSEOverload to generate method overloads ChatResponse: # JSON response schema ChatStream: # SSE event schema

This generates type-safe methods in both TypeScript and Python:

import { SDK } from '@speakeasy/sdk'; const sdk = new SDK(); // Non-streaming - returns ChatResponse const response = await sdk.chat.create({ prompt: "Hello", stream: false }); console.log(response.content); // Streaming - returns AsyncIterable<ChatStream> const stream = await sdk.chat.create({ prompt: "Hello", stream: true }); for await (const event of stream) { process.stdout.write(event.data); }
from myapi import SDK sdk = SDK() # Non-streaming - returns ChatResponse response = sdk.chat.create(prompt="Hello", stream=False) print(response.content) # Streaming - returns Iterator[ChatStream] stream = sdk.chat.create(prompt="Hello", stream=True) for event in stream: print(event.data)

Sentinel events

Some SSE APIs will terminate the stream by sending a final, special event. This sentinel event is only used to signal that there are no more events and is not intended for application code to handle.

In the example below, the final data: [DONE] event is the sentinel event:

HTTP/1.1 200 OK Content-Type: text/event-stream; charset=utf-8 Date: Fri, 12 Jul 2024 14:29:22 GMT Keep-Alive: timeout=5, max=1000 Connection: Keep-Alive data: {"content": "there"} data: {"content": "are 7"} data: {"content": "continents in the world"} data: [DONE]

To hide this final event in generated SDK methods, use the x-speakeasy-sse-sentinel: <string> extension on a text/event-stream media object:

paths: /chat: post: summary: Create a chat completion from a prompt operationId: create tags: [chat] requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/ChatRequest' responses: '200': description: Chat completion created content: text/event-stream: + x-speakeasy-sse-sentinel: '[DONE]' schema: $ref: '#/components/schemas/ChatEvent' components: schemas: ChatEvent: description: A server-sent event containing chat completion content type: object required: [data] properties: data: type: object required: [content] properties: content: type: string

Application code like the following TypeScript sample will behave as expected. The async iteration loop will finish when the sentinel event is encountered:

const llm = new LLM(); const stream = await llm.chat.create({ prompt: "How many continents are there?", }); for await (const event of stream) { // ^? ChatEvent process.stdout.write(event.data.content); }

Last updated on