Skip to content

Audit

audit

Audit logging utilities for Linux MCP Server.

This module provides helper functions for consistent audit logging across the entire MCP server. All functions add structured context to log records that can be output in both human-readable and JSON formats.

AuditContext

AuditContext(
    **extra_fields: Any,
) -> t.Generator[logging.LoggerAdapter, None, None]

Context manager for adding extra fields to all log records.

Usage

with AuditContext(tool="list_services", host="server1.com") as logger: logger.info("Starting operation")

Parameters:

Name Type Description Default
**extra_fields Any

Additional fields to add to log records.

{}

Yields:

Type Description
LoggerAdapter

logging.LoggerAdapter: Logger adapter with extra fields attached.

Source code in src/linux_mcp_server/audit.py
@contextmanager
def AuditContext(**extra_fields: t.Any) -> t.Generator[logging.LoggerAdapter, None, None]:
    """
    Context manager for adding extra fields to all log records.

    Usage:
        with AuditContext(tool="list_services", host="server1.com") as logger:
            logger.info("Starting operation")

    Args:
        **extra_fields: Additional fields to add to log records.

    Yields:
        logging.LoggerAdapter: Logger adapter with extra fields attached.
    """
    logger = logging.getLogger()

    # Create adapter with extra fields
    class ContextAdapter(logging.LoggerAdapter):
        def process(self, msg, kwargs):
            # Add extra fields to the record
            if "extra" not in kwargs:
                kwargs["extra"] = {}

            if isinstance(self.extra, t.Iterable):
                kwargs["extra"].update(self.extra)

            return msg, kwargs

    adapter = ContextAdapter(logger, extra_fields)
    yield adapter

log_ssh_command

log_ssh_command(
    command: str,
    host: str,
    exit_code: int,
    duration: float | None = None,
)

Log SSH command execution.

Verbosity is tiered based on log level: - INFO: Command and exit code - DEBUG: Also includes execution duration

Parameters:

Name Type Description Default
command str

Command that was executed

required
host str

Remote host

required
exit_code int

Command exit code

required
duration float | None

Optional execution duration in seconds (shown at DEBUG level)

None
Source code in src/linux_mcp_server/audit.py
def log_ssh_command(
    command: str,
    host: str,
    exit_code: int,
    duration: float | None = None,
):
    """
    Log SSH command execution.

    Verbosity is tiered based on log level:
    - INFO: Command and exit code
    - DEBUG: Also includes execution duration

    Args:
        command: Command that was executed
        host: Remote host
        exit_code: Command exit code
        duration: Optional execution duration in seconds (shown at DEBUG level)
    """
    logger = logging.getLogger(__name__)

    extra = {
        "command": command,
        "host": host,
        "exit_code": exit_code,
    }

    message = f"{Event.REMOTE_EXEC}: {command} | host={host} | exit_code={exit_code}"

    # At DEBUG level, include duration
    if duration is not None and logger.isEnabledFor(logging.DEBUG):
        extra["duration"] = f"{duration:.3f}s"
        message += f" | duration={duration:.3f}s"

    logger.info(message, extra=extra)

log_ssh_connect

log_ssh_connect(
    host: str,
    status: str,
    username: str = "",
    reused: bool = False,
    key_path: str | None = None,
    error: str | None = None,
)

Log SSH connection event.

Verbosity is tiered based on log level: - INFO: Basic connection success/failure - DEBUG: Detailed information including key path, reuse status

Parameters:

Name Type Description Default
host str

Remote host

required
username str

SSH username

''
status str

Connection status ("success" or "failed")

required
reused bool

Whether connection was reused (shown at DEBUG level)

False
key_path str | None

Path to SSH key used (shown at DEBUG level)

None
error str | None

Optional error message

None
Source code in src/linux_mcp_server/audit.py
def log_ssh_connect(
    host: str,
    status: str,
    username: str = "",
    reused: bool = False,
    key_path: str | None = None,
    error: str | None = None,
):
    """
    Log SSH connection event.

    Verbosity is tiered based on log level:
    - INFO: Basic connection success/failure
    - DEBUG: Detailed information including key path, reuse status

    Args:
        host: Remote host
        username: SSH username
        status: Connection status ("success" or "failed")
        reused: Whether connection was reused (shown at DEBUG level)
        key_path: Path to SSH key used (shown at DEBUG level)
        error: Optional error message
    """
    logger = logging.getLogger(__name__)

    if status == Status.success:
        extra = {
            "host": host,
            "username": username,
            "status": status,
        }

        # At INFO level, just log basic success
        message = f"{Event.SSH_CONNECT}: {host}@{username}"

        # At DEBUG level, add more details
        if logger.isEnabledFor(logging.DEBUG):
            if reused is not None:
                extra["reused"] = str(reused)
            if key_path:
                extra["key"] = key_path

        logger.info(message, extra=extra)

    else:
        # Connection failed
        extra = {
            "host": host,
            "username": username,
            "status": "failed",
        }

        if error:
            extra["reason"] = error

        message = f"{Event.SSH_AUTH_FAILED}: {host}@{username}"
        if error:
            message += f" | reason: {error}"

        logger.warning(message, extra=extra)

log_tool_call

log_tool_call(func: Callable) -> Function

Decorator to log tool calls

Works with sync or async functions.

Source code in src/linux_mcp_server/audit.py
def log_tool_call(func: t.Callable) -> Function:
    """Decorator to log tool calls

    Works with sync or async functions.
    """
    logger = logging.getLogger("linux-mcp-server")
    tool_name = func.__name__

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = _log_event_start(logger, tool_name, kwargs)
        error = None
        result = None

        try:
            result = func(*args, **kwargs)
        except Exception as exc:
            error = exc
            _log_event_complete(logger, tool_name, start_time, error)
            raise

        _log_event_complete(logger, tool_name, start_time, error)

        return result

    @functools.wraps(func)
    async def awrapper(*args, **kwargs):
        start_time = _log_event_start(logger, tool_name, kwargs)
        error = None
        result = None

        try:
            result = await func(*args, **kwargs)
        except Exception as exc:
            error = exc
            _log_event_complete(logger, tool_name, start_time, error)
            raise

        _log_event_complete(logger, tool_name, start_time, error)

        return result

    if inspect.iscoroutinefunction(func):
        return awrapper

    return wrapper

sanitize_parameters

sanitize_parameters(
    params: dict[str, Any],
) -> dict[str, t.Any]

Sanitize parameters by redacting sensitive fields.

Parameters:

Name Type Description Default
params dict[str, Any]

Dictionary of parameters to sanitize

required

Returns:

Type Description
dict[str, Any]

Dictionary with sensitive fields redacted

Source code in src/linux_mcp_server/audit.py
def sanitize_parameters(params: dict[str, t.Any]) -> dict[str, t.Any]:
    """
    Sanitize parameters by redacting sensitive fields.

    Args:
        params: Dictionary of parameters to sanitize

    Returns:
        Dictionary with sensitive fields redacted
    """
    if not params:
        return params

    sanitized = {}
    for key, value in params.items():
        # Check if key is sensitive
        key_lower = key.lower().replace("_", "").replace("-", "")
        is_sensitive = any(sensitive in key_lower for sensitive in [s.replace("_", "") for s in SENSITIVE_FIELDS])

        if is_sensitive:
            sanitized[key] = "***REDACTED***"
        elif isinstance(value, dict):
            # Recursively sanitize nested dicts
            sanitized[key] = sanitize_parameters(value)
        else:
            sanitized[key] = value

    return sanitized