From 47562afde0c52b74a7ec2df1eae635188cbd4b4e Mon Sep 17 00:00:00 2001 From: Menno van Leeuwen Date: Sun, 30 Mar 2025 18:01:26 +0200 Subject: [PATCH] connection manager support --- lib/src/comfyui_api.dart | 3 ++ lib/src/websocket_manager.dart | 94 +++++++++++++++++++++++++++++++--- 2 files changed, 89 insertions(+), 8 deletions(-) diff --git a/lib/src/comfyui_api.dart b/lib/src/comfyui_api.dart index f6a15b6..b4edcfa 100644 --- a/lib/src/comfyui_api.dart +++ b/lib/src/comfyui_api.dart @@ -52,6 +52,9 @@ class ComfyUiApi { Stream get executionInterruptedStream => _webSocketManager.executionInterruptedStream; + /// Access to the WebSocketManager instance + WebSocketManager get webSocketManager => _webSocketManager; + void onExecutingNodeChanged(void Function(int nodeId) callback) { _webSocketManager.executingNodeStream.listen(callback); } diff --git a/lib/src/websocket_manager.dart b/lib/src/websocket_manager.dart index 75097a0..b400087 100644 --- a/lib/src/websocket_manager.dart +++ b/lib/src/websocket_manager.dart @@ -9,12 +9,24 @@ import 'models/execution_event.dart'; import 'types/callback_types.dart'; import 'utils/websocket_event_handler.dart'; +/// Enum representing the connection state of the WebSocket +enum ConnectionState { connected, connecting, disconnected, failed } + class WebSocketManager { final String host; final String clientId; WebSocketChannel? _wsChannel; + // Connection state tracking + ConnectionState _connectionState = ConnectionState.disconnected; + int _retryAttempt = 0; + static const int _maxRetryAttempts = 3; + final StreamController _connectionStateController = + StreamController.broadcast(); + final StreamController _retryAttemptController = + StreamController.broadcast(); + // Controllers for different event streams final StreamController _eventController = StreamController.broadcast(); @@ -45,6 +57,22 @@ class WebSocketManager { /// Stream of typed WebSocket events Stream get events => _eventController.stream; + /// Stream of connection state changes + Stream get connectionState => + _connectionStateController.stream; + + /// Stream of retry attempt changes + Stream get retryAttemptChanges => _retryAttemptController.stream; + + /// Current connection state + ConnectionState get currentConnectionState => _connectionState; + + /// Current retry attempt count + int get retryAttempt => _retryAttempt; + + /// Maximum number of retry attempts + static int get maxRetryAttempts => _maxRetryAttempts; + /// Stream of progress updates (legacy format) Stream> get progressUpdates => _progressController.stream; @@ -85,6 +113,9 @@ class WebSocketManager { /// Connects to the WebSocket for progress updates Future connect() async { + // Update connection state to connecting + _updateConnectionState(ConnectionState.connecting); + final wsUrl = 'ws://${host.replaceFirst(RegExp(r'^https?://'), '')}/ws?clientId=$clientId'; _wsChannel = WebSocketChannel.connect(Uri.parse(wsUrl)); @@ -92,6 +123,11 @@ class WebSocketManager { print('WebSocket connecting to $wsUrl'); _wsChannel!.stream.listen((message) { + // Successfully connected + if (_connectionState != ConnectionState.connected) { + _updateConnectionState(ConnectionState.connected); + _updateRetryAttempt(0); // Reset retry counter on successful connection + } final jsonData = jsonDecode(message); print('WebSocket message: $jsonData'); @@ -167,14 +203,54 @@ class WebSocketManager { /// Reconnects the WebSocket with a delay Future _reconnect() async { - print('Attempting to reconnect WebSocket in 5 seconds...'); - await Future.delayed(Duration(seconds: 5)); - try { + // Increment retry attempt and notify listeners + _updateRetryAttempt(_retryAttempt + 1); + + if (_retryAttempt <= _maxRetryAttempts) { + print( + 'Attempting to reconnect WebSocket (attempt $_retryAttempt/$_maxRetryAttempts) in 5 seconds...'); + _updateConnectionState(ConnectionState.connecting); + + await Future.delayed(Duration(seconds: 5)); + try { + await connect(); + print('WebSocket reconnected successfully'); + } catch (e) { + print('WebSocket reconnection failed: $e'); + if (_retryAttempt < _maxRetryAttempts) { + _reconnect(); // Retry again if under max attempts + } else { + // Max retries reached + print('Max reconnection attempts reached'); + _updateConnectionState(ConnectionState.failed); + } + } + } else { + // Max retries reached + print('Max reconnection attempts reached'); + _updateConnectionState(ConnectionState.failed); + } + } + + /// Updates the connection state and notifies listeners + void _updateConnectionState(ConnectionState state) { + _connectionState = state; + _connectionStateController.add(state); + } + + /// Updates the retry attempt count and notifies listeners + void _updateRetryAttempt(int attempt) { + _retryAttempt = attempt; + _retryAttemptController.add(attempt); + // Also re-emit the current state to ensure UI updates + _connectionStateController.add(_connectionState); + } + + /// Manually retry connection after failure + Future retryConnection() async { + if (_connectionState == ConnectionState.failed) { + _updateRetryAttempt(0); // Reset retry counter await connect(); - print('WebSocket reconnected successfully'); - } catch (e) { - print('WebSocket reconnection failed: $e'); - _reconnect(); // Retry again if reconnection fails } } @@ -225,6 +301,8 @@ class WebSocketManager { _progressEventController.close(); _executionEventController.close(); _executingNodeController.close(); - _executionInterruptedController.close(); // Close the new controller + _executionInterruptedController.close(); + _connectionStateController.close(); + _retryAttemptController.close(); } }