WebSocket Workflows

Automating ComfyUI workflows using WebSocket API

Converting Workflows to WebSocket API

ComfyUI allows you to save any workflow as a WebSocket API endpoint, enabling automated execution and parameter adjustment without using the graphical interface. This is particularly useful for integration with external applications, batch processing, or creating automated generation pipelines.

Key Benefits

  • Run workflows programmatically without the UI
  • Dynamically adjust node parameters
  • Chain multiple workflows together
  • Integrate with external applications
  • Automate batch processing

Workflow API Export

To convert a workflow to an API:

1. Save API Info

Right-click in the workflow area → "Save (API Format)" to get the workflow API specification.

2. Identify Node IDs

The saved API format includes node IDs and their connections, which you'll use to modify parameters.

3. Note Input Parameters

Document which node IDs correspond to adjustable parameters in your workflow.

Python WebSocket Implementation

Creating a Python script to handle WebSocket connections and workflow configuration provides more robust control and better maintainability. Here's how to structure your implementation:

import asyncio
import websockets
import json
import re
from typing import Dict, List, Any

class ComfyWorkflowManager:
    def __init__(self, workflow_path: str):
        self.workflow = self.load_workflow(workflow_path)
        self.node_map = {}
        self.build_node_map()
    
    def load_workflow(self, path: str) -> dict:
        with open(path, 'r') as f:
            return json.load(f)
    
    def build_node_map(self):
        """Build maps of node types and prefixed nodes for easy lookup"""
        for node_id, node_data in self.workflow['prompt'].items():
            # Store by class type
            node_class = node_data.get('class_type', '')
            if node_class:
                if node_class not in self.node_map:
                    self.node_map[node_class] = []
                self.node_map[node_class].append(node_id)
            
            # Store prefixed nodes (e.g., SWITCH_, TEXT_)
            node_title = node_data.get('title', '')
            if '_' in node_title:
                prefix = node_title.split('_')[0] + '_'
                if prefix not in self.node_map:
                    self.node_map[prefix] = []
                self.node_map[prefix].append(node_id)
    
    def find_nodes(self, identifier: str) -> List[str]:
        """Find nodes by class type or prefix"""
        return self.node_map.get(identifier, [])
    
    def update_node_parameters(self, config: Dict[str, Any]):
        """Update node parameters based on configuration"""
        # Update text prompts
        for text_node in self.find_nodes('CLIPTextEncode'):
            if 'prompt' in config:
                self.workflow['prompt'][text_node]['inputs']['text'] = config['prompt']
        
        # Update all switch nodes
        for switch_node in self.find_nodes('SWITCH_'):
            switch_id = switch_node.split('_')[1]  # e.g., SWITCH_1
            if switch_id in config.get('switches', {}):
                self.workflow['prompt'][switch_node]['inputs']['select'] = config['switches'][switch_id]
        
        # Update samplers by class type
        for sampler in self.find_nodes('KSampler'):
            if 'sampling' in config:
                sampling_config = config['sampling']
                node_inputs = self.workflow['prompt'][sampler]['inputs']
                node_inputs.update({
                    k: v for k, v in sampling_config.items()
                    if k in node_inputs  # Only update existing parameters
                })

async def run_workflow(workflow_path: str, config: Dict[str, Any]):
    async with websockets.connect('ws://localhost:8188/ws') as websocket:
        manager = ComfyWorkflowManager(workflow_path)
        
        # Configure workflow based on provided config
        manager.update_node_parameters(config)
        
        # Queue the workflow
        await websocket.send(json.dumps({
            "type": "execute",
            "data": {
                "prompt": manager.workflow['prompt'],
                "client_id": "python_client"
            }
        }))
        
        # Handle execution updates
        while True:
            message = json.loads(await websocket.recv())
            
            if message['type'] == 'executing':
                print(f"Executing node: {message['data']['node']}")
            elif message['type'] == 'executed':
                print(f"Completed node: {message['data']['node']}")
                if 'output' in message['data']:
                    handle_output(message['data']['output'])
            elif message['type'] == 'execution_error':
                print(f"Error: {message['data']['message']}")
                break

# Example usage
config = {
    "prompt": "a futuristic cityscape",
    "switches": {
        "1": 0,  # SWITCH_1 set to first option
        "2": 1   # SWITCH_2 set to second option
    },
    "sampling": {
        "seed": 123456789,
        "steps": 20,
        "cfg": 7,
        "sampler_name": "euler"
    }
}

asyncio.run(run_workflow('workflow.json', config))

Node Naming Best Practices

  • Use prefixes like SWITCH_1, TEXT_positive for easily identifiable nodes
  • Search for nodes by class type (e.g., 'KSampler', 'CLIPTextEncode') instead of node IDs
  • Group related nodes with similar prefixes (e.g., MODEL_base, MODEL_refiner)
  • Use descriptive suffixes for clarity (e.g., TEXT_positive, TEXT_negative)

Benefits of This Approach

  • Workflows can be modified without breaking automation scripts
  • Node IDs can change without affecting functionality
  • Easy to find and update groups of related nodes
  • More maintainable and readable code
  • Supports workflow versioning and updates

Executing Workflows via WebSocket

Here's how to execute a workflow programmatically:

// Connect to ComfyUI WebSocket
const ws = new WebSocket('ws://localhost:8188/ws');

// Load workflow from saved API format
const workflow = loadWorkflowFromFile('workflow.json');

// Modify parameters as needed
workflow.prompt[3].inputs.seed = Math.floor(Math.random() * 1000000000);
workflow.prompt[6].inputs.text = "a futuristic city at night";

// Queue the workflow
ws.addEventListener('open', (event) => {
    ws.send(JSON.stringify({
        "type": "execute",
        "data": {
            "prompt": workflow.prompt,
            "client_id": "my_client"
        }
    }));
});

// Handle execution updates
ws.addEventListener('message', (event) => {
    const message = JSON.parse(event.data);
    
    switch(message.type) {
        case 'execution_start':
            console.log('Workflow started');
            break;
        case 'execution_cached':
            handleCachedOutput(message.data);
            break;
        case 'executing':
            updateProgress(message.data.node);
            break;
        case 'executed':
            handleOutput(message.data);
            break;
    }
});

Chaining Workflows

You can chain multiple workflows together by using the output of one as input for another:

async function runGenerationPipeline() {
    // First workflow: Generate base image
    const baseImage = await executeWorkflow(baseGenerationWorkflow);
    
    // Second workflow: Upscale the result
    const upscaleWorkflow = loadWorkflow('upscale.json');
    upscaleWorkflow.prompt[2].inputs.image = baseImage;
    const upscaledImage = await executeWorkflow(upscaleWorkflow);
    
    // Third workflow: Add details
    const detailWorkflow = loadWorkflow('detail.json');
    detailWorkflow.prompt[3].inputs.image = upscaledImage;
    const finalImage = await executeWorkflow(detailWorkflow);
    
    return finalImage;
}

Pipeline Benefits

  • Break complex operations into manageable steps
  • Reuse workflows in different combinations
  • Handle errors at each stage independently
  • Optimize parameters based on intermediate results
  • Process batches efficiently

Workflow Event Handling

When executing workflows via WebSocket, you'll receive various event types to track progress and handle results:

Execution Events

{
  "type": "executing",
  "data": {
    "node": "5",
    "prompt_id": "12345",
    "progress": {
      "value": 10,
      "max": 20
    }
  }
}

Execution Progress

{
  "type": "progress",
  "data": {
    "value": 50,
    "max": 100,
    "node": "KSampler_1"
  }
}

Execution Completed

{
  "type": "executed",
  "data": {
    "node_id": "5",
    "output": {
      "images": [...],
      "metadata": {...}
    }
  }
}

Error Messages

{
  "type": "error",
  "data": {
    "node_id": "3",
    "message": "CUDA out of memory"
  }
}

Dynamic Parameter Adjustment

You can dynamically adjust workflow parameters based on external inputs or previous results:

// Helper function to modify workflow parameters
function adjustWorkflowParams(workflow, params) {
    // Adjust KSampler settings
    if (params.seed) {
        workflow.prompt[3].inputs.seed = params.seed;
    }
    if (params.steps) {
        workflow.prompt[3].inputs.steps = params.steps;
    }
    if (params.cfg) {
        workflow.prompt[3].inputs.cfg = params.cfg;
    }
    
    // Modify prompt text
    if (params.prompt) {
        workflow.prompt[6].inputs.text = params.prompt;
    }
    if (params.negative_prompt) {
        workflow.prompt[7].inputs.text = params.negative_prompt;
    }
    
    // Adjust image dimensions
    if (params.width && params.height) {
        workflow.prompt[5].inputs.width = params.width;
        workflow.prompt[5].inputs.height = params.height;
    }
    
    return workflow;
}

// Example usage in a processing pipeline
async function processImageRequest(request) {
    const workflow = loadWorkflow('base_workflow.json');
    
    // Adjust parameters based on request
    const adjustedWorkflow = adjustWorkflowParams(workflow, {
        seed: Math.random() * 1000000000,
        prompt: request.prompt,
        negative_prompt: request.negative_prompt,
        steps: request.quality === 'high' ? 30 : 20,
        cfg: request.style === 'creative' ? 9 : 7,
        width: request.width || 512,
        height: request.height || 512
    });
    
    // Execute the workflow
    return await executeWorkflow(adjustedWorkflow);
}

Best Practices

  • Implement reconnection logic for connection drops
  • Add error handling for malformed messages
  • Include timeout handling for long-running operations
  • Consider implementing a message queue for reliable delivery
  • Handle connection state changes appropriately in the UI

Best Practices

Workflow Management

  • Save workflows in a structured format for easy access
  • Document node IDs and their purposes
  • Version control your workflow JSON files
  • Create reusable workflow components
  • Test workflows with different parameter combinations

Error Handling

  • Validate all input parameters before execution
  • Implement proper error recovery in workflow chains
  • Log execution details for debugging
  • Handle resource constraints gracefully
  • Implement retry mechanisms for failed steps

Optimization Tips

  • Reuse model loading nodes when possible
  • Batch similar operations together
  • Monitor GPU memory usage
  • Cache intermediate results when appropriate
  • Use appropriate batch sizes for your hardware

Example Use Cases

Batch Processing

Create a queue of image generation tasks with different prompts and parameters, processing them sequentially or in parallel based on available resources.

Perfect for bulk image generation or testing different parameter combinations.

API Integration

Expose ComfyUI capabilities through a REST API, using WebSocket workflows to handle the actual processing while providing a simple HTTP interface.

Ideal for integrating with web applications or services.

Dynamic Workflows

Create adaptive workflows that adjust parameters based on intermediate results or external inputs, enabling sophisticated generation pipelines.

Useful for quality-adaptive processing or style transfer applications.