This commit is contained in:
Matt Hill
2026-03-15 23:35:32 -06:00
parent ebb7916ecd
commit 2b70b67824
11 changed files with 3364 additions and 131 deletions

View File

@@ -8,7 +8,7 @@ StartOS is an open-source Linux distribution for running personal servers. It ma
- Frontend: Angular 21 + TypeScript + Taiga UI 5 - Frontend: Angular 21 + TypeScript + Taiga UI 5
- Container runtime: Node.js/TypeScript with LXC - Container runtime: Node.js/TypeScript with LXC
- Database/State: Patch-DB (git submodule) - storage layer with reactive frontend sync - Database/State: Patch-DB (git submodule) - storage layer with reactive frontend sync
- API: JSON-RPC via rpc-toolkit (see `core/rpc-toolkit.md`) - API: JSON-RPC via rpc-toolkit (see `core/rpc-toolkit.md`), MCP for LLM agents (see `core/mcp/ARCHITECTURE.md`)
- Auth: Password + session cookie, public/private key signatures, local authcookie (see `core/src/middleware/auth/`) - Auth: Password + session cookie, public/private key signatures, local authcookie (see `core/src/middleware/auth/`)
## Project Structure ## Project Structure
@@ -28,7 +28,7 @@ StartOS is an open-source Linux distribution for running personal servers. It ma
## Components ## Components
- **`core/`** — Rust backend daemon. Produces a single binary `startbox` that is symlinked as `startd` (main daemon), `start-cli` (CLI), `start-container` (runs inside LXC containers), `registrybox` (package registry), and `tunnelbox` (VPN/tunnel). Handles all backend logic: RPC API, service lifecycle, networking (DNS, ACME, WiFi, Tor, WireGuard), backups, and database state management. See [core/ARCHITECTURE.md](core/ARCHITECTURE.md). - **`core/`** — Rust backend daemon. Produces a single binary `startbox` that is symlinked as `startd` (main daemon), `start-cli` (CLI), `start-container` (runs inside LXC containers), `registrybox` (package registry), and `tunnelbox` (VPN/tunnel). Handles all backend logic: RPC API, MCP server for LLM agents, service lifecycle, networking (DNS, ACME, WiFi, Tor, WireGuard), backups, and database state management. See [core/ARCHITECTURE.md](core/ARCHITECTURE.md).
- **`web/`** — Angular 21 + TypeScript workspace using Taiga UI 5. Contains three applications (admin UI, setup wizard, VPN management) and two shared libraries (common components/services, marketplace). Communicates with the backend exclusively via JSON-RPC. See [web/ARCHITECTURE.md](web/ARCHITECTURE.md). - **`web/`** — Angular 21 + TypeScript workspace using Taiga UI 5. Contains three applications (admin UI, setup wizard, VPN management) and two shared libraries (common components/services, marketplace). Communicates with the backend exclusively via JSON-RPC. See [web/ARCHITECTURE.md](web/ARCHITECTURE.md).
@@ -54,7 +54,7 @@ Rust (core/)
Key make targets along this chain: Key make targets along this chain:
| Step | Command | What it does | | Step | Command | What it does |
|---|---|---| | ---- | --------------------------------------- | --------------------------------- |
| 1 | `cargo check -p start-os` | Verify Rust compiles | | 1 | `cargo check -p start-os` | Verify Rust compiles |
| 2 | `make ts-bindings` | Export ts-rs types → rsync to SDK | | 2 | `make ts-bindings` | Export ts-rs types → rsync to SDK |
| 3 | `cd sdk && make baseDist dist` | Build SDK packages | | 3 | `cd sdk && make baseDist dist` | Build SDK packages |
@@ -90,6 +90,17 @@ StartOS uses Patch-DB for reactive state synchronization:
This means the UI is always eventually consistent with the backend — after any mutating API call, the frontend waits for the corresponding PatchDB diff before resolving, so the UI reflects the result immediately. This means the UI is always eventually consistent with the backend — after any mutating API call, the frontend waits for the corresponding PatchDB diff before resolving, so the UI reflects the result immediately.
## MCP Server (LLM Agent Interface)
StartOS includes an [MCP](https://modelcontextprotocol.io/) (Model Context Protocol) server at `/mcp`, enabling LLM agents to discover and invoke the same operations available through the UI and CLI. The MCP server runs inside the StartOS server process alongside the RPC API.
- **Tools**: Every RPC method is exposed as an MCP tool with LLM-optimized descriptions and JSON Schema inputs. Agents call `tools/list` to discover what's available and `tools/call` to invoke operations.
- **Resources**: System state is exposed via MCP resources backed by Patch-DB. Agents subscribe to `startos:///public` and receive debounced revision diffs over SSE, maintaining a local state cache without polling.
- **Auth**: Same session cookie auth as the UI — no separate credentials.
- **Transport**: MCP Streamable HTTP — POST for requests, GET for SSE notification stream, DELETE for session teardown.
See [core/ARCHITECTURE.md](core/ARCHITECTURE.md#mcp-server) for implementation details.
## Further Reading ## Further Reading
- [core/ARCHITECTURE.md](core/ARCHITECTURE.md) — Rust backend architecture - [core/ARCHITECTURE.md](core/ARCHITECTURE.md) — Rust backend architecture

View File

@@ -23,6 +23,7 @@ The crate produces a single binary `startbox` that is symlinked under different
- `src/context/` — Context types (RpcContext, CliContext, InitContext, DiagnosticContext) - `src/context/` — Context types (RpcContext, CliContext, InitContext, DiagnosticContext)
- `src/service/` — Service lifecycle management with actor pattern (`service_actor.rs`) - `src/service/` — Service lifecycle management with actor pattern (`service_actor.rs`)
- `src/db/model/` — Patch-DB models (`public.rs` synced to frontend, `private.rs` backend-only) - `src/db/model/` — Patch-DB models (`public.rs` synced to frontend, `private.rs` backend-only)
- `src/mcp/` — MCP server for LLM agents (see [MCP Server](#mcp-server) below)
- `src/net/` — Networking (DNS, ACME, WiFi, Tor via Arti, WireGuard) - `src/net/` — Networking (DNS, ACME, WiFi, Tor via Arti, WireGuard)
- `src/s9pk/` — S9PK package format (merkle archive) - `src/s9pk/` — S9PK package format (merkle archive)
- `src/registry/` — Package registry management - `src/registry/` — Package registry management
@@ -38,16 +39,19 @@ See [rpc-toolkit.md](rpc-toolkit.md) for full handler patterns and configuration
Patch-DB provides diff-based state synchronization. Changes to `db/model/public.rs` automatically sync to the frontend. Patch-DB provides diff-based state synchronization. Changes to `db/model/public.rs` automatically sync to the frontend.
**Key patterns:** **Key patterns:**
- `db.peek().await` — Get a read-only snapshot of the database state - `db.peek().await` — Get a read-only snapshot of the database state
- `db.mutate(|db| { ... }).await` — Apply mutations atomically, returns `MutateResult` - `db.mutate(|db| { ... }).await` — Apply mutations atomically, returns `MutateResult`
- `#[derive(HasModel)]` — Derive macro for types stored in the database, generates typed accessors - `#[derive(HasModel)]` — Derive macro for types stored in the database, generates typed accessors
**Generated accessor types** (from `HasModel` derive): **Generated accessor types** (from `HasModel` derive):
- `as_field()` — Immutable reference: `&Model<T>` - `as_field()` — Immutable reference: `&Model<T>`
- `as_field_mut()` — Mutable reference: `&mut Model<T>` - `as_field_mut()` — Mutable reference: `&mut Model<T>`
- `into_field()` — Owned value: `Model<T>` - `into_field()` — Owned value: `Model<T>`
**`Model<T>` APIs** (from `db/prelude.rs`): **`Model<T>` APIs** (from `db/prelude.rs`):
- `.de()` — Deserialize to `T` - `.de()` — Deserialize to `T`
- `.ser(&value)` — Serialize from `T` - `.ser(&value)` — Serialize from `T`
- `.mutate(|v| ...)` — Deserialize, mutate, reserialize - `.mutate(|v| ...)` — Deserialize, mutate, reserialize
@@ -63,6 +67,12 @@ See [i18n-patterns.md](i18n-patterns.md) for internationalization key convention
See [core-rust-patterns.md](core-rust-patterns.md) for common utilities (Invoke trait, Guard pattern, mount guards, Apply trait, etc.). See [core-rust-patterns.md](core-rust-patterns.md) for common utilities (Invoke trait, Guard pattern, mount guards, Apply trait, etc.).
## MCP Server
The MCP (Model Context Protocol) server at `src/mcp/` exposes the StartOS RPC API to LLM agents via the Streamable HTTP transport at `/mcp`. Tools wrap the existing RPC handlers; resources expose Patch-DB state with debounced SSE subscriptions; auth reuses the UI session cookie.
See [src/mcp/ARCHITECTURE.md](src/mcp/ARCHITECTURE.md) for transport details, session lifecycle, tool dispatch, resource subscriptions, CORS, and body size limits.
## Related Documentation ## Related Documentation
- [rpc-toolkit.md](rpc-toolkit.md) — JSON-RPC handler patterns - [rpc-toolkit.md](rpc-toolkit.md) — JSON-RPC handler patterns

View File

@@ -163,13 +163,13 @@ pub struct SubscribeRes {
pub guid: Guid, pub guid: Guid,
} }
struct DbSubscriber { pub(crate) struct DbSubscriber {
rev: u64, pub(crate) rev: u64,
sub: UnboundedReceiver<Revision>, pub(crate) sub: UnboundedReceiver<Revision>,
sync_db: watch::Receiver<u64>, pub(crate) sync_db: watch::Receiver<u64>,
} }
impl DbSubscriber { impl DbSubscriber {
async fn recv(&mut self) -> Option<Revision> { pub(crate) async fn recv(&mut self) -> Option<Revision> {
loop { loop {
tokio::select! { tokio::select! {
rev = self.sub.recv() => { rev = self.sub.recv() => {

View File

@@ -63,6 +63,7 @@ pub mod init;
pub mod install; pub mod install;
pub mod logs; pub mod logs;
pub mod lxc; pub mod lxc;
pub mod mcp;
pub mod middleware; pub mod middleware;
pub mod net; pub mod net;
pub mod notifications; pub mod notifications;

View File

@@ -0,0 +1,96 @@
# MCP Server Architecture
The Model Context Protocol server embedded in StartOS (`core/src/mcp/`).
## Transport: Streamable HTTP (MCP 2025-03-26)
The server implements the **Streamable HTTP** transport from the MCP spec, not the older stdio or SSE-only transports. A single route (`/mcp`) handles all three HTTP methods:
| Method | Purpose |
| ----------- | -------------------------------------------------------------------------------- |
| **POST** | JSON-RPC 2.0 requests from client (initialize, tools/call, resources/read, etc.) |
| **GET** | Opens an SSE stream for server→client notifications (resource change events) |
| **DELETE** | Explicitly ends a session |
| **OPTIONS** | CORS preflight |
A discovery endpoint at `/.well-known/mcp` returns `{"mcp_endpoint":"/mcp"}`.
## Authentication
Every HTTP method (POST, GET, DELETE) validates the caller's session cookie via `ValidSessionToken::from_header` before processing. This reuses the same auth infrastructure as the main StartOS web UI — MCP clients must present a valid session cookie obtained through the normal login flow. Unauthenticated requests get a 401.
## Session Lifecycle
1. **Create**: Client sends `initialize` via POST. Server generates a UUID session ID, creates an `McpSession` with a bounded mpsc channel (256 messages), and returns the ID in the `Mcp-Session-Id` response header.
2. **Connect SSE**: Client opens a GET with the session ID header. The server takes the receiver half of the notification channel (`take_notification_rx`) and streams it as SSE events. Only one GET connection per session is allowed (the rx is moved, not cloned).
3. **Use**: Client sends tool calls, resource reads, subscriptions via POST. All POST requests must include a valid session ID header — the server validates it against the session map before processing.
4. **Teardown**: Three paths:
- Client sends DELETE -> session is removed, subscription tasks are aborted.
- SSE stream disconnects -> `CleanupStream`'s `PinnedDrop` impl removes the session.
- Session is never connected -> background sweep task (every 30s) removes sessions older than 60s that never had a GET stream attached.
## Module Structure
```
core/src/mcp/
├── mod.rs — HTTP handlers, routing, MCP method dispatch, shell execution, CORS
├── protocol.rs — JSON-RPC 2.0 types, MCP request/response structs, error codes
├── session.rs — Session map, create/remove/sweep, resource subscriptions with debounce
└── tools.rs — Tool registry (67 tools), HashMap<String, ToolEntry> mapping names → RPC methods + schemas
```
## Tool Dispatch
`tool_registry()` returns a `HashMap<String, ToolEntry>`, each mapping:
- An MCP tool name (e.g. `"package.start"`)
- A JSON Schema for input validation (sent to clients via `tools/list`)
- A backing RPC method name (usually identical to the tool name)
- Flags: `sync_db` (whether to flush DB sequence after success), `needs_session` (whether to inject `__Auth_session`)
When `tools/call` arrives:
1. Look up the tool by name via HashMap O(1) lookup.
2. Convert arguments from `serde_json::Value` to `imbl_value::Value`.
3. **Special-case**: If `rpc_method` is `"__shell__"` or `"__package_shell__"`, dispatch to `handle_shell_exec` / `handle_package_shell_exec` directly (no RPC handler). Both set `kill_on_drop(true)` to ensure timed-out processes are terminated.
4. Otherwise, optionally inject `__Auth_session` into params, then call `server.handle_command(rpc_method, params)`.
5. On success: if `sync_db` is true, flush the DB sequence. Return the result pretty-printed as a text content block.
6. On error: return the error as a text content block with `is_error: true`, using `McpResponse::ok` (MCP spec: tool errors are results, not JSON-RPC errors).
## Shell Execution
Two shell tools bypass the RPC layer entirely:
- **`system.shell`** (`__shell__`): Runs `/bin/bash -c <command>` on the host with `kill_on_drop(true)`. 30s default timeout, 300s max.
- **`package.shell`** (`__package_shell__`): Resolves the target package's subcontainer via `Service::resolve_subcontainer`, then runs `/bin/sh -c <command>` inside it via `lxc-attach` (also `kill_on_drop(true)`). Same timeout behavior.
## Resource Subscriptions
Four resources are exposed:
- `startos:///public` — full public DB tree
- `startos:///public/serverInfo` — server metadata
- `startos:///public/packageData` — installed packages
- `startos:///mcp/system-prompt` — curated AI assistant context (text/plain)
Resource URIs are validated to only allow `/public/**` subtrees and the special `/mcp/system-prompt` path. Attempts to access non-public paths (e.g. `startos:///private/...`) are rejected.
`resources/read` parses the URI into a `JsonPointer`, calls `ctx.db.dump(&pointer)`, and returns the JSON. The system prompt resource is handled as a special case, returning server info and version.
`resources/subscribe` creates a `DbSubscriber` that watches the patch-db for changes at the given pointer. Changes are **debounced** (500ms window): the subscriber collects multiple revisions and merges their `DiffPatch`es before sending a single `notifications/resources/updated` notification over the SSE channel. The subscription task runs as a spawned tokio task; its `JoinHandle` is stored in the session so it can be aborted on unsubscribe or session teardown. Re-subscribing to the same URI aborts the prior subscription first.
## CORS
- Preflight (OPTIONS): reflects the request's `Origin`, `Access-Control-Request-Method`, and `Access-Control-Request-Headers` back. Sets `Allow-Credentials: true` and caches for 24h.
- Normal responses (`apply_cors`): reflects the request's `Origin` header when present, falls back to `*` when absent. Exposes the `Mcp-Session-Id` header. This matches the behavior of the rpc-toolkit `Cors` middleware used by the main UI.
- CORS headers are applied to all response types: POST JSON-RPC, GET SSE, DELETE, and error responses.
## Body Size Limits
POST request bodies are limited to 1 MiB:
1. `Content-Length` header is checked **before** reading the body (rejects oversized requests immediately).
2. After reading, the actual body size is re-checked as defense-in-depth for chunked transfers that lack `Content-Length`.

1212
core/src/mcp/mod.rs Normal file

File diff suppressed because it is too large Load Diff

211
core/src/mcp/protocol.rs Normal file
View File

@@ -0,0 +1,211 @@
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
// JSON-RPC 2.0 error codes
pub const PARSE_ERROR: i32 = -32700;
pub const INVALID_REQUEST: i32 = -32600;
pub const METHOD_NOT_FOUND: i32 = -32601;
pub const INVALID_PARAMS: i32 = -32602;
pub const INTERNAL_ERROR: i32 = -32603;
pub const PROTOCOL_VERSION: &str = "2025-03-26";
// === JSON-RPC 2.0 envelope ===
#[derive(Deserialize)]
pub struct McpRequest {
pub jsonrpc: String,
pub id: Option<JsonValue>,
pub method: String,
#[serde(default)]
pub params: Option<JsonValue>,
}
#[derive(Serialize)]
pub struct McpResponse {
pub jsonrpc: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<JsonValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<JsonValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<McpError>,
}
impl McpResponse {
pub fn ok(id: Option<JsonValue>, result: JsonValue) -> Self {
Self {
jsonrpc: "2.0",
id,
result: Some(result),
error: None,
}
}
pub fn error(id: Option<JsonValue>, code: i32, message: String, data: Option<JsonValue>) -> Self {
Self {
jsonrpc: "2.0",
id,
result: None,
error: Some(McpError {
code,
message,
data,
}),
}
}
}
#[derive(Serialize)]
pub struct McpError {
pub code: i32,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<JsonValue>,
}
// === initialize ===
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InitializeParams {
pub protocol_version: String,
#[serde(default)]
pub capabilities: JsonValue,
#[serde(default)]
pub client_info: Option<ClientInfo>,
}
#[derive(Deserialize)]
pub struct ClientInfo {
pub name: String,
#[serde(default)]
pub version: Option<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct InitializeResult {
pub protocol_version: &'static str,
pub capabilities: ServerCapabilities,
pub server_info: ServerInfo,
}
#[derive(Serialize)]
pub struct ServerCapabilities {
pub tools: ToolsCapability,
pub resources: ResourcesCapability,
}
#[derive(Serialize)]
pub struct ToolsCapability {}
#[derive(Serialize)]
pub struct ResourcesCapability {
pub subscribe: bool,
}
#[derive(Serialize)]
pub struct ServerInfo {
pub name: &'static str,
pub version: String,
}
// === tools/list ===
#[derive(Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ToolDefinition {
pub name: String,
pub description: String,
pub input_schema: JsonValue,
}
#[derive(Serialize)]
pub struct ToolsListResult {
pub tools: Vec<ToolDefinition>,
}
// === tools/call ===
#[derive(Deserialize)]
pub struct ToolsCallParams {
pub name: String,
#[serde(default)]
pub arguments: JsonValue,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ToolsCallResult {
pub content: Vec<ContentBlock>,
#[serde(skip_serializing_if = "Option::is_none")]
pub is_error: Option<bool>,
}
#[derive(Serialize)]
#[serde(tag = "type")]
pub enum ContentBlock {
#[serde(rename = "text")]
Text { text: String },
}
// === resources/list ===
#[derive(Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ResourceDefinition {
pub uri: String,
pub name: String,
pub description: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub mime_type: Option<String>,
}
#[derive(Serialize)]
pub struct ResourcesListResult {
pub resources: Vec<ResourceDefinition>,
}
// === resources/read ===
#[derive(Deserialize)]
pub struct ResourcesReadParams {
pub uri: String,
}
#[derive(Serialize)]
pub struct ResourcesReadResult {
pub contents: Vec<ResourceContent>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ResourceContent {
pub uri: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub mime_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub text: Option<String>,
}
// === resources/subscribe + unsubscribe ===
#[derive(Deserialize)]
pub struct ResourcesSubscribeParams {
pub uri: String,
}
#[derive(Deserialize)]
pub struct ResourcesUnsubscribeParams {
pub uri: String,
}
// === Server→client notification ===
#[derive(Serialize)]
pub struct McpNotification {
pub jsonrpc: &'static str,
pub method: &'static str,
pub params: serde_json::Value,
}

232
core/src/mcp/session.rs Normal file
View File

@@ -0,0 +1,232 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use patch_db::json_ptr::JsonPointer;
use patch_db::DiffPatch;
use serde_json::Value as JsonValue;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use uuid::Uuid;
use crate::context::RpcContext;
use crate::db::DbSubscriber;
use crate::prelude::*;
use crate::util::sync::SyncMutex;
use super::protocol::McpNotification;
pub(crate) type SessionMap = Arc<SyncMutex<HashMap<String, McpSession>>>;
/// Maximum time a session can exist without a GET stream before being cleaned up.
const SESSION_STALE_TIMEOUT: Duration = Duration::from_secs(60);
/// Maximum buffered notifications before backpressure kicks in.
const NOTIFICATION_CHANNEL_BOUND: usize = 256;
pub(crate) struct McpSession {
pub notification_tx: mpsc::Sender<JsonValue>,
pub notification_rx: Option<mpsc::Receiver<JsonValue>>,
pub subscriptions: HashMap<String, JoinHandle<()>>,
pub created_at: Instant,
}
pub(crate) fn create_session(sessions: &SessionMap) -> String {
let id = Uuid::new_v4().to_string();
let (tx, rx) = mpsc::channel(NOTIFICATION_CHANNEL_BOUND);
sessions.mutate(|map| {
map.insert(
id.clone(),
McpSession {
notification_tx: tx,
notification_rx: Some(rx),
subscriptions: HashMap::new(),
created_at: Instant::now(),
},
);
});
id
}
/// Sweep stale sessions. Call this from any frequent code path (POST handler, create_session).
pub(crate) fn sweep_stale_sessions_if_needed(sessions: &SessionMap) {
sessions.mutate(|map| sweep_stale_sessions(map));
}
/// Remove sessions that were created but never connected a GET stream within the timeout.
fn sweep_stale_sessions(map: &mut HashMap<String, McpSession>) {
let stale: Vec<String> = map
.iter()
.filter(|(_, session)| {
// Session is stale if rx is still present (no GET connected) and it's old
session.notification_rx.is_some()
&& session.created_at.elapsed() > SESSION_STALE_TIMEOUT
})
.map(|(id, _)| id.clone())
.collect();
for id in stale {
tracing::info!(
target: "mcp_audit",
session_id = %id,
"Sweeping stale MCP session (no GET stream connected)"
);
if let Some(session) = map.remove(&id) {
for (_, handle) in session.subscriptions {
handle.abort();
}
}
}
}
pub(crate) fn remove_session(sessions: &SessionMap, id: &str) {
sessions.mutate(|map| {
if let Some(session) = map.remove(id) {
for (_, handle) in session.subscriptions {
handle.abort();
}
}
});
}
/// Take the notification receiver from a session (for use by the SSE stream).
/// Returns None if the session doesn't exist or the rx was already taken.
pub(crate) fn take_notification_rx(
sessions: &SessionMap,
id: &str,
) -> Option<mpsc::Receiver<JsonValue>> {
sessions.mutate(|map| map.get_mut(id)?.notification_rx.take())
}
/// Check whether the given session ID exists in the session map.
pub(crate) fn session_exists(sessions: &SessionMap, id: &str) -> bool {
sessions.peek(|map| map.contains_key(id))
}
/// Parse a `startos:///...` URI into a JsonPointer.
fn parse_resource_uri(uri: &str) -> Result<JsonPointer, Error> {
let path = uri.strip_prefix("startos://").ok_or_else(|| {
Error::new(
eyre!("Invalid resource URI: must start with startos://"),
ErrorKind::InvalidRequest,
)
})?;
path.parse::<JsonPointer>()
.with_kind(ErrorKind::InvalidRequest)
}
pub(crate) async fn subscribe_resource(
ctx: &RpcContext,
sessions: &SessionMap,
session_id: &str,
uri: &str,
) -> Result<(), Error> {
let pointer = parse_resource_uri(uri)?;
let (dump, sub) = ctx.db.dump_and_sub(pointer).await;
let mut db_sub = DbSubscriber {
rev: dump.id,
sub,
sync_db: ctx.sync_db.subscribe(),
};
let tx = sessions
.peek(|map| map.get(session_id).map(|s| s.notification_tx.clone()))
.ok_or_else(|| Error::new(eyre!("Session not found"), ErrorKind::InvalidRequest))?;
let uri_owned = uri.to_string();
let handle = tokio::spawn(async move {
loop {
// Wait for first revision
let first = match db_sub.recv().await {
Some(rev) => rev,
None => break,
};
// Debounce: collect more revisions for up to 500ms
let mut merged_id = first.id;
let mut merged_patch = first.patch;
let debounce = tokio::time::sleep(Duration::from_millis(500));
tokio::pin!(debounce);
loop {
tokio::select! {
_ = &mut debounce => break,
rev = db_sub.recv() => {
match rev {
Some(rev) => {
merged_id = rev.id;
merged_patch.append(rev.patch);
}
None => {
// Subscriber closed — send what we have and exit
let _ = send_notification(&tx, &uri_owned, merged_id, &merged_patch);
return;
}
}
}
}
}
if send_notification(&tx, &uri_owned, merged_id, &merged_patch).is_err() {
break; // SSE stream closed or channel full
}
}
});
// Store the task handle, aborting any prior subscription for the same URI
sessions.mutate(|map| {
if let Some(session) = map.get_mut(session_id) {
if let Some(old_handle) = session.subscriptions.remove(uri) {
tracing::info!(
target: "mcp_audit",
uri = %uri,
session_id = %session_id,
"Aborting prior subscription for re-subscribed URI"
);
old_handle.abort();
}
session.subscriptions.insert(uri.to_string(), handle);
}
});
Ok(())
}
fn send_notification(
tx: &mpsc::Sender<JsonValue>,
uri: &str,
id: u64,
patch: &DiffPatch,
) -> Result<(), ()> {
let notification = McpNotification {
jsonrpc: "2.0",
method: "notifications/resources/updated",
params: serde_json::json!({
"uri": uri,
"revision": {
"id": id,
"patch": patch,
}
}),
};
tx.try_send(serde_json::to_value(&notification).unwrap_or_default())
.map_err(|e| {
tracing::warn!(
target: "mcp_audit",
uri = %uri,
"Notification channel full or closed, dropping notification: {e}"
);
})
}
pub(crate) fn unsubscribe_resource(sessions: &SessionMap, session_id: &str, uri: &str) {
sessions.mutate(|map| {
if let Some(session) = map.get_mut(session_id) {
if let Some(handle) = session.subscriptions.remove(uri) {
handle.abort();
}
}
});
}

1385
core/src/mcp/tools.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -100,6 +100,17 @@ impl UiContext for RpcContext {
}) })
}) })
.nest("/s9pk", s9pk_router(self.clone())) .nest("/s9pk", s9pk_router(self.clone()))
.route("/mcp", crate::mcp::mcp_router(self.clone()))
.route(
"/.well-known/mcp",
get(|| async {
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/json")
.body(Body::from(r#"{"mcp_endpoint":"/mcp"}"#))
.unwrap()
}),
)
.route( .route(
"/static/local-root-ca.crt", "/static/local-root-ca.crt",
get(move || { get(move || {

View File

@@ -94,7 +94,17 @@ pub async fn get_data_version(id: &PackageId) -> Result<Option<String>, Error> {
maybe_read_file_to_string(&path).await maybe_read_file_to_string(&path).await
} }
struct RootCommand(pub String); pub(crate) struct RootCommand(pub String);
/// Resolved subcontainer info, ready for command construction.
pub(crate) struct ResolvedSubcontainer {
pub container_id: ContainerId,
pub subcontainer_id: Guid,
pub image_id: ImageId,
pub user: InternedString,
pub workdir: Option<String>,
pub root_command: RootCommand,
}
#[derive(Clone, Debug, Serialize, Deserialize, Default, TS)] #[derive(Clone, Debug, Serialize, Deserialize, Default, TS)]
pub struct MiB(pub u64); pub struct MiB(pub u64);
@@ -706,6 +716,158 @@ impl Service {
.clone(); .clone();
Ok(container_id) Ok(container_id)
} }
/// Resolve a subcontainer by optional filters (guid, name, or imageId).
/// If no filter is provided and there is exactly one subcontainer, it is returned.
/// Errors if no match found or multiple matches found (with the list in error info).
pub(crate) async fn resolve_subcontainer(
&self,
subcontainer: Option<InternedString>,
name: Option<InternedString>,
image_id: Option<ImageId>,
user: Option<InternedString>,
) -> Result<ResolvedSubcontainer, Error> {
let id = &self.seed.id;
let container = &self.seed.persistent_container;
let root_dir = container
.lxc_container
.get()
.map(|x| x.rootfs_dir().to_owned())
.or_not_found(format!("container for {id}"))?;
let subcontainer_upper = subcontainer.as_ref().map(|x| AsRef::<str>::as_ref(x).to_uppercase());
let name_upper = name.as_ref().map(|x| AsRef::<str>::as_ref(x).to_uppercase());
let image_id_upper = image_id.as_ref().map(|x| AsRef::<Path>::as_ref(x).to_string_lossy().to_uppercase());
let subcontainers = container.subcontainers.lock().await;
let matches: Vec<_> = subcontainers
.iter()
.filter(|(x, wrapper)| {
if let Some(sc) = subcontainer_upper.as_ref() {
AsRef::<str>::as_ref(x).contains(sc.as_str())
} else if let Some(n) = name_upper.as_ref() {
AsRef::<str>::as_ref(&wrapper.name)
.to_uppercase()
.contains(n.as_str())
} else if let Some(img) = image_id_upper.as_ref() {
let Some(wrapper_image_id) = AsRef::<Path>::as_ref(&wrapper.image_id).to_str()
else {
return false;
};
wrapper_image_id.to_uppercase().contains(img.as_str())
} else {
true
}
})
.collect();
let Some((subcontainer_id, matched_image_id)) = matches
.first()
.map::<(Guid, ImageId), _>(|&x| (x.0.clone(), x.1.image_id.clone()))
else {
drop(subcontainers);
let info = container
.subcontainers
.lock()
.await
.iter()
.map(|(g, s)| SubcontainerInfo {
id: g.clone(),
name: s.name.clone(),
image_id: s.image_id.clone(),
})
.collect::<Vec<_>>();
return Err(Error::new(
eyre!("{}", t!("service.mod.no-matching-subcontainers", id = id)),
ErrorKind::NotFound,
)
.with_info(to_value(&info)?));
};
if matches.len() > 1 {
let info = matches
.into_iter()
.map(|(g, s)| SubcontainerInfo {
id: g.clone(),
name: s.name.clone(),
image_id: s.image_id.clone(),
})
.collect::<Vec<_>>();
return Err(Error::new(
eyre!("{}", t!("service.mod.multiple-subcontainers-found", id = id,)),
ErrorKind::InvalidRequest,
)
.with_info(to_value(&info)?));
}
let passwd = root_dir
.join("media/startos/subcontainers")
.join(subcontainer_id.as_ref())
.join("etc")
.join("passwd");
let image_meta = serde_json::from_str::<Value>(
&tokio::fs::read_to_string(
root_dir
.join("media/startos/images/")
.join(&matched_image_id)
.with_extension("json"),
)
.await?,
)
.with_kind(ErrorKind::Deserialization)?;
let resolved_user = user
.or_else(|| image_meta["user"].as_str().map(InternedString::intern))
.unwrap_or_else(|| InternedString::intern("root"));
let root_command = get_passwd_command(passwd, &*resolved_user).await;
let workdir = image_meta["workdir"].as_str().map(|s| s.to_owned());
Ok(ResolvedSubcontainer {
container_id: self.container_id()?,
subcontainer_id,
image_id: matched_image_id,
user: resolved_user,
workdir,
root_command,
})
}
/// Build a `Command` for executing inside a resolved subcontainer (non-interactive).
pub(crate) fn build_subcontainer_command(
resolved: &ResolvedSubcontainer,
command: &[&str],
) -> Command {
let root_path =
Path::new("/media/startos/subcontainers").join(resolved.subcontainer_id.as_ref());
let mut cmd = Command::new("lxc-attach");
cmd.kill_on_drop(true);
cmd.arg(&*resolved.container_id)
.arg("--")
.arg("start-container")
.arg("subcontainer")
.arg("exec")
.arg("--env-file")
.arg(
Path::new("/media/startos/images")
.join(&resolved.image_id)
.with_extension("env"),
)
.arg("--user")
.arg(&*resolved.user);
if let Some(ref workdir) = resolved.workdir {
cmd.arg("--workdir").arg(workdir);
}
cmd.arg(&root_path).arg("--");
if command.is_empty() {
cmd.arg(&resolved.root_command.0);
} else {
cmd.args(command);
}
cmd
}
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn stats(&self) -> Result<ServiceStats, Error> { pub async fn stats(&self) -> Result<ServiceStats, Error> {
let container = &self.seed.persistent_container; let container = &self.seed.persistent_container;
@@ -799,124 +961,26 @@ pub async fn attach(
user, user,
}: AttachParams, }: AttachParams,
) -> Result<Guid, Error> { ) -> Result<Guid, Error> {
let (container_id, subcontainer_id, image_id, user, workdir, root_command) = { let resolved = {
let id = &id; let service = ctx.services.get(&id).await;
let service_ref = service.as_ref().or_not_found(&id)?;
let service = ctx.services.get(id).await; service_ref
.resolve_subcontainer(
let service_ref = service.as_ref().or_not_found(id)?; subcontainer.map(|g| InternedString::intern(g.as_ref())),
name,
let container = &service_ref.seed.persistent_container; image_id,
let root_dir = container user,
.lxc_container )
.get() .await?
.map(|x| x.rootfs_dir().to_owned())
.or_not_found(format!("container for {id}"))?;
let subcontainer = subcontainer.map(|x| AsRef::<str>::as_ref(&x).to_uppercase());
let name = name.map(|x| AsRef::<str>::as_ref(&x).to_uppercase());
let image_id = image_id.map(|x| AsRef::<Path>::as_ref(&x).to_string_lossy().to_uppercase());
let subcontainers = container.subcontainers.lock().await;
let subcontainer_ids: Vec<_> = subcontainers
.iter()
.filter(|(x, wrapper)| {
if let Some(subcontainer) = subcontainer.as_ref() {
AsRef::<str>::as_ref(x).contains(AsRef::<str>::as_ref(subcontainer))
} else if let Some(name) = name.as_ref() {
AsRef::<str>::as_ref(&wrapper.name)
.to_uppercase()
.contains(AsRef::<str>::as_ref(name))
} else if let Some(image_id) = image_id.as_ref() {
let Some(wrapper_image_id) = AsRef::<Path>::as_ref(&wrapper.image_id).to_str()
else {
return false;
}; };
wrapper_image_id let ResolvedSubcontainer {
.to_uppercase() container_id,
.contains(AsRef::<str>::as_ref(&image_id))
} else {
true
}
})
.collect();
let Some((subcontainer_id, image_id)) = subcontainer_ids
.first()
.map::<(Guid, ImageId), _>(|&x| (x.0.clone(), x.1.image_id.clone()))
else {
drop(subcontainers);
let subcontainers = container
.subcontainers
.lock()
.await
.iter()
.map(|(g, s)| SubcontainerInfo {
id: g.clone(),
name: s.name.clone(),
image_id: s.image_id.clone(),
})
.collect::<Vec<_>>();
return Err(Error::new(
eyre!("{}", t!("service.mod.no-matching-subcontainers", id = id)),
ErrorKind::NotFound,
)
.with_info(to_value(&subcontainers)?));
};
let passwd = root_dir
.join("media/startos/subcontainers")
.join(subcontainer_id.as_ref())
.join("etc")
.join("passwd");
let image_meta = serde_json::from_str::<Value>(
&tokio::fs::read_to_string(
root_dir
.join("media/startos/images/")
.join(&image_id)
.with_extension("json"),
)
.await?,
)
.with_kind(ErrorKind::Deserialization)?;
let user = user
.clone()
.or_else(|| image_meta["user"].as_str().map(InternedString::intern))
.unwrap_or_else(|| InternedString::intern("root"));
let root_command = get_passwd_command(passwd, &*user).await;
let workdir = image_meta["workdir"].as_str().map(|s| s.to_owned());
if subcontainer_ids.len() > 1 {
let subcontainers = subcontainer_ids
.into_iter()
.map(|(g, s)| SubcontainerInfo {
id: g.clone(),
name: s.name.clone(),
image_id: s.image_id.clone(),
})
.collect::<Vec<_>>();
return Err(Error::new(
eyre!(
"{}",
t!("service.mod.multiple-subcontainers-found", id = id,)
),
ErrorKind::InvalidRequest,
)
.with_info(to_value(&subcontainers)?));
}
(
service_ref.container_id()?,
subcontainer_id, subcontainer_id,
image_id, image_id,
user.into(), user,
workdir, workdir,
root_command, root_command,
) } = resolved;
};
let guid = Guid::new(); let guid = Guid::new();
async fn handler( async fn handler(