Overview
ComfyUI’s execution system is a sophisticated asynchronous engine that manages workflow execution, node processing, caching, and resource management. It handles everything from input validation to output delivery with support for interrupts, dynamic graphs, and efficient memory usage.
Execution Pipeline
The execution pipeline consists of several key stages:
Prompt Validation
Validate workflow structure and inputs using validate_prompt()
Queue Addition
Add validated prompts to the execution queue with priority
Cache Initialization
Set up caching system and check for cached results
Node Execution
Execute nodes in dependency order with async support
Result Collection
Gather outputs and send to client
Prompt Executor
The PromptExecutor class orchestrates workflow execution:
class PromptExecutor :
def __init__ ( self , server , cache_type = False , cache_args = None ):
self .cache_args = cache_args
self .cache_type = cache_type
self .server = server
self .reset()
def reset ( self ):
self .caches = CacheSet( cache_type = self .cache_type,
cache_args = self .cache_args)
self .status_messages = []
self .success = True
async def execute_async ( self , prompt , prompt_id , extra_data = {},
execute_outputs = []):
set_preview_method(extra_data.get( "preview_method" ))
nodes.interrupt_processing( False )
if "client_id" in extra_data:
self .server.client_id = extra_data[ "client_id" ]
self .status_messages = []
self .add_message( "execution_start" ,
{ "prompt_id" : prompt_id},
broadcast = False )
with torch.inference_mode():
dynamic_prompt = DynamicPrompt(prompt)
reset_progress_state(prompt_id, dynamic_prompt)
add_progress_handler(WebUIProgressHandler( self .server))
is_changed_cache = IsChangedCache(
prompt_id, dynamic_prompt, self .caches.outputs
)
for cache in self .caches.all:
await cache.set_prompt(
dynamic_prompt, prompt.keys(), is_changed_cache
)
cache.clean_unused()
# ... execution loop ...
Execution Loop
From execution.py:730-750:
execution_list = ExecutionList(dynamic_prompt, self .caches.outputs)
current_outputs = self .caches.outputs.all_node_ids()
for node_id in list (execute_outputs):
execution_list.add_node(node_id)
while not execution_list.is_empty():
node_id, error, ex = await execution_list.stage_node_execution()
if error is not None :
self .handle_execution_error(
prompt_id, dynamic_prompt.original_prompt,
current_outputs, executed, error, ex
)
break
result, error, ex = await execute(
self .server, dynamic_prompt, self .caches,
node_id, extra_data, executed, prompt_id,
execution_list, pending_subgraph_results,
pending_async_nodes, ui_node_outputs
)
if result == ExecutionResult. FAILURE :
self .handle_execution_error( ... )
break
elif result == ExecutionResult. PENDING :
execution_list.unstage_node_execution()
else : # SUCCESS
execution_list.complete_node_execution()
self .caches.outputs.poll( ram_headroom = self .cache_args[ "ram" ])
Execution Results
Three possible execution results:
class ExecutionResult ( Enum ):
SUCCESS = 0 # Node completed successfully
FAILURE = 1 # Node failed with error
PENDING = 2 # Node waiting for async/lazy inputs
Cache System
Cache Types
ComfyUI supports multiple caching strategies:
class CacheType ( Enum ):
CLASSIC = 0 # Aggressive dumping
LRU = 1 # Least Recently Used
NONE = 2 # No caching
RAM_PRESSURE = 3 # Based on available RAM
CacheSet Implementation
From execution.py:109-143:
class CacheSet :
def __init__ ( self , cache_type = None , cache_args = {}):
if cache_type == CacheType. NONE :
self .init_null_cache()
logging.info( "Disabling intermediate node cache." )
elif cache_type == CacheType. RAM_PRESSURE :
cache_ram = cache_args.get( "ram" , 16.0 )
self .init_ram_cache(cache_ram)
logging.info( "Using RAM pressure cache." )
elif cache_type == CacheType. LRU :
cache_size = cache_args.get( "lru" , 0 )
self .init_lru_cache(cache_size)
logging.info( "Using LRU cache" )
else :
self .init_classic_cache()
self .all = [ self .outputs, self .objects]
def init_classic_cache ( self ):
self .outputs = HierarchicalCache(CacheKeySetInputSignature)
self .objects = HierarchicalCache(CacheKeySetID)
def init_lru_cache ( self , cache_size ):
self .outputs = LRUCache(CacheKeySetInputSignature, max_size = cache_size)
self .objects = HierarchicalCache(CacheKeySetID)
def init_ram_cache ( self , min_headroom ):
self .outputs = RAMPressureCache(CacheKeySetInputSignature)
self .objects = HierarchicalCache(CacheKeySetID)
Two-Level Caching : The system maintains separate caches for outputs (based on inputs) and objects (based on node ID) for optimal performance.
Cache Entry Structure
class CacheEntry ( NamedTuple ):
ui: dict # UI-specific data
outputs: list # Node output values
IS_CHANGED Cache
Tracks whether node inputs have changed:
class IsChangedCache :
def __init__ ( self , prompt_id : str , dynprompt : DynamicPrompt,
outputs_cache : BasicCache):
self .prompt_id = prompt_id
self .dynprompt = dynprompt
self .outputs_cache = outputs_cache
self .is_changed = {}
async def get ( self , node_id ):
if node_id in self .is_changed:
return self .is_changed[node_id]
node = self .dynprompt.get_node(node_id)
class_type = node[ "class_type" ]
class_def = nodes. NODE_CLASS_MAPPINGS [class_type]
# Check if node has IS_CHANGED method
if hasattr (class_def, "IS_CHANGED" ):
input_data_all, _, v3_data = get_input_data(
node[ "inputs" ], class_def, node_id, None
)
is_changed = await _async_map_node_over_list(
self .prompt_id, node_id, class_def,
input_data_all, "IS_CHANGED" , v3_data = v3_data
)
is_changed = await resolve_map_node_over_list_results(is_changed)
node[ "is_changed" ] = [x for x in is_changed]
else :
self .is_changed[node_id] = False
return self .is_changed[node_id]
Node Execution Function
The core execute() function handles individual node execution:
async def execute ( server , dynprompt , caches , current_item , extra_data ,
executed , prompt_id , execution_list ,
pending_subgraph_results , pending_async_nodes , ui_outputs ):
unique_id = current_item
real_node_id = dynprompt.get_real_node_id(unique_id)
display_node_id = dynprompt.get_display_node_id(unique_id)
parent_node_id = dynprompt.get_parent_node_id(unique_id)
inputs = dynprompt.get_node(unique_id)[ 'inputs' ]
class_type = dynprompt.get_node(unique_id)[ 'class_type' ]
class_def = nodes. NODE_CLASS_MAPPINGS [class_type]
# Check cache first
cached = caches.outputs.get(unique_id)
if cached is not None :
if server.client_id is not None :
server.send_sync( "executed" , {
"node" : unique_id,
"display_node" : display_node_id,
"output" : cached.ui.get( "output" , None ),
"prompt_id" : prompt_id
}, server.client_id)
get_progress_state().finish_progress(unique_id)
execution_list.cache_update(unique_id, cached)
return (ExecutionResult. SUCCESS , None , None )
# ... execution logic ...
Lazy Evaluation
From execution.py:486-499:
if lazy_status_present:
required_inputs = await _async_map_node_over_list(
prompt_id, unique_id, obj, input_data_all,
"check_lazy_status" , allow_interrupt = True , v3_data = v3_data_lazy
)
required_inputs = await resolve_map_node_over_list_results(required_inputs)
required_inputs = set ( sum ([r for r in required_inputs if isinstance (r, list )], []))
required_inputs = [x for x in required_inputs
if x not in input_data_all or x in missing_keys]
if len (required_inputs) > 0 :
for i in required_inputs:
execution_list.make_input_strong_link(unique_id, i)
return (ExecutionResult. PENDING , None , None )
Lazy Loading : Nodes with check_lazy_status can defer loading inputs until actually needed, saving memory and computation.
Async Task Handling
From execution.py:532-540:
if has_pending_tasks:
pending_async_nodes[unique_id] = output_data
unblock = execution_list.add_external_block(unique_id)
async def await_completion ():
tasks = [x for x in output_data if isinstance (x, asyncio.Task)]
await asyncio.gather( * tasks, return_exceptions = True )
unblock()
asyncio.create_task(await_completion())
return (ExecutionResult. PENDING , None , None )
Execution Queue
The PromptQueue manages pending executions:
class PromptQueue :
def __init__ ( self , server ):
self .server = server
self .mutex = threading.RLock()
self .not_empty = threading.Condition( self .mutex)
self .task_counter = 0
self .queue = [] # Heap for priority
self .currently_running = {}
self .history = {}
self .flags = {}
def put ( self , item ):
with self .mutex:
heapq.heappush( self .queue, item)
self .server.queue_updated()
self .not_empty.notify()
def get ( self , timeout = None ):
with self .not_empty:
while len ( self .queue) == 0 :
self .not_empty.wait( timeout = timeout)
if timeout is not None and len ( self .queue) == 0 :
return None
item = heapq.heappop( self .queue)
i = self .task_counter
self .currently_running[i] = copy.deepcopy(item)
self .task_counter += 1
self .server.queue_updated()
return (item, i)
Task Completion
class ExecutionStatus ( NamedTuple ):
status_str: Literal[ 'success' , 'error' ]
completed: bool
messages: List[ str ]
def task_done ( self , item_id , history_result , status , process_item = None ):
with self .mutex:
prompt = self .currently_running.pop(item_id)
# Limit history size
if len ( self .history) > MAXIMUM_HISTORY_SIZE :
self .history.pop( next ( iter ( self .history)))
status_dict = None
if status is not None :
status_dict = copy.deepcopy(status._asdict())
if process_item is not None :
prompt = process_item(prompt)
self .history[prompt[ 1 ]] = {
"prompt" : prompt,
"outputs" : {},
'status' : status_dict,
}
self .history[prompt[ 1 ]].update(history_result)
Interrupt Handling
Executions can be interrupted gracefully:
def interrupt_processing ( value = True ):
comfy.model_management.interrupt_current_processing(value)
def before_node_execution ():
comfy.model_management.throw_exception_if_processing_interrupted()
From execution.py:593-601, interrupts are handled specially:
except comfy.model_management.InterruptProcessingException as iex:
logging.info( "Processing interrupted" )
error_details = {
"node_id" : real_node_id,
}
return (ExecutionResult. FAILURE , error_details, iex)
When interrupted:
if isinstance (ex, comfy.model_management.InterruptProcessingException):
mes = {
"prompt_id" : prompt_id,
"node_id" : node_id,
"node_type" : class_type,
"executed" : list (executed),
}
self .add_message( "execution_interrupted" , mes, broadcast = True )
Error Handling
Exception Processing
From execution.py:602-631:
except Exception as ex:
typ, _, tb = sys.exc_info()
exception_type = full_type_name(typ)
input_data_formatted = {}
if input_data_all is not None :
for name, inputs in input_data_all.items():
input_data_formatted[name] = [format_value(x) for x in inputs]
logging.error( f "!!! Exception during processing !!! { ex } " )
logging.error(traceback.format_exc())
tips = ""
if isinstance (ex, comfy.model_management. OOM_EXCEPTION ):
tips = ( "This error means you ran out of memory on your GPU. \n\n "
"TIPS: If the workflow worked before you might have "
"accidentally set the batch_size to a large number." )
logging.info( f "Memory summary: { comfy.model_management.debug_memory_summary() } " )
comfy.model_management.unload_all_models()
error_details = {
"node_id" : real_node_id,
"exception_message" : f " { ex } \n { tips } " ,
"exception_type" : exception_type,
"traceback" : traceback.format_tb(tb),
"current_inputs" : input_data_formatted
}
return (ExecutionResult. FAILURE , error_details, ex)
OOM Handling : Out-of-memory errors trigger automatic model unloading to free VRAM.
Progress Tracking
The execution system tracks and reports progress:
get_progress_state().start_progress(unique_id)
# ... node execution ...
get_progress_state().finish_progress(unique_id)
Message Broadcasting
Events are broadcast to clients:
def add_message ( self , event , data : dict , broadcast : bool ):
data = {
** data,
"timestamp" : int (time.time() * 1000 ),
}
self .status_messages.append((event, data))
if self .server.client_id is not None or broadcast:
self .server.send_sync(event, data, self .server.client_id)
Event Types
execution_start Execution begins
execution_cached Cached nodes identified
executing Node started execution
executed Node completed with output
execution_error Error occurred during execution
execution_interrupted User interrupted execution
execution_success Workflow completed successfully
Choose the right cache type for your use case:
Classic : Fast but uses more memory
LRU : Bounded memory usage
RAM Pressure : Adaptive based on available RAM
None : Minimal memory for large batches
Use async nodes for:
Network requests
File I/O operations
Long-running computations
Parallel processing
Implement check_lazy_status to:
Defer expensive inputs
Save memory on unused branches
Enable conditional execution
Use INPUT_IS_LIST and OUTPUT_IS_LIST for:
Processing multiple images
Aggregating results
Parallel GPU operations
See Also
Workflows Workflow structure and format
Nodes Node system and implementation
Models Model loading and caching