Skip to main content

Overview

ComfyUI’s execution system is a sophisticated asynchronous engine that manages workflow execution, node processing, caching, and resource management. It handles everything from input validation to output delivery with support for interrupts, dynamic graphs, and efficient memory usage.

Execution Pipeline

The execution pipeline consists of several key stages:
1

Prompt Validation

Validate workflow structure and inputs using validate_prompt()
2

Queue Addition

Add validated prompts to the execution queue with priority
3

Cache Initialization

Set up caching system and check for cached results
4

Node Execution

Execute nodes in dependency order with async support
5

Result Collection

Gather outputs and send to client

Prompt Executor

The PromptExecutor class orchestrates workflow execution:
class PromptExecutor:
    def __init__(self, server, cache_type=False, cache_args=None):
        self.cache_args = cache_args
        self.cache_type = cache_type
        self.server = server
        self.reset()
    
    def reset(self):
        self.caches = CacheSet(cache_type=self.cache_type, 
                              cache_args=self.cache_args)
        self.status_messages = []
        self.success = True
    
    async def execute_async(self, prompt, prompt_id, extra_data={}, 
                           execute_outputs=[]):
        set_preview_method(extra_data.get("preview_method"))
        nodes.interrupt_processing(False)
        
        if "client_id" in extra_data:
            self.server.client_id = extra_data["client_id"]
        
        self.status_messages = []
        self.add_message("execution_start", 
                        {"prompt_id": prompt_id}, 
                        broadcast=False)
        
        with torch.inference_mode():
            dynamic_prompt = DynamicPrompt(prompt)
            reset_progress_state(prompt_id, dynamic_prompt)
            add_progress_handler(WebUIProgressHandler(self.server))
            
            is_changed_cache = IsChangedCache(
                prompt_id, dynamic_prompt, self.caches.outputs
            )
            
            for cache in self.caches.all:
                await cache.set_prompt(
                    dynamic_prompt, prompt.keys(), is_changed_cache
                )
                cache.clean_unused()
            
            # ... execution loop ...

Execution Loop

From execution.py:730-750:
execution_list = ExecutionList(dynamic_prompt, self.caches.outputs)
current_outputs = self.caches.outputs.all_node_ids()

for node_id in list(execute_outputs):
    execution_list.add_node(node_id)

while not execution_list.is_empty():
    node_id, error, ex = await execution_list.stage_node_execution()
    
    if error is not None:
        self.handle_execution_error(
            prompt_id, dynamic_prompt.original_prompt, 
            current_outputs, executed, error, ex
        )
        break
    
    result, error, ex = await execute(
        self.server, dynamic_prompt, self.caches, 
        node_id, extra_data, executed, prompt_id, 
        execution_list, pending_subgraph_results, 
        pending_async_nodes, ui_node_outputs
    )
    
    if result == ExecutionResult.FAILURE:
        self.handle_execution_error(...)
        break
    elif result == ExecutionResult.PENDING:
        execution_list.unstage_node_execution()
    else:  # SUCCESS
        execution_list.complete_node_execution()
    
    self.caches.outputs.poll(ram_headroom=self.cache_args["ram"])

Execution Results

Three possible execution results:
class ExecutionResult(Enum):
    SUCCESS = 0   # Node completed successfully
    FAILURE = 1   # Node failed with error
    PENDING = 2   # Node waiting for async/lazy inputs

Cache System

Cache Types

ComfyUI supports multiple caching strategies:
class CacheType(Enum):
    CLASSIC = 0      # Aggressive dumping
    LRU = 1          # Least Recently Used
    NONE = 2         # No caching
    RAM_PRESSURE = 3 # Based on available RAM

CacheSet Implementation

From execution.py:109-143:
class CacheSet:
    def __init__(self, cache_type=None, cache_args={}):
        if cache_type == CacheType.NONE:
            self.init_null_cache()
            logging.info("Disabling intermediate node cache.")
        elif cache_type == CacheType.RAM_PRESSURE:
            cache_ram = cache_args.get("ram", 16.0)
            self.init_ram_cache(cache_ram)
            logging.info("Using RAM pressure cache.")
        elif cache_type == CacheType.LRU:
            cache_size = cache_args.get("lru", 0)
            self.init_lru_cache(cache_size)
            logging.info("Using LRU cache")
        else:
            self.init_classic_cache()
        
        self.all = [self.outputs, self.objects]
    
    def init_classic_cache(self):
        self.outputs = HierarchicalCache(CacheKeySetInputSignature)
        self.objects = HierarchicalCache(CacheKeySetID)
    
    def init_lru_cache(self, cache_size):
        self.outputs = LRUCache(CacheKeySetInputSignature, max_size=cache_size)
        self.objects = HierarchicalCache(CacheKeySetID)
    
    def init_ram_cache(self, min_headroom):
        self.outputs = RAMPressureCache(CacheKeySetInputSignature)
        self.objects = HierarchicalCache(CacheKeySetID)
Two-Level Caching: The system maintains separate caches for outputs (based on inputs) and objects (based on node ID) for optimal performance.

Cache Entry Structure

class CacheEntry(NamedTuple):
    ui: dict          # UI-specific data
    outputs: list     # Node output values

IS_CHANGED Cache

Tracks whether node inputs have changed:
class IsChangedCache:
    def __init__(self, prompt_id: str, dynprompt: DynamicPrompt, 
                 outputs_cache: BasicCache):
        self.prompt_id = prompt_id
        self.dynprompt = dynprompt
        self.outputs_cache = outputs_cache
        self.is_changed = {}
    
    async def get(self, node_id):
        if node_id in self.is_changed:
            return self.is_changed[node_id]
        
        node = self.dynprompt.get_node(node_id)
        class_type = node["class_type"]
        class_def = nodes.NODE_CLASS_MAPPINGS[class_type]
        
        # Check if node has IS_CHANGED method
        if hasattr(class_def, "IS_CHANGED"):
            input_data_all, _, v3_data = get_input_data(
                node["inputs"], class_def, node_id, None
            )
            
            is_changed = await _async_map_node_over_list(
                self.prompt_id, node_id, class_def, 
                input_data_all, "IS_CHANGED", v3_data=v3_data
            )
            is_changed = await resolve_map_node_over_list_results(is_changed)
            node["is_changed"] = [x for x in is_changed]
        else:
            self.is_changed[node_id] = False
        
        return self.is_changed[node_id]

Node Execution Function

The core execute() function handles individual node execution:
async def execute(server, dynprompt, caches, current_item, extra_data, 
                 executed, prompt_id, execution_list, 
                 pending_subgraph_results, pending_async_nodes, ui_outputs):
    unique_id = current_item
    real_node_id = dynprompt.get_real_node_id(unique_id)
    display_node_id = dynprompt.get_display_node_id(unique_id)
    parent_node_id = dynprompt.get_parent_node_id(unique_id)
    
    inputs = dynprompt.get_node(unique_id)['inputs']
    class_type = dynprompt.get_node(unique_id)['class_type']
    class_def = nodes.NODE_CLASS_MAPPINGS[class_type]
    
    # Check cache first
    cached = caches.outputs.get(unique_id)
    if cached is not None:
        if server.client_id is not None:
            server.send_sync("executed", {
                "node": unique_id, 
                "display_node": display_node_id,
                "output": cached.ui.get("output", None),
                "prompt_id": prompt_id
            }, server.client_id)
        
        get_progress_state().finish_progress(unique_id)
        execution_list.cache_update(unique_id, cached)
        return (ExecutionResult.SUCCESS, None, None)
    
    # ... execution logic ...

Lazy Evaluation

From execution.py:486-499:
if lazy_status_present:
    required_inputs = await _async_map_node_over_list(
        prompt_id, unique_id, obj, input_data_all, 
        "check_lazy_status", allow_interrupt=True, v3_data=v3_data_lazy
    )
    required_inputs = await resolve_map_node_over_list_results(required_inputs)
    required_inputs = set(sum([r for r in required_inputs if isinstance(r, list)], []))
    required_inputs = [x for x in required_inputs 
                      if x not in input_data_all or x in missing_keys]
    
    if len(required_inputs) > 0:
        for i in required_inputs:
            execution_list.make_input_strong_link(unique_id, i)
        return (ExecutionResult.PENDING, None, None)
Lazy Loading: Nodes with check_lazy_status can defer loading inputs until actually needed, saving memory and computation.

Async Task Handling

From execution.py:532-540:
if has_pending_tasks:
    pending_async_nodes[unique_id] = output_data
    unblock = execution_list.add_external_block(unique_id)
    
    async def await_completion():
        tasks = [x for x in output_data if isinstance(x, asyncio.Task)]
        await asyncio.gather(*tasks, return_exceptions=True)
        unblock()
    
    asyncio.create_task(await_completion())
    return (ExecutionResult.PENDING, None, None)

Execution Queue

The PromptQueue manages pending executions:
class PromptQueue:
    def __init__(self, server):
        self.server = server
        self.mutex = threading.RLock()
        self.not_empty = threading.Condition(self.mutex)
        self.task_counter = 0
        self.queue = []  # Heap for priority
        self.currently_running = {}
        self.history = {}
        self.flags = {}
    
    def put(self, item):
        with self.mutex:
            heapq.heappush(self.queue, item)
            self.server.queue_updated()
            self.not_empty.notify()
    
    def get(self, timeout=None):
        with self.not_empty:
            while len(self.queue) == 0:
                self.not_empty.wait(timeout=timeout)
                if timeout is not None and len(self.queue) == 0:
                    return None
            
            item = heapq.heappop(self.queue)
            i = self.task_counter
            self.currently_running[i] = copy.deepcopy(item)
            self.task_counter += 1
            self.server.queue_updated()
            return (item, i)

Task Completion

class ExecutionStatus(NamedTuple):
    status_str: Literal['success', 'error']
    completed: bool
    messages: List[str]

def task_done(self, item_id, history_result, status, process_item=None):
    with self.mutex:
        prompt = self.currently_running.pop(item_id)
        
        # Limit history size
        if len(self.history) > MAXIMUM_HISTORY_SIZE:
            self.history.pop(next(iter(self.history)))
        
        status_dict = None
        if status is not None:
            status_dict = copy.deepcopy(status._asdict())
        
        if process_item is not None:
            prompt = process_item(prompt)
        
        self.history[prompt[1]] = {
            "prompt": prompt,
            "outputs": {},
            'status': status_dict,
        }
        self.history[prompt[1]].update(history_result)

Interrupt Handling

Executions can be interrupted gracefully:
def interrupt_processing(value=True):
    comfy.model_management.interrupt_current_processing(value)

def before_node_execution():
    comfy.model_management.throw_exception_if_processing_interrupted()
From execution.py:593-601, interrupts are handled specially:
except comfy.model_management.InterruptProcessingException as iex:
    logging.info("Processing interrupted")
    
    error_details = {
        "node_id": real_node_id,
    }
    
    return (ExecutionResult.FAILURE, error_details, iex)
When interrupted:
if isinstance(ex, comfy.model_management.InterruptProcessingException):
    mes = {
        "prompt_id": prompt_id,
        "node_id": node_id,
        "node_type": class_type,
        "executed": list(executed),
    }
    self.add_message("execution_interrupted", mes, broadcast=True)

Error Handling

Exception Processing

From execution.py:602-631:
except Exception as ex:
    typ, _, tb = sys.exc_info()
    exception_type = full_type_name(typ)
    
    input_data_formatted = {}
    if input_data_all is not None:
        for name, inputs in input_data_all.items():
            input_data_formatted[name] = [format_value(x) for x in inputs]
    
    logging.error(f"!!! Exception during processing !!! {ex}")
    logging.error(traceback.format_exc())
    
    tips = ""
    if isinstance(ex, comfy.model_management.OOM_EXCEPTION):
        tips = ("This error means you ran out of memory on your GPU.\n\n"
               "TIPS: If the workflow worked before you might have "
               "accidentally set the batch_size to a large number.")
        logging.info(f"Memory summary: {comfy.model_management.debug_memory_summary()}")
        comfy.model_management.unload_all_models()
    
    error_details = {
        "node_id": real_node_id,
        "exception_message": f"{ex}\n{tips}",
        "exception_type": exception_type,
        "traceback": traceback.format_tb(tb),
        "current_inputs": input_data_formatted
    }
    
    return (ExecutionResult.FAILURE, error_details, ex)
OOM Handling: Out-of-memory errors trigger automatic model unloading to free VRAM.

Progress Tracking

The execution system tracks and reports progress:
get_progress_state().start_progress(unique_id)

# ... node execution ...

get_progress_state().finish_progress(unique_id)

Message Broadcasting

Events are broadcast to clients:
def add_message(self, event, data: dict, broadcast: bool):
    data = {
        **data,
        "timestamp": int(time.time() * 1000),
    }
    self.status_messages.append((event, data))
    
    if self.server.client_id is not None or broadcast:
        self.server.send_sync(event, data, self.server.client_id)

Event Types

execution_start

Execution begins

execution_cached

Cached nodes identified

executing

Node started execution

executed

Node completed with output

execution_error

Error occurred during execution

execution_interrupted

User interrupted execution

execution_success

Workflow completed successfully

Performance Optimization

Choose the right cache type for your use case:
  • Classic: Fast but uses more memory
  • LRU: Bounded memory usage
  • RAM Pressure: Adaptive based on available RAM
  • None: Minimal memory for large batches
Use async nodes for:
  • Network requests
  • File I/O operations
  • Long-running computations
  • Parallel processing
Implement check_lazy_status to:
  • Defer expensive inputs
  • Save memory on unused branches
  • Enable conditional execution
Use INPUT_IS_LIST and OUTPUT_IS_LIST for:
  • Processing multiple images
  • Aggregating results
  • Parallel GPU operations

See Also

Workflows

Workflow structure and format

Nodes

Node system and implementation

Models

Model loading and caching

Build docs developers (and LLMs) love