Enabling Event-Streaming Operations
Server-sent events (SSE) is a core web feature that provides servers with a low overhead solution to push real-time events to the client when they become available. SSE can be used to stream chat completions from a large language model, real-time stock prices, and sensor readings to clients.
SSE is similar to WebSockets in that it uses a persistent connection but differs in that it is unidirectional - only the server sends events. SSE is simpler to implement in many existing backend HTTP frameworks.
Info
Speakeasy makes it easy to build SSE into generated SDKs without vendor
extensions or heuristics. Leverage SSE by modeling SSE streams as
text/event-stream
responses with pure OpenAPI.
Here’s a short example of using an SDK to chat with an LLM and read its response as a stream:
import { SDK } from '@speakeasy/sdk';
const sdk = new SDK()
const response = await sdk.chat.create({
prompt: "What are the top 3 French cheeses by consumption?"
})
for await (const event of response.chatStream) {
process.stdout.write(event.data);
}
Info
The SSE feature is currently supported in TypeScript, Python, Go, and Java. Let us know if you’d like to see support for other languages.
Modeling SSE in OpenAPI
To implement SSE in a generated SDKs, model an API endpoint that serves an event
stream in an OpenAPI document. Each server-sent event can contain up to four
types of fields: id
, event
, data
, and retry
.
Basic Implementation
The example below illustrates an operation that streams events containing only a data
field that holds string content:
paths:
/chat:
post:
summary: Create a chat completion from a prompt
operationId: create
tags: [chat]
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/ChatRequest"
responses:
"200":
description: Chat completion created
content:
text/event-stream:
schema:
$ref: "#/components/schemas/ChatStream"
components:
schemas:
ChatRequest:
type: object
required: [prompt]
properties:
prompt:
type: string
ChatStream:
description: A server-sent event containing chat completion content
type: object
required: [data]
properties:
data:
type: string
When data
is a JSON Object
SSE implementation isn’t limited to string data. If data
is specified as an
object instead of a string, then SDKs will assume the field will contain JSON
content. Raw data received from the server will be deserialized into an object
for the application code to consume.
components:
schemas:
ChatStream:
description: A server-sent event containing chat completion content
type: object
required: [data]
properties:
data:
type: object
properties:
content:
type: string
model:
type: string
enum: ["foo-gpt-tiny", "foo-gpt-small"]
created:
type: integer
The Speakeasy-generated TypeScript SDK for the example above will allow users to access this object:
for await (const event of response.chatStream) {
const { content, model, created } = event.data;
process.stdout.write(content);
}
Handling Multiple Event Types
Other streaming APIs send multiple types of events with the id
and event
fields. These event types can be described as a union (oneOf
) with the event
field acting as a discriminator:
components:
schemas:
ChatStream:
oneOf:
- $ref: "#/components/schemas/HeartbeatEvent"
- $ref: "#/components/schemas/ChatEvent"
discriminator:
propertyName: event
mapping:
ping: "#/components/schemas/HeartbeatEvent"
completion: "#/components/schemas/ChatEvent"
HeartbeatEvent:
description: A server-sent event indicating that the server is still processing the request
type: object
required: [event]
properties:
event:
type: string
const: "ping"
ChatEvent:
description: A server-sent event containing chat completion content
type: object
required: [id, event, data]
properties:
id:
type: string
event:
type: string
const: completion
data:
type: object
required: [content]
properties:
content:
type: string
Endpoints with Multiple Response Types
For APIs that handle both JSON responses and streaming events, use URL fragments to define separate paths for each response type. Each fragment maps to a specific behavior—either returning a complete JSON response or streaming data. This approach allows Speakeasy to generate distinct SDK methods with clear return types while maintaining API flexibility.
paths:
/chat:
post:
summary: >
Create a chat completion from a prompt. The entire response is
returned as a single JSON object.
operationId: create
tags: [chat]
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/ChatRequestJson"
responses:
"200":
description: Chat completion created
content:
application/json:
schema:
$ref: "#/components/schemas/ChatResponse"
/chat#streamed:
post:
summary: >
Create a chat completion from a prompt. The response is streamed in
chunks as it is generated.
operationId: createStreamed
tags: [chat]
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/ChatRequestStream"
responses:
"200":
description: Chat completion created
content:
text/event-stream:
schema:
$ref: "#/components/schemas/ChatStream"
components:
schemas:
ChatRequest:
# ...
ChatRequestJson:
allOf:
- $ref: "#/components/schemas/ChatRequest"
- type: object
properties:
# !mark(1:4)
stream:
type: boolean
enum: [false]
default: false
ChatRequestStream:
allOf:
- $ref: "#/components/schemas/ChatRequest"
- type: object
properties:
# !mark(1:4)
stream:
type: boolean
enum: [true]
default: true
ChatResponse:
# ...
ChatStream:
# ...
IMPORTANT
The stream
properties in the ChatRequestJson
and ChatRequestStream
schemas are treated as constants, ensuring that each request type always has a
fixed stream value (false for JSON responses and true for streamed responses).
In OpenAPI 3.0, this is achieved using single-value enums. For OpenAPI 3.1,
simplify schema by using the const
field instead of enum
, which
explicitly defines the property as having a constant value. This makes the
specification more concise and easier to maintain.
See the Speakeasy OpenAPI reference on enums for more information.
Use chat
for the non-streaming endpoint and chatStreamed
for the streaming
endpoint:
import { SDK } from '@speakeasy/sdk';
const sdk = new SDK()
// Non-streaming method
const jsonResponse = await sdk.chat.create({
prompt: "What are the top 3 French cheeses by consumption?"
});
console.log(jsonResponse.content);
// Streaming method
const stream = await sdk.chat.createStreamed({
prompt: "What are the top 3 French cheeses by consumption?"
});
for await (const event of response.chatStream) {
process.stdout.write(event.data);
}
Alternative: Single method with overloads
Instead of using URL fragments to create separate methods, you can define a single endpoint that supports both response types. Since inferSSEOverload
defaults to true
in your gen.yaml
, TypeScript and Python SDKs will automatically generate a single overloaded method that provides type safety based on the stream
parameter value:
Note
The inferSSEOverload
feature is currently supported in TypeScript and Python, with support for other languages planned for future releases.
paths:
/chat:
post:
summary: Create a chat completion from a prompt
operationId: create
tags: [chat]
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/ChatRequest"
responses:
"200":
description: Chat completion response
content:
application/json:
schema:
$ref: "#/components/schemas/ChatResponse"
text/event-stream:
schema:
$ref: "#/components/schemas/ChatStream"
components:
schemas:
ChatRequest:
type: object
required: [prompt]
properties:
prompt:
type: string
stream:
type: boolean
description: Whether to stream the response
# The presence of this boolean 'stream' field in the request body
# is critical for inferSSEOverload to generate method overloads
ChatResponse:
# JSON response schema
ChatStream:
# SSE event schema
This generates type-safe methods in both TypeScript and Python:
import { SDK } from '@speakeasy/sdk';
const sdk = new SDK();
// Non-streaming - returns ChatResponse
const response = await sdk.chat.create({ prompt: "Hello", stream: false });
console.log(response.content);
// Streaming - returns AsyncIterable<ChatStream>
const stream = await sdk.chat.create({ prompt: "Hello", stream: true });
for await (const event of stream) {
process.stdout.write(event.data);
}
from myapi import SDK
sdk = SDK()
# Non-streaming - returns ChatResponse
response = sdk.chat.create(prompt="Hello", stream=False)
print(response.content)
# Streaming - returns Iterator[ChatStream]
stream = sdk.chat.create(prompt="Hello", stream=True)
for event in stream:
print(event.data)
Note
Across all of these examples, the schema for the events only ever specifies one or more of the four recognized fields. Adding other fields will trigger a validation error when generating an SDK with the Speakeasy CLI or GitHub action.
Sentinel events
Some SSE APIs will terminate the stream by sending a final, special event. This sentinel event is only used to signal that there are no more events and is not intended for application code to handle.
In the example below, the final data: [DONE]
event is the sentinel event:
HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8
Date: Fri, 12 Jul 2024 14:29:22 GMT
Keep-Alive: timeout=5, max=1000
Connection: Keep-Alive
data: {"content": "there"}
data: {"content": "are 7"}
data: {"content": "continents in the world"}
data: [DONE]
To hide this final event in generated SDK methods, use the x-speakeasy-sse-sentinel: <string>
extension on a text/event-stream
media object:
paths:
/chat:
post:
summary: Create a chat completion from a prompt
operationId: create
tags: [chat]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ChatRequest'
responses:
'200':
description: Chat completion created
content:
text/event-stream:
+ x-speakeasy-sse-sentinel: '[DONE]'
schema:
$ref: '#/components/schemas/ChatEvent'
components:
schemas:
ChatEvent:
description: A server-sent event containing chat completion content
type: object
required: [data]
properties:
data:
type: object
required: [content]
properties:
content:
type: string
Application code like the following TypeScript sample will behave as expected. The async iteration loop will finish when the sentinel event is encountered:
const llm = new LLM();
const stream = await llm.chat.create({
prompt: "How many continents are there?",
});
for await (const event of stream) {
// ^? ChatEvent
process.stdout.write(event.data.content);
}
Last updated on