Skip to main content

SiafuDB Integration

NTL and SiafuDB are designed as companion layers — NTL moves signals, SiafuDB remembers them. Together they form a unified nervous system with memory.

The Relationship

Signal arrives → NTL processes → State deposits to SiafuDB

State changes in SiafuDB → SiafuDB emits → NTL signal propagates
This bidirectional integration means:
  • Signals can write state as they propagate (deposit)
  • State changes can trigger new signals (emit)
  • The synapse topology and the graph storage share structure

Enabling SiafuDB

[siafu]
enabled = true
data_dir = "~/.ntl/siafu"
sync_mode = "eventual"    # eventual, strong, local-only

[siafu.graph]
max_nodes = 1_000_000
max_edges = 10_000_000

Depositing State from Signals

Signal handlers can deposit data into the local SiafuDB instance:
use ntl::{Signal, SignalHandler};
use ntl::siafu::{Graph, Node as GraphNode, Edge};

struct UserEventHandler {
    graph: Graph,
}

impl SignalHandler for UserEventHandler {
    fn signal_type(&self) -> SignalType {
        SignalType::Event
    }

    fn tags(&self) -> Vec<&str> {
        vec!["user", "activity"]
    }

    async fn handle(&self, signal: Signal) -> Result<Option<Signal>, ntl::Error> {
        let user_id = signal.payload.get("user_id").unwrap();
        let action = signal.payload.get("action").unwrap();
        let timestamp = signal.timestamp;

        // Deposit into graph
        let user_node = self.graph
            .get_or_create_node("user", user_id)
            .await?;

        let action_node = self.graph
            .create_node("action", &serde_json::json!({
                "type": action,
                "timestamp": timestamp,
                "signal_id": signal.id.to_string(),
            }))
            .await?;

        self.graph
            .create_edge(&user_node, &action_node, "performed")
            .await?;

        Ok(None) // No response signal needed
    }
}

Emitting Signals from State Changes

SiafuDB can watch for graph mutations and emit signals:
use ntl::siafu::{Graph, WatchEvent};

// Watch for new edges of type "alert"
let mut watcher = graph.watch_edges("alert").await?;

while let Some(event) = watcher.next().await {
    match event {
        WatchEvent::Created(edge) => {
            Signal::event("alert-created")
                .with_payload(serde_json::json!({
                    "from": edge.from_id,
                    "to": edge.to_id,
                    "data": edge.data,
                }))
                .with_weight(0.8)
                .emit(&node)
                .await?;
        }
        _ => {}
    }
}

Shared Topology

The synapse topology can be stored in SiafuDB, creating a persistent map of network structure:
[siafu.topology]
persist_synapses = true          # Store synapse state in graph
persist_signal_traces = true     # Store signal paths
trace_retention_hours = 168      # 7 days
This enables:
  • Recovery — A node that restarts can rebuild its synapse state from SiafuDB
  • Analytics — Query the graph to understand signal flow patterns
  • Optimization — The propagation engine can use historical paths to improve routing

Sync Modes

ModeDescriptionUse Case
local-onlyState stays on this nodeDevelopment, isolated nodes
eventualState syncs across nodes via CRDTProduction default
strongState requires quorum confirmationFinancial, critical data
Eventual consistency uses CRDTs (Conflict-free Replicated Data Types), meaning nodes can operate independently during network partitions and reconcile state when connectivity restores — critical for intermittent-connectivity environments.

Querying the Graph

Signal handlers can query SiafuDB to enrich their processing:
async fn handle(&self, signal: Signal) -> Result<Option<Signal>, ntl::Error> {
    let user_id = signal.payload.get("user_id").unwrap();

    // Query the graph for user context
    let recent_actions = self.graph
        .query("user", user_id)
        .traverse("performed")
        .filter(|node| node.data["timestamp"] > one_hour_ago())
        .limit(10)
        .execute()
        .await?;

    // Use context to enrich response
    Ok(Some(
        Signal::data("user-context")
            .with_correlation(signal.id)
            .with_payload(serde_json::json!({
                "user_id": user_id,
                "recent_actions": recent_actions,
                "action_count": recent_actions.len(),
            }))
    ))
}

The Nervous System Pattern

When NTL + SiafuDB are fully integrated, you get a nervous system:
  1. Sensory input — Adapters ingest external data as signals
  2. Signal propagation — NTL routes signals through the neural graph
  3. Processing — Handlers process signals and make decisions
  4. Memory formation — State deposits into SiafuDB’s graph
  5. Memory recall — Future signal processing queries past state
  6. Reflexive output — State changes trigger new signals automatically
This is not a metaphor — it’s the actual architecture. The system perceives, processes, remembers, and responds.