ComfyUI API 接入指南
概述
ComfyUI 是一个基于节点的工作流界面,用于 Stable Diffusion 和其他 AI 模型的可视化操作。它提供了强大的 REST API 接口,允许开发者将 ComfyUI 的功能集成到自己的应用程序中。本文将详细介绍 ComfyUI 的 API 接口、使用方法以及如何接入到现有系统中。
ComfyUI 简介
ComfyUI 是一个模块化的 Stable Diffusion 图形用户界面和后端,具有以下特点:
- 节点化工作流: 通过拖拽节点构建复杂的图像生成流程
- 高度可定制: 支持自定义节点和扩展
- API 驱动: 提供完整的 REST API 接口
- 高性能: 支持批处理和队列管理
- 开源免费: 完全开源,可自由使用和修改
API 接口概览
ComfyUI 提供了以下主要的 API 接口:
接口 | 方法 | 路径 | 作用 |
---|---|---|---|
系统信息 | GET | /system_stats | 获取系统状态和资源使用情况 |
队列状态 | GET | /queue | 获取当前队列状态 |
历史记录 | GET | /history | 获取任务执行历史 |
历史记录详情 | GET | /history/{prompt_id} | 获取特定任务的详细信息 |
提交任务 | POST | /prompt | 提交新的工作流任务 |
取消任务 | POST | /queue | 取消队列中的任务 |
上传文件 | POST | /upload/image | 上传图片文件 |
获取图片 | GET | /view | 获取生成的图片 |
获取模型列表 | GET | /object_info | 获取可用的模型和节点信息 |
中断任务 | POST | /interrupt | 中断当前正在执行的任务 |
详细接口说明
1. 系统信息接口
获取系统状态
GET /system_stats
响应示例:
{
"system": {
"os": "linux",
"python_version": "3.10.6",
"devices": [
{
"name": "NVIDIA GeForce RTX 4090",
"type": "cuda",
"index": 0,
"memory_total": 24576,
"memory_free": 20480
}
]
},
"queue": {
"running": 1,
"pending": 3
}
}
用途:
- 监控系统资源使用情况
- 检查 GPU 内存状态
- 了解队列负载情况
2. 队列管理接口
获取队列状态
GET /queue
响应示例:
{
"queue_running": [
{
"prompt_id": "12345-67890",
"status": "running",
"workflow": {...}
}
],
"queue_pending": [
{
"prompt_id": "12345-67891",
"status": "pending",
"workflow": {...}
}
]
}
取消任务
POST /queue
Content-Type: application/json
{
"delete": ["12345-67890"]
}
用途:
- 监控任务执行状态
- 管理任务队列
- 取消不需要的任务
3. 历史记录接口
获取历史记录
GET /history
响应示例:
{
"12345-67890": {
"status": "success",
"workflow": {...},
"outputs": {
"3": {
"images": [
{
"filename": "ComfyUI_00001_.png",
"subfolder": "",
"type": "output"
}
]
}
}
}
}
获取特定任务详情
GET /history/{prompt_id}
用途:
- 查看任务执行结果
- 获取生成的图片信息
- 分析工作流执行情况
4. 任务提交接口
提交工作流任务
POST /prompt
Content-Type: application/json
{
"prompt": {
"1": {
"inputs": {
"text": "a beautiful landscape",
"clip": ["4", 1]
},
"class_type": "CLIPTextEncode"
},
"4": {
"inputs": {
"ckpt_name": "v1-5-pruned-emaonly.ckpt"
},
"class_type": "CheckpointLoaderSimple"
}
},
"client_id": "your_client_id"
}
响应示例:
{
"prompt_id": "12345-67890",
"number": 1,
"node_errors": {}
}
用途:
- 提交图像生成任务
- 执行自定义工作流
- 批量处理任务
5. 文件管理接口
上传图片
POST /upload/image
Content-Type: multipart/form-data
file: [binary data]
overwrite: true
响应示例:
{
"name": "uploaded_image.png",
"subfolder": "",
"type": "input"
}
获取图片
GET /view?filename=ComfyUI_00001_.png&type=output&subfolder=
用途:
- 上传输入图片
- 下载生成的图片
- 管理文件资源
6. 模型信息接口
获取可用模型
GET /object_info
响应示例:
{
"CheckpointLoaderSimple": {
"input": {
"required": {
"ckpt_name": ["v1-5-pruned-emaonly.ckpt", "v1-5-pruned.ckpt"]
}
},
"output": ["MODEL", "CLIP", "VAE"],
"output_name": ["MODEL", "CLIP", "VAE"]
}
}
用途:
- 获取可用的模型列表
- 了解节点参数要求
- 构建工作流配置
7. 任务控制接口
中断当前任务
POST /interrupt
用途:
- 紧急停止当前任务
- 释放系统资源
- 处理异常情况
接入系统实现
1. Python 客户端实现
import requests
import json
import time
import uuid
from typing import Dict, Any, Optional
class ComfyUIClient:
def __init__(self, base_url: str = "http://localhost:8188"):
self.base_url = base_url.rstrip('/')
self.client_id = str(uuid.uuid4())
def get_system_stats(self) -> Dict[str, Any]:
"""获取系统状态"""
response = requests.get(f"{self.base_url}/system_stats")
return response.json()
def get_queue_status(self) -> Dict[str, Any]:
"""获取队列状态"""
response = requests.get(f"{self.base_url}/queue")
return response.json()
def submit_workflow(self, workflow: Dict[str, Any]) -> str:
"""提交工作流任务"""
payload = {
"prompt": workflow,
"client_id": self.client_id
}
response = requests.post(
f"{self.base_url}/prompt",
json=payload
)
result = response.json()
return result["prompt_id"]
def get_history(self, prompt_id: Optional[str] = None) -> Dict[str, Any]:
"""获取历史记录"""
if prompt_id:
response = requests.get(f"{self.base_url}/history/{prompt_id}")
else:
response = requests.get(f"{self.base_url}/history")
return response.json()
def upload_image(self, image_path: str) -> Dict[str, Any]:
"""上传图片"""
with open(image_path, 'rb') as f:
files = {'image': f}
data = {'overwrite': 'true'}
response = requests.post(
f"{self.base_url}/upload/image",
files=files,
data=data
)
return response.json()
def download_image(self, filename: str, subfolder: str = "",
image_type: str = "output") -> bytes:
"""下载图片"""
params = {
'filename': filename,
'subfolder': subfolder,
'type': image_type
}
response = requests.get(f"{self.base_url}/view", params=params)
return response.content
def wait_for_completion(self, prompt_id: str, timeout: int = 300) -> Dict[str, Any]:
"""等待任务完成"""
start_time = time.time()
while time.time() - start_time < timeout:
history = self.get_history(prompt_id)
if prompt_id in history:
status = history[prompt_id]["status"]
if status in ["success", "error"]:
return history[prompt_id]
time.sleep(1)
raise TimeoutError(f"Task {prompt_id} did not complete within {timeout} seconds")
def cancel_task(self, prompt_id: str) -> bool:
"""取消任务"""
payload = {"delete": [prompt_id]}
response = requests.post(f"{self.base_url}/queue", json=payload)
return response.status_code == 200
2. 工作流构建器
class WorkflowBuilder:
def __init__(self):
self.workflow = {}
self.node_counter = 1
def add_checkpoint_loader(self, ckpt_name: str) -> int:
"""添加检查点加载器"""
node_id = self.node_counter
self.workflow[str(node_id)] = {
"inputs": {"ckpt_name": ckpt_name},
"class_type": "CheckpointLoaderSimple"
}
self.node_counter += 1
return node_id
def add_text_encoder(self, text: str, clip_node: int) -> int:
"""添加文本编码器"""
node_id = self.node_counter
self.workflow[str(node_id)] = {
"inputs": {
"text": text,
"clip": [str(clip_node), 1]
},
"class_type": "CLIPTextEncode"
}
self.node_counter += 1
return node_id
def add_ksampler(self, model_node: int, positive_node: int,
negative_node: int, latent_node: int) -> int:
"""添加 KSampler"""
node_id = self.node_counter
self.workflow[str(node_id)] = {
"inputs": {
"seed": 123456,
"steps": 20,
"cfg": 7.0,
"sampler_name": "euler",
"scheduler": "normal",
"denoise": 1.0,
"model": [str(model_node), 0],
"positive": [str(positive_node), 0],
"negative": [str(negative_node), 0],
"latent_image": [str(latent_node), 0]
},
"class_type": "KSampler"
}
self.node_counter += 1
return node_id
def add_vae_decode(self, samples_node: int, vae_node: int) -> int:
"""添加 VAE 解码器"""
node_id = self.node_counter
self.workflow[str(node_id)] = {
"inputs": {
"samples": [str(samples_node), 0],
"vae": [str(vae_node), 2]
},
"class_type": "VAEDecode"
}
self.node_counter += 1
return node_id
def add_save_image(self, image_node: int) -> int:
"""添加图片保存节点"""
node_id = self.node_counter
self.workflow[str(node_id)] = {
"inputs": {
"filename_prefix": "ComfyUI",
"images": [str(image_node), 0]
},
"class_type": "SaveImage"
}
self.node_counter += 1
return node_id
def build(self) -> Dict[str, Any]:
"""构建工作流"""
return self.workflow
3. 使用示例
def generate_image(prompt: str, negative_prompt: str = "") -> str:
"""生成图片的完整示例"""
# 创建客户端
client = ComfyUIClient("http://localhost:8188")
# 构建工作流
builder = WorkflowBuilder()
# 添加节点
checkpoint_node = builder.add_checkpoint_loader("v1-5-pruned-emaonly.ckpt")
positive_node = builder.add_text_encoder(prompt, checkpoint_node)
negative_node = builder.add_text_encoder(negative_prompt, checkpoint_node)
# 添加空潜在图像
empty_latent_node = builder.node_counter
builder.workflow[str(empty_latent_node)] = {
"inputs": {
"width": 512,
"height": 512,
"batch_size": 1
},
"class_type": "EmptyLatentImage"
}
builder.node_counter += 1
# 添加采样器
sampler_node = builder.add_ksampler(
checkpoint_node, positive_node, negative_node, empty_latent_node
)
# 添加 VAE 解码
vae_decode_node = builder.add_vae_decode(sampler_node, checkpoint_node)
# 添加保存节点
save_node = builder.add_save_image(vae_decode_node)
# 提交任务
workflow = builder.build()
prompt_id = client.submit_workflow(workflow)
print(f"任务已提交,ID: {prompt_id}")
# 等待完成
result = client.wait_for_completion(prompt_id)
if result["status"] == "success":
# 获取生成的图片
outputs = result["outputs"]
for node_id, output in outputs.items():
if "images" in output:
for image_info in output["images"]:
filename = image_info["filename"]
image_data = client.download_image(filename)
# 保存图片
with open(f"generated_{filename}", "wb") as f:
f.write(image_data)
print(f"图片已保存: generated_{filename}")
return f"generated_{filename}"
else:
print(f"任务失败: {result}")
return None
# 使用示例
if __name__ == "__main__":
image_path = generate_image("a beautiful sunset over mountains")
if image_path:
print(f"图片生成成功: {image_path}")
4. 异步处理实现
import asyncio
import aiohttp
from typing import AsyncGenerator
class AsyncComfyUIClient:
def __init__(self, base_url: str = "http://localhost:8188"):
self.base_url = base_url.rstrip('/')
self.client_id = str(uuid.uuid4())
async def submit_workflow_async(self, workflow: Dict[str, Any]) -> str:
"""异步提交工作流"""
async with aiohttp.ClientSession() as session:
payload = {
"prompt": workflow,
"client_id": self.client_id
}
async with session.post(
f"{self.base_url}/prompt",
json=payload
) as response:
result = await response.json()
return result["prompt_id"]
async def poll_completion(self, prompt_id: str) -> AsyncGenerator[Dict[str, Any], None]:
"""轮询任务状态"""
async with aiohttp.ClientSession() as session:
while True:
async with session.get(f"{self.base_url}/history/{prompt_id}") as response:
history = await response.json()
if prompt_id in history:
status = history[prompt_id]["status"]
yield history[prompt_id]
if status in ["success", "error"]:
break
await asyncio.sleep(1)
# 异步使用示例
async def generate_image_async(prompt: str):
client = AsyncComfyUIClient()
# 构建工作流(使用之前的 WorkflowBuilder)
builder = WorkflowBuilder()
# ... 构建工作流 ...
workflow = builder.build()
# 提交任务
prompt_id = await client.submit_workflow_async(workflow)
print(f"任务已提交: {prompt_id}")
# 轮询状态
async for status in client.poll_completion(prompt_id):
print(f"当前状态: {status['status']}")
if status["status"] == "success":
# 处理成功结果
print("任务完成!")
break
elif status["status"] == "error":
print("任务失败!")
break
# 运行异步任务
asyncio.run(generate_image_async("a beautiful landscape"))
集成到现有系统
1. 微服务架构集成
from flask import Flask, request, jsonify
from celery import Celery
import redis
app = Flask(__name__)
celery = Celery('comfyui_service', broker='redis://localhost:6379')
redis_client = redis.Redis(host='localhost', port=6379, db=0)
class ComfyUIService:
def __init__(self):
self.client = ComfyUIClient()
def generate_image_task(self, prompt: str, user_id: str) -> str:
"""生成图片任务"""
try:
# 构建工作流
builder = WorkflowBuilder()
# ... 构建工作流 ...
workflow = builder.build()
# 提交任务
prompt_id = self.client.submit_workflow(workflow)
# 存储任务信息
redis_client.hset(f"task:{prompt_id}", mapping={
"user_id": user_id,
"prompt": prompt,
"status": "pending",
"created_at": str(time.time())
})
return prompt_id
except Exception as e:
print(f"任务提交失败: {e}")
raise
def get_task_status(self, prompt_id: str) -> Dict[str, Any]:
"""获取任务状态"""
task_info = redis_client.hgetall(f"task:{prompt_id}")
if not task_info:
return {"error": "任务不存在"}
# 获取 ComfyUI 状态
history = self.client.get_history(prompt_id)
if prompt_id in history:
task_info["comfyui_status"] = history[prompt_id]["status"]
return task_info
@app.route('/api/generate', methods=['POST'])
def generate_image():
"""生成图片 API"""
data = request.get_json()
prompt = data.get('prompt')
user_id = data.get('user_id')
if not prompt or not user_id:
return jsonify({"error": "缺少必要参数"}), 400
try:
service = ComfyUIService()
prompt_id = service.generate_image_task(prompt, user_id)
return jsonify({
"success": True,
"task_id": prompt_id,
"message": "任务已提交"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/status/<prompt_id>', methods=['GET'])
def get_status(prompt_id):
"""获取任务状态 API"""
try:
service = ComfyUIService()
status = service.get_task_status(prompt_id)
return jsonify(status)
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)
2. 队列管理系统
import threading
import queue
from datetime import datetime
class TaskQueueManager:
def __init__(self, max_workers: int = 3):
self.task_queue = queue.Queue()
self.max_workers = max_workers
self.workers = []
self.running = False
self.client = ComfyUIClient()
def start(self):
"""启动队列管理器"""
self.running = True
for i in range(self.max_workers):
worker = threading.Thread(target=self._worker, daemon=True)
worker.start()
self.workers.append(worker)
def stop(self):
"""停止队列管理器"""
self.running = False
for worker in self.workers:
worker.join()
def submit_task(self, task_data: Dict[str, Any]) -> str:
"""提交任务到队列"""
task_id = str(uuid.uuid4())
task = {
"id": task_id,
"data": task_data,
"status": "queued",
"created_at": datetime.now(),
"result": None
}
self.task_queue.put(task)
return task_id
def _worker(self):
"""工作线程"""
while self.running:
try:
task = self.task_queue.get(timeout=1)
self._process_task(task)
self.task_queue.task_done()
except queue.Empty:
continue
def _process_task(self, task: Dict[str, Any]):
"""处理任务"""
try:
task["status"] = "processing"
# 构建工作流
builder = WorkflowBuilder()
# ... 根据任务数据构建工作流 ...
workflow = builder.build()
# 提交到 ComfyUI
prompt_id = self.client.submit_workflow(workflow)
# 等待完成
result = self.client.wait_for_completion(prompt_id)
if result["status"] == "success":
task["status"] = "completed"
task["result"] = result
else:
task["status"] = "failed"
task["error"] = result.get("error", "未知错误")
except Exception as e:
task["status"] = "failed"
task["error"] = str(e)
3. 监控和日志系统
import logging
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Prometheus 指标
TASK_COUNTER = Counter('comfyui_tasks_total', 'Total tasks processed', ['status'])
TASK_DURATION = Histogram('comfyui_task_duration_seconds', 'Task duration')
QUEUE_SIZE = Gauge('comfyui_queue_size', 'Current queue size')
class ComfyUIMonitor:
def __init__(self):
self.logger = logging.getLogger('comfyui')
self.setup_logging()
def setup_logging(self):
"""设置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('comfyui.log'),
logging.StreamHandler()
]
)
def log_task_start(self, task_id: str, prompt: str):
"""记录任务开始"""
self.logger.info(f"任务开始: {task_id}, 提示词: {prompt}")
TASK_COUNTER.labels(status='started').inc()
def log_task_complete(self, task_id: str, duration: float, success: bool):
"""记录任务完成"""
status = 'success' if success else 'failed'
self.logger.info(f"任务完成: {task_id}, 耗时: {duration:.2f}s, 状态: {status}")
TASK_COUNTER.labels(status=status).inc()
TASK_DURATION.observe(duration)
def log_error(self, task_id: str, error: str):
"""记录错误"""
self.logger.error(f"任务错误: {task_id}, 错误: {error}")
TASK_COUNTER.labels(status='error').inc()
def update_queue_size(self, size: int):
"""更新队列大小"""
QUEUE_SIZE.set(size)
# 启动监控服务
def start_monitoring(port: int = 8000):
"""启动 Prometheus 监控"""
start_http_server(port)
print(f"监控服务启动在端口 {port}")
最佳实践和注意事项
1. 错误处理
class ComfyUIError(Exception):
"""ComfyUI 相关错误"""
pass
class ComfyUIClient:
def submit_workflow_with_retry(self, workflow: Dict[str, Any],
max_retries: int = 3) -> str:
"""带重试的提交工作流"""
for attempt in range(max_retries):
try:
return self.submit_workflow(workflow)
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise ComfyUIError(f"提交失败,已重试 {max_retries} 次: {e}")
time.sleep(2 ** attempt) # 指数退避
2. 资源管理
class ResourceManager:
def __init__(self, max_concurrent_tasks: int = 5):
self.semaphore = threading.Semaphore(max_concurrent_tasks)
self.active_tasks = {}
def acquire_task_slot(self, task_id: str) -> bool:
"""获取任务槽位"""
if self.semaphore.acquire(blocking=False):
self.active_tasks[task_id] = time.time()
return True
return False
def release_task_slot(self, task_id: str):
"""释放任务槽位"""
if task_id in self.active_tasks:
del self.active_tasks[task_id]
self.semaphore.release()
3. 配置管理
import yaml
from dataclasses import dataclass
@dataclass
class ComfyUIConfig:
base_url: str = "http://localhost:8188"
max_retries: int = 3
timeout: int = 300
max_concurrent_tasks: int = 5
default_model: str = "v1-5-pruned-emaonly.ckpt"
@classmethod
def from_file(cls, config_path: str) -> 'ComfyUIConfig':
"""从配置文件加载"""
with open(config_path, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f)
return cls(**config_data)
def save_to_file(self, config_path: str):
"""保存到配置文件"""
config_dict = {
'base_url': self.base_url,
'max_retries': self.max_retries,
'timeout': self.timeout,
'max_concurrent_tasks': self.max_concurrent_tasks,
'default_model': self.default_model
}
with open(config_path, 'w', encoding='utf-8') as f:
yaml.dump(config_dict, f, default_flow_style=False)
总结
ComfyUI 提供了强大而灵活的 API 接口,可以轻松集成到现有的应用程序中。通过合理使用这些接口,开发者可以:
- 构建图像生成服务: 将 AI 图像生成能力集成到应用中
- 实现批量处理: 高效处理大量图像生成任务
- 创建自定义工作流: 根据需求构建复杂的图像处理流程
- 监控和管理: 实时监控任务状态和系统资源
关键要点
- 选择合适的接口: 根据需求选择相应的 API 接口
- 实现错误处理: 添加完善的错误处理和重试机制
- 资源管理: 合理管理并发任务和系统资源
- 监控和日志: 实现完整的监控和日志系统
- 配置管理: 使用配置文件管理各种参数
通过本文提供的指南和示例代码,开发者可以快速上手 ComfyUI API 的集成工作,构建出功能强大、稳定可靠的图像生成服务。
本文档基于 ComfyUI 最新版本编写,API 接口可能会随版本更新而变化。建议参考官方文档获取最新信息。