diff --git a/README.md b/README.md index 3c0b4be..3b9145c 100644 --- a/README.md +++ b/README.md @@ -1,46 +1,121 @@ -// Found requests: -// To get the current queue -// GET ${host}/queue +# ComfyUI API SDK (Dart) -// To view a image -// GET ${host}/api/view?filename=ComfyUI_00006_.png +Light‑weight Dart client for interacting with a ComfyUI instance over HTTP + WebSocket. -// To get the history of the queue -// GET ${host}/api/history?max_items=64 +## Supported Endpoints -// To post a new image generation request to the queue -// POST ${host}/api/prompt -// Content-Type: application/json -// { ... } +(Original quick notes retained) -// To request a list of all the available models -// GET ${host}/api/experiment/models +- Queue: `GET {host}/queue` +- History: `GET {host}/api/history?max_items=64` +- Submit prompt: `POST {host}/api/prompt` +- Models (aggregate): `GET {host}/api/experiment/models` +- Checkpoints: + - List: `GET {host}/api/experiment/models/checkpoints` + - Metadata: `GET {host}/api/view_metadata/checkpoints?filename={pathAndFileName}` +- LoRAs: + - List: `GET {host}/api/experiment/models/loras` + - Metadata: `GET {host}/api/view_metadata/loras?filename={pathAndFileName}` +- VAE: + - List: `GET {host}/api/experiment/models/vae` + - Metadata: `GET {host}/api/view_metadata/vae?filename={pathAndFileName}` +- Upscale Models: + - List: `GET {host}/api/experiment/models/upscale_models` + - Metadata: `GET {host}/api/view_metadata/upscale_models?filename={pathAndFileName}` +- Embeddings: + - List: `GET {host}/api/experiment/models/embeddings` + - Metadata: `GET {host}/api/view_metadata/embeddings?filename={pathAndFileName}` +- Object / Node Info: `GET {host}/api/object_info` +- Image fetch: `GET {host}/api/view?filename=ComfyUI_00006_.png` +- WebSocket events: `ws://{host}/ws?clientId={clientId}` -// To request a list of checkpoints (Or details for a specific checkpoint) -// GET ${host}/api/experiment/models/checkpoints -// GET ${host}/api/view_metadata/checkpoints?filename=${pathAndFileName} +## WebSocket Event Model -// To request a list of loras (Or details for a specific lora) -// GET ${host}/api/experiment/models/loras -// GET ${host}/api/view_metadata/loras?filename=${pathAndFileName} +Events are parsed into strongly typed objects: -// To request a list of VAEs (Or details for a specific VAE) -// GET ${host}/api/experiment/models/vae -// GET ${host}/api/view_metadata/vae?filename=${pathAndFileName} +- `WebSocketEvent` (generic envelope) +- `ProgressEvent` (value / max / promptId / node) +- `ExecutionEvent` (execution lifecycle + optional output image descriptors) -// To request a list of upscale models (Or details for a specific upscale model) -// GET ${host}/api/experiment/models/upscale_models -// GET ${host}/api/view_metadata/upscale_models?filename=${pathAndFileName} +Callback registration (all delegated through `ComfyUiApi` → `WebSocketManager`): -// To request a list of embeddings (Or details for a specific embedding) -// GET ${host}/api/experiment/models/embeddings -// GET ${host}/api/view_metadata/embeddings?filename=${pathAndFileName} +```dart +api.onEventType(WebSocketEventType.progress, (e) { ... }); +api.onProgressChanged((progress) { ... }); +api.onPromptFinished((promptId) { ... }); +api.onExecutionInterrupted(() { ... }); +``` -// To get object info (Checkpoints, models, loras etc) -// GET ${host}/api/object_info +(See [`comfyui_api.dart`](comfyui_api_sdk/lib/src/comfyui_api.dart:45) and [`websocket_manager.dart`](comfyui_api_sdk/lib/src/websocket_manager.dart:1)) -// WebSocket for progress updates -// ws://${host}/ws?clientId=${clientId} +## Binary / Mixed Frame Handling (New) -// Final question -// How do we figure out the clientId +Some ComfyUI deployments or reverse proxies may (now or in the future) emit non‑text WebSocket frames (e.g. experimental live previews). Previously this SDK assumed every frame was UTF‑8 JSON which caused a runtime type error: + +``` +type 'Uint8List' is not a subtype of type 'String' +``` + +### Patch Summary (2025‑08‑11) + +Implemented in [`websocket_manager.dart`](comfyui_api_sdk/lib/src/websocket_manager.dart:1): + +1. Frame Type Discrimination + - Accepts `String` frames directly. + - If `List`: attempts UTF‑8 decode (malformed tolerant). + - Heuristic: Only treats decoded text as JSON if it begins with `{` or `[`. + - Binary frames with JPEG (`FF D8`) or PNG (`89 50 4E 47 0D 0A 1A 0A`) signatures are classified as potential preview frames (currently ignored but counted). + - Other binary frames are ignored safely (no exception thrown). + +2. Defensive Parsing + - JSON decode wrapped in try/catch. + - Event construction wrapped in try/catch. + - Individual callback invocations isolated with try/catch so one faulty consumer does not break stream processing. + +3. Statistics & Future Extensibility + - Exposed `stats` getter with counters: `ignoredBinaryFrames`, `malformedFrames`, `previewFrames`, `textFrames`, `retryAttempts`. + - Added (future) `previewFrames` stream (currently not emitted—line left commented to avoid premature API surface commitment). + +4. Relative Imports + - Replaced `package:comfyui_api_sdk/...` self‑imports with relative imports to prevent duplicate type identities when the package is consumed via path or symlink (fixes “type X is not a subtype of type X” issues). + +### No Consumer Changes Required + +`PromptExecutionService` (in the parent app) already listens only for structured events; ignoring binary frames requires no modification. + +## Manual Test / Reproduction Steps + +1. Start app against a ComfyUI server that (optionally) emits progress. +2. (Optional) Inject a synthetic binary frame using a WebSocket proxy or small script to send raw JPEG bytes; verify no crash and counters increment: + ```dart + print(api.webSocketManager.stats); + ``` +3. Normal JSON events continue flowing; progress & execution callbacks fire unchanged. + +## Potential Future Preview Streaming + +To enable real-time preview frames: +- Uncomment the line pushing binary frames to `_previewFrameController`. +- Provide a small header (e.g., magic + length + node id) if server supplies metadata. +- Add consumer in application layer to correlate preview with executing node id. + +## Changelog + +### 0.0.0-dev (Unreleased Internal) +- Added robust mixed text/binary WebSocket frame handling. +- Added frame classification & statistics. +- Added defensive callback isolation. +- Reworked imports to relative to avoid duplicate symbol identity issues when using path dependencies. +- Introduced (inactive) preview frame stream infrastructure. + +## FAQ + +**Q: How is `clientId` determined?** +A: The higher-level app assigns a UUID (or reuse a deterministic per-session id) and passes it when constructing `ComfyUiApi`. The server side merely uses it to correlate WebSocket with submitted prompts. + +**Q: Why ignore binary preview frames instead of emitting them now?** +A: Keeps API stable until preview protocol (metadata framing) is formalized. + +## License + +Internal / TBD. diff --git a/lib/src/types/callback_types.dart b/lib/src/types/callback_types.dart index aa59c2c..2dab43e 100644 --- a/lib/src/types/callback_types.dart +++ b/lib/src/types/callback_types.dart @@ -1,4 +1,5 @@ -import 'package:comfyui_api_sdk/comfyui_api_sdk.dart'; +import '../models/websocket_event.dart'; +import '../models/progress_event.dart'; /// Callback function type for prompt events typedef PromptEventCallback = void Function(String promptId); diff --git a/lib/src/websocket_manager.dart b/lib/src/websocket_manager.dart index b400087..eb1e005 100644 --- a/lib/src/websocket_manager.dart +++ b/lib/src/websocket_manager.dart @@ -1,15 +1,18 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:typed_data'; import 'package:web_socket_channel/web_socket_channel.dart'; +// Use relative imports to avoid duplicate library instances when this package +// is consumed via a path or symlink (prevents distinct "same" types). import 'models/websocket_event.dart'; import 'models/progress_event.dart'; import 'models/execution_event.dart'; import 'types/callback_types.dart'; import 'utils/websocket_event_handler.dart'; -/// Enum representing the connection state of the WebSocket +/// Connection states for the ComfyUI WebSocket enum ConnectionState { connected, connecting, disconnected, failed } class WebSocketManager { @@ -41,17 +44,27 @@ class WebSocketManager { final StreamController _executionInterruptedController = StreamController.broadcast(); + // Optional future binary preview frames + final StreamController _previewFrameController = + StreamController.broadcast(); + // Event callbacks final Map> _typedEventCallbacks = { - for (var type in WebSocketEventType.values) type: [], + for (var type in WebSocketEventType.values) type: [], }; final List _progressEventCallbacks = []; final Map> _eventCallbacks = { - 'onPromptStart': [], - 'onPromptFinished': [], + 'onPromptStart': [], + 'onPromptFinished': [], }; + // Frame / parse statistics + int _ignoredBinaryFrames = 0; + int _malformedFrames = 0; + int _previewFrames = 0; + int _textFrames = 0; + WebSocketManager({required this.host, required this.clientId}); /// Stream of typed WebSocket events @@ -91,6 +104,18 @@ class WebSocketManager { Stream get executionInterruptedStream => _executionInterruptedController.stream; + /// Stream of (future) binary preview frames + Stream get previewFrames => _previewFrameController.stream; + + /// Stats about received frames + Map get stats => { + 'ignoredBinaryFrames': _ignoredBinaryFrames, + 'malformedFrames': _malformedFrames, + 'previewFrames': _previewFrames, + 'textFrames': _textFrames, + 'retryAttempts': _retryAttempt, + }; + /// Register a callback for specific WebSocket event types void onEventType(WebSocketEventType type, WebSocketEventCallback callback) { _typedEventCallbacks[type]!.add(callback); @@ -122,18 +147,100 @@ class WebSocketManager { print('WebSocket connecting to $wsUrl'); - _wsChannel!.stream.listen((message) { + _wsChannel!.stream.listen((dynamic message) { // Successfully connected if (_connectionState != ConnectionState.connected) { _updateConnectionState(ConnectionState.connected); _updateRetryAttempt(0); // Reset retry counter on successful connection } - final jsonData = jsonDecode(message); - print('WebSocket message: $jsonData'); + // Determine frame type + String? textFrame; + if (message is String) { + textFrame = message; + _textFrames++; + } else if (message is List) { + // Attempt to interpret as UTF8 JSON text + try { + final decoded = utf8.decode(message, allowMalformed: true); + final trimmed = decoded.trimLeft(); + // Heuristic: treat as JSON if it starts with '{' or '[' + if (trimmed.startsWith('{') || trimmed.startsWith('[')) { + textFrame = decoded; + _textFrames++; + } else { + // Detect common image headers for potential future preview streaming + final isJpeg = + message.length >= 2 && message[0] == 0xFF && message[1] == 0xD8; + final isPng = message.length >= 8 && + message[0] == 0x89 && + message[1] == 0x50 && + message[2] == 0x4E && + message[3] == 0x47 && + message[4] == 0x0D && + message[5] == 0x0A && + message[6] == 0x1A && + message[7] == 0x0A; - // Create a typed event - final event = WebSocketEvent.fromJson(jsonData); + if (isJpeg || isPng) { + // Future: push to preview consumers + _previewFrames++; + // _previewFrameController.add(Uint8List.fromList(message)); + print( + 'WebSocket binary preview frame received (${message.length} bytes) ignored (preview streaming not enabled).'); + } else { + _ignoredBinaryFrames++; + print( + 'WebSocket non-JSON binary frame ignored (${message.length} bytes).'); + } + return; // Do not proceed to JSON decode + } + } catch (_) { + // Could not decode as UTF8 + _ignoredBinaryFrames++; + print( + 'WebSocket binary frame ignored (UTF8 decode failed, ${message.length} bytes).'); + return; + } + } else { + _ignoredBinaryFrames++; + print( + 'WebSocket unsupported frame type ignored (${message.runtimeType}).'); + return; + } + + if (textFrame == null) { + _malformedFrames++; + print('WebSocket frame had no decodable text content.'); + return; + } + + dynamic jsonData; + try { + jsonData = jsonDecode(textFrame); + } catch (e) { + _malformedFrames++; + print('WebSocket malformed JSON frame ignored: $e'); + return; + } + + if (jsonData is! Map) { + _malformedFrames++; + print( + 'WebSocket JSON root not object (type: ${jsonData.runtimeType}) ignored.'); + return; + } + + print('WebSocket message (event): $jsonData'); + + WebSocketEvent event; + try { + event = WebSocketEvent.fromJson(jsonData); + } catch (e) { + _malformedFrames++; + print('WebSocket event parse failed: $e'); + return; + } // Add to the typed event stream _eventController.add(event); @@ -142,55 +249,71 @@ class WebSocketManager { _progressController.add(jsonData); // Convert to more specific event types if possible - if (event.eventType == WebSocketEventType.progress) { - _tryCreateProgressEvent(event); - } else if ([ - WebSocketEventType.executionStart, - WebSocketEventType.executionSuccess, - WebSocketEventType.executionCached, - WebSocketEventType.executed, - WebSocketEventType.executing, - WebSocketEventType.executionInterrupted, - WebSocketEventType.status, // Add status event type - ].contains(event.eventType)) { - _tryCreateExecutionEvent(event); - } - - // Handle "execution_interrupted" event - if (event.eventType == WebSocketEventType.executionInterrupted) { - _executionInterruptedController.add(null); - } - - // Handle "executing" event - if (event.eventType == WebSocketEventType.executing && - event.data['node'] != null) { - final nodeId = int.tryParse(event.data['node'].toString()); - if (nodeId != null) { - _executingNodeController.add(nodeId); - } else { - print('Invalid node ID: ${event.data['node']}'); + try { + if (event.eventType == WebSocketEventType.progress) { + _tryCreateProgressEvent(event); + } else if ([ + WebSocketEventType.executionStart, + WebSocketEventType.executionSuccess, + WebSocketEventType.executionCached, + WebSocketEventType.executed, + WebSocketEventType.executing, + WebSocketEventType.executionInterrupted, + WebSocketEventType.status, // Add status event type + ].contains(event.eventType)) { + _tryCreateExecutionEvent(event); } - } - // Trigger event type specific callbacks - for (final callback in _typedEventCallbacks[event.eventType]!) { - callback(event); - } - - // Handle execution_success event (prompt finished) - if (event.eventType == WebSocketEventType.executionSuccess && - event.promptId != null) { - final promptId = event.promptId!; - for (final callback in _eventCallbacks['onPromptFinished']!) { - callback(promptId); + // Handle "execution_interrupted" event + if (event.eventType == WebSocketEventType.executionInterrupted) { + _executionInterruptedController.add(null); } - } - // Handle progress updates - if (event.eventType == WebSocketEventType.progress) { - for (final callback in _progressEventCallbacks) { - callback(ProgressEvent.fromJson(event.data)); + // Handle "executing" event + if (event.eventType == WebSocketEventType.executing && + event.data['node'] != null) { + final nodeId = int.tryParse(event.data['node'].toString()); + if (nodeId != null) { + _executingNodeController.add(nodeId); + } else { + print('Invalid node ID: ${event.data['node']}'); + } } + + // Trigger event type specific callbacks + for (final callback in _typedEventCallbacks[event.eventType]!) { + try { + callback(event); + } catch (e, st) { + print('WebSocket event callback error: $e\n$st'); + } + } + + // Handle execution_success event (prompt finished) + if (event.eventType == WebSocketEventType.executionSuccess && + event.promptId != null) { + final promptId = event.promptId!; + for (final callback in _eventCallbacks['onPromptFinished']!) { + try { + callback(promptId); + } catch (e, st) { + print('WebSocket onPromptFinished callback error: $e\n$st'); + } + } + } + + // Handle progress updates + if (event.eventType == WebSocketEventType.progress) { + for (final callback in _progressEventCallbacks) { + try { + callback(ProgressEvent.fromJson(event.data)); + } catch (e, st) { + print('WebSocket progress callback error: $e\n$st'); + } + } + } + } catch (e, st) { + print('WebSocket event dispatch error: $e\n$st'); } }, onError: (error) { print('WebSocket error: $error'); @@ -211,7 +334,7 @@ class WebSocketManager { 'Attempting to reconnect WebSocket (attempt $_retryAttempt/$_maxRetryAttempts) in 5 seconds...'); _updateConnectionState(ConnectionState.connecting); - await Future.delayed(Duration(seconds: 5)); + await Future.delayed(const Duration(seconds: 5)); try { await connect(); print('WebSocket reconnected successfully'); @@ -302,6 +425,7 @@ class WebSocketManager { _executionEventController.close(); _executingNodeController.close(); _executionInterruptedController.close(); + _previewFrameController.close(); _connectionStateController.close(); _retryAttemptController.close(); }