WebSocket Workflows
Automating ComfyUI workflows using WebSocket API
Converting Workflows to WebSocket API
ComfyUI allows you to save any workflow as a WebSocket API endpoint, enabling automated execution and parameter adjustment without using the graphical interface. This is particularly useful for integration with external applications, batch processing, or creating automated generation pipelines.
Key Benefits
- Run workflows programmatically without the UI
- Dynamically adjust node parameters
- Chain multiple workflows together
- Integrate with external applications
- Automate batch processing
Workflow API Export
To convert a workflow to an API:
1. Save API Info
Right-click in the workflow area → "Save (API Format)" to get the workflow API specification.
2. Identify Node IDs
The saved API format includes node IDs and their connections, which you'll use to modify parameters.
3. Note Input Parameters
Document which node IDs correspond to adjustable parameters in your workflow.
Python WebSocket Implementation
Creating a Python script to handle WebSocket connections and workflow configuration provides more robust control and better maintainability. Here's how to structure your implementation:
import asyncio
import websockets
import json
import re
from typing import Dict, List, Any
class ComfyWorkflowManager:
def __init__(self, workflow_path: str):
self.workflow = self.load_workflow(workflow_path)
self.node_map = {}
self.build_node_map()
def load_workflow(self, path: str) -> dict:
with open(path, 'r') as f:
return json.load(f)
def build_node_map(self):
"""Build maps of node types and prefixed nodes for easy lookup"""
for node_id, node_data in self.workflow['prompt'].items():
# Store by class type
node_class = node_data.get('class_type', '')
if node_class:
if node_class not in self.node_map:
self.node_map[node_class] = []
self.node_map[node_class].append(node_id)
# Store prefixed nodes (e.g., SWITCH_, TEXT_)
node_title = node_data.get('title', '')
if '_' in node_title:
prefix = node_title.split('_')[0] + '_'
if prefix not in self.node_map:
self.node_map[prefix] = []
self.node_map[prefix].append(node_id)
def find_nodes(self, identifier: str) -> List[str]:
"""Find nodes by class type or prefix"""
return self.node_map.get(identifier, [])
def update_node_parameters(self, config: Dict[str, Any]):
"""Update node parameters based on configuration"""
# Update text prompts
for text_node in self.find_nodes('CLIPTextEncode'):
if 'prompt' in config:
self.workflow['prompt'][text_node]['inputs']['text'] = config['prompt']
# Update all switch nodes
for switch_node in self.find_nodes('SWITCH_'):
switch_id = switch_node.split('_')[1] # e.g., SWITCH_1
if switch_id in config.get('switches', {}):
self.workflow['prompt'][switch_node]['inputs']['select'] = config['switches'][switch_id]
# Update samplers by class type
for sampler in self.find_nodes('KSampler'):
if 'sampling' in config:
sampling_config = config['sampling']
node_inputs = self.workflow['prompt'][sampler]['inputs']
node_inputs.update({
k: v for k, v in sampling_config.items()
if k in node_inputs # Only update existing parameters
})
async def run_workflow(workflow_path: str, config: Dict[str, Any]):
async with websockets.connect('ws://localhost:8188/ws') as websocket:
manager = ComfyWorkflowManager(workflow_path)
# Configure workflow based on provided config
manager.update_node_parameters(config)
# Queue the workflow
await websocket.send(json.dumps({
"type": "execute",
"data": {
"prompt": manager.workflow['prompt'],
"client_id": "python_client"
}
}))
# Handle execution updates
while True:
message = json.loads(await websocket.recv())
if message['type'] == 'executing':
print(f"Executing node: {message['data']['node']}")
elif message['type'] == 'executed':
print(f"Completed node: {message['data']['node']}")
if 'output' in message['data']:
handle_output(message['data']['output'])
elif message['type'] == 'execution_error':
print(f"Error: {message['data']['message']}")
break
# Example usage
config = {
"prompt": "a futuristic cityscape",
"switches": {
"1": 0, # SWITCH_1 set to first option
"2": 1 # SWITCH_2 set to second option
},
"sampling": {
"seed": 123456789,
"steps": 20,
"cfg": 7,
"sampler_name": "euler"
}
}
asyncio.run(run_workflow('workflow.json', config))
Node Naming Best Practices
- Use prefixes like
SWITCH_1
,TEXT_positive
for easily identifiable nodes - Search for nodes by class type (e.g., 'KSampler', 'CLIPTextEncode') instead of node IDs
- Group related nodes with similar prefixes (e.g.,
MODEL_base
,MODEL_refiner
) - Use descriptive suffixes for clarity (e.g.,
TEXT_positive
,TEXT_negative
)
Benefits of This Approach
- Workflows can be modified without breaking automation scripts
- Node IDs can change without affecting functionality
- Easy to find and update groups of related nodes
- More maintainable and readable code
- Supports workflow versioning and updates
Executing Workflows via WebSocket
Here's how to execute a workflow programmatically:
// Connect to ComfyUI WebSocket
const ws = new WebSocket('ws://localhost:8188/ws');
// Load workflow from saved API format
const workflow = loadWorkflowFromFile('workflow.json');
// Modify parameters as needed
workflow.prompt[3].inputs.seed = Math.floor(Math.random() * 1000000000);
workflow.prompt[6].inputs.text = "a futuristic city at night";
// Queue the workflow
ws.addEventListener('open', (event) => {
ws.send(JSON.stringify({
"type": "execute",
"data": {
"prompt": workflow.prompt,
"client_id": "my_client"
}
}));
});
// Handle execution updates
ws.addEventListener('message', (event) => {
const message = JSON.parse(event.data);
switch(message.type) {
case 'execution_start':
console.log('Workflow started');
break;
case 'execution_cached':
handleCachedOutput(message.data);
break;
case 'executing':
updateProgress(message.data.node);
break;
case 'executed':
handleOutput(message.data);
break;
}
});
Chaining Workflows
You can chain multiple workflows together by using the output of one as input for another:
async function runGenerationPipeline() {
// First workflow: Generate base image
const baseImage = await executeWorkflow(baseGenerationWorkflow);
// Second workflow: Upscale the result
const upscaleWorkflow = loadWorkflow('upscale.json');
upscaleWorkflow.prompt[2].inputs.image = baseImage;
const upscaledImage = await executeWorkflow(upscaleWorkflow);
// Third workflow: Add details
const detailWorkflow = loadWorkflow('detail.json');
detailWorkflow.prompt[3].inputs.image = upscaledImage;
const finalImage = await executeWorkflow(detailWorkflow);
return finalImage;
}
Pipeline Benefits
- Break complex operations into manageable steps
- Reuse workflows in different combinations
- Handle errors at each stage independently
- Optimize parameters based on intermediate results
- Process batches efficiently
Workflow Event Handling
When executing workflows via WebSocket, you'll receive various event types to track progress and handle results:
Execution Events
{
"type": "executing",
"data": {
"node": "5",
"prompt_id": "12345",
"progress": {
"value": 10,
"max": 20
}
}
}
Execution Progress
{
"type": "progress",
"data": {
"value": 50,
"max": 100,
"node": "KSampler_1"
}
}
Execution Completed
{
"type": "executed",
"data": {
"node_id": "5",
"output": {
"images": [...],
"metadata": {...}
}
}
}
Error Messages
{
"type": "error",
"data": {
"node_id": "3",
"message": "CUDA out of memory"
}
}
Dynamic Parameter Adjustment
You can dynamically adjust workflow parameters based on external inputs or previous results:
// Helper function to modify workflow parameters
function adjustWorkflowParams(workflow, params) {
// Adjust KSampler settings
if (params.seed) {
workflow.prompt[3].inputs.seed = params.seed;
}
if (params.steps) {
workflow.prompt[3].inputs.steps = params.steps;
}
if (params.cfg) {
workflow.prompt[3].inputs.cfg = params.cfg;
}
// Modify prompt text
if (params.prompt) {
workflow.prompt[6].inputs.text = params.prompt;
}
if (params.negative_prompt) {
workflow.prompt[7].inputs.text = params.negative_prompt;
}
// Adjust image dimensions
if (params.width && params.height) {
workflow.prompt[5].inputs.width = params.width;
workflow.prompt[5].inputs.height = params.height;
}
return workflow;
}
// Example usage in a processing pipeline
async function processImageRequest(request) {
const workflow = loadWorkflow('base_workflow.json');
// Adjust parameters based on request
const adjustedWorkflow = adjustWorkflowParams(workflow, {
seed: Math.random() * 1000000000,
prompt: request.prompt,
negative_prompt: request.negative_prompt,
steps: request.quality === 'high' ? 30 : 20,
cfg: request.style === 'creative' ? 9 : 7,
width: request.width || 512,
height: request.height || 512
});
// Execute the workflow
return await executeWorkflow(adjustedWorkflow);
}
Best Practices
- Implement reconnection logic for connection drops
- Add error handling for malformed messages
- Include timeout handling for long-running operations
- Consider implementing a message queue for reliable delivery
- Handle connection state changes appropriately in the UI
Best Practices
Workflow Management
- Save workflows in a structured format for easy access
- Document node IDs and their purposes
- Version control your workflow JSON files
- Create reusable workflow components
- Test workflows with different parameter combinations
Error Handling
- Validate all input parameters before execution
- Implement proper error recovery in workflow chains
- Log execution details for debugging
- Handle resource constraints gracefully
- Implement retry mechanisms for failed steps
Optimization Tips
- Reuse model loading nodes when possible
- Batch similar operations together
- Monitor GPU memory usage
- Cache intermediate results when appropriate
- Use appropriate batch sizes for your hardware
Example Use Cases
Batch Processing
Create a queue of image generation tasks with different prompts and parameters, processing them sequentially or in parallel based on available resources.
Perfect for bulk image generation or testing different parameter combinations.
API Integration
Expose ComfyUI capabilities through a REST API, using WebSocket workflows to handle the actual processing while providing a simple HTTP interface.
Ideal for integrating with web applications or services.
Dynamic Workflows
Create adaptive workflows that adjust parameters based on intermediate results or external inputs, enabling sophisticated generation pipelines.
Useful for quality-adaptive processing or style transfer applications.