行莫
行莫
发布于 2025-09-08 / 3 阅读
0
0

ComfyUI API 接入指南

ComfyUI API 接入指南

概述

ComfyUI 是一个基于节点的工作流界面,用于 Stable Diffusion 和其他 AI 模型的可视化操作。它提供了强大的 REST API 接口,允许开发者将 ComfyUI 的功能集成到自己的应用程序中。本文将详细介绍 ComfyUI 的 API 接口、使用方法以及如何接入到现有系统中。

ComfyUI 简介

ComfyUI 是一个模块化的 Stable Diffusion 图形用户界面和后端,具有以下特点:

  • 节点化工作流: 通过拖拽节点构建复杂的图像生成流程
  • 高度可定制: 支持自定义节点和扩展
  • API 驱动: 提供完整的 REST API 接口
  • 高性能: 支持批处理和队列管理
  • 开源免费: 完全开源,可自由使用和修改

API 接口概览

ComfyUI 提供了以下主要的 API 接口:

接口方法路径作用
系统信息GET/system_stats获取系统状态和资源使用情况
队列状态GET/queue获取当前队列状态
历史记录GET/history获取任务执行历史
历史记录详情GET/history/{prompt_id}获取特定任务的详细信息
提交任务POST/prompt提交新的工作流任务
取消任务POST/queue取消队列中的任务
上传文件POST/upload/image上传图片文件
获取图片GET/view获取生成的图片
获取模型列表GET/object_info获取可用的模型和节点信息
中断任务POST/interrupt中断当前正在执行的任务

详细接口说明

1. 系统信息接口

获取系统状态

GET /system_stats

响应示例:

{
  "system": {
    "os": "linux",
    "python_version": "3.10.6",
    "devices": [
      {
        "name": "NVIDIA GeForce RTX 4090",
        "type": "cuda",
        "index": 0,
        "memory_total": 24576,
        "memory_free": 20480
      }
    ]
  },
  "queue": {
    "running": 1,
    "pending": 3
  }
}

用途:

  • 监控系统资源使用情况
  • 检查 GPU 内存状态
  • 了解队列负载情况

2. 队列管理接口

获取队列状态

GET /queue

响应示例:

{
  "queue_running": [
    {
      "prompt_id": "12345-67890",
      "status": "running",
      "workflow": {...}
    }
  ],
  "queue_pending": [
    {
      "prompt_id": "12345-67891",
      "status": "pending",
      "workflow": {...}
    }
  ]
}

取消任务

POST /queue
Content-Type: application/json

{
  "delete": ["12345-67890"]
}

用途:

  • 监控任务执行状态
  • 管理任务队列
  • 取消不需要的任务

3. 历史记录接口

获取历史记录

GET /history

响应示例:

{
  "12345-67890": {
    "status": "success",
    "workflow": {...},
    "outputs": {
      "3": {
        "images": [
          {
            "filename": "ComfyUI_00001_.png",
            "subfolder": "",
            "type": "output"
          }
        ]
      }
    }
  }
}

获取特定任务详情

GET /history/{prompt_id}

用途:

  • 查看任务执行结果
  • 获取生成的图片信息
  • 分析工作流执行情况

4. 任务提交接口

提交工作流任务

POST /prompt
Content-Type: application/json

{
  "prompt": {
    "1": {
      "inputs": {
        "text": "a beautiful landscape",
        "clip": ["4", 1]
      },
      "class_type": "CLIPTextEncode"
    },
    "4": {
      "inputs": {
        "ckpt_name": "v1-5-pruned-emaonly.ckpt"
      },
      "class_type": "CheckpointLoaderSimple"
    }
  },
  "client_id": "your_client_id"
}

响应示例:

{
  "prompt_id": "12345-67890",
  "number": 1,
  "node_errors": {}
}

用途:

  • 提交图像生成任务
  • 执行自定义工作流
  • 批量处理任务

5. 文件管理接口

上传图片

POST /upload/image
Content-Type: multipart/form-data

file: [binary data]
overwrite: true

响应示例:

{
  "name": "uploaded_image.png",
  "subfolder": "",
  "type": "input"
}

获取图片

GET /view?filename=ComfyUI_00001_.png&type=output&subfolder=

用途:

  • 上传输入图片
  • 下载生成的图片
  • 管理文件资源

6. 模型信息接口

获取可用模型

GET /object_info

响应示例:

{
  "CheckpointLoaderSimple": {
    "input": {
      "required": {
        "ckpt_name": ["v1-5-pruned-emaonly.ckpt", "v1-5-pruned.ckpt"]
      }
    },
    "output": ["MODEL", "CLIP", "VAE"],
    "output_name": ["MODEL", "CLIP", "VAE"]
  }
}

用途:

  • 获取可用的模型列表
  • 了解节点参数要求
  • 构建工作流配置

7. 任务控制接口

中断当前任务

POST /interrupt

用途:

  • 紧急停止当前任务
  • 释放系统资源
  • 处理异常情况

接入系统实现

1. Python 客户端实现

import requests
import json
import time
import uuid
from typing import Dict, Any, Optional

class ComfyUIClient:
    def __init__(self, base_url: str = "http://localhost:8188"):
        self.base_url = base_url.rstrip('/')
        self.client_id = str(uuid.uuid4())
    
    def get_system_stats(self) -> Dict[str, Any]:
        """获取系统状态"""
        response = requests.get(f"{self.base_url}/system_stats")
        return response.json()
    
    def get_queue_status(self) -> Dict[str, Any]:
        """获取队列状态"""
        response = requests.get(f"{self.base_url}/queue")
        return response.json()
    
    def submit_workflow(self, workflow: Dict[str, Any]) -> str:
        """提交工作流任务"""
        payload = {
            "prompt": workflow,
            "client_id": self.client_id
        }
        response = requests.post(
            f"{self.base_url}/prompt",
            json=payload
        )
        result = response.json()
        return result["prompt_id"]
    
    def get_history(self, prompt_id: Optional[str] = None) -> Dict[str, Any]:
        """获取历史记录"""
        if prompt_id:
            response = requests.get(f"{self.base_url}/history/{prompt_id}")
        else:
            response = requests.get(f"{self.base_url}/history")
        return response.json()
    
    def upload_image(self, image_path: str) -> Dict[str, Any]:
        """上传图片"""
        with open(image_path, 'rb') as f:
            files = {'image': f}
            data = {'overwrite': 'true'}
            response = requests.post(
                f"{self.base_url}/upload/image",
                files=files,
                data=data
            )
        return response.json()
    
    def download_image(self, filename: str, subfolder: str = "", 
                      image_type: str = "output") -> bytes:
        """下载图片"""
        params = {
            'filename': filename,
            'subfolder': subfolder,
            'type': image_type
        }
        response = requests.get(f"{self.base_url}/view", params=params)
        return response.content
    
    def wait_for_completion(self, prompt_id: str, timeout: int = 300) -> Dict[str, Any]:
        """等待任务完成"""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            history = self.get_history(prompt_id)
            
            if prompt_id in history:
                status = history[prompt_id]["status"]
                if status in ["success", "error"]:
                    return history[prompt_id]
            
            time.sleep(1)
        
        raise TimeoutError(f"Task {prompt_id} did not complete within {timeout} seconds")
    
    def cancel_task(self, prompt_id: str) -> bool:
        """取消任务"""
        payload = {"delete": [prompt_id]}
        response = requests.post(f"{self.base_url}/queue", json=payload)
        return response.status_code == 200

2. 工作流构建器

class WorkflowBuilder:
    def __init__(self):
        self.workflow = {}
        self.node_counter = 1
    
    def add_checkpoint_loader(self, ckpt_name: str) -> int:
        """添加检查点加载器"""
        node_id = self.node_counter
        self.workflow[str(node_id)] = {
            "inputs": {"ckpt_name": ckpt_name},
            "class_type": "CheckpointLoaderSimple"
        }
        self.node_counter += 1
        return node_id
    
    def add_text_encoder(self, text: str, clip_node: int) -> int:
        """添加文本编码器"""
        node_id = self.node_counter
        self.workflow[str(node_id)] = {
            "inputs": {
                "text": text,
                "clip": [str(clip_node), 1]
            },
            "class_type": "CLIPTextEncode"
        }
        self.node_counter += 1
        return node_id
    
    def add_ksampler(self, model_node: int, positive_node: int, 
                    negative_node: int, latent_node: int) -> int:
        """添加 KSampler"""
        node_id = self.node_counter
        self.workflow[str(node_id)] = {
            "inputs": {
                "seed": 123456,
                "steps": 20,
                "cfg": 7.0,
                "sampler_name": "euler",
                "scheduler": "normal",
                "denoise": 1.0,
                "model": [str(model_node), 0],
                "positive": [str(positive_node), 0],
                "negative": [str(negative_node), 0],
                "latent_image": [str(latent_node), 0]
            },
            "class_type": "KSampler"
        }
        self.node_counter += 1
        return node_id
    
    def add_vae_decode(self, samples_node: int, vae_node: int) -> int:
        """添加 VAE 解码器"""
        node_id = self.node_counter
        self.workflow[str(node_id)] = {
            "inputs": {
                "samples": [str(samples_node), 0],
                "vae": [str(vae_node), 2]
            },
            "class_type": "VAEDecode"
        }
        self.node_counter += 1
        return node_id
    
    def add_save_image(self, image_node: int) -> int:
        """添加图片保存节点"""
        node_id = self.node_counter
        self.workflow[str(node_id)] = {
            "inputs": {
                "filename_prefix": "ComfyUI",
                "images": [str(image_node), 0]
            },
            "class_type": "SaveImage"
        }
        self.node_counter += 1
        return node_id
    
    def build(self) -> Dict[str, Any]:
        """构建工作流"""
        return self.workflow

3. 使用示例

def generate_image(prompt: str, negative_prompt: str = "") -> str:
    """生成图片的完整示例"""
    
    # 创建客户端
    client = ComfyUIClient("http://localhost:8188")
    
    # 构建工作流
    builder = WorkflowBuilder()
    
    # 添加节点
    checkpoint_node = builder.add_checkpoint_loader("v1-5-pruned-emaonly.ckpt")
    positive_node = builder.add_text_encoder(prompt, checkpoint_node)
    negative_node = builder.add_text_encoder(negative_prompt, checkpoint_node)
    
    # 添加空潜在图像
    empty_latent_node = builder.node_counter
    builder.workflow[str(empty_latent_node)] = {
        "inputs": {
            "width": 512,
            "height": 512,
            "batch_size": 1
        },
        "class_type": "EmptyLatentImage"
    }
    builder.node_counter += 1
    
    # 添加采样器
    sampler_node = builder.add_ksampler(
        checkpoint_node, positive_node, negative_node, empty_latent_node
    )
    
    # 添加 VAE 解码
    vae_decode_node = builder.add_vae_decode(sampler_node, checkpoint_node)
    
    # 添加保存节点
    save_node = builder.add_save_image(vae_decode_node)
    
    # 提交任务
    workflow = builder.build()
    prompt_id = client.submit_workflow(workflow)
    
    print(f"任务已提交,ID: {prompt_id}")
    
    # 等待完成
    result = client.wait_for_completion(prompt_id)
    
    if result["status"] == "success":
        # 获取生成的图片
        outputs = result["outputs"]
        for node_id, output in outputs.items():
            if "images" in output:
                for image_info in output["images"]:
                    filename = image_info["filename"]
                    image_data = client.download_image(filename)
                    
                    # 保存图片
                    with open(f"generated_{filename}", "wb") as f:
                        f.write(image_data)
                    
                    print(f"图片已保存: generated_{filename}")
                    return f"generated_{filename}"
    else:
        print(f"任务失败: {result}")
        return None

# 使用示例
if __name__ == "__main__":
    image_path = generate_image("a beautiful sunset over mountains")
    if image_path:
        print(f"图片生成成功: {image_path}")

4. 异步处理实现

import asyncio
import aiohttp
from typing import AsyncGenerator

class AsyncComfyUIClient:
    def __init__(self, base_url: str = "http://localhost:8188"):
        self.base_url = base_url.rstrip('/')
        self.client_id = str(uuid.uuid4())
    
    async def submit_workflow_async(self, workflow: Dict[str, Any]) -> str:
        """异步提交工作流"""
        async with aiohttp.ClientSession() as session:
            payload = {
                "prompt": workflow,
                "client_id": self.client_id
            }
            async with session.post(
                f"{self.base_url}/prompt",
                json=payload
            ) as response:
                result = await response.json()
                return result["prompt_id"]
    
    async def poll_completion(self, prompt_id: str) -> AsyncGenerator[Dict[str, Any], None]:
        """轮询任务状态"""
        async with aiohttp.ClientSession() as session:
            while True:
                async with session.get(f"{self.base_url}/history/{prompt_id}") as response:
                    history = await response.json()
                    
                    if prompt_id in history:
                        status = history[prompt_id]["status"]
                        yield history[prompt_id]
                        
                        if status in ["success", "error"]:
                            break
                    
                    await asyncio.sleep(1)

# 异步使用示例
async def generate_image_async(prompt: str):
    client = AsyncComfyUIClient()
    
    # 构建工作流(使用之前的 WorkflowBuilder)
    builder = WorkflowBuilder()
    # ... 构建工作流 ...
    workflow = builder.build()
    
    # 提交任务
    prompt_id = await client.submit_workflow_async(workflow)
    print(f"任务已提交: {prompt_id}")
    
    # 轮询状态
    async for status in client.poll_completion(prompt_id):
        print(f"当前状态: {status['status']}")
        
        if status["status"] == "success":
            # 处理成功结果
            print("任务完成!")
            break
        elif status["status"] == "error":
            print("任务失败!")
            break

# 运行异步任务
asyncio.run(generate_image_async("a beautiful landscape"))

集成到现有系统

1. 微服务架构集成

from flask import Flask, request, jsonify
from celery import Celery
import redis

app = Flask(__name__)
celery = Celery('comfyui_service', broker='redis://localhost:6379')
redis_client = redis.Redis(host='localhost', port=6379, db=0)

class ComfyUIService:
    def __init__(self):
        self.client = ComfyUIClient()
    
    def generate_image_task(self, prompt: str, user_id: str) -> str:
        """生成图片任务"""
        try:
            # 构建工作流
            builder = WorkflowBuilder()
            # ... 构建工作流 ...
            workflow = builder.build()
            
            # 提交任务
            prompt_id = self.client.submit_workflow(workflow)
            
            # 存储任务信息
            redis_client.hset(f"task:{prompt_id}", mapping={
                "user_id": user_id,
                "prompt": prompt,
                "status": "pending",
                "created_at": str(time.time())
            })
            
            return prompt_id
        except Exception as e:
            print(f"任务提交失败: {e}")
            raise
    
    def get_task_status(self, prompt_id: str) -> Dict[str, Any]:
        """获取任务状态"""
        task_info = redis_client.hgetall(f"task:{prompt_id}")
        if not task_info:
            return {"error": "任务不存在"}
        
        # 获取 ComfyUI 状态
        history = self.client.get_history(prompt_id)
        if prompt_id in history:
            task_info["comfyui_status"] = history[prompt_id]["status"]
        
        return task_info

@app.route('/api/generate', methods=['POST'])
def generate_image():
    """生成图片 API"""
    data = request.get_json()
    prompt = data.get('prompt')
    user_id = data.get('user_id')
    
    if not prompt or not user_id:
        return jsonify({"error": "缺少必要参数"}), 400
    
    try:
        service = ComfyUIService()
        prompt_id = service.generate_image_task(prompt, user_id)
        
        return jsonify({
            "success": True,
            "task_id": prompt_id,
            "message": "任务已提交"
        })
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/status/<prompt_id>', methods=['GET'])
def get_status(prompt_id):
    """获取任务状态 API"""
    try:
        service = ComfyUIService()
        status = service.get_task_status(prompt_id)
        return jsonify(status)
    except Exception as e:
        return jsonify({"error": str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True)

2. 队列管理系统

import threading
import queue
from datetime import datetime

class TaskQueueManager:
    def __init__(self, max_workers: int = 3):
        self.task_queue = queue.Queue()
        self.max_workers = max_workers
        self.workers = []
        self.running = False
        self.client = ComfyUIClient()
    
    def start(self):
        """启动队列管理器"""
        self.running = True
        for i in range(self.max_workers):
            worker = threading.Thread(target=self._worker, daemon=True)
            worker.start()
            self.workers.append(worker)
    
    def stop(self):
        """停止队列管理器"""
        self.running = False
        for worker in self.workers:
            worker.join()
    
    def submit_task(self, task_data: Dict[str, Any]) -> str:
        """提交任务到队列"""
        task_id = str(uuid.uuid4())
        task = {
            "id": task_id,
            "data": task_data,
            "status": "queued",
            "created_at": datetime.now(),
            "result": None
        }
        
        self.task_queue.put(task)
        return task_id
    
    def _worker(self):
        """工作线程"""
        while self.running:
            try:
                task = self.task_queue.get(timeout=1)
                self._process_task(task)
                self.task_queue.task_done()
            except queue.Empty:
                continue
    
    def _process_task(self, task: Dict[str, Any]):
        """处理任务"""
        try:
            task["status"] = "processing"
            
            # 构建工作流
            builder = WorkflowBuilder()
            # ... 根据任务数据构建工作流 ...
            workflow = builder.build()
            
            # 提交到 ComfyUI
            prompt_id = self.client.submit_workflow(workflow)
            
            # 等待完成
            result = self.client.wait_for_completion(prompt_id)
            
            if result["status"] == "success":
                task["status"] = "completed"
                task["result"] = result
            else:
                task["status"] = "failed"
                task["error"] = result.get("error", "未知错误")
                
        except Exception as e:
            task["status"] = "failed"
            task["error"] = str(e)

3. 监控和日志系统

import logging
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Prometheus 指标
TASK_COUNTER = Counter('comfyui_tasks_total', 'Total tasks processed', ['status'])
TASK_DURATION = Histogram('comfyui_task_duration_seconds', 'Task duration')
QUEUE_SIZE = Gauge('comfyui_queue_size', 'Current queue size')

class ComfyUIMonitor:
    def __init__(self):
        self.logger = logging.getLogger('comfyui')
        self.setup_logging()
    
    def setup_logging(self):
        """设置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('comfyui.log'),
                logging.StreamHandler()
            ]
        )
    
    def log_task_start(self, task_id: str, prompt: str):
        """记录任务开始"""
        self.logger.info(f"任务开始: {task_id}, 提示词: {prompt}")
        TASK_COUNTER.labels(status='started').inc()
    
    def log_task_complete(self, task_id: str, duration: float, success: bool):
        """记录任务完成"""
        status = 'success' if success else 'failed'
        self.logger.info(f"任务完成: {task_id}, 耗时: {duration:.2f}s, 状态: {status}")
        TASK_COUNTER.labels(status=status).inc()
        TASK_DURATION.observe(duration)
    
    def log_error(self, task_id: str, error: str):
        """记录错误"""
        self.logger.error(f"任务错误: {task_id}, 错误: {error}")
        TASK_COUNTER.labels(status='error').inc()
    
    def update_queue_size(self, size: int):
        """更新队列大小"""
        QUEUE_SIZE.set(size)

# 启动监控服务
def start_monitoring(port: int = 8000):
    """启动 Prometheus 监控"""
    start_http_server(port)
    print(f"监控服务启动在端口 {port}")

最佳实践和注意事项

1. 错误处理

class ComfyUIError(Exception):
    """ComfyUI 相关错误"""
    pass

class ComfyUIClient:
    def submit_workflow_with_retry(self, workflow: Dict[str, Any], 
                                 max_retries: int = 3) -> str:
        """带重试的提交工作流"""
        for attempt in range(max_retries):
            try:
                return self.submit_workflow(workflow)
            except requests.exceptions.RequestException as e:
                if attempt == max_retries - 1:
                    raise ComfyUIError(f"提交失败,已重试 {max_retries} 次: {e}")
                time.sleep(2 ** attempt)  # 指数退避

2. 资源管理

class ResourceManager:
    def __init__(self, max_concurrent_tasks: int = 5):
        self.semaphore = threading.Semaphore(max_concurrent_tasks)
        self.active_tasks = {}
    
    def acquire_task_slot(self, task_id: str) -> bool:
        """获取任务槽位"""
        if self.semaphore.acquire(blocking=False):
            self.active_tasks[task_id] = time.time()
            return True
        return False
    
    def release_task_slot(self, task_id: str):
        """释放任务槽位"""
        if task_id in self.active_tasks:
            del self.active_tasks[task_id]
            self.semaphore.release()

3. 配置管理

import yaml
from dataclasses import dataclass

@dataclass
class ComfyUIConfig:
    base_url: str = "http://localhost:8188"
    max_retries: int = 3
    timeout: int = 300
    max_concurrent_tasks: int = 5
    default_model: str = "v1-5-pruned-emaonly.ckpt"
    
    @classmethod
    def from_file(cls, config_path: str) -> 'ComfyUIConfig':
        """从配置文件加载"""
        with open(config_path, 'r', encoding='utf-8') as f:
            config_data = yaml.safe_load(f)
        return cls(**config_data)
    
    def save_to_file(self, config_path: str):
        """保存到配置文件"""
        config_dict = {
            'base_url': self.base_url,
            'max_retries': self.max_retries,
            'timeout': self.timeout,
            'max_concurrent_tasks': self.max_concurrent_tasks,
            'default_model': self.default_model
        }
        with open(config_path, 'w', encoding='utf-8') as f:
            yaml.dump(config_dict, f, default_flow_style=False)

总结

ComfyUI 提供了强大而灵活的 API 接口,可以轻松集成到现有的应用程序中。通过合理使用这些接口,开发者可以:

  1. 构建图像生成服务: 将 AI 图像生成能力集成到应用中
  2. 实现批量处理: 高效处理大量图像生成任务
  3. 创建自定义工作流: 根据需求构建复杂的图像处理流程
  4. 监控和管理: 实时监控任务状态和系统资源

关键要点

  • 选择合适的接口: 根据需求选择相应的 API 接口
  • 实现错误处理: 添加完善的错误处理和重试机制
  • 资源管理: 合理管理并发任务和系统资源
  • 监控和日志: 实现完整的监控和日志系统
  • 配置管理: 使用配置文件管理各种参数

通过本文提供的指南和示例代码,开发者可以快速上手 ComfyUI API 的集成工作,构建出功能强大、稳定可靠的图像生成服务。


本文档基于 ComfyUI 最新版本编写,API 接口可能会随版本更新而变化。建议参考官方文档获取最新信息。


评论