connection manager support

This commit is contained in:
2025-03-30 18:01:26 +02:00
parent c2065afbc4
commit 47562afde0
2 changed files with 89 additions and 8 deletions

View File

@@ -52,6 +52,9 @@ class ComfyUiApi {
Stream<void> get executionInterruptedStream =>
_webSocketManager.executionInterruptedStream;
/// Access to the WebSocketManager instance
WebSocketManager get webSocketManager => _webSocketManager;
void onExecutingNodeChanged(void Function(int nodeId) callback) {
_webSocketManager.executingNodeStream.listen(callback);
}

View File

@@ -9,12 +9,24 @@ import 'models/execution_event.dart';
import 'types/callback_types.dart';
import 'utils/websocket_event_handler.dart';
/// Enum representing the connection state of the WebSocket
enum ConnectionState { connected, connecting, disconnected, failed }
class WebSocketManager {
final String host;
final String clientId;
WebSocketChannel? _wsChannel;
// Connection state tracking
ConnectionState _connectionState = ConnectionState.disconnected;
int _retryAttempt = 0;
static const int _maxRetryAttempts = 3;
final StreamController<ConnectionState> _connectionStateController =
StreamController.broadcast();
final StreamController<int> _retryAttemptController =
StreamController.broadcast();
// Controllers for different event streams
final StreamController<WebSocketEvent> _eventController =
StreamController.broadcast();
@@ -45,6 +57,22 @@ class WebSocketManager {
/// Stream of typed WebSocket events
Stream<WebSocketEvent> get events => _eventController.stream;
/// Stream of connection state changes
Stream<ConnectionState> get connectionState =>
_connectionStateController.stream;
/// Stream of retry attempt changes
Stream<int> get retryAttemptChanges => _retryAttemptController.stream;
/// Current connection state
ConnectionState get currentConnectionState => _connectionState;
/// Current retry attempt count
int get retryAttempt => _retryAttempt;
/// Maximum number of retry attempts
static int get maxRetryAttempts => _maxRetryAttempts;
/// Stream of progress updates (legacy format)
Stream<Map<String, dynamic>> get progressUpdates =>
_progressController.stream;
@@ -85,6 +113,9 @@ class WebSocketManager {
/// Connects to the WebSocket for progress updates
Future<void> connect() async {
// Update connection state to connecting
_updateConnectionState(ConnectionState.connecting);
final wsUrl =
'ws://${host.replaceFirst(RegExp(r'^https?://'), '')}/ws?clientId=$clientId';
_wsChannel = WebSocketChannel.connect(Uri.parse(wsUrl));
@@ -92,6 +123,11 @@ class WebSocketManager {
print('WebSocket connecting to $wsUrl');
_wsChannel!.stream.listen((message) {
// Successfully connected
if (_connectionState != ConnectionState.connected) {
_updateConnectionState(ConnectionState.connected);
_updateRetryAttempt(0); // Reset retry counter on successful connection
}
final jsonData = jsonDecode(message);
print('WebSocket message: $jsonData');
@@ -167,14 +203,54 @@ class WebSocketManager {
/// Reconnects the WebSocket with a delay
Future<void> _reconnect() async {
print('Attempting to reconnect WebSocket in 5 seconds...');
await Future.delayed(Duration(seconds: 5));
try {
// Increment retry attempt and notify listeners
_updateRetryAttempt(_retryAttempt + 1);
if (_retryAttempt <= _maxRetryAttempts) {
print(
'Attempting to reconnect WebSocket (attempt $_retryAttempt/$_maxRetryAttempts) in 5 seconds...');
_updateConnectionState(ConnectionState.connecting);
await Future.delayed(Duration(seconds: 5));
try {
await connect();
print('WebSocket reconnected successfully');
} catch (e) {
print('WebSocket reconnection failed: $e');
if (_retryAttempt < _maxRetryAttempts) {
_reconnect(); // Retry again if under max attempts
} else {
// Max retries reached
print('Max reconnection attempts reached');
_updateConnectionState(ConnectionState.failed);
}
}
} else {
// Max retries reached
print('Max reconnection attempts reached');
_updateConnectionState(ConnectionState.failed);
}
}
/// Updates the connection state and notifies listeners
void _updateConnectionState(ConnectionState state) {
_connectionState = state;
_connectionStateController.add(state);
}
/// Updates the retry attempt count and notifies listeners
void _updateRetryAttempt(int attempt) {
_retryAttempt = attempt;
_retryAttemptController.add(attempt);
// Also re-emit the current state to ensure UI updates
_connectionStateController.add(_connectionState);
}
/// Manually retry connection after failure
Future<void> retryConnection() async {
if (_connectionState == ConnectionState.failed) {
_updateRetryAttempt(0); // Reset retry counter
await connect();
print('WebSocket reconnected successfully');
} catch (e) {
print('WebSocket reconnection failed: $e');
_reconnect(); // Retry again if reconnection fails
}
}
@@ -225,6 +301,8 @@ class WebSocketManager {
_progressEventController.close();
_executionEventController.close();
_executingNodeController.close();
_executionInterruptedController.close(); // Close the new controller
_executionInterruptedController.close();
_connectionStateController.close();
_retryAttemptController.close();
}
}