Loom API Reference

Core Types (loom.types)

Role

class Role(str, Enum):
    SYSTEM = "system"
    USER = "user"
    ASSISTANT = "assistant"
    TOOL = "tool"

StopReason

class StopReason(str, Enum):
    STOP = "stop"
    TOOL_USE = "tool_use"
    LENGTH = "length"
    CONTENT_FILTER = "content_filter"
    UNKNOWN = "unknown"

ChatMessage

class ChatMessage(BaseModel):
    role: Role
    content: str | list[ContentPart] | None = None
    tool_calls: list[ToolCall] | None = None
    tool_call_id: str | None = None
    name: str | None = None

    @property
    def text_content(self) -> str | None:
        """Extract text regardless of content format. Backward-compatible."""
        ...

content accepts plain text (str) for backward compatibility, or a list of typed content parts for multimodal messages. Use text_content when you need the text portion only.

Content Parts

class TextPart(BaseModel):
    type: Literal["text"] = "text"
    text: str

class ImagePart(BaseModel):
    type: Literal["image"] = "image"
    source: str       # file path or URL
    media_type: str = ""  # e.g. "image/png"; inferred from extension if omitted

class VideoPart(BaseModel):
    type: Literal["video"] = "video"
    source: str
    media_type: str = ""  # e.g. "video/mp4"

class FilePart(BaseModel):
    type: Literal["file"] = "file"
    source: str
    media_type: str = ""  # e.g. "application/pdf"

ContentPart = Annotated[Union[TextPart, ImagePart, VideoPart, FilePart], Discriminator(...)]

Files are referenced by path or URL and loaded from disk at send-time. They are never stored as base64 blobs in memory or the database.

Usage:

from loom import ChatMessage, Role, TextPart, ImagePart

# Plain text (backward compatible)
msg = ChatMessage(role=Role.USER, content="Describe this image")

# Multimodal
msg = ChatMessage(role=Role.USER, content=[
    TextPart(text="What's in this image?"),
    ImagePart(source="/path/to/photo.png"),
])

ToolCall

class ToolCall(BaseModel):
    id: str
    name: str
    arguments: str

ToolSpec

class ToolSpec(BaseModel):
    name: str
    description: str
    parameters: dict  # JSON Schema

Usage

class Usage(BaseModel):
    input_tokens: int = 0
    output_tokens: int = 0
    cache_read_tokens: int = 0
    cache_write_tokens: int = 0

ChatResponse

class ChatResponse(BaseModel):
    message: ChatMessage
    usage: Usage
    stop_reason: StopReason
    model: str

StreamEvent

Discriminated union:

  • ContentDeltaEvent(type="content_delta", delta: str)
  • ToolCallDeltaEvent(type="tool_call_delta", index: int, id: str | None, name: str | None, arguments_delta: str | None)
  • UsageEvent(type="usage", usage: Usage)
  • StopEvent(type="stop", stop_reason: StopReason)
  • ToolExecStartEvent(type="tool_exec_start", tool_call_id: str, name: str, arguments: str)
  • ToolExecResultEvent(type="tool_exec_result", tool_call_id: str, name: str, text: str, is_error: bool = False)
  • LimitReachedEvent(type="limit_reached", iterations: int)
  • ErrorEvent(type="error", message: str, reason: str | None, status_code: int | None, retryable: bool = False)
  • DoneEvent(type="done", context: dict = {})

  • Agentic Loop (loom.loop)

    AgentConfig

    class AgentConfig:
        max_iterations: int = 32
        model: str | None = None
        system_preamble: str = ""
        extra_tools: list[ToolHandler] = []
        on_before_turn: Callable[[list[ChatMessage]], list[ChatMessage]] | None
        on_after_turn: Callable[[AgentTurn], None] | None
        on_tool_result: Callable[[ToolCall, str], None] | None
        on_event: Callable[[StreamEvent], None] | None
        choose_model: Callable[[list[ChatMessage]], str | None] | None
        limit_message_builder: Callable[[int], str] | None
        affirmatives: set[str]
        negatives: set[str]
        serialize_event: Callable[[StreamEvent], str] | None
        before_llm_call: Callable[[list[ChatMessage], list[ToolSpec]], None] | None

    Agent

    class Agent:
        def __init__(
            self,
            provider: LLMProvider | None = None,
            provider_registry: ProviderRegistry | None = None,
            tool_registry: ToolRegistry | None = None,
            skill_registry: SkillRegistry | None = None,
            config: AgentConfig | None = None,
            agent_home: AgentHome | None = None,
            permissions: AgentPermissions | None = None,
            memory_store: MemoryStore | None = None,
            graphrag: GraphRAGEngine | None = None,
        ): ...
    
        async def run_turn(
            self,
            messages: list[ChatMessage],
            context: dict | None = None,
        ) -> AgentTurn: ...
    
        async def run_turn_stream(
            self,
            messages: list[ChatMessage],
            context: dict | None = None,
        ) -> AsyncIterator[StreamEvent]: ...

    AgentTurn

    class AgentTurn:
        reply: str
        iterations: int
        skills_touched: list[str]
        messages: list[ChatMessage]
        input_tokens: int
        output_tokens: int
        tool_calls: int
        model: str | None

    Tool System (loom.tools)

    ToolHandler (ABC)

    class ToolHandler(ABC):
        @property
        @abstractmethod
        def tool(self) -> ToolSpec: ...
    
        @abstractmethod
        async def invoke(self, args: dict) -> ToolResult: ...

    ToolResult

    class ToolResult:
        def __init__(self, text: str, metadata: dict | None = None, content_parts: list[ContentPart] | None = None): ...
        def to_text(self) -> str: ...

    content_parts carries structured media (e.g. images from MCP tools) alongside the text result. When present, the agent loop constructs a multimodal ChatMessage for the tool result.

    ToolRegistry

    class ToolRegistry:
        def register(self, handler: ToolHandler) -> None: ...
        def unregister(self, name: str) -> None: ...
        async def dispatch(self, name: str, args: dict) -> ToolResult: ...
        def specs(self) -> list[ToolSpec]: ...
        def has(self, name: str) -> bool: ...
        def list_handlers(self) -> list[str]: ...

    Built-in Tools

    ClassTool NameFile
    HttpCallToolhttp_calltools/http.py
    AskUserToolask_usertools/hitl.py
    TerminalToolterminaltools/hitl.py
    VaultToolHandlervaulttools/vault.py
    MemoryToolHandlermemorytools/memory.py
    DelegateTooldelegatetools/delegate.py
    EditIdentityTooledit_identitytools/profile.py

    Skill System (loom.skills)

    Skill

    class Skill(BaseModel):
        name: str
        description: str
        body: str
        source_dir: str
        trust: str = "user"  # "builtin" | "user" | "agent"
        metadata: dict = {}

    SkillRegistry

    class SkillRegistry:
        def __init__(self, skills_dir: Path): ...
        def scan(self) -> None: ...
        def descriptions(self) -> list[tuple[str, str]]: ...
        def get(self, name: str) -> Skill | None: ...
        def list(self) -> list[Skill]: ...
        def register(self, skill: Skill) -> None: ...
        def unregister(self, name: str) -> None: ...
        def reload(self) -> None: ...

    SkillManager

    class SkillManager:
        def __init__(self, registry: SkillRegistry, guard: SkillGuard): ...
        def invoke(self, args: dict) -> str: ...

    Actions: create, edit, patch, delete, write_file, remove_file

    SkillGuard

    class SkillGuard:
        def scan(self, content: str, filename: str = "") -> SkillGuardVerdict: ...

    SkillGuardVerdict

    class SkillGuardVerdict(BaseModel):
        level: str   # "safe" | "caution" | "dangerous"
        findings: list[str]

    LLM Providers (loom.llm)

    LLMProvider (ABC)

    class LLMProvider(ABC):
        async def chat(self, messages, *, tools=None, model=None) -> ChatResponse: ...
        async def chat_stream(self, messages, *, tools=None, model=None) -> AsyncIterator[StreamEvent]: ...

    OpenAICompatibleProvider

    class OpenAICompatibleProvider(LLMProvider):
        def __init__(self, base_url: str, api_key: str | None = None, default_model: str = "gpt-4o", timeout: float = 120.0): ...

    AnthropicProvider

    class AnthropicProvider(LLMProvider):
        def __init__(self, api_key: str, default_model: str = "claude-sonnet-4-20250514", timeout: float = 120.0): ...

    Requires pip install anthropic.

    ProviderRegistry

    class ProviderRegistry:
        def register(self, model_id: str, provider: LLMProvider, upstream_model: str) -> None: ...
        def resolve(self, model_id: str) -> tuple[LLMProvider, str]: ...
        def list_models(self) -> list[str]: ...
        @property
        def default_model(self) -> str | None: ...

    redact_sensitive_text(text: str) -> str

    Regex-based secret redaction with 30+ patterns.


    Stores (loom.store)

    SessionStore

    class SessionStore:
        def __init__(self, db_path: Path): ...
        def get_or_create(self, session_id: str, title: str | None = None) -> dict: ...
        def get_history(self, session_id: str) -> list[ChatMessage]: ...
        def replace_history(self, session_id: str, messages: list[ChatMessage]) -> None: ...
        def set_title(self, session_id: str, title: str) -> None: ...
        def bump_usage(self, session_id: str, input_tokens: int, output_tokens: int, tool_calls: int) -> None: ...
        def list_sessions(self) -> list[dict]: ...
        def delete_session(self, session_id: str) -> bool: ...
        def search(self, query: str, limit: int = 20) -> list[dict]: ...

    VaultProvider (Protocol)

    class VaultProvider(Protocol):
        @property
        def root(self) -> Path: ...
        async def search(self, query: str, limit: int = 10) -> list[dict]: ...
        async def search_scoped(self, query: str, path_prefix: str, limit: int = 10) -> list[dict]: ...
        async def read(self, path: str) -> str: ...
        async def write(self, path: str, content: str, metadata: dict | None = None) -> None: ...
        async def list(self, prefix: str = "") -> list[str]: ...
        async def delete(self, path: str) -> None: ...
        def read_frontmatter(self, path: str) -> dict: ...
        def update_frontmatter(self, path: str, updates: dict) -> None: ...

    FilesystemVaultProvider

    class FilesystemVaultProvider:
        def __init__(self, vault_dir: Path): ...
        # Implements VaultProvider — markdown files on disk + SQLite FTS5 index.

    VaultStore is a deprecated alias for FilesystemVaultProvider.

    SecretsStore

    class SecretsStore:
        def __init__(self, secrets_path: Path): ...
        def get(self, key: str) -> str | None: ...
        def set(self, key: str, value: str) -> None: ...
        def delete(self, key: str) -> bool: ...
        def list_keys(self) -> list[str]: ...

    Deprecated. Use SecretStore for new code.


    Credentials (loom.auth, loom.store.secrets, loom.store.keychain)

    Secret Types (loom.store.secrets)

    Eight typed dicts that cover common credential shapes:

    TypedDicttype literalKey fields
    PasswordSecret"password"value: str
    ApiKeySecret"api_key"value: str
    BasicAuthSecret"basic_auth"username: str, password: str
    BearerTokenSecret"bearer_token"token: str, `expires_at: str \None`
    OAuth2ClientCredentialsSecret"oauth2_client_credentials"client_id, client_secret, token_url, scopes
    SshPrivateKeySecret"ssh_private_key"key_pem: str, `passphrase: str \None`
    AwsSigV4Secret"aws_sigv4"access_key_id, secret_access_key, session_token, region
    JwtSigningKeySecret"jwt_signing_key"private_key_pem, algorithm, key_id, issuer, audience, subject, ttl_seconds

    Secret = Union[PasswordSecret, ApiKeySecret, BasicAuthSecret, BearerTokenSecret, OAuth2ClientCredentialsSecret, SshPrivateKeySecret, AwsSigV4Secret, JwtSigningKeySecret]

    SecretStore

    Fernet-encrypted, scope-keyed secret store. Key auto-generated at <store_dir>/keys/secrets.key; override with LOOM_SECRET_KEY env var.

    class SecretStore:
        def __init__(self, path: Path, *, key_path: Path | None = None): ...
        async def put(self, scope: str, secret: Secret, *, metadata: dict | None = None) -> str: ...
        async def get(self, scope: str) -> Secret | None: ...
        async def get_metadata(self, scope: str) -> dict | None: ...
        async def list(self, scope_prefix: str | None = None) -> list[SecretMetadata]: ...
        async def revoke(self, scope: str) -> bool: ...     # True if existed; idempotent
        async def rotate(self, scope: str, new_secret: Secret) -> str: ...  # raises KeyError if absent

    KeychainStore

    OS keychain backend (loom[keychain]). Identical public protocol to SecretStore.

    class KeychainStore:
        def __init__(self, service: str = "loom"): ...
        async def put(self, scope: str, secret: Secret, *, metadata: dict | None = None) -> str: ...
        async def get(self, scope: str) -> Secret | None: ...
        async def get_metadata(self, scope: str) -> dict | None: ...
        async def list(self, scope_prefix: str | None = None) -> list[SecretMetadata]: ...
        async def revoke(self, scope: str) -> bool: ...
        async def rotate(self, scope: str, new_secret: Secret) -> str: ...

    Backed by macOS Keychain / Linux Secret Service / Windows Credential Manager via keyring.

    SecretMetadata

    class SecretMetadata(TypedDict):
        scope: str
        secret_type: str
        created_at: str   # ISO 8601
        version: int
        metadata: dict    # free-form; SSH stores hostname/port/username here

    Appliers (loom.auth.appliers)

    Classsecret_typeTransportOutput
    BasicHttpApplierbasic_authhttp{"Authorization": "Basic <b64>"}
    BearerHttpApplierbearer_tokenhttp{"Authorization": "Bearer <token>"}
    OAuth2CCHttpApplieroauth2_client_credentialshttp{"Authorization": "Bearer <access_token>"}
    ApiKeyHeaderApplier(header_name="X-API-Key")api_keyhttp{header_name: value}
    ApiKeyStringApplierapi_keyllm_api_keyraw str
    SshPasswordApplierpasswordsshSshConnectArgs
    SshKeyApplierssh_private_keysshSshConnectArgs
    SigV4Applieraws_sigv4httpfull headers dict with Authorization, x-amz-date (loom[aws])
    JwtBearerApplierjwt_signing_keyhttp{"Authorization": "Bearer <jwt>"} (loom[jwt])

    Applier protocol:

    class Applier(Protocol):
        secret_type: str
        async def apply(self, secret: Secret, context: dict) -> Any: ...

    CredentialResolver (loom.auth.resolver)

    class CredentialResolver:
        def __init__(
            self,
            store: SecretStore,
            appliers: dict[tuple[str, str], Applier] | None = None,
            *,
            enforcer: PolicyEnforcer | None = None,
            scope_acl: Callable[[str, str, str], bool] | None = None,
        ): ...
    
        def register(self, applier: Applier, *, transport: str) -> None: ...
    
        async def resolve_for(
            self,
            scope: str,
            transport: str,
            context: dict | None = None,
        ) -> Any: ...

    resolve_for pipeline: ACL check → enforcer.gate()store.get()applier.apply(). Raises ScopeAccessDenied, CredentialDenied, ScopeNotFoundError, or NoApplierError on failure.


    PolicyStore (loom.auth.policy_store)

    File-backed persistence for CredentialPolicy objects (plain JSON, mode 0600).

    class PolicyStore:
        def __init__(self, path: Path): ...
        async def put(self, policy: CredentialPolicy) -> None: ...
        async def get(self, scope: str) -> CredentialPolicy | None: ...
        async def delete(self, scope: str) -> bool: ...
        async def list(self, scope_prefix: str | None = None) -> list[CredentialPolicy]: ...
        async def decrement_uses(self, scope: str) -> int: ...  # ONE_SHOT countdown

    CredentialPolicy (loom.auth.policies)

    @dataclass(frozen=True)
    class CredentialPolicy:
        scope: str
        mode: PolicyMode
        window_start: datetime | None = None   # TIME_BOXED
        window_end: datetime | None = None     # TIME_BOXED
        uses_remaining: int | None = None      # ONE_SHOT / counted
        prompt_message: str | None = None      # NOTIFY_BEFORE custom prompt

    PolicyMode (loom.auth.policies)

    class PolicyMode(StrEnum):
        AUTONOMOUS = "autonomous"       # no gate
        NOTIFY_BEFORE = "notify_before" # human must approve each use
        NOTIFY_AFTER = "notify_after"   # fire-and-log
        TIME_BOXED = "time_boxed"       # autonomous inside [window_start, window_end)
        ONE_SHOT = "one_shot"           # single use, then auto-revoked

    PolicyEnforcer (loom.auth.enforcer)

    class PolicyEnforcer:
        def __init__(
            self,
            policy_store: PolicyStore,
            hitl: HitlBroker | None = None,
            secret_store: SecretStore | None = None,
        ): ...
    
        async def gate(self, scope: str, context: dict | None = None) -> GateDecision: ...
        # Raises CredentialDenied on denial; returns GateDecision on success.

    GateDecision (loom.auth.enforcer)

    @dataclass(frozen=True)
    class GateDecision:
        allowed: bool
        policy: CredentialPolicy | None   # None = implicit AUTONOMOUS
        prompt_resolution: str | None     # NOTIFY_BEFORE answer when approved
        reason: str | None                # denial reason when allowed=False

    SshCallTool (loom.tools.ssh)

    Requires loom[ssh].

    class SshCallTool(ToolHandler):
        def __init__(
            self,
            credential_resolver: CredentialResolver,
            known_hosts_path: str | None = None,  # None = ~/.ssh/known_hosts; False = disable (dev only)
            connect_timeout: float = 10.0,
            command_timeout: float = 60.0,
            max_output_bytes: int = 10240,
        ): ...

    Tool name: ssh_call. Parameters: host (scope key), command (str), stdin (str, optional), timeout (float, optional).

    On success, ToolResult.text is stdout (truncated at max_output_bytes). Metadata keys: exit_code, stderr, truncated_stdout, truncated_stderr, duration_ms.

    On error, ToolResult.metadata["error_class"] is one of auth | timeout | transport | unknown.


    Credential error types (loom.auth.errors, loom.auth.enforcer)

    ExceptionRaised when
    AuthApplierErrorBase class for all applier errors
    SecretExpiredErrorBearer token expires_at is in the past
    NoApplierErrorNo applier registered for (secret_type, transport)
    ScopeNotFoundErrorScope absent from the SecretStore
    ScopeAccessDeniedscope_acl hook returned False
    CredentialDeniedPolicyEnforcer.gate() denied access (policy rule or HITL rejection)

    Recurring Tasks (loom.heartbeat)

    Heartbeats are scheduled background tasks composed of a driver (pure state-detection logic) and agent instructions. The scheduler ticks registered heartbeats on a cron or interval schedule, calls driver.check(state) to detect events, and invokes the agent for each event returned.

    HeartbeatDriver (ABC)

    class HeartbeatDriver(ABC):
        @abstractmethod
        async def check(
            self, state: dict[str, Any]
        ) -> tuple[list[HeartbeatEvent], dict[str, Any]]: ...

    Implement check() to inspect external state and return (events, new_state). The runtime owns state persistence — drivers never read or write state themselves. An empty events list means nothing happened this tick.

    HeartbeatEvent

    @dataclass
    class HeartbeatEvent:
        name: str
        payload: dict[str, Any] = {}
        fired_at: datetime  # UTC, auto-set

    HeartbeatRecord

    @dataclass
    class HeartbeatRecord:
        id: str           # directory name / primary key
        name: str
        description: str
        schedule: str     # raw schedule string
        enabled: bool
        instructions: str # HEARTBEAT.md body — injected as agent system prompt
        source_dir: Path
        driver: HeartbeatDriver

    HeartbeatRunRecord

    @dataclass
    class HeartbeatRunRecord:
        heartbeat_id: str
        instance_id: str
        state: dict[str, Any]
        last_check: datetime | None
        last_fired: datetime | None
        last_error: str | None

    HeartbeatRegistry

    class HeartbeatRegistry:
        def __init__(self, heartbeats_dir: Path, additional_dirs: list[Path] | None = None): ...
        def scan(self) -> None: ...
        def get(self, id: str) -> HeartbeatRecord | None: ...
        def list(self) -> list[HeartbeatRecord]: ...
        def register(self, record: HeartbeatRecord) -> None: ...
        def unregister(self, id: str) -> None: ...
        def reload(self) -> None: ...

    HeartbeatStore

    SQLite-backed runtime state, keyed by (heartbeat_id, instance_id).

    class HeartbeatStore:
        def __init__(self, db_path: Path): ...
        def get_run(self, heartbeat_id: str, instance_id: str = "default") -> HeartbeatRunRecord | None: ...
        def get_state(self, heartbeat_id: str, instance_id: str = "default") -> dict[str, Any]: ...
        def set_state(self, heartbeat_id: str, state: dict[str, Any], instance_id: str = "default") -> None: ...
        def touch_check(self, heartbeat_id: str, instance_id: str = "default") -> None: ...
        def touch_fired(self, heartbeat_id: str, instance_id: str = "default", error: str | None = None) -> None: ...
        def list_runs(self) -> list[HeartbeatRunRecord]: ...
        def delete(self, heartbeat_id: str, instance_id: str = "default") -> None: ...
        def delete_all(self, heartbeat_id: str) -> None: ...

    HeartbeatScheduler

    Asyncio background scheduler. Each tick checks all enabled heartbeats; for each due heartbeat it calls driver.check(state), persists state, then invokes run_fn for every event returned. Optionally records each agent run as a SessionStore session.

    class HeartbeatScheduler:
        def __init__(
            self,
            registry: HeartbeatRegistry,
            store: HeartbeatStore,
            run_fn: RunFn,
            tick_interval: float = 60.0,
            sessions: SessionStore | None = None,
        ): ...
    
        def start(self) -> asyncio.Task: ...
        def stop(self) -> None: ...
        @property
        def running(self) -> bool: ...
        async def trigger(self, heartbeat_id: str, instance_id: str = "default") -> list[AgentTurn]: ...

    RunFn = Callable[[str, list[ChatMessage]], Awaitable[AgentTurn]]

    HeartbeatManager

    CRUD operations on disk + registry sync. Used by HeartbeatToolHandler.

    class HeartbeatManager:
        def __init__(self, registry: HeartbeatRegistry, store: HeartbeatStore): ...
        def invoke(self, args: dict) -> str: ...

    Actions: create, delete, enable, disable, list.

    HeartbeatToolHandler

    Exposes HeartbeatManager as the manage_heartbeat tool so the agent can create and manage its own recurring tasks at runtime.

    class HeartbeatToolHandler(ToolHandler):
        def __init__(self, manager: HeartbeatManager): ...

    Tool name: manage_heartbeat. Parameters: action (required), name, description, schedule, instructions, driver_code.

    Schedule format

    parse_schedule(expr: str) -> Schedule accepts three forms:

    FormExamples
    Natural language"every 5 minutes", "every hour", "every 2 days"
    Cron shorthands@daily, @hourly, @weekly, @monthly, @yearly
    5-field cron"/5 ", "0 9 * 1-5"

    load_heartbeat(heartbeat_dir: Path) -> HeartbeatRecord

    Load a heartbeat from a directory containing HEARTBEAT.md and driver.py. Raises FileNotFoundError if either file is absent, ValueError if metadata is invalid or the directory name doesn't match name in frontmatter.

    make_run_fn(agent: Agent) -> RunFn

    Build a RunFn from an existing Agent, reusing its provider and tool registry but overriding the system prompt with the heartbeat's instructions.

    Heartbeat file layout

    heartbeats/
      my-monitor/
        HEARTBEAT.md   # frontmatter metadata + agent instructions (body)
        driver.py      # must define class Driver(HeartbeatDriver)

    HEARTBEAT.md:

    ---
    name: my-monitor
    description: Alert when queue depth exceeds threshold
    schedule: "every 5 minutes"
    enabled: true
    ---
    
    When you receive a queue_depth_exceeded event, log the depth to memory
    and call the notify tool with a short summary.

    driver.py:

    from loom.heartbeat import HeartbeatDriver, HeartbeatEvent
    
    class Driver(HeartbeatDriver):
        async def check(self, state: dict) -> tuple[list[HeartbeatEvent], dict]:
            depth = await fetch_queue_depth()  # your detection logic
            if depth > state.get("threshold", 100):
                return [HeartbeatEvent("queue_depth_exceeded", {"depth": depth})], state
            return [], state

    Server (loom.server)

    create_app(...)

    def create_app(
        agent: Agent,
        sessions: SessionStore,
        skills: SkillRegistry | None = None,
        tool_registry: ToolRegistry | None = None,
        extra_routes: Any = None,
    ) -> FastAPI: ...

    Endpoints:

    MethodPathDescription
    GET/healthHealth check
    POST/chatNon-streaming chat
    POST/chat/streamSSE streaming chat
    GET/sessionsList sessions
    DELETE/sessions/{id}Delete session
    GET/skillsList skills

    Configuration (loom.config)

    LoomConfig

    class LoomConfig(BaseModel):
        default_model: str = ""
        max_iterations: int = 32
        system_preamble: str = ""
        routing_mode: str = "fixed"
        providers: dict[str, ProviderConfig] = {}
        models: list[dict] = []

    resolve_config(cli_overrides, config) -> (base_url, api_key, model)

    Precedence: CLI > env vars > config file > defaults.

    Env vars: LOOM_LLM_BASE_URL, LOOM_LLM_API_KEY, LOOM_LLM_MODEL


    Routing (loom.routing)

    classify_message(text: str) -> MessageCategory

    Returns: CODING, REASONING, TRIVIAL, or BALANCED

    choose_model(message, registry, strengths, default_model) -> str | None

    Selects the best model from the registry based on message category and model strengths.

    ModelStrengths

    class ModelStrengths:
        speed: int = 5      # 1-10
        cost: int = 5       # 1-10 (higher = cheaper)
        reasoning: int = 5  # 1-10
        coding: int = 5     # 1-10

    Error Handling (loom.errors)

    Exception Hierarchy

    LLMError
      ├── LLMTransportError (status_code, body)  -- retryable
      └── MalformedOutputError                    -- never retryable

    ClassifiedError

    class ClassifiedError(BaseModel):
        reason: FailoverReason
        retryable: bool
        should_compress: bool
        should_rotate_credential: bool
        should_fallback: bool
        recovery: RecoveryAction

    classify_api_error(error: Exception) -> ClassifiedError

    classify_http(status: int, body: str) -> ClassifiedError


    Retry (loom.retry)

    jittered_backoff(attempt, base=2.0, max_delay=60.0, jitter_ratio=0.5) -> float

    async with_retry(coro_factory, max_attempts=3) -> Any

    Retries LLMTransportError with jittered exponential backoff.


    Agent Communication Protocol (loom.acp)

    AcpConfig

    class AcpConfig(BaseModel):
        url: str
        timeout: float = 30.0
        retries: int = 2

    AcpCallTool(ToolHandler)

    Tool handler for calling remote agents over WebSocket with Ed25519 authentication.

    DeviceKeypair

    class DeviceKeypair:
        public_key_b64: str
        def sign_challenge(self, challenge: str) -> str: ...

    load_or_create_keypair(path: Path) -> DeviceKeypair


    MCP Client (loom.mcp)

    Optional (pip install "loom[mcp]"). Connect to external MCP servers and expose their tools as Loom ToolHandler instances.

    McpServerConfig

    class McpServerConfig(BaseModel):
        name: str
        transport: Literal["stdio", "sse"] = "stdio"
        # stdio transport
        command: list[str] | None = None
        env: dict[str, str] = {}
        # sse transport
        url: str | None = None
        headers: dict[str, str] = {}

    McpClient

    Async context manager owning one MCP session.

    class McpClient:
        def __init__(self, config: McpServerConfig) -> None: ...
        async def __aenter__(self) -> McpClient: ...
        async def __aexit__(self, *exc) -> None: ...
        async def list_tools(self) -> list[McpToolHandler]: ...
        async def call_tool(self, name: str, args: dict) -> ToolResult: ...

    McpToolHandler(ToolHandler)

    Wraps one remote MCP tool. Constructed by McpClient.list_tools(); register with a ToolRegistry to make it callable by the agent.

    Usage:

    from loom.mcp import McpClient, McpServerConfig
    
    config = McpServerConfig(name="web", transport="stdio", command=["npx", "-y", "my-mcp-server"])
    async with McpClient(config) as client:
        for handler in await client.list_tools():
            tool_registry.register(handler)
        await agent.run_turn(messages)  # client must stay open here

    HITL Broker (loom.hitl)

    HitlBroker

    class HitlBroker:
        async def ask(self, session_id: str, kind: str, message: str, choices: list[str] | None = None, timeout: float = 120.0) -> str: ...
        def answer(self, session_id: str, request_id: str, answer: str) -> None: ...
        async def subscribe(self, session_id: str) -> AsyncIterator[HitlEvent]: ...

    BrokerAskUserTool(ToolHandler)

    Wraps HitlBroker.ask() as a tool handler, scoped to a session ID.

    HitlRequest

    class HitlRequest(BaseModel):
        request_id: str
        session_id: str
        kind: str
        message: str
        choices: list[str] | None

    HitlEvent

    class HitlEvent(BaseModel):
        type: str  # "request" | "resolved" | "timeout"
        request: HitlRequest | None
        answer: str | None

    Constants

  • CURRENT_SESSION_ID -- context key for session-scoped HITL
  • TIMEOUT_SENTINEL -- returned when a HITL request times out

  • Memory Store (loom.store.memory)

    MemoryEntry

    class MemoryEntry:
        key: str
        category: str
        tags: list[str]
        content: str
        created: str
        updated: str
        importance: int = 1      # clamped to [0, 3]
        pinned: bool = False
        access_count: int = 0
        last_recalled_at: str | None = None

    MemoryStore

    class MemoryStore:
        def __init__(
            self,
            memory_dir: Path,
            index_db: Path | None = None,
            *,
            embedding_provider: EmbeddingProvider | None = None,
            vault_provider: VaultProvider | None = None,
            vault_prefix: str = "memory",
            graphrag: GraphRAGEngine | None = None,
        ): ...
        async def write(self, key: str, content: str, category: str = "notes", tags: list[str] = [], *, pinned: bool = False, importance: int = 1) -> None: ...
        async def read(self, key: str) -> MemoryEntry | None: ...
        async def search(self, query: str, limit: int = 10) -> list[SearchHit]: ...
        async def recall(self, query: str, *, limit: int = 5, touch: bool = True) -> list[RecallHit]: ...
        async def delete(self, key: str) -> bool: ...
        async def list_entries(self, category: str | None = None, limit: int = 50) -> list[MemoryEntry]: ...
        def recent(self, limit: int = 5, budget: int = 1500) -> list[tuple[str, str]]: ...
        def pin(self, key: str, pinned: bool = True) -> None: ...
        def set_importance(self, key: str, level: int) -> None: ...
        def touch(self, key: str) -> None: ...

    Standalone mode (no vault_provider): local markdown files + SQLite FTS5 + salience/recency ranking. This is the default.

    Vault-backed mode (pass vault_provider): delegates file I/O and FTS5 search to the vault provider. Memories are stored as markdown files with YAML frontmatter under <vault_prefix>/<YYYY>/<MM>/<DD>/. Re-writes stay in the original date directory. Salience mutations update both frontmatter and the local SQLite index.

    When graphrag is provided, every write() also indexes the content into the GraphRAGEngine and every delete() removes it.

    EmbeddingProvider (Protocol)

    class EmbeddingProvider(Protocol):
        dim: int
        async def embed(self, texts: list[str]) -> list[list[float]]: ...

    SearchHit

    class SearchHit:
        key: str
        category: str
        snippet: str
        score: float

    RecallHit

    class RecallHit:
        key: str
        category: str
        preview: str
        score: float
        components: dict[str, float]  # e.g. {"bm25": 0.8, "salience": 0.5, ...}

    GraphRAG (loom.store.graphrag)

    Fully opt-in knowledge-graph-augmented retrieval. Pass a GraphRAGEngine to Agent(graphrag=...) or leave it None (default) for no change in behavior.

    GraphRAGEngine

    class GraphRAGEngine:
        def __init__(
            self,
            config: GraphRAGConfig,
            embedding_provider: EmbeddingProvider,
            *,
            db_dir: Path,
            llm_provider: LLMProvider | None = None,
        ): ...
    
        async def index_source(self, path: str, content: str) -> None: ...
        async def index_vault(self, vault: VaultStore) -> None: ...
        def remove_source(self, source_path: str) -> None: ...
        async def retrieve(self, query: str, *, top_k: int | None = None, max_hops: int | None = None) -> list[RetrievalResult]: ...
        async def retrieve_enriched(self, query: str, *, top_k: int | None = None, max_hops: int | None = None) -> EnrichedRetrieval: ...
        def format_context(self, results: list[RetrievalResult], budget: int | None = None) -> str: ...
        def export_graph(self) -> dict: ...
        def chunk_text(self, text: str, source_path: str) -> list[Chunk]: ...
        def close(self) -> None: ...

    Context manager supported (with GraphRAGEngine(...) as engine:).

    GraphRAGConfig

    @dataclass
    class GraphRAGConfig:
        enabled: bool = False
        embeddings: EmbeddingConfig = field(default_factory=EmbeddingConfig)
        extraction: ExtractionConfig = field(default_factory=ExtractionConfig)
        ontology: OntologyConfig = field(default_factory=OntologyConfig)
        max_hops: int = 2
        context_budget: int = 3000
        top_k: int = 10
        chunk_size: int = 1000
        chunk_overlap: int = 100

    EmbeddingConfig

    @dataclass
    class EmbeddingConfig:
        provider: str = "ollama"
        model: str = "nomic-embed-text"
        base_url: str = "http://localhost:11434"
        key_env: str = ""
        dimensions: int = 768

    ExtractionConfig

    @dataclass
    class ExtractionConfig:
        model: str | None = None
        max_gleanings: int = 1

    OntologyConfig

    @dataclass
    class OntologyConfig:
        entity_types: list[str] = ["person", "project", "concept", "technology", "decision", "resource"]
        core_relations: list[str] = ["uses", "depends_on", "part_of", "created_by", "related_to"]
        allow_custom_relations: bool = True
        aliases: dict[str, list[str]] = {}

    Chunk

    @dataclass
    class Chunk:
        id: str
        source_path: str
        heading: str
        content: str
        char_offset: int

    RetrievalResult

    @dataclass
    class RetrievalResult:
        chunk_id: str
        source_path: str
        heading: str
        content: str
        score: float
        source: str  # "vector" | "graph"
        related_entities: list[str] = []

    HopRecord

    @dataclass
    class HopRecord:
        from_entity: str
        to_entity: str
        relation: str
        hop_depth: int

    RetrievalTrace

    @dataclass
    class RetrievalTrace:
        seed_entities: list[str] = []
        hops: list[HopRecord] = []
        expanded_entity_ids: list[int] = []

    EnrichedRetrieval

    @dataclass
    class EnrichedRetrieval:
        results: list[RetrievalResult] = []
        trace: RetrievalTrace = RetrievalTrace()
        subgraph_nodes: list[dict] = []
        subgraph_edges: list[dict] = []

    chunk_markdown

    def chunk_markdown(
        text: str, source_path: str, *, max_size: int = 1000, overlap: int = 100
    ) -> list[Chunk]: ...

    Splits markdown text on headings, merges small sections, splits large ones with overlap. Deterministic chunk IDs via SHA-256.


    Vector Store (loom.store.vector)

    SQLite-backed vector store for embedding storage and cosine similarity search.

    VectorStore

    class VectorStore:
        def __init__(self, db_path: Path, dim: int = 768): ...
        def upsert(self, id: str, embedding: list[float], *, source: str = "", metadata: dict | None = None) -> None: ...
        def remove(self, id: str) -> None: ...
        def remove_for_source(self, source: str) -> int: ...
        def search(self, query_embedding: list[float], *, top_k: int = 20, source_filter: str | None = None) -> list[VectorHit]: ...
        def get(self, id: str) -> VectorHit | None: ...
        def get_embedding(self, id: str) -> list[float] | None: ...
        def count(self) -> int: ...
        def sources(self) -> list[str]: ...
        def close(self) -> None: ...

    VectorHit

    @dataclass
    class VectorHit:
        id: str
        source: str
        score: float
        metadata: dict = {}

    Entity Graph (loom.store.graph)

    SQLite-backed entity-relationship graph with multi-hop traversal.

    EntityGraph

    class EntityGraph:
        def __init__(self, db_path: Path): ...
        def resolve_entity(self, name: str, type: str, aliases: dict | None = None) -> int: ...
        def get_entity(self, entity_id: int) -> Entity | None: ...
        def find_entity(self, name: str, type: str) -> Entity | None: ...
        def add_triple(self, head_id: int, relation: str, tail_id: int, chunk_id: str, description: str = "", strength: float = 5.0) -> None: ...
        def add_mention(self, entity_id: int, chunk_id: str) -> None: ...
        def entities_for_chunk(self, chunk_id: str) -> list[Entity]: ...
        def chunks_for_entity(self, entity_id: int) -> list[str]: ...
        def neighbors(self, entity_id: int, max_hops: int = 2) -> list[Entity]: ...
        def remove_for_chunks(self, chunk_ids: list[str]) -> None: ...
        def count_entities(self) -> int: ...
        def count_triples(self) -> int: ...
        def list_entities(self, entity_type: str | None = None, search: str | None = None, limit: int = 50, offset: int = 0) -> list[Entity]: ...
        def get_entity_triples(self, entity_id: int) -> list[Triple]: ...
        def subgraph(self, seed_id: int, max_hops: int = 2) -> dict: ...
        def connected_components(self) -> list[list[int]]: ...
        def entity_degree(self, entity_id: int) -> int: ...
        def entity_counts_by_type(self) -> dict[str, int]: ...
        def list_all_entities(self) -> list[Entity]: ...
        def list_all_triples(self) -> list[Triple]: ...
        def set_entity_description(self, entity_id: int, description: str) -> None: ...
        def close(self) -> None: ...

    Entity

    @dataclass
    class Entity:
        id: int
        name: str
        type: str
        canonical: str
        description: str = ""

    Triple

    @dataclass
    class Triple:
        id: int
        head_id: int
        relation: str
        tail_id: int
        chunk_id: str
        description: str = ""
        strength: float = 5.0

    Embedding Providers (loom.store.embeddings)

    OllamaEmbeddingProvider

    class OllamaEmbeddingProvider:
        def __init__(self, model: str = "nomic-embed-text", base_url: str = "http://localhost:11434", dim: int = 768, timeout: float = 60.0): ...
        async def embed(self, texts: list[str]) -> list[list[float]]: ...

    OpenAIEmbeddingProvider

    class OpenAIEmbeddingProvider:
        def __init__(self, model: str = "text-embedding-3-small", base_url: str = "https://api.openai.com/v1", key_env: str = "OPENAI_API_KEY", dim: int = 1536, timeout: float = 60.0): ...
        async def embed(self, texts: list[str]) -> list[list[float]]: ...

    API key resolved from the environment variable named by key_env. If key_env is empty, requests are made without authentication.


    Home & Identity (loom.home, loom.permissions, loom.prompt)

    AgentHome

    class AgentHome:
        def __init__(self, root: Path, name: str = "default"): ...
        @property
        def soul_path(self) -> Path: ...
        @property
        def identity_path(self) -> Path: ...
        @property
        def user_path(self) -> Path: ...
        @property
        def skills_dir(self) -> Path: ...
        @property
        def memory_dir(self) -> Path: ...
        @property
        def vault_dir(self) -> Path: ...
        @property
        def sessions_db(self) -> Path: ...
        def ensure_dirs(self) -> None: ...

    AgentPermissions

    class AgentPermissions(BaseModel):
        user_writable: bool = False
        soul_writable: bool = False
        identity_writable: bool = False
        skills_creatable: bool = False
        terminal_allowed: bool = False
        delegate_allowed: bool = False

    PromptSection

    class PromptSection(BaseModel):
        title: str
        body: str
        priority: int = 0

    PromptBuilder

    class PromptBuilder:
        def add(self, section: PromptSection) -> None: ...
        def build(self) -> str: ...

    Web Search (loom.search)

    SearchResult

    @dataclass
    class SearchResult:
        title: str
        url: str
        snippet: str
        source: str
        score: float | None = None
        raw: dict[str, Any] | None = None  # provider-specific payload

    SearchProviderError

    class SearchProviderError(Exception):
        provider: str
        status_code: int | None

    SearchProvider (Protocol)

    class SearchProvider(Protocol):
        @property
        def name(self) -> str: ...
    
        async def search(self, query: str, max_results: int = 10) -> list[SearchResult]: ...

    Search Providers

    ProviderClassRequires API keyInstall
    DuckDuckGo (default)DuckDuckGoSearchProviderNopip install "loom[search]"
    BraveBraveSearchProviderYes (api_key)
    TavilyTavilySearchProviderYes (api_key)
    Google Custom SearchGoogleSearchProviderYes (api_key + cx)

    CompositeSearchProvider

    class CompositeSearchProvider:
        def __init__(
            self,
            providers: list[SearchProvider],
            *,
            strategy: SearchStrategy = SearchStrategy.CONCURRENT,
        ): ...
    
        @property
        def name(self) -> str: ...  # "composite"
    
        @property
        def strategy(self) -> SearchStrategy: ...
    
        async def search(self, query: str, max_results: int = 10) -> list[SearchResult]: ...

    SearchStrategy

    class SearchStrategy(StrEnum):
        CONCURRENT = "concurrent"  # fire all providers, merge, deduplicate
        FALLBACK = "fallback"      # try in order, stop when enough results

    WebSearchTool

    class WebSearchTool(ToolHandler):
        def __init__(self, provider: SearchProvider) -> None: ...
    
        @classmethod
        def from_config(
            cls,
            providers: list[SearchProvider] | None = None,
            *,
            strategy: SearchStrategy = SearchStrategy.CONCURRENT,
        ) -> WebSearchTool: ...

    Tool name: web_search. Parameters: query (str, required), max_results (int, optional, default 10). Returns JSON array of {title, url, snippet, source, score?}.


    Web Scrape (loom.scrape)

    ScrapeResult

    @dataclass
    class ScrapeResult:
        url: str
        content: str
        content_type: str       # "text" | "markdown" | "html"
        status_code: int | None = None
        cookies: dict[str, str] | None = None
        metadata: dict[str, Any] = field(default_factory=dict)

    ScrapeProviderError

    class ScrapeProviderError(Exception):
        provider: str
        status_code: int | None

    ScrapeProvider (Protocol)

    class ScrapeProvider(Protocol):
        @property
        def name(self) -> str: ...
    
        async def scrape(
            self,
            url: str,
            output_format: str = "text",
            css_selector: str | None = None,
            xpath: str | None = None,
        ) -> ScrapeResult: ...

    ScraplingProvider

    class ScraplingProvider:
        def __init__(
            self,
            *,
            mode: str = "auto",              # "auto" | "fetcher" | "dynamic" | "stealthy"
            cookie_store: CookieStore | None = None,
            headless: bool = True,
            timeout: int = 30,
            max_content_bytes: int = 102400,
        ): ...
    
        @property
        def name(self) -> str: ...  # "scrapling"
    
        async def scrape(
            self,
            url: str,
            output_format: str = "text",
            css_selector: str | None = None,
            xpath: str | None = None,
        ) -> ScrapeResult: ...

    In auto mode, cascades through fetcher → dynamic → stealthy on block detection. Detects auth failures and retries with stored cookies when a CookieStore is configured. Requires pip install "loom[scrape]".

    WebScrapeTool

    class WebScrapeTool(ToolHandler):
        def __init__(self, provider: ScrapeProvider) -> None: ...
    
        @classmethod
        def from_config(
            cls,
            *,
            mode: str = "auto",
            cookie_store: CookieStore | None = None,
            headless: bool = True,
            timeout: int = 30,
            max_content_bytes: int = 102400,
        ) -> WebScrapeTool: ...

    Tool name: web_scrape. Parameters: url (str, required), output_format ("text" | "markdown" | "html", optional), css_selector (str, optional), xpath (str, optional).


    CookieStore (Protocol)

    class CookieStore(Protocol):
        async def get_cookies(self, domain: str) -> dict[str, str] | None: ...
        async def save_cookies(self, domain: str, cookies: dict[str, str]) -> None: ...
        async def list_domains(self) -> list[str]: ...

    FilesystemCookieStore

    class FilesystemCookieStore:
        def __init__(self, cookie_dir: Path) -> None: ...
        async def get_cookies(self, domain: str) -> dict[str, str] | None: ...
        async def save_cookies(self, domain: str, cookies: dict[str, str]) -> None: ...
        async def list_domains(self) -> list[str]: ...

    Persists one Netscape cookies.txt file per domain in the configured directory. Used by ScraplingProvider for cookie-based auth retry.