Runtime Struct
Summary
Section titled “Summary”Two changes that simplify the Reducer pattern:
- Runtime struct — extract the 4 duplicated fields (state, services, cmd_tx, cmd_rx) into
Runtime<R: Reducer>(uses associated types). Reducer trait shrinks from 5 required accessors to 2. - Returned effects —
reducereturnsEffect<Svc, I, F>values instead of mutating&mut Effects. Follows the Elm/Iced model. Reduce is pure (state mutation + effect description). Services are only accessible inside effect closures, not in reduce itself.
Additionally, Session gains SessionState<M> so ReduceIntent::reduce takes &mut SessionState<M> instead of &mut Session.
Motivation
Section titled “Motivation”Boilerplate
Section titled “Boilerplate”Every Reducer holds the same 4 fields and implements 5 identical accessor methods:
pub struct SomeModality { state: State<Synced, Ephemeral>, services: Arc<SomeServices>, tx: CommandSender<Command<I, F>>, rx: Mutex<mpsc::Receiver<Command<I, F>>>,}
impl Reducer for SomeModality { fn state(&self) -> &Self::State { &self.state } fn state_mut(&mut self) -> &mut Self::State { &mut self.state } fn services(&self) -> &Arc<Self::Services> { &self.services } fn sender(&self) -> &CommandSender<...> { &self.tx } fn receiver(&self) -> &Mutex<...> { &self.rx }}Mutable effects are awkward
Section titled “Mutable effects are awkward”The current reduce takes &mut Effects and calls imperative methods:
fn reduce(cmd, state: &mut State, fx: &mut Effects<Svc, Cmd>) { match cmd { Intent::Load { id } => { fx.spawn(|svc, sender| { let data = svc.load(&id); sender.send(Command::Feedback(Loaded(data))); }); } }}Problems:
Effectsholds channels internally — reduce authors are writing to channels without knowing itfx.services()provides service access inside reduce, but services are for doing work — they belong in effects, not in the pure reduce function- The dispatch loop batch-drains pending feedback at the start of each
dispatch()call — feedback sits idle until the next explicit dispatch fx.send()andfx.spawn()are fire-and-forget side effects — hard to test, hard to compose
Session state leaks
Section titled “Session state leaks”Session<M, I> implements Reducer with type State = Self, so ReduceIntent::reduce takes &mut Session<M, Self>. Intent reducers have access to sinks, cache, and callbacks they shouldn’t touch.
Design
Section titled “Design”Effect enum — returned values
Section titled “Effect enum — returned values”Inspired by Iced’s Task. Effects are values returned from reduce, not side-effect calls on a mutable reference:
pub enum Effect<Svc, I, F> { /// No effect. None, /// Synchronous follow-up — reduced in the same dispatch cycle. Send(Command<I, F>), /// Synchronous work with service access. Runs inline during execute(). Task(Box<dyn FnOnce(&Svc, &Tx<I, F>) + Send>), /// Async work with service access. Runs as a tokio task. Spawn(Box<dyn FnOnce(Arc<Svc>, Tx<I, F>) -> BoxFuture<'static, ()> + Send>), /// Multiple effects. Batch(Vec<Effect<Svc, I, F>>),}| Variant | Services | Tx | Threading | Use case |
|---|---|---|---|---|
Send | — | — | Inline | Follow-up command |
Task | &Svc (borrowed) | &Tx (borrowed) | Inline | Synchronous service calls (agent notifications, LoroText reads) |
Spawn | Arc<Svc> (owned) | Tx (owned) | Async (tokio) | Network I/O, file loading, AI calls |
Both Task and Spawn receive a Tx (typed sender) for sending feedback. Task borrows (synchronous, inline). Spawn owns (needs to move to async task).
Tx — typed sender with sugar
Section titled “Tx — typed sender with sugar”CommandSender is renamed to Tx<I, F> with sugar methods that mirror the Fx:: constructors:
pub struct Tx<I, F>(mpsc::UnboundedSender<Command<I, F>>);
impl<I, F> Tx<I, F> { /// Send a follow-up intent. pub fn intent(&self, i: I) { let _ = self.0.send(Command::Intent(i)); }
/// Send feedback. pub fn feedback(&self, f: F) { let _ = self.0.send(Command::Feedback(f)); }
/// Send a raw command. pub fn send(&self, cmd: Command<I, F>) { let _ = self.0.send(cmd); }}
impl<I, F> Clone for Tx<I, F> { fn clone(&self) -> Self { Tx(self.0.clone()) }}Symmetry between reduce returns and closure sends:
| From reduce (return) | From closure (send) |
|---|---|
Fx::intent(x) | tx.intent(x) |
Fx::feedback(x) | tx.feedback(x) |
Fx::none() | (don’t send) |
Fx type alias — Layer 2 sugar
Section titled “Fx type alias — Layer 2 sugar”Each reducer defines a type alias that pins the generics and provides short constructors:
type Fx = Effect<WhiteboardServices, WhiteboardIntent, WhiteboardFeedback>;Effect provides constructors that eliminate Command:: noise:
impl<Svc, I, F> Effect<Svc, I, F> { /// No effect. pub fn none() -> Self { Self::None }
/// Synchronous follow-up intent — reduced in the same dispatch cycle. pub fn intent(intent: I) -> Self { Self::Send(Command::Intent(intent)) }
/// Synchronous follow-up feedback. pub fn feedback(fb: F) -> Self { Self::Send(Command::Feedback(fb)) }
/// Synchronous work with service access. Runs inline during execute(). pub fn task(f: impl FnOnce(&Svc, &Tx<I, F>) + Send + 'static) -> Self { Self::Task(Box::new(f)) }
/// Async work with service access. Runs as a tokio task. pub fn spawn(f: impl FnOnce(Arc<Svc>, Tx<I, F>) -> BoxFuture<'static, ()> + Send + 'static) -> Self { Self::Spawn(Box::new(f)) }
/// Multiple effects. pub fn batch(effects: impl IntoIterator<Item = Self>) -> Self { Self::Batch(effects.into_iter().collect()) }
/// Chain two effects. pub fn and(self, other: Self) -> Self { match self { Self::Batch(mut v) => { v.push(other); Self::Batch(v) } Self::None => other, _ => Self::Batch(vec![self, other]), } }}Reduce — before and after
Section titled “Reduce — before and after”Before:
fn reduce( cmd: Command<WhiteboardIntent, WhiteboardFeedback>, state: &mut State<WhiteboardSynced, WhiteboardEphemeral>, fx: &mut Effects<ModalityServices, Command<WhiteboardIntent, WhiteboardFeedback>>,) { match cmd { Command::Intent(AddElement(e)) => { state.synced.elements.push(e); } Command::Intent(LoadImage { id }) => { fx.spawn(|svc, sender| { let data = svc.load(&id); sender.send(Command::Feedback(ImageLoaded(data))); }); } Command::Intent(Complex) => { state.synced.x = 1; fx.send(Command::Intent(Recompile)); fx.services().agent.tool_output(json!({"ok": true})); } Command::Feedback(ImageLoaded(data)) => { state.ephemeral.image = Some(data); } }}After:
type Fx = Effect<WhiteboardServices, WhiteboardIntent, WhiteboardFeedback>;
fn reduce( cmd: Command<WhiteboardIntent, WhiteboardFeedback>, state: &mut State<WhiteboardSynced, WhiteboardEphemeral>,) -> Fx { match cmd { Command::Intent(AddElement(e)) => { state.synced.elements.push(e); Fx::none() } Command::Intent(LoadImage { id }) => { Fx::spawn(|svc, tx| Box::pin(async move { let data = svc.load(&id).await; tx.feedback(ImageLoaded(data)); })) } Command::Intent(Complex) => { state.synced.x = 1; Fx::intent(Recompile) .and(Fx::task(|svc, _tx| { svc.agent.tool_output(json!({"ok": true})); })) } Command::Feedback(ImageLoaded(data)) => { state.ephemeral.image = Some(data); Fx::none() } }}Key differences:
- No
fxparameter, nosvcparameter — reduce is(cmd, state) -> Effect - Services are only accessible inside
taskandspawnclosures — reduce itself is pure state mutation + effect description Fx::none()for the common “just mutate state” caseFx::intent(x)/tx.intent(x)— symmetric sugar for returns and sendsFx::task(|svc, tx| ...)for synchronous service work (agent calls, reads)Fx::spawn(|svc, tx| ...)for async work (network, AI).and()chains multiple effects
Runtime struct
Section titled “Runtime struct”Runtime<R: Reducer> uses the Reducer’s associated types — no redundant generic parameters. Both channels use tokio::sync::mpsc::unbounded_channel — UnboundedSender::send() is sync (works in Task closures without .await), UnboundedReceiver::recv() is async (spawn feedback loop can .await without blocking a thread), and try_recv() works for synchronous task feedback draining.
Runtime also owns the spawn feedback loop machinery: a dispatch_fn callback for async dispatch, and an AtomicBool to ensure the loop starts at most once.
use tokio::sync::mpsc;
type Cmd<R> = Command<<R as Reducer>::Intent, <R as Reducer>::Feedback>;
pub struct Runtime<R: Reducer> { pub state: R::State, pub services: Arc<R::Services>, // Synchronous: tasks send here, drained inline in execute() via try_recv() task_tx: Tx<R::Intent, R::Feedback>, task_rx: mpsc::UnboundedReceiver<Cmd<R>>, // Async: spawns send here, background loop drains via recv().await spawn_tx: Tx<R::Intent, R::Feedback>, spawn_rx: mpsc::UnboundedReceiver<Cmd<R>>, // Spawn feedback loop dispatch_fn: Arc<dyn Fn(Cmd<R>) + Send + Sync>, spawn_loop_active: AtomicBool,}Construction — into_arc
Section titled “Construction — into_arc”Reducers are always used behind Arc<Mutex<R>>. into_arc uses Arc::new_cyclic to set dispatch_fn during construction — no Option, no two-phase init:
pub fn into_arc<R: Reducer>(mut reducer: R) -> Arc<Mutex<R>> { Arc::new_cyclic(|weak: &Weak<Mutex<R>>| { let weak = weak.clone(); let rt = reducer.runtime_mut(); rt.dispatch_fn = Arc::new(move |cmd| { if let Some(this) = weak.upgrade() { this.lock().dispatch(cmd); } }); Mutex::new(reducer) })}dispatch_fn is always set — ensure_spawn_loop can rely on it unconditionally. The Weak reference means the loop exits cleanly if the Reducer is dropped.
Spawn feedback loop
Section titled “Spawn feedback loop”Lives on Runtime — every Reducer gets it for free. ensure_spawn_loop is idempotent via AtomicBool:
impl<R: Reducer> Runtime<R> { pub fn ensure_spawn_loop(&mut self) { if self.spawn_loop_active.swap(true, Ordering::SeqCst) { return; // already running } let dispatch = self.dispatch_fn.clone(); let mut rx = std::mem::replace( &mut self.spawn_rx, mpsc::unbounded_channel().1, // fresh dummy receiver ); tokio::spawn(async move { while let Some(cmd) = rx.recv().await { dispatch(cmd); } // recv() returns None when ALL spawn Tx clones are dropped // = all spawns finished, loop exits naturally }); }}Parallel spawns: Each Effect::Spawn is an independent tokio task. Multiple spawns from a Batch or .and() chain all run concurrently. The feedback loop serializes dispatch (one at a time via the Mutex acquired inside dispatch_fn), but the async work itself is fully parallel.
Reducer trait — updated
Section titled “Reducer trait — updated”const MAX_SEND_DEPTH: usize = 64;
pub trait Reducer: Send + Sync + 'static { type State; type Intent: Send + 'static; type Feedback: Send + 'static; type Services: Send + Sync + 'static;
fn runtime(&self) -> &Runtime<Self>; fn runtime_mut(&mut self) -> &mut Runtime<Self>;
/// Pure reduce: process command, mutate state, return effects. /// No service access — services live in Task/Spawn closures. fn reduce( cmd: Command<Self::Intent, Self::Feedback>, state: &mut Self::State, ) -> Effect<Self::Services, Self::Intent, Self::Feedback>;
/// Execute a command: reduce, then process returned effects. fn dispatch(&mut self, cmd: Command<Self::Intent, Self::Feedback>) { let effect = Self::reduce(cmd, &mut self.runtime_mut().state); self.execute(effect, 0); }
/// Process returned effects. Recursive for Batch/Send. fn execute(&mut self, effect: Effect<Self::Services, Self::Intent, Self::Feedback>, depth: usize) { if depth > MAX_SEND_DEPTH { log::error!("Effect::Send recursion depth exceeded {MAX_SEND_DEPTH}, dropping effect"); return; } match effect { Effect::None => {} Effect::Send(cmd) => { let effect = Self::reduce(cmd, &mut self.runtime_mut().state); self.execute(effect, depth + 1); } Effect::Task(f) => { let rt = self.runtime(); f(&rt.services, &rt.task_tx); // Drain feedback sent synchronously by the task let rt = self.runtime_mut(); while let Ok(cmd) = rt.task_rx.try_recv() { drop(rt); self.dispatch(cmd); } } Effect::Spawn(f) => { let rt = self.runtime_mut(); let svc = rt.services.clone(); let tx = rt.spawn_tx.clone(); tokio::spawn(f(svc, tx)); rt.ensure_spawn_loop(); } Effect::Batch(effects) => { for e in effects { self.execute(e, depth); } } } }}Task vs Spawn feedback: Task closures run inline — their feedback is drained from task_rx immediately after the closure returns. Spawn feedback arrives asynchronously on spawn_rx — Runtime’s ensure_spawn_loop() starts a background loop that calls dispatch_fn for each feedback, which re-enters through dispatch() on the Reducer.
Modality reducer — before and after
Section titled “Modality reducer — before and after”Before:
pub struct Whiteboard { state: State<WhiteboardSynced, WhiteboardEphemeral>, services: Arc<ModalityServices>, tx: CommandSender<Command<WhiteboardIntent, WhiteboardFeedback>>, rx: Mutex<mpsc::Receiver<Command<WhiteboardIntent, WhiteboardFeedback>>>,}
impl Reducer for Whiteboard { type State = State<WhiteboardSynced, WhiteboardEphemeral>; type Intent = WhiteboardIntent; type Feedback = WhiteboardFeedback; type Services = ModalityServices;
fn state(&self) -> &Self::State { &self.state } fn state_mut(&mut self) -> &mut Self::State { &mut self.state } fn services(&self) -> &Arc<ModalityServices> { &self.services } fn sender(&self) -> &CommandSender<...> { &self.tx } fn receiver(&self) -> &Mutex<...> { &self.rx }
fn reduce(cmd, state, fx) { /* ... */ }}After:
pub struct Whiteboard { rt: Runtime<Self>,}
type Fx = Effect<ModalityServices, WhiteboardIntent, WhiteboardFeedback>;
impl Reducer for Whiteboard { type State = State<WhiteboardSynced, WhiteboardEphemeral>; type Intent = WhiteboardIntent; type Feedback = WhiteboardFeedback; type Services = ModalityServices;
fn runtime(&self) -> &Runtime<Self> { &self.rt } fn runtime_mut(&mut self) -> &mut Runtime<Self> { &mut self.rt }
fn reduce(cmd: Command<WhiteboardIntent, WhiteboardFeedback>, state: &mut Self::State) -> Fx { match cmd { Command::Intent(intent) => reduce_intent(intent, state), Command::Feedback(fb) => reduce_feedback(fb, state), } }}4 fields → 1. 5 accessors → 2. Runtime<Self> — no redundant generics. Effects parameter → return value. No service access in reduce.
Prose — keeps custom dispatch
Section titled “Prose — keeps custom dispatch”Prose replaces the 4 fields with Runtime but retains its custom dispatch() override for undo/redo lifecycle and effect delivery:
pub struct Prose { rt: Runtime<Self>, doc: LoroDoc, undo_staging: Arc<Mutex<UndoStaging>>, undo_manager: Mutex<UndoManager>, sinks: Vec<Box<dyn Sink<ProseSnapshot>>>, effect_sinks: Vec<Box<dyn Sink<ProseEffect>>>, version: u64,}
impl Reducer for Prose { fn runtime(&self) -> &Runtime<Self> { &self.rt } fn runtime_mut(&mut self) -> &mut Runtime<Self> { &mut self.rt }
fn reduce(cmd, state) -> ProseFx { /* returns effects */ }
fn dispatch(&mut self, cmd) { // Custom: undo/redo interception, snapshot emission, effect delivery // Calls Self::reduce() and self.execute() internally }}Session — implements Reducer, overrides dispatch
Section titled “Session — implements Reducer, overrides dispatch”The current problem: Session<M, I> implements Reducer with type State = Self. Intent reducers get &mut Session, which leaks sinks, cache, callbacks — infrastructure they shouldn’t touch.
The fix: extract SessionState<M> with only the fields reduce needs. Session implements Reducer with type State = SessionState<M> and overrides dispatch() to add lifecycle (recompile, commit, snapshot).
Before:
pub struct Session<M: Modality, I: ReduceIntent<M>> { modality: M, doc: Doc<M::Synced>, sinks: Vec<Box<dyn Sink<M::Snapshot>>>, on_change_callbacks: Vec<Box<dyn Fn(&[u8]) + Send + Sync>>, session_services: Arc<SessionServices>, cache: CompileCache<...>, cmd_tx: CommandSender<Command<I, I::Feedback>>, cmd_rx: Mutex<mpsc::Receiver<Command<I, I::Feedback>>>, agent_rx: Mutex<mpsc::Receiver<AgentEvent>>, ephemeral: SessionEphemeral, version: u64, self_ref: Option<Weak<Mutex<Self>>>, agent_tx: Option<mpsc::Sender<String>>, agent_config: Option<AgentConfig>,}
impl<M, I> Reducer for Session<M, I> { type State = Self; // <-- the problem: reduce sees everything}After:
/// Domain state that reduce can touch.pub struct SessionState<M: Modality> { pub modality: M, pub doc: Doc<M::Synced>, pub ephemeral: SessionEphemeral, pub version: u64, pub agent_tx: Option<mpsc::Sender<String>>, pub agent_config: Option<AgentConfig>,}
/// Infrastructure that only dispatch lifecycle touches.pub struct Session<M: Modality, I: ReduceIntent<M>> { rt: Runtime<Self>, sinks: Vec<Box<dyn Sink<M::Snapshot>>>, cache: CompileCache<...>, on_change_callbacks: Vec<Box<dyn Fn(&[u8]) + Send + Sync>>, agent_rx: Mutex<mpsc::Receiver<AgentEvent>>,}
type SessionFx<I, F> = Effect<SessionServices, I, F>;
impl<M, I> Reducer for Session<M, I>where M: Modality, I: ReduceIntent<M>,{ type State = SessionState<M>; type Intent = I; type Feedback = I::Feedback; type Services = SessionServices;
fn runtime(&self) -> &Runtime<Self> { &self.rt } fn runtime_mut(&mut self) -> &mut Runtime<Self> { &mut self.rt }
fn reduce( cmd: Command<I, I::Feedback>, state: &mut SessionState<M>, ) -> SessionFx<I, I::Feedback> { match cmd { Command::Intent(i) => I::reduce(i, state), Command::Feedback(f) => I::handle_feedback(f, state), } }
fn dispatch(&mut self, cmd: Command<I, I::Feedback>) { let effect = Self::reduce(cmd, &mut self.rt.state); self.execute(effect, 0); // default from Reducer trait self.lifecycle(); // Session-specific }}
impl<M, I> Session<M, I> { /// Recompile → commit if changed → version++ → emit snapshot → fire on_change. fn lifecycle(&mut self) { let state = &mut self.rt.state; let output = state.modality.compile(/* via self.cache */); if state.doc.commit_if_changed() { state.version += 1; } let snapshot = state.modality.snapshot( &state.ephemeral.ai, &state.ephemeral.export, state.version, ); self.sinks.retain(|s| s.push(snapshot.clone())); for cb in &self.on_change_callbacks { cb(&state.doc.export_bytes()); } }}No self_ref — dispatch_fn on Runtime handles async dispatch (set up via into_arc). execute() stays on the Reducer trait as a default method. Session only overrides dispatch() to add lifecycle. Reduce gets &mut SessionState<M> — clean boundary. Spawn feedback loop is handled entirely by Runtime — Session gets it for free.
ReduceIntent — takes &mut SessionState
Section titled “ReduceIntent — takes &mut SessionState”pub trait ReduceIntent<M: Modality>: Send + 'static { type Feedback: Send + 'static;
fn reduce( self, state: &mut SessionState<M>, ) -> SessionFx<Self, Self::Feedback> where Self: Sized;
fn handle_feedback( fb: Self::Feedback, state: &mut SessionState<M>, ) -> SessionFx<Self, Self::Feedback> where Self: Sized;}Intent reducers no longer have access to sinks, cache, or callbacks — only the domain state they should be mutating. No services parameter — service work goes in returned Fx::task() / Fx::spawn() effects.
Effects struct — what happens to it
Section titled “Effects struct — what happens to it”The current Effects struct (channel holder with send()/spawn()/services()) is deleted. Its role is replaced by:
Effectenum — returned values describing effectsFxtype alias — per-reducer sugarTx<I, F>— typed sender for closures (renamed fromCommandSender)Reducer::execute()— default method that processes returned effects
Implementation Plan
Section titled “Implementation Plan”- Add
tokiodependency tomodality_core - Rename
CommandSendertoTx<I, F>, add.intent()/.feedback()sugar - Create
Effectenum and constructors incrates/core/src/state/effect.rs - Create
Runtime<R: Reducer>struct withdispatch_fn,spawn_loop_active,ensure_spawn_loop(),into_arc() - Update
Reducertrait —reducereturnsEffect,dispatch/executewith depth check, replace 5 accessors with 2 (&Runtime<Self>) - Refactor modality reducers (Whiteboard, Worksheet, LessonPlan, Assessment) — replace fields with
rt: Runtime<Self>, update reduce to returnFx, move service calls intoFx::task() - Refactor Prose — same, keep custom
dispatch() - Extract
SessionState<M>fromSession<M, I>, removeself_ref - Update
ReduceIntenttrait and all impls — returnSessionFx, take&mut SessionState<M> - Delete
Effectsstruct - Update
session_api.rs— useinto_arc()for all reducer construction cargo test --workspace— pure refactor, no behavioral changes
Alternatives Considered
Section titled “Alternatives Considered”Keep &mut Effects (status quo) — Works but leaks channels into reduce, provides service access where it shouldn’t be, requires the drain step, and makes effects hard to test. The Elm/Iced community has thoroughly validated that returned effects are simpler.
Services as parameter to reduce (fn reduce(cmd, state, svc) -> Fx) — Rejected because services are for doing work, not for pure state reduction. Providing &Svc in reduce tempts authors to call service methods (agent notifications, I/O) inline as side effects. With Task/Spawn closures, service work is explicitly described as an effect.
Macro sugar (Layer 3) — A fx![send x, spawn |svc| ...] macro. Rejected because it breaks LSP — no autocomplete on DSL keywords, spotty type inference inside macro bodies. Regular function calls (Fx::intent(), Fx::spawn()) give full LSP support.
impl Into<Effect> return type — Would let () auto-convert to Effect::None. Rejected because Rust match arms must all return the same concrete type, so this doesn’t actually help. Fx::none() is short enough.
Spawn returning feedback directly (Fx::spawn(|svc| async { Feedback::X })) — Simpler for one-shot async work, but can’t send multiple feedbacks (e.g., streaming). Both Task and Spawn getting a Tx is uniform and handles all cases.
Keep type State = Self on Session — Gives intent reducers access to Session internals they shouldn’t touch. SessionState<M> enforces a cleaner boundary.
Unresolved Questions
Section titled “Unresolved Questions”- Testing effects — Returned effects are values and can be asserted on directly (
assert_eq!(effect, Fx::none())). ButEffect::TaskandEffect::Spawncontain boxed closures which aren’tPartialEq. May need a test helper that executes the effect and asserts on the feedback produced.