Skip to content

Server

server

Core MCP server for Linux diagnostics using FastMCP.

get_cpu_information async

get_cpu_information(host: Host = None) -> CpuInfo

Get CPU information.

Retrieves CPU model, core counts (logical and physical), frequency, and current load averages (1, 5, and 15 minute).

Source code in src/linux_mcp_server/tools/system_info.py
@mcp.tool(
    title="Get CPU information",
    description="Get CPU information.",
    tags={"cpu", "hardware", "performance", "system"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_cpu_information(
    host: Host = None,
) -> CpuInfo:
    """Get CPU information.

    Retrieves CPU model, core counts (logical and physical), frequency,
    and current load averages (1, 5, and 15 minute).
    """
    group = get_command_group("cpu_info")
    results = {}

    # Execute all commands in the group
    for name, cmd in group.commands.items():
        returncode, stdout, _ = await cmd.run(host=host)
        if is_successful_output(returncode, stdout):
            results[name] = stdout

    return parse_cpu_info(results)

get_disk_usage async

get_disk_usage(host: Host = None) -> DiskUsage

Get disk usage information.

Retrieves filesystem usage for all mounted volumes including size, used/available space, utilization percentage, and mount points.

Source code in src/linux_mcp_server/tools/system_info.py
@mcp.tool(
    title="Get disk usage",
    description="Get detailed disk space information including size, mount points, and utilization.",
    tags={"disk", "filesystem", "storage", "system"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_disk_usage(
    host: Host = None,
) -> DiskUsage:
    """Get disk usage information.

    Retrieves filesystem usage for all mounted volumes including size,
    used/available space, utilization percentage, and mount points.
    """
    cmd = get_command("disk_usage")

    try:
        returncode, stdout, stderr = await cmd.run(host=host)
    except Exception as e:
        raise ToolError(f"Error gathering disk usage information: {str(e)}") from e

    if not is_successful_output(returncode, stdout):
        raise ToolError(f"Unable to retrieve disk usage information: {stderr}")

    try:
        data = json.loads(stdout)
        return DiskUsage.model_validate(data)
    except (json.JSONDecodeError, ValueError) as e:
        raise ToolError(f"Error parsing disk usage information: {str(e)}") from e

get_hardware_information async

get_hardware_information(
    host: Host = None,
) -> dict[str, str | list[str]]

Get hardware information.

Retrieves detailed hardware inventory including CPU specifications, PCI devices, USB devices, and DMI/SMBIOS data (system manufacturer, model, BIOS version, etc.). Some information may require root privileges.

Source code in src/linux_mcp_server/tools/system_info.py
@mcp.tool(
    title="Get hardware information",
    description="Get hardware information such as CPU details, PCI devices, USB devices, and hardware information from DMI.",
    tags={"hardware", "system"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_hardware_information(
    host: Host = None,
) -> dict[str, str | list[str]]:
    """Get hardware information.

    Retrieves detailed hardware inventory including CPU specifications,
    PCI devices, USB devices, and DMI/SMBIOS data (system manufacturer,
    model, BIOS version, etc.). Some information may require root privileges.
    """
    group = get_command_group("hardware_info")
    results: dict[str, str | list[str]] = {}

    # Execute all commands in the group
    for name, cmd in group.commands.items():
        try:
            returncode, stdout, stderr = await cmd.run(host=host)
            if is_successful_output(returncode, stdout):
                results[name] = stdout if name == "lscpu" else stdout.splitlines()
            else:
                results[name] = f"Error retrieving {name}: {stderr}"
        except FileNotFoundError:
            results[name] = f"{name} command not available"
        except Exception as e:
            raise ToolError(f"Error gathering hardware information: {str(e)}") from e

    return results

get_journal_logs async

get_journal_logs(
    unit: Annotated[
        str,
        Field(
            description="Filter by systemd unit name or pattern",
            examples=[
                service,
                nginx,
                httpd,
                "systemd-*",
                "audit*",
            ],
        ),
    ] = "",
    priority: Annotated[
        str,
        Field(
            description="Filter by syslog priority level (0-7), name, or range",
            examples=[
                err,
                warning,
                info,
                debug,
                3,
                "err..warning",
            ],
        ),
    ] = "",
    since: Annotated[
        str,
        Field(
            description="Filter entries since specified time (absolute or relative)",
            examples=[
                today,
                yesterday,
                "-1h",
                "-30m",
                "-7d",
                "2025-01-15 10:00:00",
            ],
        ),
    ] = "",
    transport: Annotated[
        Transport | None,
        "Filter by journal transport (e.g., 'audit' for audit logs, 'kernel' for kernel messages, 'syslog' for syslog messages)",
    ] = None,
    lines: Annotated[
        int,
        Field(
            description="Number of log lines to retrieve. Default: 100",
            ge=1,
            le=10000,
        ),
    ] = 100,
    host: Host = None,
) -> LogEntries

Get systemd journal logs.

Retrieves entries from the systemd journal with optional filtering by unit, priority level, time range, and transport. Returns timestamped log messages.

To get audit logs, use transport='audit'.

Source code in src/linux_mcp_server/tools/logs.py
@mcp.tool(
    title="Get journal logs",
    description="Get systemd journal logs.",
    tags={"journal", "logs", "systemd", "troubleshooting"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_journal_logs(
    unit: t.Annotated[
        str,
        Field(
            description="Filter by systemd unit name or pattern",
            examples=["sshd.service", "nginx", "httpd", "systemd-*", "audit*"],
        ),
    ] = "",
    priority: t.Annotated[
        str,
        Field(
            description="Filter by syslog priority level (0-7), name, or range",
            examples=["err", "warning", "info", "debug", "3", "err..warning"],
        ),
    ] = "",
    since: t.Annotated[
        str,
        Field(
            description="Filter entries since specified time (absolute or relative)",
            examples=["today", "yesterday", "-1h", "-30m", "-7d", "2025-01-15 10:00:00"],
        ),
    ] = "",
    transport: t.Annotated[
        Transport | None,
        "Filter by journal transport (e.g., 'audit' for audit logs, 'kernel' for kernel messages, 'syslog' for syslog messages)",
    ] = None,
    lines: t.Annotated[int, Field(description="Number of log lines to retrieve. Default: 100", ge=1, le=10_000)] = 100,
    host: Host = None,
) -> LogEntries:
    """Get systemd journal logs.

    Retrieves entries from the systemd journal with optional filtering by unit,
    priority level, time range, and transport. Returns timestamped log messages.

    To get audit logs, use transport='audit'.
    """
    returncode, stdout, stderr = await _get_journal_logs(
        lines=lines, host=host, unit=unit, priority=priority, since=since, transport=transport
    )

    if returncode != 0:
        raise ToolError(f"Error reading journal logs: {stderr}")

    if is_empty_output(stdout):
        raise ToolError("No journal entries found matching the criteria.")

    entries = [line for line in stdout.strip().splitlines() if line]

    return LogEntries(
        entries=entries,
        unit=unit,
    )

get_listening_ports async

get_listening_ports(host: Host = None) -> str

Get listening ports.

Retrieves all ports with services actively listening for connections, including protocol (TCP/UDP), bind address, port number, and process name.

Source code in src/linux_mcp_server/tools/network.py
@mcp.tool(
    title="Get listening ports",
    description="Get details on listening port, protocols, and services.",
    tags={"connectivity", "network", "ports"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_listening_ports(
    host: Host = None,
) -> str:
    """Get listening ports.

    Retrieves all ports with services actively listening for connections,
    including protocol (TCP/UDP), bind address, port number, and process name.
    """
    cmd = get_command("listening_ports")

    returncode, stdout, stderr = await cmd.run(host=host)

    if is_successful_output(returncode, stdout):
        ports = parse_ss_listening(stdout)
        return format_listening_ports(ports)
    return f"Error getting listening ports: return code {returncode}, stderr: {stderr}"

get_memory_information async

get_memory_information(host: Host = None) -> SystemMemory

Get memory information.

Retrieves physical RAM and swap usage including total, used, free, shared, buffers, cached, and available memory.

Source code in src/linux_mcp_server/tools/system_info.py
@mcp.tool(
    title="Get memory information",
    description="Get detailed memory including physical and swap.",
    tags={"hardware", "memory", "performance", "system"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_memory_information(
    host: Host = None,
) -> SystemMemory:
    """Get memory information.

    Retrieves physical RAM and swap usage including total, used, free,
    shared, buffers, cached, and available memory.
    """
    # Execute free command
    free_cmd = get_command("memory_info", "free")

    try:
        returncode, stdout, stderr = await free_cmd.run(host=host)
    except Exception as e:
        raise ToolError(f"Error gathering memory information: {str(e)}") from e

    if not is_successful_output(returncode, stdout):
        raise ToolError(f"Unable to retrieve memory information: {stderr}")
    return parse_free_output(stdout)

get_network_connections async

get_network_connections(host: Host = None) -> str

Get active network connections.

Retrieves all established and pending network connections including protocol, state, local/remote addresses and ports, and associated process information.

Source code in src/linux_mcp_server/tools/network.py
@mcp.tool(
    title="Get network connections",
    description="Get detailed information about active network connections.",
    tags={"connections", "connectivity", "network"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_network_connections(
    host: Host = None,
) -> str:
    """Get active network connections.

    Retrieves all established and pending network connections including protocol,
    state, local/remote addresses and ports, and associated process information.
    """
    cmd = get_command("network_connections")

    returncode, stdout, stderr = await cmd.run(host=host)

    if is_successful_output(returncode, stdout):
        connections = parse_ss_connections(stdout)
        return format_network_connections(connections)
    return f"Error getting network connections: return code {returncode}, stderr: {stderr}"

get_network_interfaces async

get_network_interfaces(host: Host = None) -> str

Get network interface information.

Retrieves all network interfaces with their operational state, IP addresses, and traffic statistics (bytes/packets sent/received, errors, dropped packets).

Source code in src/linux_mcp_server/tools/network.py
@mcp.tool(
    title="Get network interfaces",
    description="Get detailed information about network interfaces including address and traffic statistics.",
    tags={"connectivity", "interfaces", "network"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_network_interfaces(
    host: Host = None,
) -> str:
    """Get network interface information.

    Retrieves all network interfaces with their operational state, IP addresses,
    and traffic statistics (bytes/packets sent/received, errors, dropped packets).
    """
    interfaces = {}
    stats = {}

    # Get brief interface info
    brief_cmd = get_command("network_interfaces", "brief")
    returncode, stdout, _ = await brief_cmd.run(host=host)

    if is_successful_output(returncode, stdout):
        interfaces = parse_ip_brief(stdout)

    # Get network statistics from /proc/net/dev
    stats_cmd = get_command("network_interfaces", "stats")
    returncode, stdout, _ = await stats_cmd.run(host=host)

    if is_successful_output(returncode, stdout):
        stats = parse_proc_net_dev(stdout)

    return format_network_interfaces(interfaces, stats)

get_process_info async

get_process_info(
    pid: Annotated[
        int,
        Field(
            description="Process ID",
            ge=1,
            examples=[1, 1234, 65535],
        ),
    ],
    host: Host = None,
) -> str

Get detailed information about a specific process.

Retrieves comprehensive process details including CPU/memory usage, process state, virtual/resident memory size, controlling terminal, and additional metadata from /proc//status when available.

Source code in src/linux_mcp_server/tools/processes.py
@mcp.tool(
    title="Process details",
    description="Get information about a specific process.",
    tags={"performance", "processes"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_process_info(
    pid: t.Annotated[
        int,
        Field(description="Process ID", ge=1, examples=[1, 1234, 65535]),
    ],
    host: Host = None,
) -> str:
    """Get detailed information about a specific process.

    Retrieves comprehensive process details including CPU/memory usage, process
    state, virtual/resident memory size, controlling terminal, and additional
    metadata from /proc/<pid>/status when available.
    """
    # Get process details with ps
    ps_cmd = get_command("process_info", "ps_detail")
    returncode, stdout, _ = await ps_cmd.run(host=host, pid=pid)

    if returncode != 0:
        return f"Process with PID {pid} does not exist."

    if not stdout:
        return f"Process with PID {pid} does not exist."

    # Try to get more details from /proc
    proc_status = None
    status_cmd = get_command("process_info", "proc_status")
    status_code, status_stdout, _ = await status_cmd.run(host=host, pid=pid)

    if is_successful_output(status_code, status_stdout):
        proc_status = parse_proc_status(status_stdout)

    return format_process_detail(stdout, proc_status, pid)

get_service_logs async

get_service_logs(
    service_name: Annotated[
        str,
        Field(
            description="Name of the systemd service",
            examples=[
                sshd,
                NetworkManager,
                auditd,
                rsyslog,
                crond,
                firewalld,
            ],
        ),
    ],
    lines: Annotated[
        int,
        Field(
            description="Number of log lines to retrieve.",
            ge=1,
            le=10000,
        ),
    ] = 50,
    host: Host = None,
) -> str

Get recent logs for a specific systemd service.

Retrieves journal entries for the specified service unit, including timestamps, priority levels, and log messages.

Source code in src/linux_mcp_server/tools/services.py
@mcp.tool(
    title="Get service logs",
    description="Get recent logs for a specific systemd service.",
    tags={"logs", "services", "systemd", "troubleshooting"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_service_logs(
    service_name: t.Annotated[
        str,
        Field(
            description="Name of the systemd service",
            examples=["sshd", "NetworkManager", "auditd", "rsyslog", "crond", "firewalld"],
        ),
    ],
    lines: t.Annotated[int, Field(description="Number of log lines to retrieve.", ge=1, le=10_000)] = 50,
    host: Host = None,
) -> str:
    """Get recent logs for a specific systemd service.

    Retrieves journal entries for the specified service unit, including
    timestamps, priority levels, and log messages.
    """
    # Ensure service name has .service suffix if not present
    if not service_name.endswith(".service") and "." not in service_name:
        service_name = f"{service_name}.service"

    cmd = get_command("service_logs")
    returncode, stdout, stderr = await cmd.run(host=host, service_name=service_name, lines=lines)

    if returncode != 0:
        if "not found" in stderr.lower() or "no entries" in stderr.lower():
            return f"No logs found for service '{service_name}'. The service may not exist or has no log entries."
        return f"Error getting service logs: {stderr}"

    if is_empty_output(stdout):
        return f"No log entries found for service '{service_name}'."

    return format_service_logs(stdout, service_name, lines)

get_service_status async

get_service_status(
    service_name: Annotated[
        str,
        Field(
            description="Name of the systemd service",
            examples=[
                sshd,
                NetworkManager,
                auditd,
                rsyslog,
                crond,
                firewalld,
            ],
        ),
    ],
    host: Host = None,
) -> str

Get status of a specific systemd service.

Retrieves detailed service information including active/enabled state, main PID, memory usage, CPU time, and recent log entries from the journal.

Source code in src/linux_mcp_server/tools/services.py
@mcp.tool(
    title="Get service status",
    description="Get detailed status of a specific systemd service.",
    tags={"services", "systemd"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_service_status(
    service_name: t.Annotated[
        str,
        Field(
            description="Name of the systemd service",
            examples=["sshd", "NetworkManager", "auditd", "rsyslog", "crond", "firewalld"],
        ),
    ],
    host: Host = None,
) -> str:
    """Get status of a specific systemd service.

    Retrieves detailed service information including active/enabled state,
    main PID, memory usage, CPU time, and recent log entries from the journal.
    """
    # Ensure service name has .service suffix if not present
    if not service_name.endswith(".service") and "." not in service_name:
        service_name = f"{service_name}.service"

    cmd = get_command("service_status")
    _, stdout, stderr = await cmd.run(host=host, service_name=service_name)

    # Note: systemctl status returns non-zero for inactive services, but that's expected
    if not stdout and stderr:
        # Service not found
        if "not found" in stderr.lower() or "could not be found" in stderr.lower():
            return f"Service '{service_name}' not found on this system."
        return f"Error getting service status: {stderr}"

    return format_service_status(stdout, service_name)

get_system_information async

get_system_information(host: Host = None) -> SystemInfo

Get basic system information.

Retrieves hostname, OS name/version, kernel version, architecture, system uptime, and last boot time.

Source code in src/linux_mcp_server/tools/system_info.py
@mcp.tool(
    title="Get system information",
    description="Get basic system information such as operating system, distribution, kernel version, uptime, and last boot time.",
    tags={"hardware", "system"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_system_information(
    host: Host = None,
) -> SystemInfo:
    """Get basic system information.

    Retrieves hostname, OS name/version, kernel version, architecture,
    system uptime, and last boot time.
    """
    group = get_command_group("system_info")
    results = {}

    # Execute all commands in the group
    for name, cmd in group.commands.items():
        returncode, stdout, _ = await cmd.run(host=host)
        if is_successful_output(returncode, stdout):
            results[name] = stdout

    info = parse_system_info(results)
    return info

list_block_devices async

list_block_devices(host: Host = None) -> BlockDevices

List block devices.

Retrieves all block devices (disks, partitions, LVM volumes) with their name, size, type, mount point, and filesystem information.

Source code in src/linux_mcp_server/tools/storage.py
@mcp.tool(
    title="List block devices",
    description="List block devices on the system",
    tags={"devices", "storage"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def list_block_devices(
    host: Host = None,
) -> BlockDevices:
    """List block devices.

    Retrieves all block devices (disks, partitions, LVM volumes) with their
    name, size, type, mount point, and filesystem information.
    """
    cmd = get_command("list_block_devices")
    returncode, stdout, stderr = await cmd.run(host=host)

    if not is_successful_output(returncode, stdout):
        raise ToolError(f"Unable to list block devices. lsblk command may not be available. {returncode}: {stderr}")

    return BlockDevices.model_validate_json(stdout)

list_directories async

list_directories(
    path: Annotated[
        Path,
        BeforeValidator(validate_path),
        Field(
            description="Absolute path to the directory to analyze",
            examples=[
                "/var/log",
                "/etc",
                "/home",
                "/opt",
                "/tmp",
            ],
        ),
    ],
    order_by: Annotated[
        OrderBy,
        "Sort order - 'size', 'name', or 'modified' (default: 'name')",
    ] = OrderBy.NAME,
    sort: Annotated[
        SortBy,
        "Sort direction - 'ascending' or 'descending' (default: 'ascending')",
    ] = SortBy.ASCENDING,
    top_n: Annotated[
        int | None,
        Field(
            description="Optional limit on number of directories to return (1-1000)",
            gt=0,
            le=1000,
        ),
    ] = None,
    host: Host = None,
) -> StorageNodes

List directories under a specified path.

Retrieves subdirectories with their size (when ordered by size) or modification time, supporting flexible sorting and result limiting.

Source code in src/linux_mcp_server/tools/storage.py
@mcp.tool(
    title="List directories",
    description="List directories under a specified path with various sorting options.",
    tags={"directories", "filesystem", "storage"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def list_directories(
    path: t.Annotated[
        Path,
        BeforeValidator(validate_path),
        Field(
            description="Absolute path to the directory to analyze",
            examples=["/var/log", "/etc", "/home", "/opt", "/tmp"],
        ),
    ],
    order_by: t.Annotated[OrderBy, "Sort order - 'size', 'name', or 'modified' (default: 'name')"] = OrderBy.NAME,
    sort: t.Annotated[SortBy, "Sort direction - 'ascending' or 'descending' (default: 'ascending')"] = SortBy.ASCENDING,
    top_n: t.Annotated[
        int | None,
        Field(
            description="Optional limit on number of directories to return (1-1000)",
            gt=0,
            le=1_000,
        ),
    ] = None,
    host: Host = None,
) -> StorageNodes:
    """List directories under a specified path.

    Retrieves subdirectories with their size (when ordered by size) or
    modification time, supporting flexible sorting and result limiting.
    """
    return await _list_resources(
        path=path,
        command=get_command(f"list_directories_{order_by}"),
        order_by=order_by,
        sort=sort,
        top_n=top_n,
        host=host,
        parser=parse_directory_listing,
    )

list_files async

list_files(
    path: Annotated[
        Path,
        BeforeValidator(validate_path),
        Field(
            description="Absolute path to the directory to analyze",
            examples=[
                "/var/log",
                "/etc",
                "/home",
                "/opt",
                "/tmp",
            ],
        ),
    ],
    order_by: Annotated[
        OrderBy,
        "Sort order - 'size', 'name', or 'modified' (default: 'name')",
    ] = OrderBy.NAME,
    sort: Annotated[
        SortBy,
        "Sort direction - 'ascending' or 'descending' (default: 'ascending')",
    ] = SortBy.ASCENDING,
    top_n: Annotated[
        int | None,
        Field(
            description="Optional limit on number of files to return (1-1000, only used with size ordering)",
            gt=0,
            le=1000,
        ),
    ] = None,
    host: Host = None,
) -> StorageNodes

List files under a specified path.

Retrieves files with their size or modification time, supporting flexible sorting and result limiting. Useful for finding large or recently modified files.

Source code in src/linux_mcp_server/tools/storage.py
@mcp.tool(
    title="List files",
    description="List files under a specified path with various sorting options.",
    tags={"files", "filesystem", "storage"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def list_files(
    path: t.Annotated[
        Path,
        BeforeValidator(validate_path),
        Field(
            description="Absolute path to the directory to analyze",
            examples=["/var/log", "/etc", "/home", "/opt", "/tmp"],
        ),
    ],
    order_by: t.Annotated[OrderBy, "Sort order - 'size', 'name', or 'modified' (default: 'name')"] = OrderBy.NAME,
    sort: t.Annotated[SortBy, "Sort direction - 'ascending' or 'descending' (default: 'ascending')"] = SortBy.ASCENDING,
    top_n: t.Annotated[
        int | None,
        Field(
            description="Optional limit on number of files to return (1-1000, only used with size ordering)",
            gt=0,
            le=1_000,
        ),
    ] = None,
    host: Host = None,
) -> StorageNodes:
    """List files under a specified path.

    Retrieves files with their size or modification time, supporting flexible
    sorting and result limiting. Useful for finding large or recently modified files.
    """
    return await _list_resources(
        path=path,
        command=get_command(f"list_files_{order_by}"),
        order_by=order_by,
        sort=sort,
        top_n=top_n,
        host=host,
        parser=parse_file_listing,
    )

list_processes async

list_processes(host: Host = None) -> str

List all running processes.

Retrieves a snapshot of all running processes with details including PID, user, CPU/memory usage, process state, start time, and command line.

Source code in src/linux_mcp_server/tools/processes.py
@mcp.tool(
    title="List processes",
    description="List running processes",
    tags={"performance", "processes"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def list_processes(
    host: Host = None,
) -> str:
    """List all running processes.

    Retrieves a snapshot of all running processes with details including PID,
    user, CPU/memory usage, process state, start time, and command line.
    """
    cmd = get_command("list_processes")
    returncode, stdout, _ = await cmd.run(host=host)

    if is_successful_output(returncode, stdout):
        processes = parse_ps_output(stdout)
        return format_process_list(processes)
    return "Error executing ps command"

list_services async

list_services(host: Host = None) -> str

List all systemd services.

Retrieves all systemd service units with their load state, active state, sub-state, and description. Also includes a count of currently running services.

Source code in src/linux_mcp_server/tools/services.py
@mcp.tool(
    title="List services",
    description="List all systemd services.",
    tags={"services", "systemd"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def list_services(
    host: Host = None,
) -> str:
    """List all systemd services.

    Retrieves all systemd service units with their load state, active state,
    sub-state, and description. Also includes a count of currently running services.
    """
    cmd = get_command("list_services")
    returncode, stdout, stderr = await cmd.run(host=host)

    if returncode != 0:
        return f"Error listing services: {stderr}"

    # Get running services count
    running_cmd = get_command("running_services")
    returncode_summary, stdout_summary, _ = await running_cmd.run(host=host)

    running_count = None
    if returncode_summary == 0:
        running_count = parse_service_count(stdout_summary)

    return format_services_list(stdout, running_count)

read_file async

read_file(
    path: Annotated[
        Path,
        BeforeValidator(validate_path),
        Field(
            description="Absolute path to the file to read",
            examples=[
                "/etc/hosts",
                "/etc/resolv.conf",
                "/etc/os-release",
                "/proc/cpuinfo",
            ],
        ),
    ],
    host: Host = None,
) -> str

Read the contents of a file.

Retrieves the full contents of a text file. The path must be absolute and the file must exist. Binary files may not display correctly.

Source code in src/linux_mcp_server/tools/storage.py
@mcp.tool(
    title="Read file",
    description="Read the contents of a file using cat",
    tags={"files", "filesystem", "storage"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def read_file(
    path: t.Annotated[
        Path,
        BeforeValidator(validate_path),
        Field(
            description="Absolute path to the file to read",
            examples=["/etc/hosts", "/etc/resolv.conf", "/etc/os-release", "/proc/cpuinfo"],
        ),
    ],
    host: Host = None,
) -> str:
    """Read the contents of a file.

    Retrieves the full contents of a text file. The path must be absolute
    and the file must exist. Binary files may not display correctly.
    """
    if not host:
        # For local execution, check early if file exists
        if not os.path.isfile(path):
            raise ToolError(f"Path is not a file: {path}")

    cmd = get_command("read_file")

    returncode, stdout, stderr = await cmd.run_bytes(host=host, path=path)

    if returncode != 0:
        raise ToolError(f"Error running command: command failed with return code {returncode}: {stderr}")

    return stdout.decode("utf-8")

read_log_file async

read_log_file(
    log_path: Annotated[
        Path,
        BeforeValidator(validate_path),
        Field(
            description="Absolute path to the log file (must be in allowed list)",
            examples=[
                "/var/log/messages",
                "/var/log/secure",
                "/var/log/audit/audit.log",
                "/var/log/dnf.log",
            ],
        ),
    ],
    lines: Annotated[
        int,
        Field(
            description="Number of lines to retrieve from the end.",
            ge=1,
            le=10000,
        ),
    ] = 100,
    host: Host = None,
) -> LogEntries

Read a specific log file.

Retrieves the last N lines from a log file. The file path must be in the allowed list configured via LINUX_MCP_ALLOWED_LOG_PATHS environment variable.

Source code in src/linux_mcp_server/tools/logs.py
@mcp.tool(
    title="Read log file",
    description="Read a specific log file.",
    tags={"files", "logs", "troubleshooting"},
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def read_log_file(
    log_path: t.Annotated[
        Path,
        BeforeValidator(validate_path),
        Field(
            description="Absolute path to the log file (must be in allowed list)",
            examples=["/var/log/messages", "/var/log/secure", "/var/log/audit/audit.log", "/var/log/dnf.log"],
        ),
    ],
    lines: t.Annotated[int, Field(description="Number of lines to retrieve from the end.", ge=1, le=10_000)] = 100,
    host: Host = None,
) -> LogEntries:
    """Read a specific log file.

    Retrieves the last N lines from a log file. The file path must be in the
    allowed list configured via LINUX_MCP_ALLOWED_LOG_PATHS environment variable.
    """
    # Get allowed log paths from environment variable
    allowed_paths_env = CONFIG.allowed_log_paths

    if not allowed_paths_env:
        raise ToolError(
            "No log files are allowed. Set LINUX_MCP_ALLOWED_LOG_PATHS environment variable "
            "with comma-separated list of allowed log file paths."
        )

    allowed_paths = [Path(p.strip()) for p in allowed_paths_env.split(",") if p.strip()]

    if not host:
        # For local execution, resolve and check against allowlist
        requested_path = log_path.resolve()

        is_allowed = False
        for allowed_path in allowed_paths:
            allowed_resolved = Path(allowed_path).resolve()
            if requested_path == allowed_resolved:
                is_allowed = True
                break

        if not is_allowed:
            raise ToolError(f"Access to log file '{log_path}' is not allowed.")

        if not requested_path.exists():
            raise ToolError(f"Log file not found: {log_path}")

        if not requested_path.is_file():
            raise ToolError(f"Path is not a file: {log_path}")

        log_path_str = str(requested_path)
    else:
        # For remote execution, check against allowlist without resolving
        if log_path not in allowed_paths:
            raise ToolError(f"Access to log file '{log_path}' is not allowed.")

        log_path_str = str(log_path)

    cmd = get_command("read_log_file")
    returncode, stdout, stderr = await cmd.run(host=host, lines=lines, log_path=log_path_str)

    if returncode != 0:
        if "Permission denied" in stderr:
            raise ToolError(f"Permission denied reading log file: {log_path}")

        raise ToolError(f"Error reading log file: {stderr}")

    if is_empty_output(stdout):
        raise ToolError(f"Log file is empty: {log_path}")

    entries = [line for line in stdout.strip().splitlines() if line]

    return LogEntries(entries=entries, path=log_path)