ptrlib.connection.tube

Abstract base class for bidirectional, buffered byte-stream communication.

The Tube class provides a unified interface for interacting with various types of I/O streams (e.g., sockets, subprocesses, pipes) in a thread-safe and buffered manner. It supports high-level methods for sending and receiving data, line-based and regex-based reading, interactive sessions, and timeout management. Subclasses must implement low-level transport-specific hooks.

Key features: - Buffered, thread-safe I/O with support for custom delimiters and regex patterns. - Flexible send/receive methods, including sendline, recvall, recvline, and recvuntil. - Interactive mode for user-driven sessions (with TTY/raw support). - Configurable debugging and logging. - Abstract methods for transport-specific implementations.

Exceptions

TubeTimeout

Timeout with captured partial data.

Classes

Tube

Abstract base for bidirectional, buffered byte-stream communication.

Module Contents

class ptrlib.connection.tube.Tube(debug: bool | DebugModeT = False, pcap: str | None = None, quiet: bool = False)[source]

Abstract base for bidirectional, buffered byte-stream communication.

Subclasses must implement the low-level hooks: - _recv_impl - _send_impl - _close_impl - _close_recv_impl - _close_send_impl - _settimeout_impl - _gettimeout_impl

High-level methods (recv*, send*) operate against an internal buffer and respect the instance-wide timeout via the timeout(…) context manager.

property debug: str[source]

Debug mode for I/O tracing.

When enabled, incoming and outgoing data is printed.

Values: - ‘none’ : Disable debugging. - ‘plain’ : Print incoming bytes as UTF-8. - ‘hex’ : Print incoming bytes as a hexdump-like view. - True -> ‘hex’ - False -> ‘none’

Examples

sock = Socket("...", debug=True)
sock.debug = False
sock.debug = 'plain'
sock.debug = 'hex'
property newline: bytes[source]

A byte sequence considered as newline terminators.

Examples

p = Process(["wine", "a.exe"])
p.newline = [b"\n", b"\r\n"]
sock = Socket("localhost", 80)
sock.newline = "\r\n"
property newlines: list[bytes][source]

List of byte sequences considered as newline terminators.

Examples

p = Process(["wine", "a.exe"])
p.newline = [b"\n", b"\r\n"]
p.newline   # b"\n"
p.newlines  # [b"\n", b"\r\n"]
property prompt: str[source]

Prompt string displayed in the interactive mode.

property pcap: ptrlib.filestruct.pcap.PcapFile[source]

PCAP file for the connection.

recv(blocksize: int = 4096, timeout: int | float = -1) bytes[source]

Receive up to blocksize bytes.

Parameters:
  • blocksize – Maximum size for each low-level read.

  • timeout – Timeout for each low-level read operation.

Behavior:
  • If the internal buffer holds data, return up to blocksize bytes from it.

  • If the buffer is empty, perform a single low-level read via _recv_impl(blocksize) under the timeout context.

Returns:

Data received from the stream.

Return type:

bytes

Raises:
  • EOFError – If the connection is closed before any data is received.

  • ValueError – If blocksize is negative.

  • TubeTimeout – If the operation timed out.

  • OSError – If a system error occurred.

recvall(size: int = -1, blocksize: int = 4096, timeout: int | float = -1) bytes[source]

Receive all requested data.

Parameters:
  • size – Number of bytes to read. If size >= 0, read exactly size bytes (raise EOFError on early EOF). If size == -1, read until EOF and return everything, including buffered bytes.

  • blocksize – Maximum size for each low-level read.

  • timeout – Timeout for each low-level read operation.

Returns:

Data received from the stream.

Return type:

bytes

Raises:
  • EOFError – When size >= 0 but the connection ends before enough data is received.

  • ValueError – If size < -1 or blocksize is negative.

  • TubeTimeout – If size >= 0 and the operation timed out.

  • OSError – If size >= 0 and a system error occurred.

recvline(blocksize: int = 4096, timeout: int | float = -1, drop: bool = True, consume: bool = True) bytes[source]

Read until any configured newline delimiter is encountered.

Parameters:
  • blocksize – Maximum size for each low-level read attempt.

  • timeout – Timeout for each low-level read operation.

  • drop – If True, exclude the newline delimiter from the returned bytes.

  • consume – If True, remove the delimiter from the internal buffer.

Returns:

Data up to (and optionally including) the newline delimiter.

Return type:

bytes

Raises:
  • EOFError – If the connection is closed before a newline is received.

  • ValueError – If blocksize is negative.

  • TubeTimeout – If the operation timed out.

  • OSError – If a system error occurred.

recvregex(regex: RegexDelimiterT, blocksize: int = 4096, timeout: int | float = -1, consume: bool = True) re.Match[source]

Block until any of the regex patterns matches and return the match object.

The search is performed against the internal buffer. If no match is found, more data is read from the underlying endpoint in chunks of blocksize until a match is found or EOF occurs.

Parameters:
  • regex – A single pattern or a list of patterns (bytes/str/compiled).

  • blocksize – Number of bytes to request per low-level read.

  • timeout – Timeout for each low-level read operation.

  • consume – If True, bytes up to the end of the match are removed from the internal buffer.

Returns:

The first match found (ties broken by the earliest match end).

Return type:

re.Match

Raises:
  • EOFError – If the connection is closed before a match is found.

  • ValueError – If blocksize is negative.

  • TubeTimeout – If the operation timed out.

  • OSError – If a system error occurred.

recvuntil(delim: DelimiterT | None = None, blocksize: int = 4096, regex: RegexDelimiterT | None = None, timeout: int | float = -1, drop: bool = False, consume: bool = True) bytes[source]

Receive until a delimiter or regex match is found.

Exactly one of delim or regex must be provided.

Parameters:
  • delim – Delimiter(s) to wait for. A single bytes/str or a list of such values. If a list is given, the method stops at the earliest occurrence of any delimiter.

  • blocksize – Number of bytes to request per low-level read.

  • regex – A single regex pattern or a list of patterns. Patterns may be bytes, str, or compiled re.Pattern. The method stops at the earliest match among all patterns.

  • timeout – Timeout for each low-level read operation.

  • drop – If True, exclude the newline delimiter from the returned bytes.

  • consume – If True, remove the delimiter from the internal buffer.

Returns:

Data up to (and optionally including) the match. When regex is used,

this method still returns bytes, not a match object.

Return type:

bytes

Raises:
  • EOFError – If the connection is closed before a delimiter or pattern is found.

  • ValueError – If both or neither of delim and regex are provided, or if blocksize is negative.

  • TubeTimeout – If the operation timed out.

  • OSError – If a system error occurred.

after(delim: DelimiterT | None = None, blocksize: int = 4096, regex: RegexDelimiterT | None = None, timeout: int | float = -1) Tube[source]

Wait for a delimiter (or regex) and then return self.

Useful for chained calls like:
tube.after(b'Name: ').sendline(name)
leak = tube.after(regex=r'Hello, .{32}').recvline()
Parameters:
  • delim – Delimiter(s) to wait for. A single bytes/str or a list of such values. If a list is given, the method stops at the earliest occurrence of any delimiter.

  • blocksize – Number of bytes to request per low-level read.

  • regex – A single regex pattern or a list of patterns. Patterns may be bytes, str, or compiled re.Pattern. The method stops at the earliest match among all patterns.

  • timeout – Timeout for each low-level read operation.

Returns:

The current tube instance.

Return type:

Tube

Raises:
  • EOFError – If the connection is closed before a delimiter or pattern is found.

  • ValueError – If both or neither of delim and regex are provided, or if blocksize is negative.

  • TubeTimeout – If the operation timed out.

  • OSError – If a system error occurred.

send(data: str | bytes) int[source]

Send a single chunk of data

This is a thin wrapper that issues one low-level write. It may send fewer bytes than provided, depending on the underlying endpoint.

Parameters:

data – Data to send.

Returns:

Number of bytes actually written.

Return type:

int

Raises:
sendall(data: str | bytes) int[source]

Send all bytes, blocking until everything is written (or an error occurs).

Parameters:

data – Data to send.

Returns:

Total number of bytes written (== len(data) on success).

Return type:

int

Raises:
sendline(data: AtomicSendT | list[AtomicSendT]) int[source]

Send data followed by the current newline delimiter.

The first configured newline sequence (self.newline[0]) is used. If none is configured, b'\n' is assumed.

Parameters:

data – String, bytes, int, float, or a list of such values. Integer and float values will be converted to strings. If a list is provided, each element will be sent followed by the newline.

Returns:

Total number of bytes written (payload + newline).

Return type:

int

Raises:
sendkey()[source]
peek(size: int = -1, blocksize: int = 4096, timeout: int | float = -1) bytes[source]

Return exactly size bytes from the internal buffer without consuming them.

Parameters:

size – The number of bytes to return. If size < 0, return the entire buffer.

Returns:

bytes

unget(data: str | bytes) None[source]

Push data back to the front of the internal buffer.

The next receive operation will return these bytes first.

Parameters:

data – Bytes to be re-inserted at the start of the buffer.

defer_after()[source]

Defer after() waits to enable simple request pipelining.

Within this context, after(…) does not block; the wait is queued. Before any receive that consumes data (recv*, peek), all queued waits are flushed in order, then the receive proceeds. On exit, the outermost block also flushes remaining waits (nested blocks supported).

Only after() (and thus sendafter/sendlineafter) is deferred; recvuntil and other recv methods always flush first.

Example: .. code-block:: python

with tube.defer_after():

tube.after(b”> “).sendline(b”1”) tube.after(b”Message: “).send(b”bye”) data = tube.recvall(8) tube.sendlineafter(b”> “, b”2”)

# is equivalent to

tube.sendline(b”1”) tube.send(b”bye”) tube.recvuntil(b”> “) tube.recvuntil(b”Message: “) data = tube.recvall(8) tube.send(b”2”) tube.recvuntil(b”> “)

close()[source]

Close the tube and release any resources.

close_recv()[source]

Close the receive end of the tube.

close_send()[source]

Close the send end of the tube.

interactive(prompt: str | None = None, use_tty: bool = False, is_raw: bool = False, encoding: str = 'utf-8', blocksize: int = 4096, *, readline: Callable[[], str] | None = None, oninterrupt: Callable[[], bool] | None = None, onexit: Callable[[], None] | None = None, ansi_on_windows: bool = True)[source]

Run an interactive session with the remote endpoint.

Two-thread model:
  • RX thread: continuously receives bytes and prints them to stdout.

  • TX thread: prompts and reads user input, then sends it to the peer.

Output formatting:
  • If is_raw is True, received bytes are written to stdout.buffer as-is.

  • If is_raw is False, bytes are decoded with encoding and undecodable bytes are rendered as \xNN using errors='backslashreplace'.

Input modes:
  • Line mode (default): each line is sent with the tube’s newline sequence (via sendline). The line source is readline() if provided; otherwise a built-in line reader is used.

  • Key passthrough (use_tty=True): character-at-a-time mode. On POSIX, the local terminal is switched to raw mode so that arrow keys and ESC sequences are passed unchanged. On Windows, raw keystrokes are read via msvcrt; if ansi_on_windows is True, common special keys (arrows) are translated to ANSI escape sequences (e.g. Up => \x1b[A).

Session control:
  • oninterrupt is called when the user hits Ctrl-C in line mode. If it returns True, the session continues; otherwise the session ends.

  • In key passthrough mode, Ctrl-C is sent to the peer as \x03. Use escape_key (default: Ctrl-]) to locally end the session.

  • onexit is called once both threads finish and just before returning.

This method blocks until the session ends and swallows network I/O errors inside worker threads (they terminate the session instead of raising here).

Parameters:
  • prompt – Prompt string shown in line mode.

  • use_tty – Enable character-at-a-time mode for TUI/curses programs.

  • is_raw – If True, print incoming bytes as-is without decoding. Always treated as True if use_tty is set to True.

  • encoding – Text encoding used when is_raw is False (default: UTF-8).

  • readline – Optional callable that returns a single input line (without EOL).

  • oninterrupt – Callback invoked on KeyboardInterrupt in line mode. Return True to continue, False/None to end the session.

  • onexit – Callback invoked when the interactive session ends.

  • ansi_on_windows – Windows only—translate arrow keys to ANSI if True.

Raises:
  • ValueError – If use_tty is requested but stdin is not a TTY.

  • RuntimeError – If platform lacks required TTY support in passthrough mode.

sh(*args, **kwargs)[source]
timeout(timeout: int | float)[source]

Temporarily set an I/O timeout for the enclosed operations.

The previous timeout value is restored after the context exits.

Parameters:

timeout – The timeout value in seconds. If timeout is negative, the timeout is temporarily disabled.

Examples

with tube.timeout(5):
    line = tube.recvline()
recvlineafter(delim: DelimiterT, blocksize: int = 4096, regex: RegexDelimiterT | None = None, timeout: int | float = -1, drop: bool = True, consume: bool = True) bytes[source]

Wait for a delimiter (or regex), then read a line.

Note

This method is deprecated. Use after(delim, blocksize, regex, timeout).recvline(…) instead.

sendlineafter(delim: DelimiterT, data: str | bytes, blocksize: int = 4096, regex: RegexDelimiterT | None = None, timeout: int | float = -1) int[source]

Wait for a delimiter (or regex), then send a line.

Note

This method is deprecated. Use after(delim, blocksize, regex, timeout).sendline(…) instead.

sendafter(delim: DelimiterT, data: str | bytes, blocksize: int = 4096, regex: RegexDelimiterT | None = None, timeout: int | float = -1) int[source]

Wait for a delimiter (or regex), then send data.

Note

This method is deprecated. Use after(delim, blocksize, regex, timeout).send(…) instead.

shutdown(target: Literal['send', 'recv'])[source]

Shut down a specific connection.

Parameters:

target (str) – Connection to close (send or recv)

Examples

The following code shuts down input of remote. .. code-block:: python

tube.shutdown(“send”) data = tube.recv() # OK tube.send(b”data”) # NG

The following code shuts down output of remote. .. code-block:: python

tube.shutdown(“recv”) tube.send(b”data”) # OK data = tube.recv() # NG

exception ptrlib.connection.tube.TubeTimeout(message: str, *, buffered: bytes = b'')[source]

Bases: TimeoutError

Timeout with captured partial data.

The partial data read up to the timeout is available as .buffered.

Note

This exception subclasses TimeoutError, so existing except TimeoutError handlers will still work. Prefer catching TubeTimeout when you need the buffered data.

buffered[source]