Skip to content

mercury-ocip-fast

mercury-ocip-fast is a counterpart to mercury-ocip, built for high-volume production workloads. It's significantly faster through connection pooling and async concurrency, making it suitable for backend services and bulk operations.

Where mercury-ocip excels at scripting and automation, mercury-ocip-fast is designed for stability and throughput when you need to handle thousands of requests.

Installation

pip install mercury-ocip-fast

Basic usage

from mercury_ocip_fast import Client
from mercury_ocip_fast.commands.commands import UserGetRequest21sp1

async with Client(
    host="your-broadworks.server",
    username="admin",
    password="your-password"
) as client:
    response = await client.command(
        UserGetRequest21sp1(user_id="user@domain.com")
    )

    print(response.first_name)

The client handles authentication automatically. Both TLS and non-TLS connections are supported.

Bulk operations

Pass a list of commands to execute them concurrently:

from mercury_ocip_fast import Client
from mercury_ocip_fast.commands.commands import UserGetRequest21sp1

async with Client(
    host="your-broadworks.server",
    username="admin",
    password="your-password"
) as client:
    users = ["user1@domain.com", "user2@domain.com", "user3@domain.com"]

    responses = await client.command([
        UserGetRequest21sp1(user_id=user) for user in users
    ])

    for response in responses:
        print(f"{response.user_id}: {response.first_name}")

Commands are batched into groups of 15 per the OCI-P spec and sent concurrently across the connection pool. Responses are returned in the same order as the input commands.

Connection warming

Pre-create connections to avoid cold-start latency on bulk operations:

async with Client(
    host="your-broadworks.server",
    username="admin",
    password="your-password"
) as client:
    await client.warm(50)  # Create 50 connections upfront

    responses = await client.command([...])

Pool configuration

The connection pool can be configured for your specific workload:

from mercury_ocip_fast import Client
from mercury_ocip_fast.pool import PoolConfig

config = PoolConfig(
    max_connections=50,           # Max TCP connections to maintain
    max_concurrent_requests=100,  # Max in-flight requests at once
    connect_timeout=10.0,         # Timeout for establishing connection
    read_timeout=30.0,            # Timeout for reading response
    max_connection_age=300.0,     # Recycle connections after 5 minutes
    idle_timeout=60.0,            # Close idle connections after 1 minute
)

async with Client(
    host="your-broadworks.server",
    username="admin",
    password="your-password",
    config=config
) as client:
    pass

Start with conservative values and adjust based on your BroadWorks cluster capacity.

TLS and non-TLS

The default is TLS on port 2209:

async with Client(
    host="your-broadworks.server",
    username="admin",
    password="your-password"
) as client:
    pass

For non-TLS connections on port 2208:

async with Client(
    host="your-broadworks.server",
    port=2208,
    username="admin",
    password="your-password",
    tls=False
) as client:
    pass

The authentication flow adjusts automatically based on the TLS setting.

Response handling

Responses are parsed into Python objects:

from mercury_ocip_fast.commands.base_command import ErrorResponse

response = await client.command(some_command)

if isinstance(response, ErrorResponse):
    print(f"Error {response.error_code}: {response.summary}")
else:
    print(response.user_id)

For bulk operations, responses maintain the same order as the input commands:

commands = [cmd1, cmd2, cmd3]
responses = await client.command(commands)

for cmd, resp in zip(commands, responses):
    # Process each pair
    pass

Use cases

mercury-ocip is better for: - Scripts and automation - Interactive CLI tools - General purpose work

mercury-ocip-fast is better for: - Backend APIs and services - Bulk data migrations - High-volume reporting - Production workloads requiring stability and throughput

Both libraries use identical OCI-P command definitions, so code is portable between them.

Performance notes

This library can generate significant traffic quickly. BroadWorks clusters not sized for the load may experience impact. Consider:

  • Starting with lower concurrency settings
  • Monitoring cluster performance during bulk operations
  • Using connection warming selectively
  • Rate limiting if necessary

Example: Bulk user fetch

import asyncio
from mercury_ocip_fast import Client
from mercury_ocip_fast.commands import (
    GroupGetRequest,
    UserGetRequest21sp1
)

async def get_all_group_users(group_id: str):
    async with Client(
        host="broadworks.example.com",
        username="admin",
        password="secret"
    ) as client:
        group = await client.command(
            GroupGetRequest(service_provider_id="ent1", group_id=group_id)
        )

        await client.warm(min(50, len(group.user_ids) // 20))

        responses = await client.command([
            UserGetRequest21sp1(user_id=uid) for uid in group.user_ids
        ])

        return responses

users = asyncio.run(get_all_group_users("sales-team"))

API Reference

See the Commands Reference for available OCI-P commands.

Client

Async client for BroadWorks OCI-P API.

Parameters:

NameTypeDescriptionDefault
host

Hostname or IP address of the BroadWorks server.

required
port

Server port. Defaults to 2209 (TLS).

required
username

Authentication username.

required
password

Authentication password.

required
config

Connection pool configuration.

required
user_agent

User agent string for logging.

required
logger

Custom logger instance. Creates default if not provided.

required
session_id

Session identifier. Auto-generated if not provided.

required
tls

Use TLS encryption. Defaults to True.

required

Raises:

TypeDescription
MError

If authentication fails.

Source code in src/mercury_ocip_fast/client.py
@attrs.define(kw_only=True)
class Client:
    """Async client for BroadWorks OCI-P API.

    Args:
        host: Hostname or IP address of the BroadWorks server.
        port: Server port. Defaults to 2209 (TLS).
        username: Authentication username.
        password: Authentication password.
        config: Connection pool configuration.
        user_agent: User agent string for logging.
        logger: Custom logger instance. Creates default if not provided.
        session_id: Session identifier. Auto-generated if not provided.
        tls: Use TLS encryption. Defaults to True.

    Raises:
        MError: If authentication fails.
    """

    host: str
    port: int = 2209
    username: str
    password: str
    config: PoolConfig = attrs.Factory(PoolConfig)
    user_agent: str = "Broadworks SDK"
    session_id: str = attrs.Factory(lambda: str(uuid.uuid4()))
    tls: bool = True

    _authenticated: bool = attrs.field(default=False, init=False)
    _requester: AsyncTCPRequester = attrs.field(init=False)
    logger: logging.Logger = attrs.Factory(
        lambda self: self._set_up_logging(), takes_self=True
    )

    def __attrs_post_init__(self):
        self._requester = AsyncTCPRequester(
            host=self.host,
            port=self.port,
            config=self.config,
            tls=self.tls,
            session_id=self.session_id,
            logger=self.logger,
            auth_callback=self._create_auth_callback(),
        )

    def __getattr__(self, name):
        if name == "_dispatch_table":
            return FakeDispatchTable(self)
        raise AttributeError(f"'{type(self).__name__}' has no attribute '{name}'")

    async def __aenter__(self):
        return self

    async def __aexit__(self, _exc_type, _exc_val, _exc_tb):
        await self._disconnect()

    def _create_auth_callback(self):
        async def _authenticate(conn: PooledConnection) -> None:
            await self.authenticate(conn)

        return _authenticate

    @overload
    async def command(self, command: CommandInput) -> CommandResult: ...

    @overload
    async def command(self, command: list[CommandInput]) -> list[CommandResult]: ...

    async def command(
        self, command: Union[CommandInput, list[CommandInput]]
    ) -> Union[CommandResult, list[CommandResult]]:
        """Execute one or more OCI-P commands.

        Authenticates automatically if needed. Accepts either a single command
        or a list of commands for bulk execution.

        Args:
            command: A single command instance or list of command instances.

        Returns:
            Single CommandResult for single input, list of CommandResult for list input.

        Raises:
            MError: If a command fails or response cannot be parsed.
        """
        if not self._authenticated:
            await self.authenticate()

        if isinstance(command, list):
            self.logger.debug(
                f"Dispatching {len(command)} commands: "
                f"{[type(cmd).__name__ for cmd in command]}"
            )
            xml_commands = [cmd.to_xml() for cmd in command]
            responses = await self._requester.send_bulk_request(xml_commands)
            return self._receive_response(responses)
        else:
            self.logger.debug(f"Dispatching command: {type(command).__name__}")
            response = await self._requester.send_request(command.to_xml())
            return self._receive_response(response)

    async def warm(self, connection_amount: int | None = None) -> int:
        """Pre-warm the connection pool for faster bulk requests.

        Args:
            connection_amount: Number of connections to create. Defaults to pool max.

        Returns:
            Number of connections created.
        """
        return await self._requester.warm(connection_amount)

    async def authenticate(
        self, conn: Optional[PooledConnection] = None
    ) -> CommandResult:
        """Authenticate with the BroadWorks server.

        Uses TLS direct login or two-stage hashed password authentication
        depending on the tls setting. Called automatically by command() if needed.

        Returns:
            Login response from server, or None if already authenticated.

        Raises:
            MError: If authentication fails.
        """
        if conn is None and self._authenticated:
            return None

        if self.tls:
            self.logger.debug(f"Authenticating {self.username!r} via TLS (LoginRequest22V5)")
            login_request = LoginRequest22V5(
                user_id=self.username, password=self.password
            )

            login_response = self._receive_response(
                await self._requester.send_request(login_request.to_xml(), conn=conn)
            )

            if isinstance(login_response, ErrorResponse):
                self.logger.error(f"TLS authentication failed for {self.username!r}: {login_response.summary}")
                raise MError(f"Failed to authenticate: {login_response.summary}")

            self.logger.info(f"{self.username} Authenticated with server")
            self._authenticated = True
            return login_response
        else:
            # Non-TLS requires two-stage authentication with password hashing
            self.logger.debug(f"Authenticating {self.username!r} via non-TLS (two-stage)")
            auth_request = AuthenticationRequest(user_id=self.username)

            auth_response = self._receive_response(
                await self._requester.send_request(auth_request.to_xml(), conn=conn)
            )

            if isinstance(auth_response, ErrorResponse):
                self.logger.error(f"AuthenticationRequest failed for {self.username!r}: {auth_response.summary}")
                raise MError(f"Auth request failed: {auth_response.summary}")

            if not isinstance(auth_response, AuthenticationResponse):
                raise MError("Unexpected response type from AuthenticationRequest")

            self.logger.debug(f"Received nonce, signing password for {self.username!r}")
            authhash: str = hashlib.sha1(self.password.encode()).hexdigest().lower()
            signed_password: str = (
                hashlib.md5(f"{auth_response.nonce}:{authhash}".encode())
                .hexdigest()
                .lower()
            )

            # Complete login with signed password
            self.logger.debug(f"Sending LoginRequest14sp4 for {self.username!r}")
            login_request = LoginRequest14sp4(
                user_id=self.username, signed_password=signed_password
            )

            login_response = self._receive_response(
                await self._requester.send_request(login_request.to_xml(), conn=conn)
            )

            if isinstance(login_response, ErrorResponse):
                self.logger.error(f"Login failed for {self.username!r}: {login_response.summary}")
                raise MError(f"Failed to authenticate: {login_response.summary}")

            self.logger.info("Authenticated with server")
            self._authenticated = True
            return login_response

    def _receive_response(
        self, response: Union[RequestResult, list[str]]
    ) -> Union[CommandResult, list[CommandResult]]:
        """Parse requester response into command result(s).

        Handles both single responses and batch responses. Batch responses
        (from send_bulk_request) may contain multiple commands per XML document,
        which are flattened into a single list.

        Args:
            response: Single response string or list of batch response strings.

        Returns:
            Single CommandResult for single input, flattened list for batch input.

        Raises:
            MError: If response is an error or cannot be parsed.
        """
        if isinstance(response, MError):
            raise response

        if isinstance(response, list):
            results: list[CommandResult] = []
            for batch_xml in response:
                batch_results = self._parse_response(batch_xml)
                if isinstance(batch_results, list):
                    results.extend(batch_results)
                else:
                    results.append(batch_results)
            return results

        if isinstance(response, str):
            return self._parse_response(response)

        raise MError("Unexpected response type")

    def _parse_response(
        self, response: str
    ) -> Union[CommandResult, list[CommandResult]]:
        """Parse XML response string into command result object(s).

        Handles responses with single or multiple command elements.

        Args:
            response: Raw XML response string from the server.

        Returns:
            Single CommandResult or list if response contains multiple commands.

        Raises:
            MError: If response cannot be parsed or command type is unknown.
        """
        response_dict = Parser.to_dict_from_xml(response)
        command_data = response_dict.get("command")

        if command_data is None:
            return SuccessResponse()

        if isinstance(command_data, list):
            return [self._parse_single_command(cmd) for cmd in command_data]

        if isinstance(command_data, dict):
            return self._parse_single_command(command_data)

        return SuccessResponse()

    def _parse_single_command(self, command_data: dict) -> CommandResult:
        """Parse a single command dict into a CommandResult.

        Args:
            command_data: Parsed command dictionary from XML.

        Returns:
            Parsed command result object.

        Raises:
            MError: If command type cannot be determined or is unknown.
        """
        type_name: Union[str, None] = command_data.get("attributes", {}).get(
            "{http://www.w3.org/2001/XMLSchema-instance}type"
        )

        if not type_name or not isinstance(type_name, str):
            raise MError("Failed to parse response object")

        if ":" in type_name:
            type_name = type_name.split(":", 1)[1]

        self.logger.debug(f"Parsing response type: {type_name}")

        if type_name == "ErrorResponse":
            result = ErrorResponse.from_dict(command_data)
            self.logger.debug(f"ErrorResponse received: {result.summary!r}")
            return result
        elif type_name == "SuccessResponse":
            self.logger.debug("SuccessResponse received")
            return SuccessResponse.from_dict(command_data)

        response_class = getattr(commands, type_name, None)

        if not response_class:
            raise MError(f"Failed To Find Raw Response Type: {type_name}")

        self.logger.debug(f"Resolved response class: {response_class.__name__}")
        return response_class.from_dict(command_data)

    async def _disconnect(self, wait_timeout: float = 10.0) -> None:
        """Disconnect from the server and close the connection pool.

        Args:
            wait_timeout: Maximum seconds to wait for in-flight operations to complete.
        """
        self._authenticated = False
        self.session_id = ""
        await self._requester.close(wait_timeout=wait_timeout)

    async def shutdown(self, wait_timeout: float = 30.0) -> None:
        """Gracefully shutdown the client, waiting for all operations to complete.

        Args:
            wait_timeout: Maximum seconds to wait for in-flight operations to complete.
        """
        self.logger.info("Initiating graceful shutdown...")
        await self._disconnect(wait_timeout=wait_timeout)
        self.logger.info("Client shutdown complete")

    @property
    def pool_stats(self) -> dict[str, int]:
        """Get current connection pool statistics for monitoring.

        Returns:
            Dictionary containing pool metrics:
            - total_connections: Total number of connections created
            - available: Number of connections available in the pool
            - in_use: Number of connections currently in use
            - waiting: Number of tasks waiting for a connection
            - max_connections: Maximum allowed connections
            - max_concurrent: Maximum concurrent requests allowed

        Usage:
            stats = client.pool_stats
            print(f"Pool usage: {stats['in_use']}/{stats['max_connections']}")
        """
        if self._requester and self._requester._pool:
            return self._requester._pool.stats
        return {
            "total_connections": 0,
            "available": 0,
            "in_use": 0,
            "waiting": 0,
            "max_connections": self.config.max_connections,
            "max_concurrent": self.config.max_concurrent_requests,
        }

    def _set_up_logging(self) -> logging.Logger:
        """Create default logger with WARNING level console output."""
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.WARNING)

        # Only add handler if none exist to prevent handler accumulation
        if not logger.hasHandlers():
            console_handler = logging.StreamHandler(sys.stdout)
            console_handler.setLevel(logging.WARNING)
            logger.addHandler(console_handler)

        return logger

pool_stats property

Get current connection pool statistics for monitoring.

Returns:

TypeDescription
dict[str, int]

Dictionary containing pool metrics:

dict[str, int]
  • total_connections: Total number of connections created
dict[str, int]
  • available: Number of connections available in the pool
dict[str, int]
  • in_use: Number of connections currently in use
dict[str, int]
  • waiting: Number of tasks waiting for a connection
dict[str, int]
  • max_connections: Maximum allowed connections
dict[str, int]
  • max_concurrent: Maximum concurrent requests allowed
Usage

stats = client.pool_stats print(f"Pool usage: {stats['in_use']}/{stats['max_connections']}")

authenticate(conn=None) async

Authenticate with the BroadWorks server.

Uses TLS direct login or two-stage hashed password authentication depending on the tls setting. Called automatically by command() if needed.

Returns:

TypeDescription
CommandResult

Login response from server, or None if already authenticated.

Raises:

TypeDescription
MError

If authentication fails.

Source code in src/mercury_ocip_fast/client.py
async def authenticate(
    self, conn: Optional[PooledConnection] = None
) -> CommandResult:
    """Authenticate with the BroadWorks server.

    Uses TLS direct login or two-stage hashed password authentication
    depending on the tls setting. Called automatically by command() if needed.

    Returns:
        Login response from server, or None if already authenticated.

    Raises:
        MError: If authentication fails.
    """
    if conn is None and self._authenticated:
        return None

    if self.tls:
        self.logger.debug(f"Authenticating {self.username!r} via TLS (LoginRequest22V5)")
        login_request = LoginRequest22V5(
            user_id=self.username, password=self.password
        )

        login_response = self._receive_response(
            await self._requester.send_request(login_request.to_xml(), conn=conn)
        )

        if isinstance(login_response, ErrorResponse):
            self.logger.error(f"TLS authentication failed for {self.username!r}: {login_response.summary}")
            raise MError(f"Failed to authenticate: {login_response.summary}")

        self.logger.info(f"{self.username} Authenticated with server")
        self._authenticated = True
        return login_response
    else:
        # Non-TLS requires two-stage authentication with password hashing
        self.logger.debug(f"Authenticating {self.username!r} via non-TLS (two-stage)")
        auth_request = AuthenticationRequest(user_id=self.username)

        auth_response = self._receive_response(
            await self._requester.send_request(auth_request.to_xml(), conn=conn)
        )

        if isinstance(auth_response, ErrorResponse):
            self.logger.error(f"AuthenticationRequest failed for {self.username!r}: {auth_response.summary}")
            raise MError(f"Auth request failed: {auth_response.summary}")

        if not isinstance(auth_response, AuthenticationResponse):
            raise MError("Unexpected response type from AuthenticationRequest")

        self.logger.debug(f"Received nonce, signing password for {self.username!r}")
        authhash: str = hashlib.sha1(self.password.encode()).hexdigest().lower()
        signed_password: str = (
            hashlib.md5(f"{auth_response.nonce}:{authhash}".encode())
            .hexdigest()
            .lower()
        )

        # Complete login with signed password
        self.logger.debug(f"Sending LoginRequest14sp4 for {self.username!r}")
        login_request = LoginRequest14sp4(
            user_id=self.username, signed_password=signed_password
        )

        login_response = self._receive_response(
            await self._requester.send_request(login_request.to_xml(), conn=conn)
        )

        if isinstance(login_response, ErrorResponse):
            self.logger.error(f"Login failed for {self.username!r}: {login_response.summary}")
            raise MError(f"Failed to authenticate: {login_response.summary}")

        self.logger.info("Authenticated with server")
        self._authenticated = True
        return login_response

command(command) async

command(command: CommandInput) -> CommandResult
command(command: list[CommandInput]) -> list[CommandResult]

Execute one or more OCI-P commands.

Authenticates automatically if needed. Accepts either a single command or a list of commands for bulk execution.

Parameters:

NameTypeDescriptionDefault
commandUnion[CommandInput, list[CommandInput]]

A single command instance or list of command instances.

required

Returns:

TypeDescription
Union[CommandResult, list[CommandResult]]

Single CommandResult for single input, list of CommandResult for list input.

Raises:

TypeDescription
MError

If a command fails or response cannot be parsed.

Source code in src/mercury_ocip_fast/client.py
async def command(
    self, command: Union[CommandInput, list[CommandInput]]
) -> Union[CommandResult, list[CommandResult]]:
    """Execute one or more OCI-P commands.

    Authenticates automatically if needed. Accepts either a single command
    or a list of commands for bulk execution.

    Args:
        command: A single command instance or list of command instances.

    Returns:
        Single CommandResult for single input, list of CommandResult for list input.

    Raises:
        MError: If a command fails or response cannot be parsed.
    """
    if not self._authenticated:
        await self.authenticate()

    if isinstance(command, list):
        self.logger.debug(
            f"Dispatching {len(command)} commands: "
            f"{[type(cmd).__name__ for cmd in command]}"
        )
        xml_commands = [cmd.to_xml() for cmd in command]
        responses = await self._requester.send_bulk_request(xml_commands)
        return self._receive_response(responses)
    else:
        self.logger.debug(f"Dispatching command: {type(command).__name__}")
        response = await self._requester.send_request(command.to_xml())
        return self._receive_response(response)

shutdown(wait_timeout=30.0) async

Gracefully shutdown the client, waiting for all operations to complete.

Parameters:

NameTypeDescriptionDefault
wait_timeoutfloat

Maximum seconds to wait for in-flight operations to complete.

30.0
Source code in src/mercury_ocip_fast/client.py
async def shutdown(self, wait_timeout: float = 30.0) -> None:
    """Gracefully shutdown the client, waiting for all operations to complete.

    Args:
        wait_timeout: Maximum seconds to wait for in-flight operations to complete.
    """
    self.logger.info("Initiating graceful shutdown...")
    await self._disconnect(wait_timeout=wait_timeout)
    self.logger.info("Client shutdown complete")

warm(connection_amount=None) async

Pre-warm the connection pool for faster bulk requests.

Parameters:

NameTypeDescriptionDefault
connection_amountint | None

Number of connections to create. Defaults to pool max.

None

Returns:

TypeDescription
int

Number of connections created.

Source code in src/mercury_ocip_fast/client.py
async def warm(self, connection_amount: int | None = None) -> int:
    """Pre-warm the connection pool for faster bulk requests.

    Args:
        connection_amount: Number of connections to create. Defaults to pool max.

    Returns:
        Number of connections created.
    """
    return await self._requester.warm(connection_amount)