Reactive programming with Subject and Source

react-declarative includes a self-contained reactive programming toolkit: Subject and BehaviorSubject for event streams, a Source class for composable MapReduce pipelines, and a set of React hooks that wire these primitives into your component tree cleanly. You do not need RxJS—the library's own implementations are enough for the patterns commonly needed alongside form and grid components.

Subject is a push-based event emitter. Call next to emit a value; call subscribe to listen. Subscribers receive only values emitted after they subscribed.

import { Subject } from "react-declarative";

const clickSubject = new Subject<MouseEvent>();

// Emit
document.addEventListener("click", (e) => clickSubject.next(e));

// Subscribe — returns an unsubscribe function
const unsub = clickSubject.subscribe((e) => {
console.log("clicked at", e.clientX, e.clientY);
});

// Later
unsub();

BehaviorSubject is like Subject but it stores the most recent value. New subscribers immediately receive the current value, then any subsequent emissions.

import { BehaviorSubject } from "react-declarative";

const userSubject = new BehaviorSubject<string | null>(null);

userSubject.next("alice");

// New subscriber gets "alice" immediately, then any future values
userSubject.subscribe((user) => console.log("current user:", user));

// Access current value directly
console.log(userSubject.data); // "alice"

EventEmitter provides named-event pub/sub when you need multiple separate event channels on a single object.

import { EventEmitter } from "react-declarative";

const emitter = new EventEmitter();

emitter.subscribe("login", (user) => console.log("logged in:", user));
emitter.subscribe("logout", () => console.log("logged out"));

emitter.emit("login", { name: "alice" });
emitter.emit("logout");

useSubject creates a Subject that is stable across renders. Pass an optional external subject to bridge an outer stream into a component-scoped one.

import { useSubject } from "react-declarative";

const SaveButton = () => {
const saveSubject = useSubject<void>();

const handleClick = () => saveSubject.next();

useEffect(
() => saveSubject.subscribe(() => console.log("save triggered")),
[]
);

return <button onClick={handleClick}>Save</button>;
};

useBehaviorSubject creates a stable BehaviorSubject with an optional initial value.

import { useBehaviorSubject } from "react-declarative";

const StatusBadge = () => {
const statusSubject = useBehaviorSubject<"idle" | "loading" | "done">("idle");

const handleFetch = async () => {
statusSubject.next("loading");
await fetchData();
statusSubject.next("done");
};

// Combine with useSubjectValue to read in JSX
return <button onClick={handleFetch}>Fetch</button>;
};

useChangeSubject turns a plain React value into a subject that emits whenever the value changes. This is useful for reacting to form data changes without adding a manual useEffect dependency array.

import { useChangeSubject, useSubscription } from "react-declarative";

const AutoSaveForm = ({ data }) => {
const dataChangeSubject = useChangeSubject(data);

useSubscription(() =>
dataChangeSubject.subscribe((newData) => {
// Called every time `data` changes
saveToServer(newData);
})
);

return <One fields={fields} handler={data} />;
};

useSubscription is a thin wrapper around useEffect for subscribing to subjects. It runs the setup function once on mount and calls the returned unsubscribe function on unmount—no dependency array needed.

import { useSubscription } from "react-declarative";
import { ioc } from "../ioc/ioc";

const NotificationBell = () => {
const [count, setCount] = useState(0);

useSubscription(() =>
ioc.notificationService.countSubject.subscribe(setCount)
);

return <span>{count} notifications</span>;
};

Source provides a fluent API for composing data pipelines across multiple streams. It is the right tool when you need to join, filter, and transform multiple subjects before consuming the result.

Method Purpose
Source.join Combines multiple observers; emits when any emits
Source.multicast Shares a single upstream subscription among multiple consumers
.reduce(fn, init) Accumulates values across emissions
.filter(fn) Drops emissions that do not pass the predicate
.tap(fn) Runs a side effect without changing the emitted value
.map(fn) Transforms each emitted value
Source.fromInterval(ms) Emits void on a fixed interval

This example is taken directly from a real face-verification feature. It joins a state stream with a timer, counts consecutive valid frames, starts/stops a media recorder, and fires only when enough consecutive frames pass.

import { Source } from "react-declarative";

const CC_SECONDS_TO_VERIFY = 3;

const verifyCompleteEmitter = Source.multicast(() =>
Source
.join([
captureStateEmitter, // emits { state: boolean } on each frame
Source.fromInterval(1_000), // ticks every second
])
.reduce((acm, [{ state: isValid }]) => {
// Reset counter to 0 on any invalid frame; otherwise increment
if (isValid) {
return acm + 1;
}
return 0;
}, 0)
.tap((ticker) => {
if (ticker === 1) {
// Side effect: start recording on first valid tick
mediaRecorderInstance.beginCapture();
}
})
.filter((ticker) => ticker === CC_SECONDS_TO_VERIFY)
.tap(() => {
// Side effect: stop recording when threshold is reached
mediaRecorderInstance.endCapture();
})
);

Source.multicast ensures only one upstream subscription is created regardless of how many consumers subscribe to verifyCompleteEmitter.

Use Subject when…

You need to fire a one-off event across components that are not in a parent/child relationship—for example, triggering a form save from a toolbar button, or forwarding a keyboard shortcut to a nested grid.

Use BehaviorSubject when…

You have shared mutable state that components need to read synchronously on mount—for example, the current user session, a feature flag, or a connection status. The .data getter lets you read it outside of a subscription.

Use useChangeSubject when…

You want to react to changes in a prop or local state value without writing a useEffect with a dependency array. It is especially useful for auto-save, debounced search, and analytics tracking tied to form data.

Use Source pipelines when…

You need to combine multiple streams with stateful logic—counters, timers joined with data streams, multi-step sequences. If you find yourself chaining several useEffect hooks with shared state variables, a Source pipeline is usually cleaner.

Prefer simpler hooks when…

The data flow is local and linear. useState + useEffect is still the right choice for straightforward async loading, conditional rendering, and single-component side effects. Reactive primitives add overhead that is only worthwhile when multiple components or streams are involved.

import {
Subject,
BehaviorSubject,
useSubject,
useSubscription,
useChangeSubject,
One,
FieldType,
TypedField,
} from "react-declarative";

interface IFormData {
query: string;
}

const fields: TypedField[] = [
{
type: FieldType.Text,
name: "query",
title: "Search",
placeholder: "Type to search…",
},
];

// Module-level subject shared across components
export const searchSubject = new BehaviorSubject<string>("");

export const SearchForm = () => {
const handleChange = (data: Partial<IFormData>) => {
if (data.query !== undefined) {
searchSubject.next(data.query);
}
};

return <One fields={fields} onChange={handleChange} />;
};

export const SearchResults = () => {
const [results, setResults] = useState<string[]>([]);

useSubscription(() =>
searchSubject.subscribe(async (query) => {
if (query.length > 2) {
const data = await fetchResults(query);
setResults(data);
}
})
);

return (
<ul>
{results.map((r) => <li key={r}>{r}</li>)}
</ul>
);
};