diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 9adb1bd9f..be09b1f1a 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -8,7 +8,7 @@ StartOS is an open-source Linux distribution for running personal servers. It ma - Frontend: Angular 21 + TypeScript + Taiga UI 5 - Container runtime: Node.js/TypeScript with LXC - Database/State: Patch-DB (git submodule) - storage layer with reactive frontend sync -- API: JSON-RPC via rpc-toolkit (see `core/rpc-toolkit.md`) +- API: JSON-RPC via rpc-toolkit (see `core/rpc-toolkit.md`), MCP for LLM agents (see `core/mcp/ARCHITECTURE.md`) - Auth: Password + session cookie, public/private key signatures, local authcookie (see `core/src/middleware/auth/`) ## Project Structure @@ -28,7 +28,7 @@ StartOS is an open-source Linux distribution for running personal servers. It ma ## Components -- **`core/`** — Rust backend daemon. Produces a single binary `startbox` that is symlinked as `startd` (main daemon), `start-cli` (CLI), `start-container` (runs inside LXC containers), `registrybox` (package registry), and `tunnelbox` (VPN/tunnel). Handles all backend logic: RPC API, service lifecycle, networking (DNS, ACME, WiFi, Tor, WireGuard), backups, and database state management. See [core/ARCHITECTURE.md](core/ARCHITECTURE.md). +- **`core/`** — Rust backend daemon. Produces a single binary `startbox` that is symlinked as `startd` (main daemon), `start-cli` (CLI), `start-container` (runs inside LXC containers), `registrybox` (package registry), and `tunnelbox` (VPN/tunnel). Handles all backend logic: RPC API, MCP server for LLM agents, service lifecycle, networking (DNS, ACME, WiFi, Tor, WireGuard), backups, and database state management. See [core/ARCHITECTURE.md](core/ARCHITECTURE.md). - **`web/`** — Angular 21 + TypeScript workspace using Taiga UI 5. Contains three applications (admin UI, setup wizard, VPN management) and two shared libraries (common components/services, marketplace). Communicates with the backend exclusively via JSON-RPC. See [web/ARCHITECTURE.md](web/ARCHITECTURE.md). @@ -53,13 +53,13 @@ Rust (core/) Key make targets along this chain: -| Step | Command | What it does | -|---|---|---| -| 1 | `cargo check -p start-os` | Verify Rust compiles | -| 2 | `make ts-bindings` | Export ts-rs types → rsync to SDK | -| 3 | `cd sdk && make baseDist dist` | Build SDK packages | -| 4 | `cd web && npm run check` | Type-check Angular projects | -| 5 | `cd container-runtime && npm run check` | Type-check runtime | +| Step | Command | What it does | +| ---- | --------------------------------------- | --------------------------------- | +| 1 | `cargo check -p start-os` | Verify Rust compiles | +| 2 | `make ts-bindings` | Export ts-rs types → rsync to SDK | +| 3 | `cd sdk && make baseDist dist` | Build SDK packages | +| 4 | `cd web && npm run check` | Type-check Angular projects | +| 5 | `cd container-runtime && npm run check` | Type-check runtime | **Important**: Editing `sdk/base/lib/osBindings/*.ts` alone is NOT sufficient — you must rebuild the SDK bundle (step 3) before web/container-runtime can see the changes. @@ -90,6 +90,17 @@ StartOS uses Patch-DB for reactive state synchronization: This means the UI is always eventually consistent with the backend — after any mutating API call, the frontend waits for the corresponding PatchDB diff before resolving, so the UI reflects the result immediately. +## MCP Server (LLM Agent Interface) + +StartOS includes an [MCP](https://modelcontextprotocol.io/) (Model Context Protocol) server at `/mcp`, enabling LLM agents to discover and invoke the same operations available through the UI and CLI. The MCP server runs inside the StartOS server process alongside the RPC API. + +- **Tools**: Every RPC method is exposed as an MCP tool with LLM-optimized descriptions and JSON Schema inputs. Agents call `tools/list` to discover what's available and `tools/call` to invoke operations. +- **Resources**: System state is exposed via MCP resources backed by Patch-DB. Agents subscribe to `startos:///public` and receive debounced revision diffs over SSE, maintaining a local state cache without polling. +- **Auth**: Same session cookie auth as the UI — no separate credentials. +- **Transport**: MCP Streamable HTTP — POST for requests, GET for SSE notification stream, DELETE for session teardown. + +See [core/ARCHITECTURE.md](core/ARCHITECTURE.md#mcp-server) for implementation details. + ## Further Reading - [core/ARCHITECTURE.md](core/ARCHITECTURE.md) — Rust backend architecture diff --git a/core/ARCHITECTURE.md b/core/ARCHITECTURE.md index f895715b2..1acdb9d0b 100644 --- a/core/ARCHITECTURE.md +++ b/core/ARCHITECTURE.md @@ -23,6 +23,7 @@ The crate produces a single binary `startbox` that is symlinked under different - `src/context/` — Context types (RpcContext, CliContext, InitContext, DiagnosticContext) - `src/service/` — Service lifecycle management with actor pattern (`service_actor.rs`) - `src/db/model/` — Patch-DB models (`public.rs` synced to frontend, `private.rs` backend-only) +- `src/mcp/` — MCP server for LLM agents (see [MCP Server](#mcp-server) below) - `src/net/` — Networking (DNS, ACME, WiFi, Tor via Arti, WireGuard) - `src/s9pk/` — S9PK package format (merkle archive) - `src/registry/` — Package registry management @@ -38,16 +39,19 @@ See [rpc-toolkit.md](rpc-toolkit.md) for full handler patterns and configuration Patch-DB provides diff-based state synchronization. Changes to `db/model/public.rs` automatically sync to the frontend. **Key patterns:** + - `db.peek().await` — Get a read-only snapshot of the database state - `db.mutate(|db| { ... }).await` — Apply mutations atomically, returns `MutateResult` - `#[derive(HasModel)]` — Derive macro for types stored in the database, generates typed accessors **Generated accessor types** (from `HasModel` derive): + - `as_field()` — Immutable reference: `&Model` - `as_field_mut()` — Mutable reference: `&mut Model` - `into_field()` — Owned value: `Model` **`Model` APIs** (from `db/prelude.rs`): + - `.de()` — Deserialize to `T` - `.ser(&value)` — Serialize from `T` - `.mutate(|v| ...)` — Deserialize, mutate, reserialize @@ -63,6 +67,12 @@ See [i18n-patterns.md](i18n-patterns.md) for internationalization key convention See [core-rust-patterns.md](core-rust-patterns.md) for common utilities (Invoke trait, Guard pattern, mount guards, Apply trait, etc.). +## MCP Server + +The MCP (Model Context Protocol) server at `src/mcp/` exposes the StartOS RPC API to LLM agents via the Streamable HTTP transport at `/mcp`. Tools wrap the existing RPC handlers; resources expose Patch-DB state with debounced SSE subscriptions; auth reuses the UI session cookie. + +See [src/mcp/ARCHITECTURE.md](src/mcp/ARCHITECTURE.md) for transport details, session lifecycle, tool dispatch, resource subscriptions, CORS, and body size limits. + ## Related Documentation - [rpc-toolkit.md](rpc-toolkit.md) — JSON-RPC handler patterns diff --git a/core/src/db/mod.rs b/core/src/db/mod.rs index 67653b389..0cb73d22d 100644 --- a/core/src/db/mod.rs +++ b/core/src/db/mod.rs @@ -163,13 +163,13 @@ pub struct SubscribeRes { pub guid: Guid, } -struct DbSubscriber { - rev: u64, - sub: UnboundedReceiver, - sync_db: watch::Receiver, +pub(crate) struct DbSubscriber { + pub(crate) rev: u64, + pub(crate) sub: UnboundedReceiver, + pub(crate) sync_db: watch::Receiver, } impl DbSubscriber { - async fn recv(&mut self) -> Option { + pub(crate) async fn recv(&mut self) -> Option { loop { tokio::select! { rev = self.sub.recv() => { diff --git a/core/src/lib.rs b/core/src/lib.rs index bf5805e88..904999d59 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -63,6 +63,7 @@ pub mod init; pub mod install; pub mod logs; pub mod lxc; +pub mod mcp; pub mod middleware; pub mod net; pub mod notifications; diff --git a/core/src/mcp/ARCHITECTURE.md b/core/src/mcp/ARCHITECTURE.md new file mode 100644 index 000000000..af687f964 --- /dev/null +++ b/core/src/mcp/ARCHITECTURE.md @@ -0,0 +1,96 @@ +# MCP Server Architecture + +The Model Context Protocol server embedded in StartOS (`core/src/mcp/`). + +## Transport: Streamable HTTP (MCP 2025-03-26) + +The server implements the **Streamable HTTP** transport from the MCP spec, not the older stdio or SSE-only transports. A single route (`/mcp`) handles all three HTTP methods: + +| Method | Purpose | +| ----------- | -------------------------------------------------------------------------------- | +| **POST** | JSON-RPC 2.0 requests from client (initialize, tools/call, resources/read, etc.) | +| **GET** | Opens an SSE stream for server→client notifications (resource change events) | +| **DELETE** | Explicitly ends a session | +| **OPTIONS** | CORS preflight | + +A discovery endpoint at `/.well-known/mcp` returns `{"mcp_endpoint":"/mcp"}`. + +## Authentication + +Every HTTP method (POST, GET, DELETE) validates the caller's session cookie via `ValidSessionToken::from_header` before processing. This reuses the same auth infrastructure as the main StartOS web UI — MCP clients must present a valid session cookie obtained through the normal login flow. Unauthenticated requests get a 401. + +## Session Lifecycle + +1. **Create**: Client sends `initialize` via POST. Server generates a UUID session ID, creates an `McpSession` with a bounded mpsc channel (256 messages), and returns the ID in the `Mcp-Session-Id` response header. + +2. **Connect SSE**: Client opens a GET with the session ID header. The server takes the receiver half of the notification channel (`take_notification_rx`) and streams it as SSE events. Only one GET connection per session is allowed (the rx is moved, not cloned). + +3. **Use**: Client sends tool calls, resource reads, subscriptions via POST. All POST requests must include a valid session ID header — the server validates it against the session map before processing. + +4. **Teardown**: Three paths: + - Client sends DELETE -> session is removed, subscription tasks are aborted. + - SSE stream disconnects -> `CleanupStream`'s `PinnedDrop` impl removes the session. + - Session is never connected -> background sweep task (every 30s) removes sessions older than 60s that never had a GET stream attached. + +## Module Structure + +``` +core/src/mcp/ +├── mod.rs — HTTP handlers, routing, MCP method dispatch, shell execution, CORS +├── protocol.rs — JSON-RPC 2.0 types, MCP request/response structs, error codes +├── session.rs — Session map, create/remove/sweep, resource subscriptions with debounce +└── tools.rs — Tool registry (67 tools), HashMap mapping names → RPC methods + schemas +``` + +## Tool Dispatch + +`tool_registry()` returns a `HashMap`, each mapping: + +- An MCP tool name (e.g. `"package.start"`) +- A JSON Schema for input validation (sent to clients via `tools/list`) +- A backing RPC method name (usually identical to the tool name) +- Flags: `sync_db` (whether to flush DB sequence after success), `needs_session` (whether to inject `__Auth_session`) + +When `tools/call` arrives: + +1. Look up the tool by name via HashMap O(1) lookup. +2. Convert arguments from `serde_json::Value` to `imbl_value::Value`. +3. **Special-case**: If `rpc_method` is `"__shell__"` or `"__package_shell__"`, dispatch to `handle_shell_exec` / `handle_package_shell_exec` directly (no RPC handler). Both set `kill_on_drop(true)` to ensure timed-out processes are terminated. +4. Otherwise, optionally inject `__Auth_session` into params, then call `server.handle_command(rpc_method, params)`. +5. On success: if `sync_db` is true, flush the DB sequence. Return the result pretty-printed as a text content block. +6. On error: return the error as a text content block with `is_error: true`, using `McpResponse::ok` (MCP spec: tool errors are results, not JSON-RPC errors). + +## Shell Execution + +Two shell tools bypass the RPC layer entirely: + +- **`system.shell`** (`__shell__`): Runs `/bin/bash -c ` on the host with `kill_on_drop(true)`. 30s default timeout, 300s max. +- **`package.shell`** (`__package_shell__`): Resolves the target package's subcontainer via `Service::resolve_subcontainer`, then runs `/bin/sh -c ` inside it via `lxc-attach` (also `kill_on_drop(true)`). Same timeout behavior. + +## Resource Subscriptions + +Four resources are exposed: + +- `startos:///public` — full public DB tree +- `startos:///public/serverInfo` — server metadata +- `startos:///public/packageData` — installed packages +- `startos:///mcp/system-prompt` — curated AI assistant context (text/plain) + +Resource URIs are validated to only allow `/public/**` subtrees and the special `/mcp/system-prompt` path. Attempts to access non-public paths (e.g. `startos:///private/...`) are rejected. + +`resources/read` parses the URI into a `JsonPointer`, calls `ctx.db.dump(&pointer)`, and returns the JSON. The system prompt resource is handled as a special case, returning server info and version. + +`resources/subscribe` creates a `DbSubscriber` that watches the patch-db for changes at the given pointer. Changes are **debounced** (500ms window): the subscriber collects multiple revisions and merges their `DiffPatch`es before sending a single `notifications/resources/updated` notification over the SSE channel. The subscription task runs as a spawned tokio task; its `JoinHandle` is stored in the session so it can be aborted on unsubscribe or session teardown. Re-subscribing to the same URI aborts the prior subscription first. + +## CORS + +- Preflight (OPTIONS): reflects the request's `Origin`, `Access-Control-Request-Method`, and `Access-Control-Request-Headers` back. Sets `Allow-Credentials: true` and caches for 24h. +- Normal responses (`apply_cors`): reflects the request's `Origin` header when present, falls back to `*` when absent. Exposes the `Mcp-Session-Id` header. This matches the behavior of the rpc-toolkit `Cors` middleware used by the main UI. +- CORS headers are applied to all response types: POST JSON-RPC, GET SSE, DELETE, and error responses. + +## Body Size Limits + +POST request bodies are limited to 1 MiB: + +1. `Content-Length` header is checked **before** reading the body (rejects oversized requests immediately). +2. After reading, the actual body size is re-checked as defense-in-depth for chunked transfers that lack `Content-Length`. diff --git a/core/src/mcp/mod.rs b/core/src/mcp/mod.rs new file mode 100644 index 000000000..2001d3de5 --- /dev/null +++ b/core/src/mcp/mod.rs @@ -0,0 +1,1212 @@ +pub mod protocol; +pub mod session; +pub mod tools; + +use std::collections::HashMap; +use std::convert::Infallible; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use axum::body::Body; +use axum::extract::Request; +use axum::response::sse::{Event, KeepAlive, Sse}; +use axum::response::{IntoResponse, Response}; +use axum::routing::MethodRouter; +use futures::future::ready; +use http::header::{CONTENT_LENGTH, CONTENT_TYPE}; +use http::{HeaderValue, StatusCode}; +use patch_db::Dump; +use pin_project::pin_project; +use rpc_toolkit::Server; +use serde_json::Value as JsonValue; +use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::StreamExt as _; + +use crate::context::RpcContext; +use crate::middleware::auth::session::ValidSessionToken; +use crate::net::static_server::unauthorized; +use crate::prelude::*; +use imbl_value::InternedString; + +use self::protocol::*; +use self::session::{ + SessionMap, create_session, remove_session, session_exists, sweep_stale_sessions_if_needed, + take_notification_rx, +}; + +use self::tools::{ToolEntry, tool_registry}; + +/// Maximum request body size (1 MiB). +const MAX_BODY_SIZE: u64 = 1024 * 1024; + +pub fn mcp_router(ctx: RpcContext) -> MethodRouter { + let rpc_server = Server::new( + { + let ctx = ctx.clone(); + move || { + let ctx = ctx.clone(); + ready(Ok(ctx)) + } + }, + crate::main_api::(), + ); + + let registry: Arc> = Arc::new(tool_registry()); + let sessions: SessionMap = Arc::new(Default::default()); + + // Background task: sweep stale MCP sessions every 30 seconds. + // This task runs for the lifetime of the process. mcp_router() is called once + // at server startup, so this does not leak. + { + let sessions = sessions.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + sweep_stale_sessions_if_needed(&sessions); + } + }); + } + + let post_handler = { + let ctx = ctx.clone(); + let server = rpc_server.clone(); + let registry = registry.clone(); + let sessions = sessions.clone(); + axum::routing::post(move |request: Request| { + let ctx = ctx.clone(); + let server = server.clone(); + let registry = registry.clone(); + let sessions = sessions.clone(); + async move { handle_post(ctx, server, ®istry, &sessions, request).await } + }) + }; + + let get_handler = { + let ctx = ctx.clone(); + let sessions = sessions.clone(); + axum::routing::get(move |request: Request| { + let ctx = ctx.clone(); + let sessions = sessions.clone(); + async move { handle_get(ctx, &sessions, request).await } + }) + }; + + let delete_handler = { + let sessions = sessions.clone(); + axum::routing::delete(move |request: Request| { + let ctx = ctx.clone(); + let sessions = sessions.clone(); + async move { handle_delete(ctx, &sessions, request).await } + }) + }; + + let options_handler = axum::routing::options(|request: Request| async move { + cors_preflight(&request) + }); + + post_handler + .merge(get_handler) + .merge(delete_handler) + .merge(options_handler) +} + +// ============================================================================= +// POST /mcp — JSON-RPC requests (initialize, tools/*, resources/*) +// ============================================================================= + +async fn handle_post( + ctx: RpcContext, + server: Server, + registry: &HashMap, + sessions: &SessionMap, + request: Request, +) -> Response { + // Extract origin for CORS before consuming the request + let origin = extract_origin(&request); + + // Auth check — preserve the session hash for tools that need __Auth_session + let session_hash = match ValidSessionToken::from_header( + request.headers().get(http::header::COOKIE), + &ctx, + ) + .await + { + Ok(valid) => valid.0.hashed().clone(), + Err(e) => return unauthorized(e, "/mcp"), + }; + + // Content-Type check + if let Some(ct) = request.headers().get(CONTENT_TYPE) { + if let Ok(ct_str) = ct.to_str() { + if !ct_str.starts_with("application/json") { + return bad_request("Content-Type must be application/json", origin.as_ref()); + } + } + } + + let session_id = extract_session_id(&request); + + // Pre-check Content-Length before reading the body (C2) + if let Some(cl) = request.headers().get(CONTENT_LENGTH) { + if let Ok(len) = cl.to_str().unwrap_or("0").parse::() { + if len > MAX_BODY_SIZE { + return bad_request("Request body too large (max 1 MiB)", origin.as_ref()); + } + } + } + + // Read body + let body_bytes = { + use http_body_util::BodyExt; + let body = request.into_body(); + match body.collect().await { + Ok(collected) => collected.to_bytes(), + Err(_) => return bad_request("Failed to read request body", origin.as_ref()), + } + }; + + // Defense-in-depth: also check after read (for chunked transfers without Content-Length) + if body_bytes.len() as u64 > MAX_BODY_SIZE { + return bad_request("Request body too large (max 1 MiB)", origin.as_ref()); + } + + // Parse JSON-RPC request + let mcp_req: McpRequest = match serde_json::from_slice(&body_bytes) { + Ok(r) => r, + Err(e) => { + return json_response( + &McpResponse::error(None, PARSE_ERROR, format!("Parse error: {e}"), None), + None, + origin.as_ref(), + ); + } + }; + + if mcp_req.jsonrpc != "2.0" { + return json_response( + &McpResponse::error( + mcp_req.id, + INVALID_REQUEST, + "jsonrpc must be \"2.0\"".into(), + None, + ), + None, + origin.as_ref(), + ); + } + + // Session ID validation: all methods except initialize and notifications/initialized + // require a valid session ID (N8) + if mcp_req.method != "initialize" && mcp_req.method != "notifications/initialized" { + match session_id.as_deref() { + Some(id) if session_exists(sessions, id) => {} // valid + Some(_) => { + return json_response( + &McpResponse::error( + mcp_req.id, + INVALID_REQUEST, + "Invalid Mcp-Session-Id".into(), + None, + ), + None, + origin.as_ref(), + ); + } + None => { + return json_response( + &McpResponse::error( + mcp_req.id, + INVALID_REQUEST, + "Missing Mcp-Session-Id header".into(), + None, + ), + None, + origin.as_ref(), + ); + } + } + } + + // Dispatch by MCP method + match mcp_req.method.as_str() { + "initialize" => { + let new_session_id = create_session(sessions); + let response = handle_initialize(mcp_req.id, mcp_req.params); + json_response(&response, Some(&new_session_id), origin.as_ref()) + } + "notifications/initialized" => apply_cors( + Response::builder() + .status(StatusCode::ACCEPTED) + .body(Body::empty()) + .unwrap(), + origin.as_ref(), + ), + "tools/list" => json_response( + &handle_tools_list(mcp_req.id, registry), + session_id.as_deref(), + origin.as_ref(), + ), + "tools/call" => { + let response = handle_tools_call( + &ctx, + &server, + registry, + mcp_req.id, + mcp_req.params, + &session_hash, + ) + .await; + json_response(&response, session_id.as_deref(), origin.as_ref()) + } + "resources/list" => json_response( + &handle_resources_list(mcp_req.id), + session_id.as_deref(), + origin.as_ref(), + ), + "resources/read" => { + let response = handle_resources_read(&ctx, mcp_req.id, mcp_req.params).await; + json_response(&response, session_id.as_deref(), origin.as_ref()) + } + "resources/subscribe" => { + let response = handle_resources_subscribe( + &ctx, + sessions, + session_id.as_deref(), + mcp_req.id, + mcp_req.params, + ) + .await; + json_response(&response, session_id.as_deref(), origin.as_ref()) + } + "resources/unsubscribe" => { + let response = handle_resources_unsubscribe( + sessions, + session_id.as_deref(), + mcp_req.id, + mcp_req.params, + ); + json_response(&response, session_id.as_deref(), origin.as_ref()) + } + _ => json_response( + &McpResponse::error( + mcp_req.id, + METHOD_NOT_FOUND, + format!("Unknown method: {}", mcp_req.method), + None, + ), + session_id.as_deref(), + origin.as_ref(), + ), + } +} + +// ============================================================================= +// GET /mcp — SSE stream for server→client notifications +// ============================================================================= + +async fn handle_get(ctx: RpcContext, sessions: &SessionMap, request: Request) -> Response { + let origin = extract_origin(&request); + + // Auth check + if let Err(e) = + ValidSessionToken::from_header(request.headers().get(http::header::COOKIE), &ctx).await + { + return unauthorized(e, "/mcp"); + } + + let session_id = match extract_session_id(&request) { + Some(id) => id, + None => return bad_request("Missing Mcp-Session-Id header", origin.as_ref()), + }; + + // Take the notification receiver from the session + let rx = match take_notification_rx(sessions, &session_id) { + Some(rx) => rx, + None => return bad_request("Invalid or already-connected session", origin.as_ref()), + }; + + let sessions_cleanup = sessions.clone(); + let session_id_cleanup = session_id.clone(); + + // Convert bounded receiver to a Stream of SSE Events + let stream = ReceiverStream::new(rx).map(move |notification| { + let data = serde_json::to_string(¬ification).unwrap_or_default(); + Ok::<_, Infallible>(Event::default().event("message").data(data)) + }); + + // Wrap in a stream that cleans up the session on drop + let stream = CleanupStream { + inner: stream, + sessions: Some(sessions_cleanup), + session_id: Some(session_id_cleanup), + }; + + apply_cors( + Sse::new(stream) + .keep_alive(KeepAlive::default()) + .into_response(), + origin.as_ref(), + ) +} + +/// A stream wrapper that removes the MCP session when dropped (client disconnect). +#[pin_project(PinnedDrop)] +struct CleanupStream { + #[pin] + inner: S, + sessions: Option, + session_id: Option, +} + +#[pin_project::pinned_drop] +impl PinnedDrop for CleanupStream { + fn drop(self: std::pin::Pin<&mut Self>) { + let this = self.project(); + if let (Some(sessions), Some(id)) = (this.sessions.take(), this.session_id.take()) { + tracing::info!(target: "mcp_audit", session_id = %id, "MCP SSE stream disconnected, cleaning up session"); + remove_session(&sessions, &id); + } + } +} + +impl futures::Stream for CleanupStream { + type Item = S::Item; + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().inner.poll_next(cx) + } +} + +// ============================================================================= +// DELETE /mcp — end session +// ============================================================================= + +async fn handle_delete(ctx: RpcContext, sessions: &SessionMap, request: Request) -> Response { + let origin = extract_origin(&request); + + if let Err(e) = + ValidSessionToken::from_header(request.headers().get(http::header::COOKIE), &ctx).await + { + return unauthorized(e, "/mcp"); + } + + if let Some(session_id) = extract_session_id(&request) { + remove_session(sessions, &session_id); + } + + apply_cors( + Response::builder() + .status(StatusCode::OK) + .body(Body::empty()) + .unwrap(), + origin.as_ref(), + ) +} + +// ============================================================================= +// Method handlers +// ============================================================================= + +fn handle_initialize(id: Option, params: Option) -> McpResponse { + if let Some(params) = params { + match serde_json::from_value::(params) { + Ok(p) => { + // Validate protocol version (N6) + if p.protocol_version != PROTOCOL_VERSION { + return McpResponse::error( + id, + INVALID_PARAMS, + format!( + "Unsupported protocol version: {}. Server supports: {}", + p.protocol_version, PROTOCOL_VERSION + ), + None, + ); + } + if let Some(ref info) = p.client_info { + tracing::info!( + target: "mcp_audit", + client_name = %info.name, + client_version = info.version.as_deref().unwrap_or("unknown"), + "MCP client connected" + ); + } + } + Err(e) => { + return McpResponse::error(id, INVALID_PARAMS, format!("{e}"), None); + } + } + } + + McpResponse::ok( + id, + serde_json::to_value(&InitializeResult { + protocol_version: PROTOCOL_VERSION, + capabilities: ServerCapabilities { + tools: ToolsCapability {}, + resources: ResourcesCapability { subscribe: true }, + }, + server_info: ServerInfo { + name: "StartOS", + version: env!("CARGO_PKG_VERSION").into(), + }, + }) + .unwrap(), + ) +} + +fn handle_tools_list(id: Option, registry: &HashMap) -> McpResponse { + let tools: Vec<_> = registry.values().map(|t| t.definition.clone()).collect(); + McpResponse::ok( + id, + serde_json::to_value(&ToolsListResult { tools }).unwrap(), + ) +} + +async fn handle_tools_call( + ctx: &RpcContext, + server: &Server, + registry: &HashMap, + id: Option, + params: Option, + session_hash: &InternedString, +) -> McpResponse { + let call_params: ToolsCallParams = match params.map(serde_json::from_value).transpose() { + Ok(Some(p)) => p, + Ok(None) => { + return McpResponse::error(id, INVALID_PARAMS, "Missing params".into(), None) + } + Err(e) => return McpResponse::error(id, INVALID_PARAMS, format!("{e}"), None), + }; + + let tool = match registry.get(&call_params.name) { + Some(t) => t, + None => { + return McpResponse::error( + id, + INVALID_PARAMS, + format!("Unknown tool: {}", call_params.name), + None, + ) + } + }; + + let start = Instant::now(); + + tracing::info!( + target: "mcp_audit", + tool = %call_params.name, + rpc_method = %tool.rpc_method, + "MCP tool invoked" + ); + + let mut imbl_params = match imbl_value::to_value(&call_params.arguments) { + Ok(v) => v, + Err(e) => { + return McpResponse::error( + id, + INVALID_PARAMS, + format!("Failed to convert params: {e}"), + None, + ); + } + }; + + // Special-case: shell execution (no RPC handler for these) + if tool.rpc_method == "__shell__" { + return handle_shell_exec(id, call_params.arguments, start).await; + } + if tool.rpc_method == "__package_shell__" { + return handle_package_shell_exec(ctx, id, call_params.arguments, start).await; + } + + // Inject __Auth_session for handlers that need it + if tool.needs_session { + if let imbl_value::Value::Object(ref mut map) = imbl_params { + map.insert( + imbl_value::InternedString::intern("__Auth_session"), + imbl_value::to_value(session_hash).unwrap(), + ); + } + } + + match server.handle_command(tool.rpc_method, imbl_params).await { + Ok(result) => { + if tool.sync_db { + let seq = ctx.db.sequence().await; + ctx.sync_db.send_replace(seq); + } + + let duration = start.elapsed(); + tracing::info!( + target: "mcp_audit", + tool = %call_params.name, + duration_ms = duration.as_millis() as u64, + "MCP tool completed" + ); + + let json_result = serde_json::to_value(&result).unwrap_or(JsonValue::Null); + let text = serde_json::to_string_pretty(&json_result).unwrap_or_default(); + + McpResponse::ok( + id, + serde_json::to_value(&ToolsCallResult { + content: vec![ContentBlock::Text { text }], + is_error: None, + }) + .unwrap(), + ) + } + Err(rpc_err) => { + let duration = start.elapsed(); + tracing::warn!( + target: "mcp_audit", + tool = %call_params.name, + error_code = rpc_err.code, + error_msg = %rpc_err.message, + duration_ms = duration.as_millis() as u64, + "MCP tool call failed" + ); + + McpResponse::ok( + id, + serde_json::to_value(&ToolsCallResult { + content: vec![ContentBlock::Text { + text: format!("Error ({}): {}", rpc_err.code, rpc_err.message), + }], + is_error: Some(true), + }) + .unwrap(), + ) + } + } +} + +async fn handle_shell_exec( + id: Option, + arguments: JsonValue, + start: Instant, +) -> McpResponse { + let command = match arguments.get("command").and_then(|v| v.as_str()) { + Some(c) => c.to_string(), + None => { + return McpResponse::error( + id, + INVALID_PARAMS, + "Missing required parameter: command".into(), + None, + ) + } + }; + + let timeout_secs = arguments + .get("timeout") + .and_then(|v| v.as_u64()) + .unwrap_or(30) + .min(300); + + tracing::info!( + target: "mcp_audit", + command = %command, + timeout_secs = timeout_secs, + "MCP shell command executing" + ); + + let result = tokio::time::timeout( + Duration::from_secs(timeout_secs), + tokio::process::Command::new("/bin/bash") + .arg("-c") + .arg(&command) + .kill_on_drop(true) // N10: ensure timed-out processes are killed + .output(), + ) + .await; + + let duration = start.elapsed(); + + match result { + Ok(Ok(output)) => { + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + let exit_code = output.status.code().unwrap_or(-1); + + tracing::info!( + target: "mcp_audit", + command = %command, + exit_code = exit_code, + stdout_len = output.stdout.len(), + stderr_len = output.stderr.len(), + duration_ms = duration.as_millis() as u64, + "MCP shell command completed" + ); + + let mut text = String::new(); + if !stdout.is_empty() { + text.push_str(&stdout); + } + if !stderr.is_empty() { + if !text.is_empty() { + text.push('\n'); + } + text.push_str("[stderr]\n"); + text.push_str(&stderr); + } + if text.is_empty() { + text.push_str("(no output)"); + } + text.push_str(&format!("\n[exit code: {}]", exit_code)); + + McpResponse::ok( + id, + serde_json::to_value(&ToolsCallResult { + content: vec![ContentBlock::Text { text }], + is_error: if exit_code != 0 { Some(true) } else { None }, + }) + .unwrap(), + ) + } + Ok(Err(e)) => { + tracing::warn!( + target: "mcp_audit", + command = %command, + error = %e, + "MCP shell command failed to execute" + ); + McpResponse::ok( + id, + serde_json::to_value(&ToolsCallResult { + content: vec![ContentBlock::Text { + text: format!("Failed to execute command: {e}"), + }], + is_error: Some(true), + }) + .unwrap(), + ) + } + Err(_) => { + tracing::warn!( + target: "mcp_audit", + command = %command, + timeout_secs = timeout_secs, + "MCP shell command timed out" + ); + McpResponse::ok( + id, + serde_json::to_value(&ToolsCallResult { + content: vec![ContentBlock::Text { + text: format!("Command timed out after {timeout_secs} seconds"), + }], + is_error: Some(true), + }) + .unwrap(), + ) + } + } +} + +async fn handle_package_shell_exec( + ctx: &RpcContext, + id: Option, + arguments: JsonValue, + start: Instant, +) -> McpResponse { + let pkg_id_str = match arguments.get("id").and_then(|v| v.as_str()) { + Some(id) => id.to_string(), + None => { + return McpResponse::error( + id, + INVALID_PARAMS, + "Missing required parameter: id".into(), + None, + ) + } + }; + + let command = match arguments.get("command").and_then(|v| v.as_str()) { + Some(c) => c.to_string(), + None => { + return McpResponse::error( + id, + INVALID_PARAMS, + "Missing required parameter: command".into(), + None, + ) + } + }; + + let timeout_secs = arguments + .get("timeout") + .and_then(|v| v.as_u64()) + .unwrap_or(30) + .min(300); + + let subcontainer_filter = arguments + .get("subcontainer") + .and_then(|v| v.as_str()) + .map(InternedString::intern); + let name_filter = arguments + .get("name") + .and_then(|v| v.as_str()) + .map(InternedString::intern); + + let pkg_id: crate::PackageId = match pkg_id_str.parse() { + Ok(id) => id, + Err(e) => { + return McpResponse::error( + id, + INVALID_PARAMS, + format!("Invalid package ID: {e}"), + None, + ) + } + }; + + // Resolve the subcontainer using the shared logic from service/mod.rs + let resolved = { + let service = ctx.services.get(&pkg_id).await; + let service_ref = match service.as_ref() { + Some(s) => s, + None => { + return McpResponse::ok( + id, + serde_json::to_value(&ToolsCallResult { + content: vec![ContentBlock::Text { + text: format!("Package '{pkg_id}' is not installed or not running"), + }], + is_error: Some(true), + }) + .unwrap(), + ) + } + }; + match service_ref + .resolve_subcontainer(subcontainer_filter, name_filter, None, None) + .await + { + Ok(r) => r, + Err(e) => { + return McpResponse::ok( + id, + serde_json::to_value(&ToolsCallResult { + content: vec![ContentBlock::Text { + text: format!("{e}"), + }], + is_error: Some(true), + }) + .unwrap(), + ) + } + } + }; + + tracing::info!( + target: "mcp_audit", + package = %pkg_id_str, + command = %command, + subcontainer = %resolved.subcontainer_id, + timeout_secs = timeout_secs, + "MCP package shell command executing" + ); + + // Build the command using shared logic (kill_on_drop already set by build_subcontainer_command) + let mut cmd = crate::service::Service::build_subcontainer_command( + &resolved, + &["/bin/sh", "-c", &command], + ); + + let result = tokio::time::timeout(Duration::from_secs(timeout_secs), cmd.output()).await; + + let duration = start.elapsed(); + + match result { + Ok(Ok(output)) => { + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + let exit_code = output.status.code().unwrap_or(-1); + + tracing::info!( + target: "mcp_audit", + package = %pkg_id_str, + command = %command, + exit_code = exit_code, + duration_ms = duration.as_millis() as u64, + "MCP package shell command completed" + ); + + let mut text = String::new(); + if !stdout.is_empty() { + text.push_str(&stdout); + } + if !stderr.is_empty() { + if !text.is_empty() { + text.push('\n'); + } + text.push_str("[stderr]\n"); + text.push_str(&stderr); + } + if text.is_empty() { + text.push_str("(no output)"); + } + text.push_str(&format!("\n[exit code: {}]", exit_code)); + + McpResponse::ok( + id, + serde_json::to_value(&ToolsCallResult { + content: vec![ContentBlock::Text { text }], + is_error: if exit_code != 0 { Some(true) } else { None }, + }) + .unwrap(), + ) + } + Ok(Err(e)) => McpResponse::ok( + id, + serde_json::to_value(&ToolsCallResult { + content: vec![ContentBlock::Text { + text: format!("Failed to execute in container: {e}"), + }], + is_error: Some(true), + }) + .unwrap(), + ), + Err(_) => McpResponse::ok( + id, + serde_json::to_value(&ToolsCallResult { + content: vec![ContentBlock::Text { + text: format!("Command timed out after {timeout_secs} seconds"), + }], + is_error: Some(true), + }) + .unwrap(), + ), + } +} + +fn handle_resources_list(id: Option) -> McpResponse { + let resources = vec![ + ResourceDefinition { + uri: "startos:///public".into(), + name: "System State".into(), + description: "The full public database tree. Contains server info, package data, \ + network configuration, and all other system state. Subscribe to this for \ + real-time updates." + .into(), + mime_type: Some("application/json".into()), + }, + ResourceDefinition { + uri: "startos:///public/serverInfo".into(), + name: "Server Info".into(), + description: "Server metadata: hostname, version, network addresses, resource usage." + .into(), + mime_type: Some("application/json".into()), + }, + ResourceDefinition { + uri: "startos:///public/packageData".into(), + name: "Package Data".into(), + description: "All installed packages with their status, version, and configuration." + .into(), + mime_type: Some("application/json".into()), + }, + ResourceDefinition { + uri: "startos:///mcp/system-prompt".into(), + name: "System Prompt".into(), + description: "A curated system context for AI assistants: server identity, version, \ + and installed packages summary." + .into(), + mime_type: Some("text/plain".into()), + }, + ]; + + McpResponse::ok( + id, + serde_json::to_value(&ResourcesListResult { resources }).unwrap(), + ) +} + +async fn handle_resources_read( + ctx: &RpcContext, + id: Option, + params: Option, +) -> McpResponse { + let read_params: ResourcesReadParams = match params.map(serde_json::from_value).transpose() { + Ok(Some(p)) => p, + Ok(None) => { + return McpResponse::error(id, INVALID_PARAMS, "Missing params".into(), None) + } + Err(e) => return McpResponse::error(id, INVALID_PARAMS, format!("{e}"), None), + }; + + // Validate resource URI (N7) + if let Err(msg) = validate_resource_uri(&read_params.uri) { + return McpResponse::error(id, INVALID_PARAMS, msg, None); + } + + // Special-case: system prompt resource (S-B) + if read_params.uri == "startos:///mcp/system-prompt" { + let prompt = build_system_prompt(ctx).await; + return McpResponse::ok( + id, + serde_json::to_value(&ResourcesReadResult { + contents: vec![ResourceContent { + uri: read_params.uri, + mime_type: Some("text/plain".into()), + text: Some(prompt), + }], + }) + .unwrap(), + ); + } + + let pointer = match read_params + .uri + .strip_prefix("startos://") + .and_then(|p| p.parse::().ok()) + { + Some(p) => p, + None => { + return McpResponse::error( + id, + INVALID_PARAMS, + format!("Invalid resource URI: {}", read_params.uri), + None, + ) + } + }; + + let dump: Dump = ctx.db.dump(&pointer).await; + let json_value = serde_json::to_value(&dump.value).unwrap_or(JsonValue::Null); + let text = serde_json::to_string_pretty(&json_value).unwrap_or_default(); + + McpResponse::ok( + id, + serde_json::to_value(&ResourcesReadResult { + contents: vec![ResourceContent { + uri: read_params.uri, + mime_type: Some("application/json".into()), + text: Some(text), + }], + }) + .unwrap(), + ) +} + +async fn handle_resources_subscribe( + ctx: &RpcContext, + sessions: &SessionMap, + session_id: Option<&str>, + id: Option, + params: Option, +) -> McpResponse { + let session_id = match session_id { + Some(s) => s, + None => { + return McpResponse::error( + id, + INVALID_REQUEST, + "Missing Mcp-Session-Id header".into(), + None, + ) + } + }; + + let sub_params: ResourcesSubscribeParams = + match params.map(serde_json::from_value).transpose() { + Ok(Some(p)) => p, + Ok(None) => { + return McpResponse::error(id, INVALID_PARAMS, "Missing params".into(), None) + } + Err(e) => return McpResponse::error(id, INVALID_PARAMS, format!("{e}"), None), + }; + + // Validate resource URI (N7) + if let Err(msg) = validate_resource_uri(&sub_params.uri) { + return McpResponse::error(id, INVALID_PARAMS, msg, None); + } + + if let Err(e) = + session::subscribe_resource(ctx, sessions, session_id, &sub_params.uri).await + { + return McpResponse::error(id, INTERNAL_ERROR, format!("{e}"), None); + } + + tracing::info!( + target: "mcp_audit", + uri = %sub_params.uri, + session_id = %session_id, + "MCP resource subscription started" + ); + + McpResponse::ok(id, serde_json::json!({})) +} + +fn handle_resources_unsubscribe( + sessions: &SessionMap, + session_id: Option<&str>, + id: Option, + params: Option, +) -> McpResponse { + let session_id = match session_id { + Some(s) => s, + None => { + return McpResponse::error( + id, + INVALID_REQUEST, + "Missing Mcp-Session-Id header".into(), + None, + ) + } + }; + + let unsub_params: ResourcesUnsubscribeParams = + match params.map(serde_json::from_value).transpose() { + Ok(Some(p)) => p, + Ok(None) => { + return McpResponse::error(id, INVALID_PARAMS, "Missing params".into(), None) + } + Err(e) => return McpResponse::error(id, INVALID_PARAMS, format!("{e}"), None), + }; + + session::unsubscribe_resource(sessions, session_id, &unsub_params.uri); + + tracing::info!( + target: "mcp_audit", + uri = %unsub_params.uri, + session_id = %session_id, + "MCP resource subscription stopped" + ); + + McpResponse::ok(id, serde_json::json!({})) +} + +// ============================================================================= +// Helpers +// ============================================================================= + +/// Extract the Origin header value from a request for CORS reflection. +fn extract_origin(request: &Request) -> Option { + request.headers().get("Origin").cloned() +} + +fn extract_session_id(request: &Request) -> Option { + request + .headers() + .get("Mcp-Session-Id") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) +} + +/// Validate that a resource URI is within the allowed set. +/// Only `startos:///public/**` subtrees and the special `startos:///mcp/system-prompt` +/// resource are accessible. +fn validate_resource_uri(uri: &str) -> Result<(), String> { + match uri.strip_prefix("startos://") { + Some("/mcp/system-prompt") => Ok(()), + Some(path) if path.starts_with("/public") => Ok(()), + Some(path) => Err(format!( + "Access denied: resource URI must start with startos:///public, got path: {path}" + )), + None => Err(format!( + "Invalid resource URI: must start with startos://, got: {uri}" + )), + } +} + +/// Build a curated system prompt for AI assistants (S-B). +async fn build_system_prompt(ctx: &RpcContext) -> String { + let dump = ctx + .db + .dump(&"/public/serverInfo".parse().unwrap()) + .await; + let server_info = serde_json::to_string_pretty(&dump.value).unwrap_or_default(); + format!( + "You are managing a StartOS server.\n\ + \n\ + Server info:\n{server_info}\n\ + \n\ + StartOS version: {}\n\ + \n\ + Use the available MCP tools to inspect, configure, and manage this server. \ + Always confirm destructive operations with the user before executing them.", + env!("CARGO_PKG_VERSION"), + ) +} + +fn json_response( + response: &McpResponse, + session_id: Option<&str>, + origin: Option<&HeaderValue>, +) -> Response { + let body = serde_json::to_vec(response).unwrap_or_else(|_| { + br#"{"jsonrpc":"2.0","error":{"code":-32603,"message":"Internal error"}}"#.to_vec() + }); + let mut builder = Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/json"); + if let Some(id) = session_id { + builder = builder.header("Mcp-Session-Id", id); + } + apply_cors(builder.body(Body::from(body)).unwrap(), origin) +} + +fn bad_request(msg: &str, origin: Option<&HeaderValue>) -> Response { + apply_cors( + Response::builder() + .status(StatusCode::BAD_REQUEST) + .header(CONTENT_TYPE, "text/plain") + .body(Body::from(msg.to_string())) + .unwrap(), + origin, + ) +} + +/// Apply CORS headers to any response. Reflects the origin when present, +/// falls back to `*` when absent (matching the rpc-toolkit Cors middleware). +fn apply_cors(mut response: Response, origin: Option<&HeaderValue>) -> Response { + let headers = response.headers_mut(); + headers.insert( + "Access-Control-Allow-Origin", + origin + .cloned() + .unwrap_or_else(|| HeaderValue::from_static("*")), + ); + headers.insert( + "Access-Control-Allow-Credentials", + HeaderValue::from_static("true"), + ); + headers.insert( + "Access-Control-Expose-Headers", + HeaderValue::from_static("Mcp-Session-Id"), + ); + response +} + +fn cors_preflight(request: &Request) -> Response { + let origin = request + .headers() + .get("Origin") + .cloned() + .unwrap_or_else(|| HeaderValue::from_static("*")); + let methods = request + .headers() + .get("Access-Control-Request-Method") + .cloned() + .unwrap_or_else(|| HeaderValue::from_static("POST, GET, DELETE, OPTIONS")); + let req_headers = request + .headers() + .get("Access-Control-Request-Headers") + .cloned() + .unwrap_or_else(|| { + HeaderValue::from_static("Content-Type, Mcp-Session-Id, Cookie") + }); + + Response::builder() + .status(StatusCode::OK) + .header("Access-Control-Allow-Origin", origin) + .header("Access-Control-Allow-Methods", methods) + .header("Access-Control-Allow-Headers", req_headers) + .header("Access-Control-Allow-Credentials", "true") + .header("Access-Control-Max-Age", "86400") + .body(Body::empty()) + .unwrap() +} diff --git a/core/src/mcp/protocol.rs b/core/src/mcp/protocol.rs new file mode 100644 index 000000000..35d2ac38d --- /dev/null +++ b/core/src/mcp/protocol.rs @@ -0,0 +1,211 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; + +// JSON-RPC 2.0 error codes +pub const PARSE_ERROR: i32 = -32700; +pub const INVALID_REQUEST: i32 = -32600; +pub const METHOD_NOT_FOUND: i32 = -32601; +pub const INVALID_PARAMS: i32 = -32602; +pub const INTERNAL_ERROR: i32 = -32603; + +pub const PROTOCOL_VERSION: &str = "2025-03-26"; + +// === JSON-RPC 2.0 envelope === + +#[derive(Deserialize)] +pub struct McpRequest { + pub jsonrpc: String, + pub id: Option, + pub method: String, + #[serde(default)] + pub params: Option, +} + +#[derive(Serialize)] +pub struct McpResponse { + pub jsonrpc: &'static str, + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +impl McpResponse { + pub fn ok(id: Option, result: JsonValue) -> Self { + Self { + jsonrpc: "2.0", + id, + result: Some(result), + error: None, + } + } + + pub fn error(id: Option, code: i32, message: String, data: Option) -> Self { + Self { + jsonrpc: "2.0", + id, + result: None, + error: Some(McpError { + code, + message, + data, + }), + } + } +} + +#[derive(Serialize)] +pub struct McpError { + pub code: i32, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +// === initialize === + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct InitializeParams { + pub protocol_version: String, + #[serde(default)] + pub capabilities: JsonValue, + #[serde(default)] + pub client_info: Option, +} + +#[derive(Deserialize)] +pub struct ClientInfo { + pub name: String, + #[serde(default)] + pub version: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct InitializeResult { + pub protocol_version: &'static str, + pub capabilities: ServerCapabilities, + pub server_info: ServerInfo, +} + +#[derive(Serialize)] +pub struct ServerCapabilities { + pub tools: ToolsCapability, + pub resources: ResourcesCapability, +} + +#[derive(Serialize)] +pub struct ToolsCapability {} + +#[derive(Serialize)] +pub struct ResourcesCapability { + pub subscribe: bool, +} + +#[derive(Serialize)] +pub struct ServerInfo { + pub name: &'static str, + pub version: String, +} + +// === tools/list === + +#[derive(Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ToolDefinition { + pub name: String, + pub description: String, + pub input_schema: JsonValue, +} + +#[derive(Serialize)] +pub struct ToolsListResult { + pub tools: Vec, +} + +// === tools/call === + +#[derive(Deserialize)] +pub struct ToolsCallParams { + pub name: String, + #[serde(default)] + pub arguments: JsonValue, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ToolsCallResult { + pub content: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub is_error: Option, +} + +#[derive(Serialize)] +#[serde(tag = "type")] +pub enum ContentBlock { + #[serde(rename = "text")] + Text { text: String }, +} + +// === resources/list === + +#[derive(Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ResourceDefinition { + pub uri: String, + pub name: String, + pub description: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub mime_type: Option, +} + +#[derive(Serialize)] +pub struct ResourcesListResult { + pub resources: Vec, +} + +// === resources/read === + +#[derive(Deserialize)] +pub struct ResourcesReadParams { + pub uri: String, +} + +#[derive(Serialize)] +pub struct ResourcesReadResult { + pub contents: Vec, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ResourceContent { + pub uri: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub mime_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub text: Option, +} + +// === resources/subscribe + unsubscribe === + +#[derive(Deserialize)] +pub struct ResourcesSubscribeParams { + pub uri: String, +} + +#[derive(Deserialize)] +pub struct ResourcesUnsubscribeParams { + pub uri: String, +} + +// === Server→client notification === + +#[derive(Serialize)] +pub struct McpNotification { + pub jsonrpc: &'static str, + pub method: &'static str, + pub params: serde_json::Value, +} diff --git a/core/src/mcp/session.rs b/core/src/mcp/session.rs new file mode 100644 index 000000000..1879e9539 --- /dev/null +++ b/core/src/mcp/session.rs @@ -0,0 +1,232 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use patch_db::json_ptr::JsonPointer; +use patch_db::DiffPatch; +use serde_json::Value as JsonValue; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use uuid::Uuid; + +use crate::context::RpcContext; +use crate::db::DbSubscriber; +use crate::prelude::*; +use crate::util::sync::SyncMutex; + +use super::protocol::McpNotification; + +pub(crate) type SessionMap = Arc>>; + +/// Maximum time a session can exist without a GET stream before being cleaned up. +const SESSION_STALE_TIMEOUT: Duration = Duration::from_secs(60); + +/// Maximum buffered notifications before backpressure kicks in. +const NOTIFICATION_CHANNEL_BOUND: usize = 256; + +pub(crate) struct McpSession { + pub notification_tx: mpsc::Sender, + pub notification_rx: Option>, + pub subscriptions: HashMap>, + pub created_at: Instant, +} + +pub(crate) fn create_session(sessions: &SessionMap) -> String { + let id = Uuid::new_v4().to_string(); + let (tx, rx) = mpsc::channel(NOTIFICATION_CHANNEL_BOUND); + sessions.mutate(|map| { + map.insert( + id.clone(), + McpSession { + notification_tx: tx, + notification_rx: Some(rx), + subscriptions: HashMap::new(), + created_at: Instant::now(), + }, + ); + }); + id +} + +/// Sweep stale sessions. Call this from any frequent code path (POST handler, create_session). +pub(crate) fn sweep_stale_sessions_if_needed(sessions: &SessionMap) { + sessions.mutate(|map| sweep_stale_sessions(map)); +} + +/// Remove sessions that were created but never connected a GET stream within the timeout. +fn sweep_stale_sessions(map: &mut HashMap) { + let stale: Vec = map + .iter() + .filter(|(_, session)| { + // Session is stale if rx is still present (no GET connected) and it's old + session.notification_rx.is_some() + && session.created_at.elapsed() > SESSION_STALE_TIMEOUT + }) + .map(|(id, _)| id.clone()) + .collect(); + for id in stale { + tracing::info!( + target: "mcp_audit", + session_id = %id, + "Sweeping stale MCP session (no GET stream connected)" + ); + if let Some(session) = map.remove(&id) { + for (_, handle) in session.subscriptions { + handle.abort(); + } + } + } +} + +pub(crate) fn remove_session(sessions: &SessionMap, id: &str) { + sessions.mutate(|map| { + if let Some(session) = map.remove(id) { + for (_, handle) in session.subscriptions { + handle.abort(); + } + } + }); +} + +/// Take the notification receiver from a session (for use by the SSE stream). +/// Returns None if the session doesn't exist or the rx was already taken. +pub(crate) fn take_notification_rx( + sessions: &SessionMap, + id: &str, +) -> Option> { + sessions.mutate(|map| map.get_mut(id)?.notification_rx.take()) +} + +/// Check whether the given session ID exists in the session map. +pub(crate) fn session_exists(sessions: &SessionMap, id: &str) -> bool { + sessions.peek(|map| map.contains_key(id)) +} + +/// Parse a `startos:///...` URI into a JsonPointer. +fn parse_resource_uri(uri: &str) -> Result { + let path = uri.strip_prefix("startos://").ok_or_else(|| { + Error::new( + eyre!("Invalid resource URI: must start with startos://"), + ErrorKind::InvalidRequest, + ) + })?; + path.parse::() + .with_kind(ErrorKind::InvalidRequest) +} + +pub(crate) async fn subscribe_resource( + ctx: &RpcContext, + sessions: &SessionMap, + session_id: &str, + uri: &str, +) -> Result<(), Error> { + let pointer = parse_resource_uri(uri)?; + + let (dump, sub) = ctx.db.dump_and_sub(pointer).await; + let mut db_sub = DbSubscriber { + rev: dump.id, + sub, + sync_db: ctx.sync_db.subscribe(), + }; + + let tx = sessions + .peek(|map| map.get(session_id).map(|s| s.notification_tx.clone())) + .ok_or_else(|| Error::new(eyre!("Session not found"), ErrorKind::InvalidRequest))?; + + let uri_owned = uri.to_string(); + + let handle = tokio::spawn(async move { + loop { + // Wait for first revision + let first = match db_sub.recv().await { + Some(rev) => rev, + None => break, + }; + + // Debounce: collect more revisions for up to 500ms + let mut merged_id = first.id; + let mut merged_patch = first.patch; + + let debounce = tokio::time::sleep(Duration::from_millis(500)); + tokio::pin!(debounce); + + loop { + tokio::select! { + _ = &mut debounce => break, + rev = db_sub.recv() => { + match rev { + Some(rev) => { + merged_id = rev.id; + merged_patch.append(rev.patch); + } + None => { + // Subscriber closed — send what we have and exit + let _ = send_notification(&tx, &uri_owned, merged_id, &merged_patch); + return; + } + } + } + } + } + + if send_notification(&tx, &uri_owned, merged_id, &merged_patch).is_err() { + break; // SSE stream closed or channel full + } + } + }); + + // Store the task handle, aborting any prior subscription for the same URI + sessions.mutate(|map| { + if let Some(session) = map.get_mut(session_id) { + if let Some(old_handle) = session.subscriptions.remove(uri) { + tracing::info!( + target: "mcp_audit", + uri = %uri, + session_id = %session_id, + "Aborting prior subscription for re-subscribed URI" + ); + old_handle.abort(); + } + session.subscriptions.insert(uri.to_string(), handle); + } + }); + + Ok(()) +} + +fn send_notification( + tx: &mpsc::Sender, + uri: &str, + id: u64, + patch: &DiffPatch, +) -> Result<(), ()> { + let notification = McpNotification { + jsonrpc: "2.0", + method: "notifications/resources/updated", + params: serde_json::json!({ + "uri": uri, + "revision": { + "id": id, + "patch": patch, + } + }), + }; + tx.try_send(serde_json::to_value(¬ification).unwrap_or_default()) + .map_err(|e| { + tracing::warn!( + target: "mcp_audit", + uri = %uri, + "Notification channel full or closed, dropping notification: {e}" + ); + }) +} + +pub(crate) fn unsubscribe_resource(sessions: &SessionMap, session_id: &str, uri: &str) { + sessions.mutate(|map| { + if let Some(session) = map.get_mut(session_id) { + if let Some(handle) = session.subscriptions.remove(uri) { + handle.abort(); + } + } + }); +} diff --git a/core/src/mcp/tools.rs b/core/src/mcp/tools.rs new file mode 100644 index 000000000..94e3e152f --- /dev/null +++ b/core/src/mcp/tools.rs @@ -0,0 +1,1385 @@ +use std::collections::HashMap; + +use serde_json::{Value as JsonValue, json}; + +use super::protocol::ToolDefinition; + +pub struct ToolEntry { + pub definition: ToolDefinition, + pub rpc_method: &'static str, + pub sync_db: bool, + /// If true, inject `__Auth_session` (hashed session token) into params before dispatch. + pub needs_session: bool, +} + +fn tool(name: &str, description: &str, rpc_method: &'static str) -> ToolEntry { + ToolEntry { + definition: ToolDefinition { + name: name.into(), + description: description.into(), + input_schema: empty_schema(), + }, + rpc_method, + sync_db: false, + needs_session: false, + } +} + +fn empty_schema() -> JsonValue { + json!({ "type": "object", "properties": {}, "additionalProperties": false }) +} + +fn pkg_id_schema() -> JsonValue { + json!({ + "type": "object", + "properties": { + "id": { "type": "string", "description": "Package identifier (e.g. 'bitcoind', 'lnd'). Read or subscribe to the startos:///public/packageData resource to see installed packages." } + }, + "required": ["id"], + "additionalProperties": false + }) +} + +pub fn tool_registry() -> HashMap { + let tools = vec![ + // ===================================================================== + // Server management + // ===================================================================== + tool( + "server.time", + "Get the current server time (UTC) and uptime. Use this to check if the server is responsive and how long it has been running since last reboot.", + "server.time", + ), + tool( + "server.device-info", + "Get hardware and software information about this StartOS server, including the device name, server ID, OS version, CPU architecture, platform, and network addresses (LAN, Tor). Use this to understand what device you are managing.", + "server.device-info", + ), + tool( + "server.metrics", + "Get current server resource usage: CPU load, memory usage, disk usage, and temperature. Returns null if metrics are not yet available (the server collects metrics periodically). Use this to diagnose performance issues or check available disk space.", + "server.metrics", + ), + ToolEntry { + definition: ToolDefinition { + name: "server.shutdown".into(), + description: "Shut down the StartOS server. THIS IS DESTRUCTIVE: the server will power off and all services will stop. The server must be physically restarted. Always confirm with the user before calling this.".into(), + input_schema: empty_schema(), + }, + rpc_method: "server.shutdown", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "server.restart".into(), + description: "Restart the StartOS server. All services will stop and restart automatically. The server will be temporarily unreachable during the reboot. Confirm with the user before calling this.".into(), + input_schema: empty_schema(), + }, + rpc_method: "server.restart", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "server.rebuild".into(), + description: "Tear down and rebuild all service containers. This restarts every installed service. Use this to fix issues caused by corrupted container state. Services will be temporarily unavailable. Confirm with the user before calling.".into(), + input_schema: empty_schema(), + }, + rpc_method: "server.rebuild", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "server.set-hostname".into(), + description: "Change the server's display name and/or hostname. The name is a human-readable label. The hostname is the network identifier. Either can be provided independently.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "name": { "type": "string", "description": "Human-readable display name for the server" }, + "hostname": { "type": "string", "description": "Network hostname (alphanumeric and hyphens only)" } + }, + "additionalProperties": false + }), + }, + rpc_method: "server.set-hostname", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "server.update".into(), + description: "Check for and apply a StartOS system update from a registry. The update downloads and installs asynchronously — subscribe to startos:///public/serverStatus to monitor update progress. THIS WILL REBOOT THE SERVER if an update is applied. Confirm with the user before calling.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "registry": { "type": "string", "description": "URL of the update registry (e.g. 'https://registry.start9.com')" }, + "target": { "type": "string", "description": "Target version range (optional, uses latest if omitted)" } + }, + "required": ["registry"], + "additionalProperties": false + }), + }, + rpc_method: "server.update", + sync_db: true, + needs_session: false, + }, + tool( + "server.update-firmware", + "Check for and apply a firmware update for the server hardware. Returns whether a reboot is required. Only applicable on supported hardware platforms.", + "server.update-firmware", + ), + ToolEntry { + definition: ToolDefinition { + name: "server.set-smtp".into(), + description: "Configure the server's SMTP settings for sending email notifications. Requires SMTP server details.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "host": { "type": "string", "description": "SMTP server hostname" }, + "port": { "type": "integer", "description": "SMTP server port (e.g. 587 for STARTTLS, 465 for TLS)" }, + "from": { "type": "string", "description": "Sender email address" }, + "username": { "type": "string", "description": "SMTP username" }, + "password": { "type": "string", "description": "SMTP password" }, + "security": { "type": "string", "enum": ["Starttls", "Tls"], "description": "Connection security. Default: Starttls" } + }, + "required": ["host", "port", "from", "username"], + "additionalProperties": false + }), + }, + rpc_method: "server.set-smtp", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "server.test-smtp".into(), + description: "Send a test email to verify SMTP settings are working correctly. Does not save the settings.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "host": { "type": "string", "description": "SMTP server hostname" }, + "port": { "type": "integer", "description": "SMTP server port" }, + "from": { "type": "string", "description": "Sender email address" }, + "to": { "type": "string", "description": "Recipient email address for the test" }, + "username": { "type": "string", "description": "SMTP username" }, + "password": { "type": "string", "description": "SMTP password" }, + "security": { "type": "string", "enum": ["Starttls", "Tls"], "description": "Connection security. Default: Starttls" } + }, + "required": ["host", "port", "from", "to", "username", "password"], + "additionalProperties": false + }), + }, + rpc_method: "server.test-smtp", + sync_db: false, + needs_session: false, + }, + tool( + "server.clear-smtp", + "Remove the server's SMTP configuration. Email notifications will no longer be sent.", + "server.clear-smtp", + ), + ToolEntry { + definition: ToolDefinition { + name: "server.set-echoip-urls".into(), + description: "Set the URLs used to determine the server's external IP address. These services are queried to detect the WAN IP.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "urls": { + "type": "array", + "items": { "type": "string" }, + "description": "List of echoip service URLs" + } + }, + "required": ["urls"], + "additionalProperties": false + }), + }, + rpc_method: "server.set-echoip-urls", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "server.set-keyboard".into(), + description: "Set the keyboard layout for the server console (kiosk mode).".into(), + input_schema: json!({ + "type": "object", + "properties": { + "layout": { "type": "string", "description": "Keyboard layout code (e.g. 'us', 'de', 'fr')" }, + "keymap": { "type": "string", "description": "Keymap name (optional)" }, + "model": { "type": "string", "description": "Keyboard model (optional)" }, + "variant": { "type": "string", "description": "Layout variant (optional)" }, + "options": { "type": "array", "items": { "type": "string" }, "description": "Additional keyboard options (optional)" } + }, + "required": ["layout"], + "additionalProperties": false + }), + }, + rpc_method: "server.set-keyboard", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "server.set-language".into(), + description: "Set the system locale/language for the server.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "language": { "type": "string", "description": "Language code (e.g. 'en_US', 'de_DE', 'es_ES')" } + }, + "required": ["language"], + "additionalProperties": false + }), + }, + rpc_method: "server.set-language", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "server.experimental.zram".into(), + description: "Enable or disable ZRAM compressed swap. ZRAM uses compressed RAM as swap space, which can improve performance on memory-constrained systems.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "enable": { "type": "boolean", "description": "true to enable ZRAM, false to disable" } + }, + "required": ["enable"], + "additionalProperties": false + }), + }, + rpc_method: "server.experimental.zram", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "server.experimental.governor".into(), + description: "Get or set the CPU frequency governor. Returns current governor and available options. Pass 'set' to change the governor.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "set": { "type": "string", "description": "Governor to set (e.g. 'performance', 'powersave', 'ondemand'). Omit to just read current state." } + }, + "additionalProperties": false + }), + }, + rpc_method: "server.experimental.governor", + sync_db: false, + needs_session: false, + }, + // ===================================================================== + // Package / service management + // ===================================================================== + tool( + "package.list", + "List all installed packages (services) on this StartOS server. Returns each package's ID, version, and current status. For real-time status updates, subscribe to the startos:///public/packageData resource instead of polling this tool.", + "package.list", + ), + ToolEntry { + definition: ToolDefinition { + name: "package.install".into(), + description: "Install a package (service) from a registry. The installation is asynchronous — it starts the download and setup, but the service will not be immediately available. Subscribe to startos:///public/packageData/ to monitor this package's installation progress in real time.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "registry": { "type": "string", "description": "URL of the package registry (e.g. 'https://registry.start9.com')" }, + "id": { "type": "string", "description": "Package identifier (e.g. 'bitcoind', 'lnd', 'nextcloud')" }, + "version": { "type": "string", "description": "Version to install (e.g. '0.27.0:0')" } + }, + "required": ["registry", "id", "version"], + "additionalProperties": false + }), + }, + rpc_method: "package.install", + sync_db: true, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "package.uninstall".into(), + description: "Uninstall a package (service). THIS IS DESTRUCTIVE: the service and its data will be removed unless 'soft' is true. Always confirm with the user.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "id": { "type": "string", "description": "Package identifier to uninstall" }, + "soft": { "type": "boolean", "description": "If true, keep package data on disk for potential reinstall. Default: false" }, + "force": { "type": "boolean", "description": "If true, force uninstall even if other packages depend on this one. Default: false" } + }, + "required": ["id"], + "additionalProperties": false + }), + }, + rpc_method: "package.uninstall", + sync_db: true, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "package.start".into(), + description: "Start an installed service. Starting is asynchronous — subscribe to startos:///public/packageData/ to monitor the state transition.".into(), + input_schema: pkg_id_schema(), + }, + rpc_method: "package.start", + sync_db: true, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "package.stop".into(), + description: "Stop a running service. The service will be gracefully shut down. Subscribe to startos:///public/packageData/ to monitor the state transition. It can be restarted later with package.start.".into(), + input_schema: pkg_id_schema(), + }, + rpc_method: "package.stop", + sync_db: true, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "package.restart".into(), + description: "Restart a running service. Equivalent to stop followed by start. Subscribe to startos:///public/packageData/ to monitor the state transition.".into(), + input_schema: pkg_id_schema(), + }, + rpc_method: "package.restart", + sync_db: true, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "package.rebuild".into(), + description: "Rebuild a service's container. Subscribe to startos:///public/packageData/ to monitor the rebuild. Use this to fix container corruption for a specific service without affecting others.".into(), + input_schema: pkg_id_schema(), + }, + rpc_method: "package.rebuild", + sync_db: true, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "package.cancel-install".into(), + description: "Cancel an in-progress package installation.".into(), + input_schema: pkg_id_schema(), + }, + rpc_method: "package.cancel-install", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "package.installed-version".into(), + description: "Get the installed version of a specific package. Returns null if the package is not installed.".into(), + input_schema: pkg_id_schema(), + }, + rpc_method: "package.installed-version", + sync_db: false, + needs_session: false, + }, + tool( + "package.stats", + "Get resource usage statistics for all LXC service containers: memory usage, memory limit, and percentage. Use this to identify which services are using the most resources.", + "package.stats", + ), + ToolEntry { + definition: ToolDefinition { + name: "package.set-outbound-gateway".into(), + description: "Set the outbound network gateway for a specific package. This controls which network interface the service uses for outbound connections. Set gateway to null to use the system default.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "package": { "type": "string", "description": "Package identifier" }, + "gateway": { "type": "string", "description": "Gateway ID to use for outbound traffic, or null for system default" } + }, + "required": ["package"], + "additionalProperties": false + }), + }, + rpc_method: "package.set-outbound-gateway", + sync_db: true, + needs_session: false, + }, + // === Package actions === + ToolEntry { + definition: ToolDefinition { + name: "package.action.get-input".into(), + description: "Get the input specification for a package action. Returns the form schema that describes what input the action expects. Use this before running an action to understand what parameters it needs.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "packageId": { "type": "string", "description": "Package identifier" }, + "actionId": { "type": "string", "description": "Action identifier" }, + "prefill": { "type": "object", "description": "Optional prefill values for the action form" } + }, + "required": ["packageId", "actionId"], + "additionalProperties": false + }), + }, + rpc_method: "package.action.get-input", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "package.action.run".into(), + description: "Run a package action. Actions are service-specific operations defined by each package (e.g. reset password, create user, run maintenance). Use package.action.get-input first to understand what input is needed.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "packageId": { "type": "string", "description": "Package identifier" }, + "actionId": { "type": "string", "description": "Action identifier" }, + "input": { "description": "Input data for the action (schema varies per action)" } + }, + "required": ["packageId", "actionId"], + "additionalProperties": false + }), + }, + rpc_method: "package.action.run", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "package.action.clear-task".into(), + description: "Clear a completed or failed action task for a package.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "packageId": { "type": "string", "description": "Package identifier" }, + "replayId": { "type": "string", "description": "Replay/task ID to clear" }, + "force": { "type": "boolean", "description": "Force clear even if task is still running. Default: false" } + }, + "required": ["packageId", "replayId"], + "additionalProperties": false + }), + }, + rpc_method: "package.action.clear-task", + sync_db: false, + needs_session: false, + }, + // === Package backup === + ToolEntry { + definition: ToolDefinition { + name: "package.backup.restore".into(), + description: "Restore packages from a backup. Requires a mounted backup target with a valid backup.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "targetId": { "type": "string", "description": "Backup target ID (must be mounted)" }, + "packageIds": { + "type": "array", + "items": { "type": "string" }, + "description": "Package IDs to restore. If omitted, all backed-up packages are restored." + }, + "password": { "type": "string", "description": "Backup encryption password" } + }, + "required": ["targetId", "password"], + "additionalProperties": false + }), + }, + rpc_method: "package.backup.restore", + sync_db: false, + needs_session: false, + }, + // ===================================================================== + // Notifications + // ===================================================================== + ToolEntry { + definition: ToolDefinition { + name: "notification.list".into(), + description: "List recent notifications from the server. Notifications include system events, service status changes, backup results, and errors. Use 'before' to paginate and 'limit' to control page size.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "before": { "type": "integer", "description": "Return notifications with IDs before this value (for pagination)" }, + "limit": { "type": "integer", "description": "Maximum number of notifications to return" } + }, + "additionalProperties": false + }), + }, + rpc_method: "notification.list", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "notification.remove".into(), + description: "Remove specific notifications by their IDs.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "ids": { "type": "array", "items": { "type": "integer" }, "description": "Notification IDs to remove" } + }, + "required": ["ids"], + "additionalProperties": false + }), + }, + rpc_method: "notification.remove", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "notification.remove-before".into(), + description: "Remove all notifications with IDs before the specified value.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "before": { "type": "integer", "description": "Remove notifications with IDs less than this value" } + }, + "required": ["before"], + "additionalProperties": false + }), + }, + rpc_method: "notification.remove-before", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "notification.mark-seen".into(), + description: "Mark specific notifications as seen/read.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "ids": { "type": "array", "items": { "type": "integer" }, "description": "Notification IDs to mark as seen" } + }, + "required": ["ids"], + "additionalProperties": false + }), + }, + rpc_method: "notification.mark-seen", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "notification.mark-seen-before".into(), + description: "Mark all notifications with IDs before the specified value as seen/read.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "before": { "type": "integer", "description": "Mark notifications with IDs less than this value as seen" } + }, + "required": ["before"], + "additionalProperties": false + }), + }, + rpc_method: "notification.mark-seen-before", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "notification.mark-unseen".into(), + description: "Mark specific notifications as unseen/unread.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "ids": { "type": "array", "items": { "type": "integer" }, "description": "Notification IDs to mark as unseen" } + }, + "required": ["ids"], + "additionalProperties": false + }), + }, + rpc_method: "notification.mark-unseen", + sync_db: false, + needs_session: false, + }, + // ===================================================================== + // SSH keys + // ===================================================================== + tool( + "ssh.list", + "List all SSH public keys authorized to access this server. Returns each key's algorithm, fingerprint, hostname label, and creation date.", + "ssh.list", + ), + ToolEntry { + definition: ToolDefinition { + name: "ssh.add".into(), + description: "Add an SSH public key to the server's authorized keys.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "key": { "type": "string", "description": "SSH public key in OpenSSH format (e.g. 'ssh-ed25519 AAAA... user@host')" } + }, + "required": ["key"], + "additionalProperties": false + }), + }, + rpc_method: "ssh.add", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "ssh.remove".into(), + description: "Remove an SSH public key from the server's authorized keys. Use ssh.list to find the fingerprint.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "fingerprint": { "type": "string", "description": "Fingerprint of the SSH key to remove" } + }, + "required": ["fingerprint"], + "additionalProperties": false + }), + }, + rpc_method: "ssh.remove", + sync_db: false, + needs_session: false, + }, + // ===================================================================== + // Backup + // ===================================================================== + ToolEntry { + definition: ToolDefinition { + name: "backup.create".into(), + description: "Create a backup of server data and installed packages to a backup target (USB drive, network share). The backup runs asynchronously — subscribe to startos:///public/serverStatus to monitor backup progress in real time.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "targetId": { "type": "string", "description": "Backup target ID. Use backup.target.list to see available targets." }, + "password": { "type": "string", "description": "Encryption password for the backup" }, + "oldPassword": { "type": "string", "description": "Previous backup password, if changing password" }, + "packageIds": { "type": "array", "items": { "type": "string" }, "description": "Specific package IDs to back up. If omitted, all packages are backed up." } + }, + "required": ["targetId", "password"], + "additionalProperties": false + }), + }, + rpc_method: "backup.create", + sync_db: false, + needs_session: false, + }, + tool( + "backup.target.list", + "List all configured backup targets (USB drives, CIFS/network shares). Returns target IDs, types, and connection details.", + "backup.target.list", + ), + ToolEntry { + definition: ToolDefinition { + name: "backup.target.info".into(), + description: "Get detailed information about a backup on a target, including which packages were backed up and when.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "targetId": { "type": "string", "description": "Backup target ID" }, + "serverId": { "type": "string", "description": "Server ID to get backup info for" }, + "password": { "type": "string", "description": "Backup encryption password" } + }, + "required": ["targetId", "serverId", "password"], + "additionalProperties": false + }), + }, + rpc_method: "backup.target.info", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "backup.target.mount".into(), + description: "Mount a backup target to make it accessible for backup or restore operations.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "targetId": { "type": "string", "description": "Backup target ID to mount" }, + "serverId": { "type": "string", "description": "Server ID (optional)" }, + "password": { "type": "string", "description": "Backup encryption password" }, + "allowPartial": { "type": "boolean", "description": "Allow mounting even if some data is incomplete. Default: false" } + }, + "required": ["targetId", "password"], + "additionalProperties": false + }), + }, + rpc_method: "backup.target.mount", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "backup.target.umount".into(), + description: "Unmount a backup target.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "targetId": { "type": "string", "description": "Backup target ID to unmount. If omitted, unmounts all." } + }, + "additionalProperties": false + }), + }, + rpc_method: "backup.target.umount", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "backup.target.cifs.add".into(), + description: "Add a new CIFS/SMB network backup target (e.g. a NAS share).".into(), + input_schema: json!({ + "type": "object", + "properties": { + "hostname": { "type": "string", "description": "CIFS server hostname or IP" }, + "path": { "type": "string", "description": "Share path on the server (e.g. '/backups')" }, + "username": { "type": "string", "description": "CIFS username" }, + "password": { "type": "string", "description": "CIFS password (optional for guest access)" } + }, + "required": ["hostname", "path", "username"], + "additionalProperties": false + }), + }, + rpc_method: "backup.target.cifs.add", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "backup.target.cifs.update".into(), + description: "Update an existing CIFS/SMB network backup target's connection details.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "id": { "type": "string", "description": "Backup target ID to update" }, + "hostname": { "type": "string", "description": "CIFS server hostname or IP" }, + "path": { "type": "string", "description": "Share path on the server" }, + "username": { "type": "string", "description": "CIFS username" }, + "password": { "type": "string", "description": "CIFS password" } + }, + "required": ["id", "hostname", "path", "username"], + "additionalProperties": false + }), + }, + rpc_method: "backup.target.cifs.update", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "backup.target.cifs.remove".into(), + description: "Remove a CIFS/SMB network backup target.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "id": { "type": "string", "description": "Backup target ID to remove" } + }, + "required": ["id"], + "additionalProperties": false + }), + }, + rpc_method: "backup.target.cifs.remove", + sync_db: false, + needs_session: false, + }, + // ===================================================================== + // Network + // ===================================================================== + tool( + "net.gateway.list", + "List all network gateways (interfaces) the server can listen on. Shows interface type, IP addresses, and WAN IP for each.", + "net.gateway.list", + ), + ToolEntry { + definition: ToolDefinition { + name: "net.gateway.forget".into(), + description: "Remove a disconnected gateway from the server's list. Only works on gateways that are currently disconnected.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "gateway": { "type": "string", "description": "Gateway ID to forget" } + }, + "required": ["gateway"], + "additionalProperties": false + }), + }, + rpc_method: "net.gateway.forget", + sync_db: true, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "net.gateway.set-name".into(), + description: "Rename a network gateway for easier identification in the UI.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "id": { "type": "string", "description": "Gateway ID to rename" }, + "name": { "type": "string", "description": "New display name for the gateway" } + }, + "required": ["id", "name"], + "additionalProperties": false + }), + }, + rpc_method: "net.gateway.set-name", + sync_db: true, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "net.gateway.check-port".into(), + description: "Check if a port is reachable from the internet through a specific gateway. Tests both external and internal reachability, and hairpinning support.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "port": { "type": "integer", "description": "Port number to check" }, + "gateway": { "type": "string", "description": "Gateway ID to check the port through" } + }, + "required": ["port", "gateway"], + "additionalProperties": false + }), + }, + rpc_method: "net.gateway.check-port", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "net.gateway.check-dns".into(), + description: "Check if DNS resolution works through a specific gateway.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "gateway": { "type": "string", "description": "Gateway ID to check DNS through" } + }, + "required": ["gateway"], + "additionalProperties": false + }), + }, + rpc_method: "net.gateway.check-dns", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "net.dns.query".into(), + description: "Perform a DNS lookup for a fully qualified domain name using the server's DNS resolver.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "fqdn": { "type": "string", "description": "Fully qualified domain name to look up" } + }, + "required": ["fqdn"], + "additionalProperties": false + }), + }, + rpc_method: "net.dns.query", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "net.dns.set-static".into(), + description: "Set static DNS server addresses. If null/empty, the server uses DHCP-provided DNS.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "servers": { "type": "array", "items": { "type": "string" }, "description": "List of DNS server IP addresses (e.g. ['1.1.1.1', '8.8.8.8'])" } + }, + "additionalProperties": false + }), + }, + rpc_method: "net.dns.set-static", + sync_db: false, + needs_session: false, + }, + tool( + "net.dns.dump-table", + "Dump the server's internal DNS resolution table for debugging.", + "net.dns.dump-table", + ), + tool( + "net.forward.dump-table", + "Dump the server's port forwarding table for debugging.", + "net.forward.dump-table", + ), + ToolEntry { + definition: ToolDefinition { + name: "net.acme.init".into(), + description: "Initialize an ACME certificate provider for automated SSL/TLS certificate management (e.g. Let's Encrypt).".into(), + input_schema: json!({ + "type": "object", + "properties": { + "provider": { "type": "string", "description": "ACME provider identifier" }, + "contact": { "type": "array", "items": { "type": "string" }, "description": "Contact email addresses for the ACME account" } + }, + "required": ["provider", "contact"], + "additionalProperties": false + }), + }, + rpc_method: "net.acme.init", + sync_db: true, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "net.acme.remove".into(), + description: "Remove an ACME certificate provider configuration.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "provider": { "type": "string", "description": "ACME provider identifier to remove" } + }, + "required": ["provider"], + "additionalProperties": false + }), + }, + rpc_method: "net.acme.remove", + sync_db: true, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "net.tunnel.add".into(), + description: "Add a VPN/tunnel gateway configuration. This allows the server to route traffic through VPN tunnels.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "name": { "type": "string", "description": "Display name for the tunnel" }, + "config": { "type": "string", "description": "WireGuard configuration content" }, + "type": { "type": "string", "description": "Gateway type (optional)" }, + "setAsDefaultOutbound": { "type": "boolean", "description": "Set this tunnel as the default outbound gateway. Default: false" } + }, + "required": ["config"], + "additionalProperties": false + }), + }, + rpc_method: "net.tunnel.add", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "net.tunnel.remove".into(), + description: "Remove a VPN/tunnel gateway configuration.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "id": { "type": "string", "description": "Gateway/tunnel ID to remove" } + }, + "required": ["id"], + "additionalProperties": false + }), + }, + rpc_method: "net.tunnel.remove", + sync_db: false, + needs_session: false, + }, + tool( + "net.vhost.dump-table", + "Dump the SSL virtual host proxy table for debugging.", + "net.vhost.dump-table", + ), + ToolEntry { + definition: ToolDefinition { + name: "net.vhost.add-passthrough".into(), + description: "Add an SSL passthrough rule. Incoming TLS connections matching the hostname will be forwarded directly to the backend without termination.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "hostname": { "type": "string", "description": "Hostname to match for passthrough" }, + "listenPort": { "type": "integer", "description": "Port to listen on" }, + "backend": { "type": "string", "description": "Backend address (IP:port)" }, + "publicGateway": { "type": "array", "items": { "type": "string" }, "description": "Gateway IDs to expose on publicly" }, + "privateIp": { "type": "array", "items": { "type": "string" }, "description": "Private IP addresses to listen on" } + }, + "required": ["hostname", "listenPort", "backend"], + "additionalProperties": false + }), + }, + rpc_method: "net.vhost.add-passthrough", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "net.vhost.remove-passthrough".into(), + description: "Remove an SSL passthrough rule.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "hostname": { "type": "string", "description": "Hostname of the passthrough to remove" }, + "listenPort": { "type": "integer", "description": "Listen port of the passthrough to remove" } + }, + "required": ["hostname", "listenPort"], + "additionalProperties": false + }), + }, + rpc_method: "net.vhost.remove-passthrough", + sync_db: false, + needs_session: false, + }, + tool( + "net.vhost.list-passthrough", + "List all SSL passthrough rules currently configured.", + "net.vhost.list-passthrough", + ), + // ===================================================================== + // WiFi + // ===================================================================== + ToolEntry { + definition: ToolDefinition { + name: "wifi.set-enabled".into(), + description: "Enable or disable the WiFi interface on the server.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "enabled": { "type": "boolean", "description": "true to enable WiFi, false to disable" } + }, + "required": ["enabled"], + "additionalProperties": false + }), + }, + rpc_method: "wifi.set-enabled", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "wifi.add".into(), + description: "Add a WiFi network configuration (SSID and password). Does not connect immediately — use wifi.connect to connect.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "ssid": { "type": "string", "description": "WiFi network name (SSID)" }, + "password": { "type": "string", "description": "WiFi password" } + }, + "required": ["ssid", "password"], + "additionalProperties": false + }), + }, + rpc_method: "wifi.add", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "wifi.connect".into(), + description: "Connect to a previously added WiFi network.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "ssid": { "type": "string", "description": "WiFi network name (SSID) to connect to" } + }, + "required": ["ssid"], + "additionalProperties": false + }), + }, + rpc_method: "wifi.connect", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "wifi.remove".into(), + description: "Remove a saved WiFi network configuration. If currently connected to this network, the server will disconnect.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "ssid": { "type": "string", "description": "WiFi network name (SSID) to remove" } + }, + "required": ["ssid"], + "additionalProperties": false + }), + }, + rpc_method: "wifi.remove", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "wifi.get".into(), + description: "Get details about a specific saved WiFi network configuration.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "ssid": { "type": "string", "description": "WiFi network name (SSID)" } + }, + "required": ["ssid"], + "additionalProperties": false + }), + }, + rpc_method: "wifi.get", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "wifi.country.set".into(), + description: "Set the WiFi regulatory country code. This affects which channels and power levels are available.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "country": { "type": "string", "description": "ISO 3166-1 alpha-2 country code (e.g. 'US', 'DE', 'GB')" } + }, + "required": ["country"], + "additionalProperties": false + }), + }, + rpc_method: "wifi.country.set", + sync_db: false, + needs_session: false, + }, + tool( + "wifi.available.get", + "Scan for and list available WiFi networks in range of the server.", + "wifi.available.get", + ), + // ===================================================================== + // Auth + // ===================================================================== + ToolEntry { + definition: ToolDefinition { + name: "auth.reset-password".into(), + description: "Change the server's master password. THIS IS SENSITIVE: the master password controls all access to the server. Requires the old password for verification (unless first-time setup).".into(), + input_schema: json!({ + "type": "object", + "properties": { + "oldPassword": { "type": "string", "description": "Current password (required unless first-time setup)" }, + "newPassword": { "type": "string", "description": "New password to set" } + }, + "additionalProperties": false + }), + }, + rpc_method: "auth.reset-password", + sync_db: false, + needs_session: false, + }, + tool( + "auth.get-pubkey", + "Get the server's public key (JWK format). This is the server's identity key used for cryptographic verification.", + "auth.get-pubkey", + ), + ToolEntry { + definition: ToolDefinition { + name: "auth.session.kill".into(), + description: "Terminate specific authenticated sessions by their IDs. Use this to revoke access for specific sessions.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "ids": { "type": "array", "items": { "type": "string" }, "description": "Session IDs to terminate" } + }, + "required": ["ids"], + "additionalProperties": false + }), + }, + rpc_method: "auth.session.kill", + sync_db: false, + needs_session: false, + }, + // ===================================================================== + // Database + // ===================================================================== + ToolEntry { + definition: ToolDefinition { + name: "db.dump".into(), + description: "Dump the current database state, optionally filtered to a specific path. Use this to inspect the full system state or a specific section. The pointer uses JSON Pointer syntax (e.g. '/public/serverInfo').".into(), + input_schema: json!({ + "type": "object", + "properties": { + "pointer": { "type": "string", "description": "JSON Pointer path to dump (e.g. '/public/serverInfo'). If omitted, dumps the entire database." } + }, + "additionalProperties": false + }), + }, + rpc_method: "db.dump", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "db.put.ui".into(), + description: "Write a value to a specific path in the UI section of the database. This is for advanced use — incorrect values can break the UI state. The pointer uses JSON Pointer syntax.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "pointer": { "type": "string", "description": "JSON Pointer path to write to (e.g. '/ui/someSetting')" }, + "value": { "description": "Value to write at the specified path" } + }, + "required": ["pointer", "value"], + "additionalProperties": false + }), + }, + rpc_method: "db.put.ui", + sync_db: false, + needs_session: false, + }, + // ===================================================================== + // Disk + // ===================================================================== + tool( + "disk.list", + "List all disks attached to the server, including their size, partitions, and mount status.", + "disk.list", + ), + tool( + "disk.repair", + "Run filesystem repair on the server's data partition. Use this if the server reports filesystem errors. The server may need to restart.", + "disk.repair", + ), + // ===================================================================== + // Server host (network address & binding management for system UI) + // ===================================================================== + tool( + "server.host.address.list", + "List all network addresses configured for the server's system UI. This includes LAN, Tor, and any custom domain addresses.", + "server.host.address.list", + ), + tool( + "server.host.binding.list", + "List all network bindings for the server's system UI. Shows which addresses and ports the UI is accessible on.", + "server.host.binding.list", + ), + // ===================================================================== + // Package host (network address & binding management per package) + // ===================================================================== + ToolEntry { + definition: ToolDefinition { + name: "package.host.list".into(), + description: "List all network host configurations for a specific package.".into(), + input_schema: pkg_id_schema(), + }, + rpc_method: "package.host.list", + sync_db: false, + needs_session: false, + }, + // ===================================================================== + // Logs + // ===================================================================== + ToolEntry { + definition: ToolDefinition { + name: "server.logs".into(), + description: "Fetch recent OS/system log entries from journald. Returns log lines \ + with timestamps. Use 'limit' to control how many entries to return (default: \ + all available). Use 'cursor' to paginate from a specific position. Use 'before' \ + to get entries before the cursor instead of after.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "limit": { "type": "integer", "description": "Maximum number of log entries to return" }, + "cursor": { "type": "string", "description": "Journald cursor for pagination. Returned in previous log responses." }, + "before": { "type": "boolean", "description": "If true and cursor is set, return entries before the cursor. Default: false" }, + "boot": { "type": "string", "description": "Boot identifier to filter logs by (e.g. '0' for current boot, '-1' for previous)" } + }, + "additionalProperties": false + }), + }, + rpc_method: "server.logs", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "server.kernel-logs".into(), + description: "Fetch recent kernel log entries (dmesg equivalent). Returns kernel \ + messages with timestamps. Useful for diagnosing hardware issues, driver \ + problems, or boot errors.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "limit": { "type": "integer", "description": "Maximum number of log entries to return" }, + "cursor": { "type": "string", "description": "Journald cursor for pagination" }, + "before": { "type": "boolean", "description": "If true, return entries before the cursor. Default: false" }, + "boot": { "type": "string", "description": "Boot identifier to filter by" } + }, + "additionalProperties": false + }), + }, + rpc_method: "server.kernel-logs", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "package.logs".into(), + description: "Fetch recent log entries for a specific installed service/package. \ + Returns the service's stdout/stderr output with timestamps.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "id": { "type": "string", "description": "Package identifier to get logs for. Read or subscribe to startos:///public/packageData to see installed packages." }, + "limit": { "type": "integer", "description": "Maximum number of log entries to return" }, + "cursor": { "type": "string", "description": "Journald cursor for pagination" }, + "before": { "type": "boolean", "description": "If true, return entries before the cursor. Default: false" }, + "boot": { "type": "string", "description": "Boot identifier to filter by" } + }, + "required": ["id"], + "additionalProperties": false + }), + }, + rpc_method: "package.logs", + sync_db: false, + needs_session: false, + }, + // ===================================================================== + // Shell execution + // ===================================================================== + ToolEntry { + definition: ToolDefinition { + name: "system.shell".into(), + description: "Execute a shell command on the StartOS server as the start9 user \ + with passwordless sudo. Returns stdout, stderr, and exit code. Use this for \ + diagnostics, inspecting files, checking processes, or any operation not \ + covered by other tools. THIS IS POWERFUL: the command runs with full system \ + access. Always be careful with destructive commands. Commands have a 30-second \ + timeout by default.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "command": { "type": "string", "description": "Shell command to execute (passed to /bin/bash -c)" }, + "timeout": { "type": "integer", "description": "Timeout in seconds. Default: 30. Max: 300." } + }, + "required": ["command"], + "additionalProperties": false + }), + }, + rpc_method: "__shell__", + sync_db: false, + needs_session: false, + }, + ToolEntry { + definition: ToolDefinition { + name: "package.shell".into(), + description: "Execute a command inside a package's subcontainer (where the actual \ + service runs). Returns stdout, stderr, and exit code. Use this to inspect \ + the service filesystem, check running processes, read config files, or run \ + service-specific CLI tools. If the package has exactly one subcontainer, it \ + is selected automatically. If there are multiple, provide 'subcontainer' or \ + 'name' to filter. Commands have a 30-second timeout by default.".into(), + input_schema: json!({ + "type": "object", + "properties": { + "id": { "type": "string", "description": "Package identifier. Read or subscribe to startos:///public/packageData to see installed packages." }, + "command": { "type": "string", "description": "Command to execute inside the subcontainer (passed to /bin/sh -c)" }, + "subcontainer": { "type": "string", "description": "Subcontainer ID filter (partial match). Only needed if the package has multiple subcontainers." }, + "name": { "type": "string", "description": "Subcontainer name filter (partial match). Alternative to 'subcontainer'." }, + "timeout": { "type": "integer", "description": "Timeout in seconds. Default: 30. Max: 300." } + }, + "required": ["id", "command"], + "additionalProperties": false + }), + }, + rpc_method: "__package_shell__", + sync_db: false, + needs_session: false, + }, + // ===================================================================== + // Session management (requires __Auth_session injection) + // ===================================================================== + ToolEntry { + definition: ToolDefinition { + name: "auth.session.list".into(), + description: "List all active authenticated sessions on this server. Returns \ + session IDs, last active timestamps, and which session is the current one. \ + Use this to audit who is connected to the server.".into(), + input_schema: empty_schema(), + }, + rpc_method: "auth.session.list", + sync_db: false, + needs_session: true, + }, + ]; + tools + .into_iter() + .map(|t| (t.definition.name.clone(), t)) + .collect() +} diff --git a/core/src/net/static_server.rs b/core/src/net/static_server.rs index 0d1fb458c..2223bf814 100644 --- a/core/src/net/static_server.rs +++ b/core/src/net/static_server.rs @@ -100,6 +100,17 @@ impl UiContext for RpcContext { }) }) .nest("/s9pk", s9pk_router(self.clone())) + .route("/mcp", crate::mcp::mcp_router(self.clone())) + .route( + "/.well-known/mcp", + get(|| async { + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/json") + .body(Body::from(r#"{"mcp_endpoint":"/mcp"}"#)) + .unwrap() + }), + ) .route( "/static/local-root-ca.crt", get(move || { diff --git a/core/src/service/mod.rs b/core/src/service/mod.rs index d904e77c9..aa1b396a4 100644 --- a/core/src/service/mod.rs +++ b/core/src/service/mod.rs @@ -94,7 +94,17 @@ pub async fn get_data_version(id: &PackageId) -> Result, Error> { maybe_read_file_to_string(&path).await } -struct RootCommand(pub String); +pub(crate) struct RootCommand(pub String); + +/// Resolved subcontainer info, ready for command construction. +pub(crate) struct ResolvedSubcontainer { + pub container_id: ContainerId, + pub subcontainer_id: Guid, + pub image_id: ImageId, + pub user: InternedString, + pub workdir: Option, + pub root_command: RootCommand, +} #[derive(Clone, Debug, Serialize, Deserialize, Default, TS)] pub struct MiB(pub u64); @@ -706,6 +716,158 @@ impl Service { .clone(); Ok(container_id) } + + /// Resolve a subcontainer by optional filters (guid, name, or imageId). + /// If no filter is provided and there is exactly one subcontainer, it is returned. + /// Errors if no match found or multiple matches found (with the list in error info). + pub(crate) async fn resolve_subcontainer( + &self, + subcontainer: Option, + name: Option, + image_id: Option, + user: Option, + ) -> Result { + let id = &self.seed.id; + let container = &self.seed.persistent_container; + let root_dir = container + .lxc_container + .get() + .map(|x| x.rootfs_dir().to_owned()) + .or_not_found(format!("container for {id}"))?; + + let subcontainer_upper = subcontainer.as_ref().map(|x| AsRef::::as_ref(x).to_uppercase()); + let name_upper = name.as_ref().map(|x| AsRef::::as_ref(x).to_uppercase()); + let image_id_upper = image_id.as_ref().map(|x| AsRef::::as_ref(x).to_string_lossy().to_uppercase()); + + let subcontainers = container.subcontainers.lock().await; + let matches: Vec<_> = subcontainers + .iter() + .filter(|(x, wrapper)| { + if let Some(sc) = subcontainer_upper.as_ref() { + AsRef::::as_ref(x).contains(sc.as_str()) + } else if let Some(n) = name_upper.as_ref() { + AsRef::::as_ref(&wrapper.name) + .to_uppercase() + .contains(n.as_str()) + } else if let Some(img) = image_id_upper.as_ref() { + let Some(wrapper_image_id) = AsRef::::as_ref(&wrapper.image_id).to_str() + else { + return false; + }; + wrapper_image_id.to_uppercase().contains(img.as_str()) + } else { + true + } + }) + .collect(); + + let Some((subcontainer_id, matched_image_id)) = matches + .first() + .map::<(Guid, ImageId), _>(|&x| (x.0.clone(), x.1.image_id.clone())) + else { + drop(subcontainers); + let info = container + .subcontainers + .lock() + .await + .iter() + .map(|(g, s)| SubcontainerInfo { + id: g.clone(), + name: s.name.clone(), + image_id: s.image_id.clone(), + }) + .collect::>(); + return Err(Error::new( + eyre!("{}", t!("service.mod.no-matching-subcontainers", id = id)), + ErrorKind::NotFound, + ) + .with_info(to_value(&info)?)); + }; + + if matches.len() > 1 { + let info = matches + .into_iter() + .map(|(g, s)| SubcontainerInfo { + id: g.clone(), + name: s.name.clone(), + image_id: s.image_id.clone(), + }) + .collect::>(); + return Err(Error::new( + eyre!("{}", t!("service.mod.multiple-subcontainers-found", id = id,)), + ErrorKind::InvalidRequest, + ) + .with_info(to_value(&info)?)); + } + + let passwd = root_dir + .join("media/startos/subcontainers") + .join(subcontainer_id.as_ref()) + .join("etc") + .join("passwd"); + + let image_meta = serde_json::from_str::( + &tokio::fs::read_to_string( + root_dir + .join("media/startos/images/") + .join(&matched_image_id) + .with_extension("json"), + ) + .await?, + ) + .with_kind(ErrorKind::Deserialization)?; + + let resolved_user = user + .or_else(|| image_meta["user"].as_str().map(InternedString::intern)) + .unwrap_or_else(|| InternedString::intern("root")); + + let root_command = get_passwd_command(passwd, &*resolved_user).await; + let workdir = image_meta["workdir"].as_str().map(|s| s.to_owned()); + + Ok(ResolvedSubcontainer { + container_id: self.container_id()?, + subcontainer_id, + image_id: matched_image_id, + user: resolved_user, + workdir, + root_command, + }) + } + + /// Build a `Command` for executing inside a resolved subcontainer (non-interactive). + pub(crate) fn build_subcontainer_command( + resolved: &ResolvedSubcontainer, + command: &[&str], + ) -> Command { + let root_path = + Path::new("/media/startos/subcontainers").join(resolved.subcontainer_id.as_ref()); + let mut cmd = Command::new("lxc-attach"); + cmd.kill_on_drop(true); + cmd.arg(&*resolved.container_id) + .arg("--") + .arg("start-container") + .arg("subcontainer") + .arg("exec") + .arg("--env-file") + .arg( + Path::new("/media/startos/images") + .join(&resolved.image_id) + .with_extension("env"), + ) + .arg("--user") + .arg(&*resolved.user); + if let Some(ref workdir) = resolved.workdir { + cmd.arg("--workdir").arg(workdir); + } + cmd.arg(&root_path).arg("--"); + if command.is_empty() { + cmd.arg(&resolved.root_command.0); + } else { + cmd.args(command); + } + cmd + } + #[instrument(skip_all)] pub async fn stats(&self) -> Result { let container = &self.seed.persistent_container; @@ -799,124 +961,26 @@ pub async fn attach( user, }: AttachParams, ) -> Result { - let (container_id, subcontainer_id, image_id, user, workdir, root_command) = { - let id = &id; - - let service = ctx.services.get(id).await; - - let service_ref = service.as_ref().or_not_found(id)?; - - let container = &service_ref.seed.persistent_container; - let root_dir = container - .lxc_container - .get() - .map(|x| x.rootfs_dir().to_owned()) - .or_not_found(format!("container for {id}"))?; - - let subcontainer = subcontainer.map(|x| AsRef::::as_ref(&x).to_uppercase()); - let name = name.map(|x| AsRef::::as_ref(&x).to_uppercase()); - let image_id = image_id.map(|x| AsRef::::as_ref(&x).to_string_lossy().to_uppercase()); - - let subcontainers = container.subcontainers.lock().await; - let subcontainer_ids: Vec<_> = subcontainers - .iter() - .filter(|(x, wrapper)| { - if let Some(subcontainer) = subcontainer.as_ref() { - AsRef::::as_ref(x).contains(AsRef::::as_ref(subcontainer)) - } else if let Some(name) = name.as_ref() { - AsRef::::as_ref(&wrapper.name) - .to_uppercase() - .contains(AsRef::::as_ref(name)) - } else if let Some(image_id) = image_id.as_ref() { - let Some(wrapper_image_id) = AsRef::::as_ref(&wrapper.image_id).to_str() - else { - return false; - }; - wrapper_image_id - .to_uppercase() - .contains(AsRef::::as_ref(&image_id)) - } else { - true - } - }) - .collect(); - let Some((subcontainer_id, image_id)) = subcontainer_ids - .first() - .map::<(Guid, ImageId), _>(|&x| (x.0.clone(), x.1.image_id.clone())) - else { - drop(subcontainers); - let subcontainers = container - .subcontainers - .lock() - .await - .iter() - .map(|(g, s)| SubcontainerInfo { - id: g.clone(), - name: s.name.clone(), - image_id: s.image_id.clone(), - }) - .collect::>(); - return Err(Error::new( - eyre!("{}", t!("service.mod.no-matching-subcontainers", id = id)), - ErrorKind::NotFound, + let resolved = { + let service = ctx.services.get(&id).await; + let service_ref = service.as_ref().or_not_found(&id)?; + service_ref + .resolve_subcontainer( + subcontainer.map(|g| InternedString::intern(g.as_ref())), + name, + image_id, + user, ) - .with_info(to_value(&subcontainers)?)); - }; - - let passwd = root_dir - .join("media/startos/subcontainers") - .join(subcontainer_id.as_ref()) - .join("etc") - .join("passwd"); - - let image_meta = serde_json::from_str::( - &tokio::fs::read_to_string( - root_dir - .join("media/startos/images/") - .join(&image_id) - .with_extension("json"), - ) - .await?, - ) - .with_kind(ErrorKind::Deserialization)?; - - let user = user - .clone() - .or_else(|| image_meta["user"].as_str().map(InternedString::intern)) - .unwrap_or_else(|| InternedString::intern("root")); - - let root_command = get_passwd_command(passwd, &*user).await; - - let workdir = image_meta["workdir"].as_str().map(|s| s.to_owned()); - - if subcontainer_ids.len() > 1 { - let subcontainers = subcontainer_ids - .into_iter() - .map(|(g, s)| SubcontainerInfo { - id: g.clone(), - name: s.name.clone(), - image_id: s.image_id.clone(), - }) - .collect::>(); - return Err(Error::new( - eyre!( - "{}", - t!("service.mod.multiple-subcontainers-found", id = id,) - ), - ErrorKind::InvalidRequest, - ) - .with_info(to_value(&subcontainers)?)); - } - - ( - service_ref.container_id()?, - subcontainer_id, - image_id, - user.into(), - workdir, - root_command, - ) + .await? }; + let ResolvedSubcontainer { + container_id, + subcontainer_id, + image_id, + user, + workdir, + root_command, + } = resolved; let guid = Guid::new(); async fn handler(