Skip to content

Server

server

Core MCP server for Linux diagnostics using FastMCP.

get_audit_logs async

get_audit_logs(
    lines: Annotated[
        int,
        Field(
            description="Number of log lines to retrieve.",
            ge=1,
            le=100000,
        ),
    ] = 100,
    host: Host = None,
) -> str

Get Linux audit logs.

Retrieves entries from /var/log/audit/audit.log containing security-relevant events such as authentication, authorization, and system call auditing. Requires root privileges to read.

Source code in src/linux_mcp_server/tools/logs.py
@mcp.tool(
    title="Get audit logs",
    description="Read the system audit logs. This requires root privileges.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_audit_logs(
    lines: t.Annotated[int, Field(description="Number of log lines to retrieve.", ge=1, le=10_0000)] = 100,
    host: Host = None,
) -> str:
    """Get Linux audit logs.

    Retrieves entries from /var/log/audit/audit.log containing security-relevant
    events such as authentication, authorization, and system call auditing.
    Requires root privileges to read.
    """
    audit_log_path = "/var/log/audit/audit.log"

    try:
        # For local execution, check if file exists
        if not host and not os.path.exists(audit_log_path):
            return f"Audit log file not found at {audit_log_path}. Audit logging may not be enabled."

        cmd = get_command("audit_logs")
        returncode, stdout, stderr = await cmd.run(host=host, lines=lines)

        if returncode != 0:
            if "Permission denied" in stderr:
                return f"Permission denied reading audit logs. This tool requires elevated privileges (root) to read {audit_log_path}."
            return f"Error reading audit logs: {stderr}"

        if is_empty_output(stdout):
            return "No audit log entries found."

        return format_audit_logs(stdout, lines)
    except FileNotFoundError:
        return "Error: tail command not found."
    except Exception as e:
        return f"Error reading audit logs: {str(e)}"

get_cpu_information async

get_cpu_information(host: Host = None) -> str

Get CPU information.

Retrieves CPU model, core counts (logical and physical), frequency, and current load averages (1, 5, and 15 minute).

Source code in src/linux_mcp_server/tools/system_info.py
@mcp.tool(
    title="Get CPU information",
    description="Get CPU information.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_cpu_information(
    host: Host = None,
) -> str:
    """Get CPU information.

    Retrieves CPU model, core counts (logical and physical), frequency,
    and current load averages (1, 5, and 15 minute).
    """
    try:
        group = get_command_group("cpu_info")
        results = {}

        # Execute all commands in the group
        for name, cmd in group.commands.items():
            returncode, stdout, _ = await cmd.run(host=host)
            if is_successful_output(returncode, stdout):
                results[name] = stdout

        info = parse_cpu_info(results)
        return format_cpu_info(info)
    except Exception as e:
        return f"Error gathering CPU information: {str(e)}"

get_disk_usage async

get_disk_usage(host: Host = None) -> str

Get disk usage information.

Retrieves filesystem usage for all mounted volumes including size, used/available space, utilization percentage, and mount points.

Source code in src/linux_mcp_server/tools/system_info.py
@mcp.tool(
    title="Get disk usage",
    description="Get detailed disk space information including size, mount points, and utilization.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_disk_usage(
    host: Host = None,
) -> str:
    """Get disk usage information.

    Retrieves filesystem usage for all mounted volumes including size,
    used/available space, utilization percentage, and mount points.
    """
    try:
        cmd = get_command("disk_usage")

        returncode, stdout, _ = await cmd.run(host=host)

        if is_successful_output(returncode, stdout):
            return format_disk_usage(stdout)

        return "Error: Unable to retrieve disk usage information"
    except Exception as e:
        return f"Error gathering disk usage information: {str(e)}"

get_hardware_information async

get_hardware_information(host: Host = None) -> str

Get hardware information.

Retrieves detailed hardware inventory including CPU specifications, PCI devices, USB devices, and DMI/SMBIOS data (system manufacturer, model, BIOS version, etc.). Some information may require root privileges.

Source code in src/linux_mcp_server/tools/system_info.py
@mcp.tool(
    title="Get hardware information",
    description="Get hardware information such as CPU details, PCI devices, USB devices, and hardware information from DMI.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_hardware_information(
    host: Host = None,
) -> str:
    """Get hardware information.

    Retrieves detailed hardware inventory including CPU specifications,
    PCI devices, USB devices, and DMI/SMBIOS data (system manufacturer,
    model, BIOS version, etc.). Some information may require root privileges.
    """
    try:
        group = get_command_group("hardware_info")
        results = {}

        # Execute all commands in the group
        for name, cmd in group.commands.items():
            try:
                returncode, stdout, stderr = await cmd.run(host=host)
                if returncode == 0:
                    results[name] = stdout
            except FileNotFoundError:
                results[name] = f"{name} command not available"

        return format_hardware_info(results)
    except Exception as e:
        return f"Error getting hardware information: {str(e)}"

get_journal_logs async

get_journal_logs(
    unit: Annotated[
        str | None,
        "Filter by systemd unit name or pattern (e.g., 'nginx.service', 'ssh*')",
    ] = None,
    priority: Annotated[
        str | None,
        "Filter by priority. Possible values: priority level (0-7), syslog level name ('emerg' to 'debug'), or range (e.g., 'err..info')",
    ] = None,
    since: Annotated[
        str | None,
        "Filter entries since specified time. Date/time filter (format: 'YYYY-MM-DD HH:MM:SS', 'today', 'yesterday', 'now', or relative like '-1h')",
    ] = None,
    lines: Annotated[
        int,
        Field(
            description="Number of log lines to retrieve. Default: 100",
            ge=1,
            le=10000,
        ),
    ] = 100,
    host: Host = None,
) -> str

Get systemd journal logs.

Retrieves entries from the systemd journal with optional filtering by unit, priority level, and time range. Returns timestamped log messages.

Source code in src/linux_mcp_server/tools/logs.py
@mcp.tool(
    title="Get journal logs",
    description="Get systemd journal logs.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_journal_logs(
    unit: t.Annotated[str | None, "Filter by systemd unit name or pattern (e.g., 'nginx.service', 'ssh*')"] = None,
    priority: t.Annotated[
        str | None,
        "Filter by priority. Possible values: priority level (0-7), syslog level name ('emerg' to 'debug'), or range (e.g., 'err..info')",
    ] = None,
    since: t.Annotated[
        str | None,
        "Filter entries since specified time. Date/time filter (format: 'YYYY-MM-DD HH:MM:SS', 'today', 'yesterday', 'now', or relative like '-1h')",
    ] = None,
    lines: t.Annotated[int, Field(description="Number of log lines to retrieve. Default: 100", ge=1, le=10_000)] = 100,
    host: Host = None,
) -> str:
    """Get systemd journal logs.

    Retrieves entries from the systemd journal with optional filtering by unit,
    priority level, and time range. Returns timestamped log messages.
    """
    try:
        # Get command from registry
        cmd = get_command("journal_logs")
        returncode, stdout, stderr = await cmd.run(host=host, lines=lines, unit=unit, priority=priority, since=since)

        if returncode != 0:
            return f"Error reading journal logs: {stderr}"

        if is_empty_output(stdout):
            return "No journal entries found matching the criteria."

        return format_journal_logs(stdout, lines, unit, priority, since)
    except FileNotFoundError:
        return "Error: journalctl command not found. This tool requires systemd."
    except Exception as e:
        return f"Error reading journal logs: {str(e)}"

get_listening_ports async

get_listening_ports(host: Host = None) -> str

Get listening ports.

Retrieves all ports with services actively listening for connections, including protocol (TCP/UDP), bind address, port number, and process name.

Source code in src/linux_mcp_server/tools/network.py
@mcp.tool(
    title="Get listening ports",
    description="Get details on listening port, protocols, and services.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_listening_ports(
    host: Host = None,
) -> str:
    """Get listening ports.

    Retrieves all ports with services actively listening for connections,
    including protocol (TCP/UDP), bind address, port number, and process name.
    """
    try:
        cmd = get_command("listening_ports")

        returncode, stdout, stderr = await cmd.run(host=host)

        if is_successful_output(returncode, stdout):
            ports = parse_ss_listening(stdout)
            return format_listening_ports(ports)
        return f"Error getting listening ports: return code {returncode}, stderr: {stderr}"
    except Exception as e:
        return f"Error getting listening ports: {str(e)}"

get_memory_information async

get_memory_information(host: Host = None) -> str

Get memory information.

Retrieves physical RAM and swap usage including total, used, free, shared, buffers, cached, and available memory.

Source code in src/linux_mcp_server/tools/system_info.py
@mcp.tool(
    title="Get memory information",
    description="Get detailed memory including physical and swap.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_memory_information(
    host: Host = None,
) -> str:
    """Get memory information.

    Retrieves physical RAM and swap usage including total, used, free,
    shared, buffers, cached, and available memory.
    """
    try:
        # Execute free command
        free_cmd = get_command("memory_info", "free")
        returncode, stdout, _ = await free_cmd.run(host=host)

        if is_successful_output(returncode, stdout):
            memory = parse_free_output(stdout)
            return format_memory_info(memory)

        return "Error: Unable to retrieve memory information"
    except Exception as e:
        return f"Error gathering memory information: {str(e)}"

get_network_connections async

get_network_connections(host: Host = None) -> str

Get active network connections.

Retrieves all established and pending network connections including protocol, state, local/remote addresses and ports, and associated process information.

Source code in src/linux_mcp_server/tools/network.py
@mcp.tool(
    title="Get network connections",
    description="Get detailed information about active network connections.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_network_connections(
    host: Host = None,
) -> str:
    """Get active network connections.

    Retrieves all established and pending network connections including protocol,
    state, local/remote addresses and ports, and associated process information.
    """
    try:
        cmd = get_command("network_connections")

        returncode, stdout, stderr = await cmd.run(host=host)

        if is_successful_output(returncode, stdout):
            connections = parse_ss_connections(stdout)
            return format_network_connections(connections)
        return f"Error getting network connections: return code {returncode}, stderr: {stderr}"
    except Exception as e:
        return f"Error getting network connections: {str(e)}"

get_network_interfaces async

get_network_interfaces(host: Host = None) -> str

Get network interface information.

Retrieves all network interfaces with their operational state, IP addresses, and traffic statistics (bytes/packets sent/received, errors, dropped packets).

Source code in src/linux_mcp_server/tools/network.py
@mcp.tool(
    title="Get network interfaces",
    description="Get detailed information about network interfaces including address and traffic statistics.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_network_interfaces(
    host: Host = None,
) -> str:
    """Get network interface information.

    Retrieves all network interfaces with their operational state, IP addresses,
    and traffic statistics (bytes/packets sent/received, errors, dropped packets).
    """
    try:
        interfaces = {}
        stats = {}

        # Get brief interface info
        brief_cmd = get_command("network_interfaces", "brief")
        returncode, stdout, _ = await brief_cmd.run(host=host)

        if is_successful_output(returncode, stdout):
            interfaces = parse_ip_brief(stdout)

        # Get network statistics from /proc/net/dev
        stats_cmd = get_command("network_interfaces", "stats")
        returncode, stdout, _ = await stats_cmd.run(host=host)

        if is_successful_output(returncode, stdout):
            stats = parse_proc_net_dev(stdout)

        return format_network_interfaces(interfaces, stats)
    except Exception as e:
        return f"Error getting network interface information: {str(e)}"

get_process_info async

get_process_info(
    pid: Annotated[
        int, Field(description="Process ID", ge=1)
    ],
    host: Host = None,
) -> str

Get detailed information about a specific process.

Retrieves comprehensive process details including CPU/memory usage, process state, virtual/resident memory size, controlling terminal, and additional metadata from /proc//status when available.

Source code in src/linux_mcp_server/tools/processes.py
@mcp.tool(
    title="Process details",
    description="Get information about a specific process.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_process_info(
    pid: t.Annotated[int, Field(description="Process ID", ge=1)],
    host: Host = None,
) -> str:
    """Get detailed information about a specific process.

    Retrieves comprehensive process details including CPU/memory usage, process
    state, virtual/resident memory size, controlling terminal, and additional
    metadata from /proc/<pid>/status when available.
    """
    try:
        # Get process details with ps
        ps_cmd = get_command("process_info", "ps_detail")
        returncode, stdout, _ = await ps_cmd.run(host=host, pid=pid)

        if returncode != 0:
            return f"Process with PID {pid} does not exist."

        if not stdout:
            return f"Process with PID {pid} does not exist."

        # Try to get more details from /proc
        proc_status = None
        status_cmd = get_command("process_info", "proc_status")
        status_code, status_stdout, _ = await status_cmd.run(host=host, pid=pid)

        if is_successful_output(status_code, status_stdout):
            proc_status = parse_proc_status(status_stdout)

        return format_process_detail(stdout, proc_status, pid)
    except Exception as e:
        return f"Error getting process information: {str(e)}"

get_service_logs async

get_service_logs(
    service_name: Annotated[str, "Name of the service"],
    lines: Annotated[
        int,
        Field(
            description="Number of log lines to retrieve.",
            ge=1,
            le=10000,
        ),
    ] = 50,
    host: Host = None,
) -> str

Get recent logs for a specific systemd service.

Retrieves journal entries for the specified service unit, including timestamps, priority levels, and log messages.

Source code in src/linux_mcp_server/tools/services.py
@mcp.tool(
    title="Get service logs",
    description="Get recent logs for a specific systemd service.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_service_logs(
    service_name: t.Annotated[str, "Name of the service"],
    lines: t.Annotated[int, Field(description="Number of log lines to retrieve.", ge=1, le=10_000)] = 50,
    host: Host = None,
) -> str:
    """Get recent logs for a specific systemd service.

    Retrieves journal entries for the specified service unit, including
    timestamps, priority levels, and log messages.
    """
    try:
        # Ensure service name has .service suffix if not present
        if not service_name.endswith(".service") and "." not in service_name:
            service_name = f"{service_name}.service"

        cmd = get_command("service_logs")
        returncode, stdout, stderr = await cmd.run(host=host, service_name=service_name, lines=lines)

        if returncode != 0:
            if "not found" in stderr.lower() or "no entries" in stderr.lower():
                return f"No logs found for service '{service_name}'. The service may not exist or has no log entries."
            return f"Error getting service logs: {stderr}"

        if is_empty_output(stdout):
            return f"No log entries found for service '{service_name}'."

        return format_service_logs(stdout, service_name, lines)
    except FileNotFoundError:
        return "Error: journalctl command not found. This tool requires systemd."
    except Exception as e:
        return f"Error getting service logs: {str(e)}"

get_service_status async

get_service_status(
    service_name: Annotated[str, "Name of the service"],
    host: Host = None,
) -> str

Get status of a specific systemd service.

Retrieves detailed service information including active/enabled state, main PID, memory usage, CPU time, and recent log entries from the journal.

Source code in src/linux_mcp_server/tools/services.py
@mcp.tool(
    title="Get service status",
    description="Get detailed status of a specific systemd service.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_service_status(
    service_name: t.Annotated[str, "Name of the service"],
    host: Host = None,
) -> str:
    """Get status of a specific systemd service.

    Retrieves detailed service information including active/enabled state,
    main PID, memory usage, CPU time, and recent log entries from the journal.
    """
    try:
        # Ensure service name has .service suffix if not present
        if not service_name.endswith(".service") and "." not in service_name:
            service_name = f"{service_name}.service"

        cmd = get_command("service_status")
        _, stdout, stderr = await cmd.run(host=host, service_name=service_name)

        # Note: systemctl status returns non-zero for inactive services, but that's expected
        if not stdout and stderr:
            # Service not found
            if "not found" in stderr.lower() or "could not be found" in stderr.lower():
                return f"Service '{service_name}' not found on this system."
            return f"Error getting service status: {stderr}"

        return format_service_status(stdout, service_name)
    except FileNotFoundError:
        return "Error: systemctl command not found. This tool requires systemd."
    except Exception as e:
        return f"Error getting service status: {str(e)}"

get_system_information async

get_system_information(host: Host = None) -> str

Get basic system information.

Retrieves hostname, OS name/version, kernel version, architecture, system uptime, and last boot time.

Source code in src/linux_mcp_server/tools/system_info.py
@mcp.tool(
    title="Get system information",
    description="Get basic system information such as operating system, distribution, kernel version, uptime, and last boot time.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_system_information(
    host: Host = None,
) -> str:
    """Get basic system information.

    Retrieves hostname, OS name/version, kernel version, architecture,
    system uptime, and last boot time.
    """
    try:
        group = get_command_group("system_info")
        results = {}

        # Execute all commands in the group
        for name, cmd in group.commands.items():
            returncode, stdout, _ = await cmd.run(host=host)
            if is_successful_output(returncode, stdout):
                results[name] = stdout

        info = parse_system_info(results)
        return format_system_info(info)
    except Exception as e:
        return f"Error gathering system information: {str(e)}"

list_block_devices async

list_block_devices(host: Host = None) -> str

List block devices.

Retrieves all block devices (disks, partitions, LVM volumes) with their name, size, type, mount point, and filesystem information.

Source code in src/linux_mcp_server/tools/storage.py
@mcp.tool(
    title="List block devices",
    description="List block devices on the system",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def list_block_devices(
    host: Host = None,
) -> str:
    """List block devices.

    Retrieves all block devices (disks, partitions, LVM volumes) with their
    name, size, type, mount point, and filesystem information.
    """
    try:
        cmd = get_command("list_block_devices")
        returncode, stdout, _ = await cmd.run(host=host)

        if is_successful_output(returncode, stdout):
            return format_block_devices(stdout)

        # Fallback message if lsblk fails
        return "Error: Unable to list block devices. lsblk command may not be available."
    except FileNotFoundError:
        return "Error: lsblk command not found."
    except Exception as e:
        return f"Error listing block devices: {str(e)}"

list_directories async

list_directories(
    path: Annotated[str, "The directory path to analyze"],
    order_by: Annotated[
        OrderBy,
        "Sort order - 'size', 'name', or 'modified' (default: 'name')",
    ] = OrderBy.NAME,
    sort: Annotated[
        SortBy,
        "Sort direction - 'ascending' or 'descending' (default: 'ascending')",
    ] = SortBy.ASCENDING,
    top_n: Annotated[
        int | None,
        Field(
            description="Optional limit on number of directories to return (1-1000, only used with size ordering)",
            gt=0,
            le=1000,
        ),
    ] = None,
    host: Host = None,
) -> str

List directories under a specified path.

Retrieves subdirectories with their size (when ordered by size) or modification time, supporting flexible sorting and result limiting.

Source code in src/linux_mcp_server/tools/storage.py
@mcp.tool(
    title="List directories",
    description="List directories under a specified path with various sorting options.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def list_directories(
    path: t.Annotated[str, "The directory path to analyze"],
    order_by: t.Annotated[OrderBy, "Sort order - 'size', 'name', or 'modified' (default: 'name')"] = OrderBy.NAME,
    sort: t.Annotated[SortBy, "Sort direction - 'ascending' or 'descending' (default: 'ascending')"] = SortBy.ASCENDING,
    top_n: t.Annotated[
        int | None,
        Field(
            description="Optional limit on number of directories to return (1-1000, only used with size ordering)",
            gt=0,
            le=1_000,
        ),
    ] = None,
    host: Host = None,
) -> str:
    """List directories under a specified path.

    Retrieves subdirectories with their size (when ordered by size) or
    modification time, supporting flexible sorting and result limiting.
    """
    path = _validate_path(path)

    # Get the appropriate command for the order_by field
    cmd_name = DIRECTORY_COMMANDS[order_by]
    cmd = get_command(cmd_name)

    try:
        returncode, stdout, stderr = await cmd.run(host=host, path=path)

        if returncode != 0:
            raise ToolError(f"Error running command: command failed with return code {returncode}: {stderr}")

        # Parse the output
        entries = parse_directory_listing(stdout, order_by)

        # Apply top_n limit if specified
        if top_n:
            # Sort first if we need to limit
            if order_by == OrderBy.SIZE:
                entries = sorted(entries, key=lambda e: e.size, reverse=sort == SortBy.DESCENDING)
            elif order_by == OrderBy.MODIFIED:
                entries = sorted(entries, key=lambda e: e.modified, reverse=sort == SortBy.DESCENDING)
            else:
                entries = sorted(entries, key=lambda e: e.name.lower(), reverse=sort == SortBy.DESCENDING)
            entries = entries[:top_n]

        # Format the output
        return format_directory_listing(entries, path, order_by, reverse=sort == SortBy.DESCENDING)

    except Exception as e:
        raise ToolError(f"Error listing directories: {str(e)}") from e

list_files async

list_files(
    path: Annotated[str, "The path to analyze"],
    order_by: Annotated[
        OrderBy,
        "Sort order - 'size', 'name', or 'modified' (default: 'name')",
    ] = OrderBy.NAME,
    sort: Annotated[
        SortBy,
        "Sort direction - 'ascending' or 'descending' (default: 'ascending')",
    ] = SortBy.ASCENDING,
    top_n: Annotated[
        int | None,
        Field(
            description="Optional limit on number of files to return (1-1000, only used with size ordering)",
            gt=0,
            le=1000,
        ),
    ] = None,
    host: Host = None,
) -> str

List files under a specified path.

Retrieves files with their size or modification time, supporting flexible sorting and result limiting. Useful for finding large or recently modified files.

Source code in src/linux_mcp_server/tools/storage.py
@mcp.tool(
    title="List files",
    description="List files under a specified path with various sorting options.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def list_files(
    path: t.Annotated[str, "The path to analyze"],
    order_by: t.Annotated[OrderBy, "Sort order - 'size', 'name', or 'modified' (default: 'name')"] = OrderBy.NAME,
    sort: t.Annotated[SortBy, "Sort direction - 'ascending' or 'descending' (default: 'ascending')"] = SortBy.ASCENDING,
    top_n: t.Annotated[
        int | None,
        Field(
            description="Optional limit on number of files to return (1-1000, only used with size ordering)",
            gt=0,
            le=1_000,
        ),
    ] = None,
    host: Host = None,
) -> str:
    """List files under a specified path.

    Retrieves files with their size or modification time, supporting flexible
    sorting and result limiting. Useful for finding large or recently modified files.
    """
    # For local execution, validate path
    if not host:
        path = _validate_path(path)

    # Get the appropriate command for the order_by field
    cmd_name = FILE_COMMANDS[order_by]
    cmd = get_command(cmd_name)

    try:
        returncode, stdout, stderr = await cmd.run(host=host, path=path)

        if returncode != 0:
            raise ToolError(f"Error running command: command failed with return code {returncode}: {stderr}")

        # Parse the output
        entries = parse_file_listing(stdout, order_by)

        # Apply top_n limit if specified
        if top_n:
            # Sort first if we need to limit
            if order_by == OrderBy.SIZE:
                entries = sorted(entries, key=lambda e: e.size, reverse=sort == SortBy.DESCENDING)
            elif order_by == OrderBy.MODIFIED:
                entries = sorted(entries, key=lambda e: e.modified, reverse=sort == SortBy.DESCENDING)
            else:
                entries = sorted(entries, key=lambda e: e.name.lower(), reverse=sort == SortBy.DESCENDING)
            entries = entries[:top_n]

        # Format the output
        return format_file_listing(entries, path, order_by, reverse=sort == SortBy.DESCENDING)

    except Exception as e:
        raise ToolError(f"Error listing files: {str(e)}") from e

list_processes async

list_processes(host: Host = None) -> str

List all running processes.

Retrieves a snapshot of all running processes with details including PID, user, CPU/memory usage, process state, start time, and command line.

Source code in src/linux_mcp_server/tools/processes.py
@mcp.tool(
    title="List processes",
    description="List running processes",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def list_processes(
    host: Host = None,
) -> str:
    """List all running processes.

    Retrieves a snapshot of all running processes with details including PID,
    user, CPU/memory usage, process state, start time, and command line.
    """
    try:
        cmd = get_command("list_processes")
        returncode, stdout, _ = await cmd.run(host=host)

        if is_successful_output(returncode, stdout):
            processes = parse_ps_output(stdout)
            return format_process_list(processes)
        return "Error executing ps command"
    except Exception as e:
        return f"Error listing processes: {str(e)}"

list_services async

list_services(host: Host = None) -> str

List all systemd services.

Retrieves all systemd service units with their load state, active state, sub-state, and description. Also includes a count of currently running services.

Source code in src/linux_mcp_server/tools/services.py
@mcp.tool(
    title="List services",
    description="List all systemd services.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def list_services(
    host: Host = None,
) -> str:
    """List all systemd services.

    Retrieves all systemd service units with their load state, active state,
    sub-state, and description. Also includes a count of currently running services.
    """
    try:
        cmd = get_command("list_services")
        returncode, stdout, stderr = await cmd.run(host=host)

        if returncode != 0:
            return f"Error listing services: {stderr}"

        # Get running services count
        running_cmd = get_command("running_services")
        returncode_summary, stdout_summary, _ = await running_cmd.run(host=host)

        running_count = None
        if returncode_summary == 0:
            running_count = parse_service_count(stdout_summary)

        return format_services_list(stdout, running_count)
    except FileNotFoundError:
        return "Error: systemctl command not found. This tool requires systemd."
    except Exception as e:
        return f"Error listing services: {str(e)}"

read_file async

read_file(
    path: Annotated[str, "The file path to read"],
    host: Host = None,
) -> str

Read the contents of a file.

Retrieves the full contents of a text file. The path must be absolute and the file must exist. Binary files may not display correctly.

Source code in src/linux_mcp_server/tools/storage.py
@mcp.tool(
    title="Read file",
    description="Read the contents of a file using cat",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def read_file(
    path: t.Annotated[str, "The file path to read"],
    host: Host = None,
) -> str:
    """Read the contents of a file.

    Retrieves the full contents of a text file. The path must be absolute
    and the file must exist. Binary files may not display correctly.
    """

    # Validate path
    path = _validate_path(path)

    if not host:
        # For local execution, check early if file exists
        if not os.path.isfile(path):
            raise ToolError(f"Path is not a file: {path}")

    cmd = get_command("read_file")

    try:
        returncode, stdout, stderr = await cmd.run_bytes(host=host, path=path)

        if returncode != 0:
            raise ToolError(f"Error running command: command failed with return code {returncode}: {stderr}")

        return stdout.decode("utf-8")
    except Exception as e:
        raise ToolError(f"Error reading file: {str(e)}") from e

read_log_file async

read_log_file(
    log_path: Annotated[str, "Path to the log file"],
    lines: Annotated[
        int,
        Field(
            description="Number of lines to retrieve from the end.",
            ge=1,
            le=10000,
        ),
    ] = 100,
    host: Host = None,
) -> str

Read a specific log file.

Retrieves the last N lines from a log file. The file path must be in the allowed list configured via LINUX_MCP_ALLOWED_LOG_PATHS environment variable.

Source code in src/linux_mcp_server/tools/logs.py
@mcp.tool(
    title="Read log file",
    description="Read a specific log file.",
    annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def read_log_file(  # noqa: C901
    log_path: t.Annotated[str, "Path to the log file"],
    lines: t.Annotated[int, Field(description="Number of lines to retrieve from the end.", ge=1, le=10_000)] = 100,
    host: Host = None,
) -> str:
    """Read a specific log file.

    Retrieves the last N lines from a log file. The file path must be in the
    allowed list configured via LINUX_MCP_ALLOWED_LOG_PATHS environment variable.
    """
    try:
        # Get allowed log paths from environment variable
        allowed_paths_env = CONFIG.allowed_log_paths

        if not allowed_paths_env:
            return (
                "No log files are allowed. Set LINUX_MCP_ALLOWED_LOG_PATHS environment variable "
                "with comma-separated list of allowed log file paths."
            )

        allowed_paths = [p.strip() for p in allowed_paths_env.split(",") if p.strip()]

        # For local execution, validate path
        if not host:
            try:
                requested_path = Path(log_path).resolve()
            except Exception:
                return f"Invalid log file path: {log_path}"

            # Check if the requested path is in the allowed list
            is_allowed = False
            for allowed_path in allowed_paths:
                try:
                    allowed_resolved = Path(allowed_path).resolve()
                    if requested_path == allowed_resolved:
                        is_allowed = True
                        break
                except Exception:
                    continue

            if not is_allowed:
                return (
                    f"Access to log file '{log_path}' is not allowed.\n"
                    f"Allowed log files: {', '.join(allowed_paths)}"
                )  # nofmt

            # Check if file exists
            if not requested_path.exists():
                return f"Log file not found: {log_path}"

            if not requested_path.is_file():
                return f"Path is not a file: {log_path}"

            log_path_str = str(requested_path)
        else:
            # For remote execution, just check against whitelist without resolving
            if log_path not in allowed_paths:
                return (
                    f"Access to log file '{log_path}' is not allowed.\n"
                    f"Allowed log files: {', '.join(allowed_paths)}"
                )  # nofmt
            log_path_str = log_path

        cmd = get_command("read_log_file")
        returncode, stdout, stderr = await cmd.run(host=host, lines=lines, log_path=log_path_str)

        if returncode != 0:
            if "Permission denied" in stderr:
                return f"Permission denied reading log file: {log_path}"
            return f"Error reading log file: {stderr}"

        if is_empty_output(stdout):
            return f"Log file is empty: {log_path}"

        return format_log_file(stdout, log_path, lines)
    except FileNotFoundError:
        return "Error: tail command not found."
    except Exception as e:
        return f"Error reading log file: {str(e)}"