Dispatch Lifecycle
Session overrides Reducer::dispatch() and Reducer::execute() to add capabilities beyond the default: feedback drain, spawn execution, deferred agent spawn with undo grouping, cache invalidation, and a lifecycle phase after all reduces.
Full Dispatch Flow
Section titled “Full Dispatch Flow”impl<M: Modality, I: ReduceIntent<M>> Reducer for Session<M, I> { fn dispatch(&mut self, cmd: Command<I, I::Feedback>) { // 1. Drain pending async feedback (from Runtime spawn channel) let pending: Vec<_> = self.rt.drain_feedback(); for fb in pending { let effect = Self::reduce(fb, &mut self.rt.state); self.execute(effect, 0); }
// 2. Reduce the current command + execute its effect let effect = Self::reduce(cmd, &mut self.rt.state); self.execute(effect, 0);
// 3a. Deferred agent spawn (pending_prompt set by reduce) // Begin undo group so all AI changes merge into one undo step. if let Some(prompt) = self.rt.state.ephemeral.ai.pending_prompt.take() { if !self.ai_undo_group_active { self.begin_undo_group(); self.ai_undo_group_active = true; } I::spawn_generate(&self.self_ref, self.rt.spawn_tx(), ...); }
// 3b. End AI undo group when streaming finishes if self.ai_undo_group_active && !self.rt.state.ephemeral.ai.is_streaming { self.end_undo_group(); self.ai_undo_group_active = false; }
// 4. Drain invalidated children → clear cache let invalid: Vec<_> = self.rt.state.invalidated_children.drain(..).collect(); for id in &invalid { self.cache.remove(id); }
// 5. Lifecycle — once, after ALL reduces self.recompile(); let synced_changed = self.commit_if_changed(); self.rt.state.version += 1; self.emit(); if synced_changed { self.fire_on_change(); } }}Sequence Diagram
Section titled “Sequence Diagram”sequenceDiagram participant Caller participant Session participant Reduce as Self::reduce participant Execute as self.execute participant Lifecycle participant Thread as Thread Pool
Caller->>Session: dispatch(cmd)
Note over Session: Step 1: Drain pending feedback loop rt.drain_feedback() Session->>Reduce: reduce(feedback, &mut rt.state) → Effect Session->>Execute: execute(effect, 0) end
Note over Session: Step 2: Reduce current command Session->>Reduce: reduce(cmd, &mut rt.state) → Effect Session->>Execute: execute(effect, 0)
Note over Session: Step 3a: Deferred agent spawn opt pending_prompt.take() Note over Session: begin_undo_group() if not already active Session->>Thread: I::spawn_generate(self_ref, ...) end
Note over Session: Step 3b: End AI undo group opt ai_undo_group_active && !is_streaming Note over Session: end_undo_group() end
Note over Session: Step 4: Cache invalidation opt invalidated_children.drain(..) Session->>Session: cache.remove(id) end
Note over Session: Step 5: Lifecycle (once) Session->>Lifecycle: recompile() Session->>Lifecycle: commit_if_changed() Session->>Lifecycle: version += 1 Session->>Lifecycle: emit() [snapshot → sinks] Lifecycle-->>Caller: Sink::on_snapshot(snapshot) opt synced_changed Session->>Lifecycle: fire_on_change() end
Note over Thread: Spawns already fired during execute().<br/>Feedback arrives on spawn channel,<br/>picked up in next dispatch()Session’s execute() Override
Section titled “Session’s execute() Override”Session overrides execute() to handle Effect::Spawn via the Runtime’s spawn infrastructure:
fn execute(&mut self, effect: Effect<SessionServices, I, I::Feedback>, depth: usize) { const MAX_SEND_DEPTH: usize = 64; match effect { Effect::None => {} Effect::Send(cmd) => { if depth < MAX_SEND_DEPTH { let e = Self::reduce(cmd, &mut self.rt.state); self.execute(e, depth + 1); } } Effect::Task(f) => { let svc = self.rt.services.clone(); f(&svc); } Effect::Spawn(f) => { let svc = self.rt.services.clone(); let tx = self.rt.spawn_tx(); std::thread::spawn(move || f(svc, tx)); } Effect::Batch(effects) => { for e in effects { self.execute(e, depth); } } }}Session uses self.rt.services and self.rt.spawn_tx() instead of the old self.session_services and self.cmd_tx. The spawn channel feedback is drained at the top of each dispatch via self.rt.drain_feedback().
The Five Steps
Section titled “The Five Steps”Step 1: Drain Pending Feedback
Section titled “Step 1: Drain Pending Feedback”let pending: Vec<_> = self.rt.drain_feedback();for fb in pending { let effect = Self::reduce(fb, &mut self.rt.state); self.execute(effect, 0);}Async effects spawned in previous dispatch() calls send feedback via Tx to the Runtime’s spawn channel. These are collected and reduced before the current command.
Step 2: Reduce Current Command + Execute
Section titled “Step 2: Reduce Current Command + Execute”let effect = Self::reduce(cmd, &mut self.rt.state);self.execute(effect, 0);Delegates to I::reduce() or I::handle_feedback() via the ReduceIntent trait. The returned Effect is immediately executed — Send recurses (depth-limited to 64), Task runs inline, Spawn fires a thread.
Step 3a: Deferred Agent Spawn + Undo Grouping
Section titled “Step 3a: Deferred Agent Spawn + Undo Grouping”if let Some(prompt) = self.rt.state.ephemeral.ai.pending_prompt.take() { if !self.ai_undo_group_active { self.begin_undo_group(); self.ai_undo_group_active = true; } I::spawn_generate(&self.self_ref, self.rt.spawn_tx(), ...);}Generate intents can’t spawn directly from reduce (needs self_ref, which lives on Session, not SessionState). Instead, reduce sets pending_prompt and dispatch handles the spawn. An undo group is opened so all AI-generated changes merge into a single undo step.
Step 3b: End AI Undo Group
Section titled “Step 3b: End AI Undo Group”if self.ai_undo_group_active && !self.rt.state.ephemeral.ai.is_streaming { self.end_undo_group(); self.ai_undo_group_active = false;}When the AI finishes streaming, the undo group is closed. This restores the default merge interval so subsequent user edits create separate undo entries.
Step 4: Cache Invalidation
Section titled “Step 4: Cache Invalidation”let invalid: Vec<_> = self.rt.state.invalidated_children.drain(..).collect();for id in &invalid { self.cache.remove(id); }Parent session child dispatch pushes placement IDs to invalidated_children. These are drained here before recompile so stale cache entries don’t mask updated child output.
Step 5: Lifecycle
Section titled “Step 5: Lifecycle”This is the key addition over the default Reducer::dispatch():
recompile()— runs the modality’scompile()vialayout()with a caching closure. The cache key is computed bymodality.compile_child_hash()— modalities override this when compile output depends on state beyond metadata and property values (e.g. Whiteboard includes LoroText content). Only placements whose hash changed are recompiled.commit_if_changed()— compares currentM::Syncedagainst the Doc’s last committed state viaPartialEq. If changed, commits to the LoroDoc. Returnsbool.version += 1— monotonic counter. Every dispatch increments, even if nothing changed.emit()— callsM::snapshot(ai, export, version)to build a typed snapshot, then pushes it to all sinks.fire_on_change()— only if synced state actually changed. Triggers CRDT sync outbound callbacks.
Two Paths for Follow-Up Commands
Section titled “Two Paths for Follow-Up Commands”| Method | Timing | Use Case |
|---|---|---|
Effect::Send / Effect::intent() | Synchronous. Reduced in the same dispatch() call via recursive execute(). | Same-cycle state machine transitions. |
Effect::Spawn / Effect::spawn() | Async. New thread. Feedback via Tx on next dispatch(). | I/O: export, child warming, AI streaming. |
graph LR A["Effect::Send"] -->|"same cycle"| B["execute() recurses"] C["Effect::Spawn"] -->|"new thread"| D[spawn channel] D -->|"next dispatch"| E["Step 1: drain pending"]No Recursion
Section titled “No Recursion”Effects are executed via recursive execute() calls (depth-limited), not recursive dispatch() calls. This guarantees that the lifecycle (step 5) runs exactly once per dispatch() call, after all reduces have completed.
For parent sessions with children, child dispatch happens inside ReduceIntent::reduce() at step 2. The child’s dispatch() runs its own full lifecycle independently. The parent’s lifecycle then runs after, recompiling with the child’s updated output.
Related Pages
Section titled “Related Pages”- Session Overview — the Session struct and ownership model
- ReduceIntent — how intent types reduce
- Snapshots — how emit() builds and pushes snapshots
- Persistence — how commit_if_changed() persists to LoroDoc