Skip to main content

Shared Object Channels

Shared Object Channels in ART provide real-time collaborative editing capabilities using CRDT (Conflict-free Replicated Data Types) technology. These channels enable multiple users to simultaneously edit the same data structure with automatic conflict resolution and real-time synchronization.

Unlike regular messaging channels that send discrete messages, shared object channels maintain a synchronized data structure that multiple clients can edit simultaneously. Think of it like Google Docs for any object — changes made by one user are instantly reflected for all other users.

Key Features

  • CRDT-Based Synchronization: Uses advanced conflict-free replicated data types
  • Real-time Updates: Changes propagate instantly to all connected clients
  • Conflict Resolution: Automatic handling of concurrent edits without data loss
  • RGA Arrays: Replicated Growable Arrays for ordered list operations
  • Echo Suppression: Prevents infinite loops from your own changes

How Shared Object Channels Work

Shared Object Channels operate through a sophisticated CRDT (Conflict-free Replicated Data Types) system that maintains synchronized state across multiple clients. When you subscribe to a shared object channel, ART creates a live data structure that automatically synchronizes changes between all connected users in real-time.

Subscribing to Shared Object Channels

Subscribing to a shared object channel uses the same adk.subscribe call as any other channel:

final subscription = await adk.subscribe(channel: "YOUR_SHARED_OBJECT_CHANNEL");

if (subscription is LiveObjSubscription) {
// CRDT-enabled subscription
} else {
// Default or encrypted channel — no CRDT state
}

The system initializes with any existing state from the server and creates a unique replica ID for your client to handle conflict resolution.

Accessing the Shared State

The core of shared object channels is the state() method, which returns a CRDT Proxy object that behaves like a nested Map, with .set() / .delete() / array helpers that are automatically synchronised:

Set a value

// Get the shared state object
final sharedState = subscription.state();
// Now you can work with it like a regular flutter object
sharedState['document']['title'].set("New Document Title");
await subscription.flush();

Delete a key

// Safe delete (no error if the key doesn't exist)
sharedState[key].delete();
await subscription.flush();

Every set() / delete() is intercepted by the CRDT engine and converted into operations that are synchronized across all connected clients.

Working with Arrays

Shared object channels provide full array support using RGA (Replicated Growable Array) semantics, which handle concurrent insertions and deletions correctly:

Push

final int count = sharedState['items'].push("First item");
await subscription.flush();

Pop

final dynamic removed = sharedState['items'].pop();
await subscription.flush();

Remove at index

final dynamic removed = sharedState['items'].removeAt(index);
await subscription.flush();

Splice (replace a range)

final List<String> removed = sharedState['items'].splice(
start: start,
deleteCount: deleteCount,
insert: insert,
);
await subscription.flush();

The RGA implementation ensures that concurrent array operations from different users are resolved consistently, maintaining the intended order even when users edit simultaneously.

Manual Synchronization Control

Changes are automatically synchronized, but you can control when pending operations are sent to the server using the flush() method:

final state = sharedState;

// Make multiple changes
state['title'].set('New Title');
state['content'].set('Updated content');

// Manually trigger synchronization —
// all pending changes are batched and sent as a single operation
await subscription.flush();

By default, the Flutter ADK batches operations with a 50ms trailing delay to optimize network usage; flush() allows immediate synchronization when needed.

Listening to Updates

The query() method allows you to observe specific paths of the shared object — useful for optimising updates in large data structures:

Get the current value

// Query specific path in the object
final query = subscription.query(path: 'user');

// Get current value without listening continuously
final dynamic initial = await query.execute();
// `initial` holds the current value at the path 'user'

Listen for changes

query.listen() fires immediately with the current value, then again on every update at (or under) that path:

await query.listen((dynamic data) {
if (data == null) {
// path was cleared / never existed
return;
}
if (data is! Map) {
// unexpected shape — handle defensively
return;
}

// data is a Map<dynamic, dynamic> representing the sub-tree at 'user'
for (final MapEntry<dynamic, dynamic> entry in data.entries) {
final username = '${entry.key}';
final userData = entry.value;
// …consume userData…
}
});