Skip to content

Runtime Struct

Two changes that simplify the Reducer pattern:

  1. Runtime struct — extract the 4 duplicated fields (state, services, cmd_tx, cmd_rx) into Runtime<R: Reducer> (uses associated types). Reducer trait shrinks from 5 required accessors to 2.
  2. Returned effectsreduce returns Effect<Svc, I, F> values instead of mutating &mut Effects. Follows the Elm/Iced model. Reduce is pure (state mutation + effect description). Services are only accessible inside effect closures, not in reduce itself.

Additionally, Session gains SessionState<M> so ReduceIntent::reduce takes &mut SessionState<M> instead of &mut Session.

Every Reducer holds the same 4 fields and implements 5 identical accessor methods:

pub struct SomeModality {
state: State<Synced, Ephemeral>,
services: Arc<SomeServices>,
tx: CommandSender<Command<I, F>>,
rx: Mutex<mpsc::Receiver<Command<I, F>>>,
}
impl Reducer for SomeModality {
fn state(&self) -> &Self::State { &self.state }
fn state_mut(&mut self) -> &mut Self::State { &mut self.state }
fn services(&self) -> &Arc<Self::Services> { &self.services }
fn sender(&self) -> &CommandSender<...> { &self.tx }
fn receiver(&self) -> &Mutex<...> { &self.rx }
}

The current reduce takes &mut Effects and calls imperative methods:

fn reduce(cmd, state: &mut State, fx: &mut Effects<Svc, Cmd>) {
match cmd {
Intent::Load { id } => {
fx.spawn(|svc, sender| {
let data = svc.load(&id);
sender.send(Command::Feedback(Loaded(data)));
});
}
}
}

Problems:

  • Effects holds channels internally — reduce authors are writing to channels without knowing it
  • fx.services() provides service access inside reduce, but services are for doing work — they belong in effects, not in the pure reduce function
  • The dispatch loop batch-drains pending feedback at the start of each dispatch() call — feedback sits idle until the next explicit dispatch
  • fx.send() and fx.spawn() are fire-and-forget side effects — hard to test, hard to compose

Session<M, I> implements Reducer with type State = Self, so ReduceIntent::reduce takes &mut Session<M, Self>. Intent reducers have access to sinks, cache, and callbacks they shouldn’t touch.

Inspired by Iced’s Task. Effects are values returned from reduce, not side-effect calls on a mutable reference:

pub enum Effect<Svc, I, F> {
/// No effect.
None,
/// Synchronous follow-up — reduced in the same dispatch cycle.
Send(Command<I, F>),
/// Synchronous work with service access. Runs inline during execute().
Task(Box<dyn FnOnce(&Svc, &Tx<I, F>) + Send>),
/// Async work with service access. Runs as a tokio task.
Spawn(Box<dyn FnOnce(Arc<Svc>, Tx<I, F>) -> BoxFuture<'static, ()> + Send>),
/// Multiple effects.
Batch(Vec<Effect<Svc, I, F>>),
}
VariantServicesTxThreadingUse case
SendInlineFollow-up command
Task&Svc (borrowed)&Tx (borrowed)InlineSynchronous service calls (agent notifications, LoroText reads)
SpawnArc<Svc> (owned)Tx (owned)Async (tokio)Network I/O, file loading, AI calls

Both Task and Spawn receive a Tx (typed sender) for sending feedback. Task borrows (synchronous, inline). Spawn owns (needs to move to async task).

CommandSender is renamed to Tx<I, F> with sugar methods that mirror the Fx:: constructors:

pub struct Tx<I, F>(mpsc::UnboundedSender<Command<I, F>>);
impl<I, F> Tx<I, F> {
/// Send a follow-up intent.
pub fn intent(&self, i: I) {
let _ = self.0.send(Command::Intent(i));
}
/// Send feedback.
pub fn feedback(&self, f: F) {
let _ = self.0.send(Command::Feedback(f));
}
/// Send a raw command.
pub fn send(&self, cmd: Command<I, F>) {
let _ = self.0.send(cmd);
}
}
impl<I, F> Clone for Tx<I, F> {
fn clone(&self) -> Self { Tx(self.0.clone()) }
}

Symmetry between reduce returns and closure sends:

From reduce (return)From closure (send)
Fx::intent(x)tx.intent(x)
Fx::feedback(x)tx.feedback(x)
Fx::none()(don’t send)

Each reducer defines a type alias that pins the generics and provides short constructors:

type Fx = Effect<WhiteboardServices, WhiteboardIntent, WhiteboardFeedback>;

Effect provides constructors that eliminate Command:: noise:

impl<Svc, I, F> Effect<Svc, I, F> {
/// No effect.
pub fn none() -> Self {
Self::None
}
/// Synchronous follow-up intent — reduced in the same dispatch cycle.
pub fn intent(intent: I) -> Self {
Self::Send(Command::Intent(intent))
}
/// Synchronous follow-up feedback.
pub fn feedback(fb: F) -> Self {
Self::Send(Command::Feedback(fb))
}
/// Synchronous work with service access. Runs inline during execute().
pub fn task(f: impl FnOnce(&Svc, &Tx<I, F>) + Send + 'static) -> Self {
Self::Task(Box::new(f))
}
/// Async work with service access. Runs as a tokio task.
pub fn spawn(f: impl FnOnce(Arc<Svc>, Tx<I, F>) -> BoxFuture<'static, ()> + Send + 'static) -> Self {
Self::Spawn(Box::new(f))
}
/// Multiple effects.
pub fn batch(effects: impl IntoIterator<Item = Self>) -> Self {
Self::Batch(effects.into_iter().collect())
}
/// Chain two effects.
pub fn and(self, other: Self) -> Self {
match self {
Self::Batch(mut v) => { v.push(other); Self::Batch(v) }
Self::None => other,
_ => Self::Batch(vec![self, other]),
}
}
}

Before:

fn reduce(
cmd: Command<WhiteboardIntent, WhiteboardFeedback>,
state: &mut State<WhiteboardSynced, WhiteboardEphemeral>,
fx: &mut Effects<ModalityServices, Command<WhiteboardIntent, WhiteboardFeedback>>,
) {
match cmd {
Command::Intent(AddElement(e)) => {
state.synced.elements.push(e);
}
Command::Intent(LoadImage { id }) => {
fx.spawn(|svc, sender| {
let data = svc.load(&id);
sender.send(Command::Feedback(ImageLoaded(data)));
});
}
Command::Intent(Complex) => {
state.synced.x = 1;
fx.send(Command::Intent(Recompile));
fx.services().agent.tool_output(json!({"ok": true}));
}
Command::Feedback(ImageLoaded(data)) => {
state.ephemeral.image = Some(data);
}
}
}

After:

type Fx = Effect<WhiteboardServices, WhiteboardIntent, WhiteboardFeedback>;
fn reduce(
cmd: Command<WhiteboardIntent, WhiteboardFeedback>,
state: &mut State<WhiteboardSynced, WhiteboardEphemeral>,
) -> Fx {
match cmd {
Command::Intent(AddElement(e)) => {
state.synced.elements.push(e);
Fx::none()
}
Command::Intent(LoadImage { id }) => {
Fx::spawn(|svc, tx| Box::pin(async move {
let data = svc.load(&id).await;
tx.feedback(ImageLoaded(data));
}))
}
Command::Intent(Complex) => {
state.synced.x = 1;
Fx::intent(Recompile)
.and(Fx::task(|svc, _tx| {
svc.agent.tool_output(json!({"ok": true}));
}))
}
Command::Feedback(ImageLoaded(data)) => {
state.ephemeral.image = Some(data);
Fx::none()
}
}
}

Key differences:

  • No fx parameter, no svc parameter — reduce is (cmd, state) -> Effect
  • Services are only accessible inside task and spawn closures — reduce itself is pure state mutation + effect description
  • Fx::none() for the common “just mutate state” case
  • Fx::intent(x) / tx.intent(x) — symmetric sugar for returns and sends
  • Fx::task(|svc, tx| ...) for synchronous service work (agent calls, reads)
  • Fx::spawn(|svc, tx| ...) for async work (network, AI)
  • .and() chains multiple effects

Runtime<R: Reducer> uses the Reducer’s associated types — no redundant generic parameters. Both channels use tokio::sync::mpsc::unbounded_channelUnboundedSender::send() is sync (works in Task closures without .await), UnboundedReceiver::recv() is async (spawn feedback loop can .await without blocking a thread), and try_recv() works for synchronous task feedback draining.

Runtime also owns the spawn feedback loop machinery: a dispatch_fn callback for async dispatch, and an AtomicBool to ensure the loop starts at most once.

use tokio::sync::mpsc;
type Cmd<R> = Command<<R as Reducer>::Intent, <R as Reducer>::Feedback>;
pub struct Runtime<R: Reducer> {
pub state: R::State,
pub services: Arc<R::Services>,
// Synchronous: tasks send here, drained inline in execute() via try_recv()
task_tx: Tx<R::Intent, R::Feedback>,
task_rx: mpsc::UnboundedReceiver<Cmd<R>>,
// Async: spawns send here, background loop drains via recv().await
spawn_tx: Tx<R::Intent, R::Feedback>,
spawn_rx: mpsc::UnboundedReceiver<Cmd<R>>,
// Spawn feedback loop
dispatch_fn: Arc<dyn Fn(Cmd<R>) + Send + Sync>,
spawn_loop_active: AtomicBool,
}

Reducers are always used behind Arc<Mutex<R>>. into_arc uses Arc::new_cyclic to set dispatch_fn during construction — no Option, no two-phase init:

pub fn into_arc<R: Reducer>(mut reducer: R) -> Arc<Mutex<R>> {
Arc::new_cyclic(|weak: &Weak<Mutex<R>>| {
let weak = weak.clone();
let rt = reducer.runtime_mut();
rt.dispatch_fn = Arc::new(move |cmd| {
if let Some(this) = weak.upgrade() {
this.lock().dispatch(cmd);
}
});
Mutex::new(reducer)
})
}

dispatch_fn is always set — ensure_spawn_loop can rely on it unconditionally. The Weak reference means the loop exits cleanly if the Reducer is dropped.

Lives on Runtime — every Reducer gets it for free. ensure_spawn_loop is idempotent via AtomicBool:

impl<R: Reducer> Runtime<R> {
pub fn ensure_spawn_loop(&mut self) {
if self.spawn_loop_active.swap(true, Ordering::SeqCst) {
return; // already running
}
let dispatch = self.dispatch_fn.clone();
let mut rx = std::mem::replace(
&mut self.spawn_rx,
mpsc::unbounded_channel().1, // fresh dummy receiver
);
tokio::spawn(async move {
while let Some(cmd) = rx.recv().await {
dispatch(cmd);
}
// recv() returns None when ALL spawn Tx clones are dropped
// = all spawns finished, loop exits naturally
});
}
}

Parallel spawns: Each Effect::Spawn is an independent tokio task. Multiple spawns from a Batch or .and() chain all run concurrently. The feedback loop serializes dispatch (one at a time via the Mutex acquired inside dispatch_fn), but the async work itself is fully parallel.

const MAX_SEND_DEPTH: usize = 64;
pub trait Reducer: Send + Sync + 'static {
type State;
type Intent: Send + 'static;
type Feedback: Send + 'static;
type Services: Send + Sync + 'static;
fn runtime(&self) -> &Runtime<Self>;
fn runtime_mut(&mut self) -> &mut Runtime<Self>;
/// Pure reduce: process command, mutate state, return effects.
/// No service access — services live in Task/Spawn closures.
fn reduce(
cmd: Command<Self::Intent, Self::Feedback>,
state: &mut Self::State,
) -> Effect<Self::Services, Self::Intent, Self::Feedback>;
/// Execute a command: reduce, then process returned effects.
fn dispatch(&mut self, cmd: Command<Self::Intent, Self::Feedback>) {
let effect = Self::reduce(cmd, &mut self.runtime_mut().state);
self.execute(effect, 0);
}
/// Process returned effects. Recursive for Batch/Send.
fn execute(&mut self, effect: Effect<Self::Services, Self::Intent, Self::Feedback>, depth: usize) {
if depth > MAX_SEND_DEPTH {
log::error!("Effect::Send recursion depth exceeded {MAX_SEND_DEPTH}, dropping effect");
return;
}
match effect {
Effect::None => {}
Effect::Send(cmd) => {
let effect = Self::reduce(cmd, &mut self.runtime_mut().state);
self.execute(effect, depth + 1);
}
Effect::Task(f) => {
let rt = self.runtime();
f(&rt.services, &rt.task_tx);
// Drain feedback sent synchronously by the task
let rt = self.runtime_mut();
while let Ok(cmd) = rt.task_rx.try_recv() {
drop(rt);
self.dispatch(cmd);
}
}
Effect::Spawn(f) => {
let rt = self.runtime_mut();
let svc = rt.services.clone();
let tx = rt.spawn_tx.clone();
tokio::spawn(f(svc, tx));
rt.ensure_spawn_loop();
}
Effect::Batch(effects) => {
for e in effects {
self.execute(e, depth);
}
}
}
}
}

Task vs Spawn feedback: Task closures run inline — their feedback is drained from task_rx immediately after the closure returns. Spawn feedback arrives asynchronously on spawn_rx — Runtime’s ensure_spawn_loop() starts a background loop that calls dispatch_fn for each feedback, which re-enters through dispatch() on the Reducer.

Before:

pub struct Whiteboard {
state: State<WhiteboardSynced, WhiteboardEphemeral>,
services: Arc<ModalityServices>,
tx: CommandSender<Command<WhiteboardIntent, WhiteboardFeedback>>,
rx: Mutex<mpsc::Receiver<Command<WhiteboardIntent, WhiteboardFeedback>>>,
}
impl Reducer for Whiteboard {
type State = State<WhiteboardSynced, WhiteboardEphemeral>;
type Intent = WhiteboardIntent;
type Feedback = WhiteboardFeedback;
type Services = ModalityServices;
fn state(&self) -> &Self::State { &self.state }
fn state_mut(&mut self) -> &mut Self::State { &mut self.state }
fn services(&self) -> &Arc<ModalityServices> { &self.services }
fn sender(&self) -> &CommandSender<...> { &self.tx }
fn receiver(&self) -> &Mutex<...> { &self.rx }
fn reduce(cmd, state, fx) { /* ... */ }
}

After:

pub struct Whiteboard {
rt: Runtime<Self>,
}
type Fx = Effect<ModalityServices, WhiteboardIntent, WhiteboardFeedback>;
impl Reducer for Whiteboard {
type State = State<WhiteboardSynced, WhiteboardEphemeral>;
type Intent = WhiteboardIntent;
type Feedback = WhiteboardFeedback;
type Services = ModalityServices;
fn runtime(&self) -> &Runtime<Self> { &self.rt }
fn runtime_mut(&mut self) -> &mut Runtime<Self> { &mut self.rt }
fn reduce(cmd: Command<WhiteboardIntent, WhiteboardFeedback>, state: &mut Self::State) -> Fx {
match cmd {
Command::Intent(intent) => reduce_intent(intent, state),
Command::Feedback(fb) => reduce_feedback(fb, state),
}
}
}

4 fields → 1. 5 accessors → 2. Runtime<Self> — no redundant generics. Effects parameter → return value. No service access in reduce.

Prose replaces the 4 fields with Runtime but retains its custom dispatch() override for undo/redo lifecycle and effect delivery:

pub struct Prose {
rt: Runtime<Self>,
doc: LoroDoc,
undo_staging: Arc<Mutex<UndoStaging>>,
undo_manager: Mutex<UndoManager>,
sinks: Vec<Box<dyn Sink<ProseSnapshot>>>,
effect_sinks: Vec<Box<dyn Sink<ProseEffect>>>,
version: u64,
}
impl Reducer for Prose {
fn runtime(&self) -> &Runtime<Self> { &self.rt }
fn runtime_mut(&mut self) -> &mut Runtime<Self> { &mut self.rt }
fn reduce(cmd, state) -> ProseFx { /* returns effects */ }
fn dispatch(&mut self, cmd) {
// Custom: undo/redo interception, snapshot emission, effect delivery
// Calls Self::reduce() and self.execute() internally
}
}

Session — implements Reducer, overrides dispatch

Section titled “Session — implements Reducer, overrides dispatch”

The current problem: Session<M, I> implements Reducer with type State = Self. Intent reducers get &mut Session, which leaks sinks, cache, callbacks — infrastructure they shouldn’t touch.

The fix: extract SessionState<M> with only the fields reduce needs. Session implements Reducer with type State = SessionState<M> and overrides dispatch() to add lifecycle (recompile, commit, snapshot).

Before:

pub struct Session<M: Modality, I: ReduceIntent<M>> {
modality: M,
doc: Doc<M::Synced>,
sinks: Vec<Box<dyn Sink<M::Snapshot>>>,
on_change_callbacks: Vec<Box<dyn Fn(&[u8]) + Send + Sync>>,
session_services: Arc<SessionServices>,
cache: CompileCache<...>,
cmd_tx: CommandSender<Command<I, I::Feedback>>,
cmd_rx: Mutex<mpsc::Receiver<Command<I, I::Feedback>>>,
agent_rx: Mutex<mpsc::Receiver<AgentEvent>>,
ephemeral: SessionEphemeral,
version: u64,
self_ref: Option<Weak<Mutex<Self>>>,
agent_tx: Option<mpsc::Sender<String>>,
agent_config: Option<AgentConfig>,
}
impl<M, I> Reducer for Session<M, I> {
type State = Self; // <-- the problem: reduce sees everything
}

After:

/// Domain state that reduce can touch.
pub struct SessionState<M: Modality> {
pub modality: M,
pub doc: Doc<M::Synced>,
pub ephemeral: SessionEphemeral,
pub version: u64,
pub agent_tx: Option<mpsc::Sender<String>>,
pub agent_config: Option<AgentConfig>,
}
/// Infrastructure that only dispatch lifecycle touches.
pub struct Session<M: Modality, I: ReduceIntent<M>> {
rt: Runtime<Self>,
sinks: Vec<Box<dyn Sink<M::Snapshot>>>,
cache: CompileCache<...>,
on_change_callbacks: Vec<Box<dyn Fn(&[u8]) + Send + Sync>>,
agent_rx: Mutex<mpsc::Receiver<AgentEvent>>,
}
type SessionFx<I, F> = Effect<SessionServices, I, F>;
impl<M, I> Reducer for Session<M, I>
where
M: Modality,
I: ReduceIntent<M>,
{
type State = SessionState<M>;
type Intent = I;
type Feedback = I::Feedback;
type Services = SessionServices;
fn runtime(&self) -> &Runtime<Self> { &self.rt }
fn runtime_mut(&mut self) -> &mut Runtime<Self> { &mut self.rt }
fn reduce(
cmd: Command<I, I::Feedback>,
state: &mut SessionState<M>,
) -> SessionFx<I, I::Feedback> {
match cmd {
Command::Intent(i) => I::reduce(i, state),
Command::Feedback(f) => I::handle_feedback(f, state),
}
}
fn dispatch(&mut self, cmd: Command<I, I::Feedback>) {
let effect = Self::reduce(cmd, &mut self.rt.state);
self.execute(effect, 0); // default from Reducer trait
self.lifecycle(); // Session-specific
}
}
impl<M, I> Session<M, I> {
/// Recompile → commit if changed → version++ → emit snapshot → fire on_change.
fn lifecycle(&mut self) {
let state = &mut self.rt.state;
let output = state.modality.compile(/* via self.cache */);
if state.doc.commit_if_changed() {
state.version += 1;
}
let snapshot = state.modality.snapshot(
&state.ephemeral.ai,
&state.ephemeral.export,
state.version,
);
self.sinks.retain(|s| s.push(snapshot.clone()));
for cb in &self.on_change_callbacks {
cb(&state.doc.export_bytes());
}
}
}

No self_refdispatch_fn on Runtime handles async dispatch (set up via into_arc). execute() stays on the Reducer trait as a default method. Session only overrides dispatch() to add lifecycle. Reduce gets &mut SessionState<M> — clean boundary. Spawn feedback loop is handled entirely by Runtime — Session gets it for free.

pub trait ReduceIntent<M: Modality>: Send + 'static {
type Feedback: Send + 'static;
fn reduce(
self,
state: &mut SessionState<M>,
) -> SessionFx<Self, Self::Feedback>
where Self: Sized;
fn handle_feedback(
fb: Self::Feedback,
state: &mut SessionState<M>,
) -> SessionFx<Self, Self::Feedback>
where Self: Sized;
}

Intent reducers no longer have access to sinks, cache, or callbacks — only the domain state they should be mutating. No services parameter — service work goes in returned Fx::task() / Fx::spawn() effects.

The current Effects struct (channel holder with send()/spawn()/services()) is deleted. Its role is replaced by:

  • Effect enum — returned values describing effects
  • Fx type alias — per-reducer sugar
  • Tx<I, F> — typed sender for closures (renamed from CommandSender)
  • Reducer::execute() — default method that processes returned effects
  1. Add tokio dependency to modality_core
  2. Rename CommandSender to Tx<I, F>, add .intent() / .feedback() sugar
  3. Create Effect enum and constructors in crates/core/src/state/effect.rs
  4. Create Runtime<R: Reducer> struct with dispatch_fn, spawn_loop_active, ensure_spawn_loop(), into_arc()
  5. Update Reducer trait — reduce returns Effect, dispatch/execute with depth check, replace 5 accessors with 2 (&Runtime<Self>)
  6. Refactor modality reducers (Whiteboard, Worksheet, LessonPlan, Assessment) — replace fields with rt: Runtime<Self>, update reduce to return Fx, move service calls into Fx::task()
  7. Refactor Prose — same, keep custom dispatch()
  8. Extract SessionState<M> from Session<M, I>, remove self_ref
  9. Update ReduceIntent trait and all impls — return SessionFx, take &mut SessionState<M>
  10. Delete Effects struct
  11. Update session_api.rs — use into_arc() for all reducer construction
  12. cargo test --workspace — pure refactor, no behavioral changes

Keep &mut Effects (status quo) — Works but leaks channels into reduce, provides service access where it shouldn’t be, requires the drain step, and makes effects hard to test. The Elm/Iced community has thoroughly validated that returned effects are simpler.

Services as parameter to reduce (fn reduce(cmd, state, svc) -> Fx) — Rejected because services are for doing work, not for pure state reduction. Providing &Svc in reduce tempts authors to call service methods (agent notifications, I/O) inline as side effects. With Task/Spawn closures, service work is explicitly described as an effect.

Macro sugar (Layer 3) — A fx![send x, spawn |svc| ...] macro. Rejected because it breaks LSP — no autocomplete on DSL keywords, spotty type inference inside macro bodies. Regular function calls (Fx::intent(), Fx::spawn()) give full LSP support.

impl Into<Effect> return type — Would let () auto-convert to Effect::None. Rejected because Rust match arms must all return the same concrete type, so this doesn’t actually help. Fx::none() is short enough.

Spawn returning feedback directly (Fx::spawn(|svc| async { Feedback::X })) — Simpler for one-shot async work, but can’t send multiple feedbacks (e.g., streaming). Both Task and Spawn getting a Tx is uniform and handles all cases.

Keep type State = Self on Session — Gives intent reducers access to Session internals they shouldn’t touch. SessionState<M> enforces a cleaner boundary.

  1. Testing effects — Returned effects are values and can be asserted on directly (assert_eq!(effect, Fx::none())). But Effect::Task and Effect::Spawn contain boxed closures which aren’t PartialEq. May need a test helper that executes the effect and asserts on the feedback produced.