Skip to main content

Building Adapters

Adapters are the bridge between NTL and the outside world. This guide walks through building a custom adapter that exposes NTL signal handlers via HTTP — useful for connecting web applications, mobile apps, and existing services.

The Adapter Trait

Every adapter implements the core Adapter trait:
use ntl::adapter::{Adapter, AdapterHealth, ExternalPayload, Protocol};
use ntl::Signal;

trait Adapter: Send + Sync {
    fn ingest(&self, external: ExternalPayload) -> Result<Signal>;
    fn emit(&self, signal: Signal) -> Result<ExternalPayload>;
    fn protocol(&self) -> Protocol;
    fn health(&self) -> AdapterHealth;
}

Example: HTTP Adapter

Let’s build a minimal HTTP adapter using axum:
use ntl::{Node, Signal, SignalType};
use ntl::adapter::{Adapter, AdapterHealth, ExternalPayload, Protocol};
use axum::{Router, Json, extract::State};
use std::sync::Arc;

struct HttpAdapter {
    node: Arc<Node>,
}

impl HttpAdapter {
    pub fn new(node: Arc<Node>) -> Self {
        Self { node }
    }

    pub fn router(self: Arc<Self>) -> Router {
        Router::new()
            .route("/signal", axum::routing::post(Self::handle_request))
            .with_state(self)
    }

    async fn handle_request(
        State(adapter): State<Arc<HttpAdapter>>,
        Json(body): Json<SignalRequest>,
    ) -> Json<SignalResponse> {
        // Translate HTTP request to NTL signal
        let signal = Signal::new(body.signal_type)
            .with_payload(body.payload)
            .with_weight(body.weight.unwrap_or(0.5))
            .with_tags(body.tags);

        // Emit into NTL network
        let signal_id = signal.emit(&adapter.node).await.unwrap();

        // Wait for correlated response
        match adapter.node
            .wait_correlation(signal_id, std::time::Duration::from_secs(30))
            .await
        {
            Ok(response) => Json(SignalResponse {
                status: "ok".into(),
                signal_id: signal_id.to_string(),
                data: Some(response.payload),
            }),
            Err(_) => Json(SignalResponse {
                status: "timeout".into(),
                signal_id: signal_id.to_string(),
                data: None,
            }),
        }
    }
}

impl Adapter for HttpAdapter {
    fn ingest(&self, external: ExternalPayload) -> Result<Signal> {
        let request: SignalRequest = serde_json::from_slice(&external.data)?;
        Ok(Signal::new(request.signal_type)
            .with_payload(request.payload)
            .with_weight(request.weight.unwrap_or(0.5)))
    }

    fn emit(&self, signal: Signal) -> Result<ExternalPayload> {
        let body = serde_json::to_vec(&signal.payload)?;
        Ok(ExternalPayload {
            data: body,
            content_type: "application/json".into(),
        })
    }

    fn protocol(&self) -> Protocol {
        Protocol::Http
    }

    fn health(&self) -> AdapterHealth {
        AdapterHealth::Healthy
    }
}

#[derive(serde::Deserialize)]
struct SignalRequest {
    signal_type: SignalType,
    payload: serde_json::Value,
    weight: Option<f32>,
    tags: Vec<String>,
}

#[derive(serde::Serialize)]
struct SignalResponse {
    status: String,
    signal_id: String,
    data: Option<serde_json::Value>,
}

Running the Adapter

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let node = Arc::new(
        Node::builder()
            .with_config_file("~/.ntl/config.toml")
            .build()
            .await?
    );

    let adapter = Arc::new(HttpAdapter::new(node.clone()));
    let router = adapter.router();

    // Run NTL node and HTTP server concurrently
    tokio::select! {
        _ = node.run_until_shutdown() => {},
        _ = axum::Server::bind(&"0.0.0.0:3000".parse()?)
            .serve(router.into_make_service()) => {},
    }

    Ok(())
}
Now any HTTP client can interact with the NTL network:
curl -X POST http://localhost:3000/signal \
  -H "Content-Type: application/json" \
  -d '{
    "signal_type": "query",
    "payload": {"key": "greeting"},
    "tags": ["kv", "get"]
  }'

Adapter Patterns

Stateless Translation

The simplest pattern — translate external format to signal and back. No state held in the adapter.

Correlation Bridge

Hold external connections open while waiting for NTL correlated responses. Essential for request-response protocols like HTTP.

Event Stream

For protocols that support streaming (WebSocket, SSE, gRPC streams), the adapter subscribes to signal types and pushes them to the external client as they arrive.

Batch Ingestion

For high-throughput sources, the adapter accumulates external payloads and emits them as weighted signal batches at intervals.

Registering Adapters

Adapters register with the node so they appear in health checks and topology:
node.register_adapter(adapter.clone()).await?;

// Now visible in:
// ntl status
// ntl adapters list

Testing Adapters

NTL provides a test harness for adapter development:
#[cfg(test)]
mod tests {
    use ntl::testing::MockNode;

    #[tokio::test]
    async fn test_http_adapter_ingest() {
        let node = MockNode::new();
        let adapter = HttpAdapter::new(Arc::new(node));

        let payload = ExternalPayload {
            data: serde_json::to_vec(&serde_json::json!({
                "signal_type": "query",
                "payload": {"key": "test"},
                "tags": ["kv"]
            })).unwrap(),
            content_type: "application/json".into(),
        };

        let signal = adapter.ingest(payload).unwrap();
        assert_eq!(signal.signal_type, SignalType::Query);
    }
}