ptrlib.connection.tube¶
Abstract base class for bidirectional, buffered byte-stream communication.
The Tube class provides a unified interface for interacting with various types of I/O streams (e.g., sockets, subprocesses, pipes) in a thread-safe and buffered manner. It supports high-level methods for sending and receiving data, line-based and regex-based reading, interactive sessions, and timeout management. Subclasses must implement low-level transport-specific hooks.
Key features: - Buffered, thread-safe I/O with support for custom delimiters and regex patterns. - Flexible send/receive methods, including sendline, recvall, recvline, and recvuntil. - Interactive mode for user-driven sessions (with TTY/raw support). - Configurable debugging and logging. - Abstract methods for transport-specific implementations.
Exceptions¶
Timeout with captured partial data. |
Classes¶
Abstract base for bidirectional, buffered byte-stream communication. |
Module Contents¶
- class ptrlib.connection.tube.Tube(debug: bool | DebugModeT = False, pcap: str | None = None, quiet: bool = False)[source]¶
Abstract base for bidirectional, buffered byte-stream communication.
Subclasses must implement the low-level hooks: - _recv_impl - _send_impl - _close_impl - _close_recv_impl - _close_send_impl - _settimeout_impl - _gettimeout_impl
High-level methods (recv*, send*) operate against an internal buffer and respect the instance-wide timeout via the timeout(…) context manager.
- property debug: str[source]¶
Debug mode for I/O tracing.
When enabled, incoming and outgoing data is printed.
Values: - ‘none’ : Disable debugging. - ‘plain’ : Print incoming bytes as UTF-8. - ‘hex’ : Print incoming bytes as a hexdump-like view. - True -> ‘hex’ - False -> ‘none’
Examples
sock = Socket("...", debug=True) sock.debug = False sock.debug = 'plain' sock.debug = 'hex'
- property newline: bytes[source]¶
A byte sequence considered as newline terminators.
Examples
p = Process(["wine", "a.exe"]) p.newline = [b"\n", b"\r\n"] sock = Socket("localhost", 80) sock.newline = "\r\n"
- property newlines: list[bytes][source]¶
List of byte sequences considered as newline terminators.
Examples
p = Process(["wine", "a.exe"]) p.newline = [b"\n", b"\r\n"] p.newline # b"\n" p.newlines # [b"\n", b"\r\n"]
- recv(blocksize: int = 4096, timeout: int | float = -1) bytes[source]¶
Receive up to
blocksizebytes.- Parameters:
blocksize – Maximum size for each low-level read.
timeout – Timeout for each low-level read operation.
- Behavior:
If the internal buffer holds data, return up to
blocksizebytes from it.If the buffer is empty, perform a single low-level read via
_recv_impl(blocksize)under the timeout context.
- Returns:
Data received from the stream.
- Return type:
- Raises:
EOFError – If the connection is closed before any data is received.
ValueError – If
blocksizeis negative.TubeTimeout – If the operation timed out.
OSError – If a system error occurred.
- recvall(size: int = -1, blocksize: int = 4096, timeout: int | float = -1) bytes[source]¶
Receive all requested data.
- Parameters:
size – Number of bytes to read. If
size >= 0, read exactlysizebytes (raiseEOFErroron early EOF). Ifsize == -1, read until EOF and return everything, including buffered bytes.blocksize – Maximum size for each low-level read.
timeout – Timeout for each low-level read operation.
- Returns:
Data received from the stream.
- Return type:
- Raises:
EOFError – When
size >= 0but the connection ends before enough data is received.ValueError – If
size < -1orblocksizeis negative.TubeTimeout – If
size >= 0and the operation timed out.OSError – If
size >= 0and a system error occurred.
- recvline(blocksize: int = 4096, timeout: int | float = -1, drop: bool = True, consume: bool = True) bytes[source]¶
Read until any configured newline delimiter is encountered.
- Parameters:
blocksize – Maximum size for each low-level read attempt.
timeout – Timeout for each low-level read operation.
drop – If True, exclude the newline delimiter from the returned bytes.
consume – If True, remove the delimiter from the internal buffer.
- Returns:
Data up to (and optionally including) the newline delimiter.
- Return type:
- Raises:
EOFError – If the connection is closed before a newline is received.
ValueError – If
blocksizeis negative.TubeTimeout – If the operation timed out.
OSError – If a system error occurred.
- recvregex(regex: RegexDelimiterT, blocksize: int = 4096, timeout: int | float = -1, consume: bool = True) re.Match[source]¶
Block until any of the regex patterns matches and return the match object.
The search is performed against the internal buffer. If no match is found, more data is read from the underlying endpoint in chunks of
blocksizeuntil a match is found or EOF occurs.- Parameters:
regex – A single pattern or a list of patterns (bytes/str/compiled).
blocksize – Number of bytes to request per low-level read.
timeout – Timeout for each low-level read operation.
consume – If True, bytes up to the end of the match are removed from the internal buffer.
- Returns:
The first match found (ties broken by the earliest match end).
- Return type:
- Raises:
EOFError – If the connection is closed before a match is found.
ValueError – If
blocksizeis negative.TubeTimeout – If the operation timed out.
OSError – If a system error occurred.
- recvuntil(delim: DelimiterT | None = None, blocksize: int = 4096, regex: RegexDelimiterT | None = None, timeout: int | float = -1, drop: bool = False, consume: bool = True) bytes[source]¶
Receive until a delimiter or regex match is found.
Exactly one of
delimorregexmust be provided.- Parameters:
delim – Delimiter(s) to wait for. A single
bytes/stror a list of such values. If a list is given, the method stops at the earliest occurrence of any delimiter.blocksize – Number of bytes to request per low-level read.
regex – A single regex pattern or a list of patterns. Patterns may be
bytes,str, or compiledre.Pattern. The method stops at the earliest match among all patterns.timeout – Timeout for each low-level read operation.
drop – If True, exclude the newline delimiter from the returned bytes.
consume – If True, remove the delimiter from the internal buffer.
- Returns:
- Data up to (and optionally including) the match. When
regexis used, this method still returns bytes, not a match object.
- Data up to (and optionally including) the match. When
- Return type:
- Raises:
EOFError – If the connection is closed before a delimiter or pattern is found.
ValueError – If both or neither of
delimandregexare provided, or ifblocksizeis negative.TubeTimeout – If the operation timed out.
OSError – If a system error occurred.
- after(delim: DelimiterT | None = None, blocksize: int = 4096, regex: RegexDelimiterT | None = None, timeout: int | float = -1) Tube[source]¶
Wait for a delimiter (or regex) and then return self.
- Useful for chained calls like:
tube.after(b'Name: ').sendline(name) leak = tube.after(regex=r'Hello, .{32}').recvline()
- Parameters:
delim – Delimiter(s) to wait for. A single
bytes/stror a list of such values. If a list is given, the method stops at the earliest occurrence of any delimiter.blocksize – Number of bytes to request per low-level read.
regex – A single regex pattern or a list of patterns. Patterns may be
bytes,str, or compiledre.Pattern. The method stops at the earliest match among all patterns.timeout – Timeout for each low-level read operation.
- Returns:
The current tube instance.
- Return type:
- Raises:
EOFError – If the connection is closed before a delimiter or pattern is found.
ValueError – If both or neither of
delimandregexare provided, or ifblocksizeis negative.TubeTimeout – If the operation timed out.
OSError – If a system error occurred.
- send(data: str | bytes) int[source]¶
Send a single chunk of data
This is a thin wrapper that issues one low-level write. It may send fewer bytes than provided, depending on the underlying endpoint.
- Parameters:
data – Data to send.
- Returns:
Number of bytes actually written.
- Return type:
- Raises:
BrokenPipeError – If the send-side has been closed by the peer.
OSError – If a system error occurred.
- sendall(data: str | bytes) int[source]¶
Send all bytes, blocking until everything is written (or an error occurs).
- Parameters:
data – Data to send.
- Returns:
Total number of bytes written (== len(data) on success).
- Return type:
- Raises:
BrokenPipeError – If the send-side has been closed by the peer.
OSError – If a system error occurred.
- sendline(data: AtomicSendT | list[AtomicSendT]) int[source]¶
Send data followed by the current newline delimiter.
The first configured newline sequence (
self.newline[0]) is used. If none is configured,b'\n'is assumed.- Parameters:
data – String, bytes, int, float, or a list of such values. Integer and float values will be converted to strings. If a list is provided, each element will be sent followed by the newline.
- Returns:
Total number of bytes written (payload + newline).
- Return type:
- Raises:
BrokenPipeError – If the send-side has been closed by the peer.
OSError – If a system error occurred.
- peek(size: int = -1, blocksize: int = 4096, timeout: int | float = -1) bytes[source]¶
Return exactly
sizebytes from the internal buffer without consuming them.- Parameters:
size – The number of bytes to return. If
size < 0, return the entire buffer.- Returns:
bytes
- unget(data: str | bytes) None[source]¶
Push
databack to the front of the internal buffer.The next receive operation will return these bytes first.
- Parameters:
data – Bytes to be re-inserted at the start of the buffer.
- defer_after()[source]¶
Defer after() waits to enable simple request pipelining.
Within this context, after(…) does not block; the wait is queued. Before any receive that consumes data (recv*, peek), all queued waits are flushed in order, then the receive proceeds. On exit, the outermost block also flushes remaining waits (nested blocks supported).
Only after() (and thus sendafter/sendlineafter) is deferred; recvuntil and other recv methods always flush first.
Example: .. code-block:: python
- with tube.defer_after():
tube.after(b”> “).sendline(b”1”) tube.after(b”Message: “).send(b”bye”) data = tube.recvall(8) tube.sendlineafter(b”> “, b”2”)
# is equivalent to
tube.sendline(b”1”) tube.send(b”bye”) tube.recvuntil(b”> “) tube.recvuntil(b”Message: “) data = tube.recvall(8) tube.send(b”2”) tube.recvuntil(b”> “)
- interactive(prompt: str | None = None, use_tty: bool = False, is_raw: bool = False, encoding: str = 'utf-8', blocksize: int = 4096, *, readline: Callable[[], str] | None = None, oninterrupt: Callable[[], bool] | None = None, onexit: Callable[[], None] | None = None, ansi_on_windows: bool = True)[source]¶
Run an interactive session with the remote endpoint.
- Two-thread model:
RX thread: continuously receives bytes and prints them to stdout.
TX thread: prompts and reads user input, then sends it to the peer.
- Output formatting:
If
is_rawis True, received bytes are written tostdout.bufferas-is.If
is_rawis False, bytes are decoded withencodingand undecodable bytes are rendered as\xNNusingerrors='backslashreplace'.
- Input modes:
Line mode (default): each line is sent with the tube’s newline sequence (via
sendline). The line source isreadline()if provided; otherwise a built-in line reader is used.Key passthrough (
use_tty=True): character-at-a-time mode. On POSIX, the local terminal is switched to raw mode so that arrow keys and ESC sequences are passed unchanged. On Windows, raw keystrokes are read viamsvcrt; ifansi_on_windowsis True, common special keys (arrows) are translated to ANSI escape sequences (e.g. Up =>\x1b[A).
- Session control:
oninterruptis called when the user hits Ctrl-C in line mode. If it returns True, the session continues; otherwise the session ends.In key passthrough mode, Ctrl-C is sent to the peer as
\x03. Useescape_key(default: Ctrl-]) to locally end the session.onexitis called once both threads finish and just before returning.
This method blocks until the session ends and swallows network I/O errors inside worker threads (they terminate the session instead of raising here).
- Parameters:
prompt – Prompt string shown in line mode.
use_tty – Enable character-at-a-time mode for TUI/curses programs.
is_raw – If True, print incoming bytes as-is without decoding. Always treated as True if
use_ttyis set to True.encoding – Text encoding used when
is_rawis False (default: UTF-8).readline – Optional callable that returns a single input line (without EOL).
oninterrupt – Callback invoked on KeyboardInterrupt in line mode. Return True to continue, False/None to end the session.
onexit – Callback invoked when the interactive session ends.
ansi_on_windows – Windows only—translate arrow keys to ANSI if True.
- Raises:
ValueError – If
use_ttyis requested but stdin is not a TTY.RuntimeError – If platform lacks required TTY support in passthrough mode.
- timeout(timeout: int | float)[source]¶
Temporarily set an I/O timeout for the enclosed operations.
The previous timeout value is restored after the context exits.
- Parameters:
timeout – The timeout value in seconds. If
timeoutis negative, the timeout is temporarily disabled.
Examples
with tube.timeout(5): line = tube.recvline()
- recvlineafter(delim: DelimiterT, blocksize: int = 4096, regex: RegexDelimiterT | None = None, timeout: int | float = -1, drop: bool = True, consume: bool = True) bytes[source]¶
Wait for a delimiter (or regex), then read a line.
Note
This method is deprecated. Use after(delim, blocksize, regex, timeout).recvline(…) instead.
- sendlineafter(delim: DelimiterT, data: str | bytes, blocksize: int = 4096, regex: RegexDelimiterT | None = None, timeout: int | float = -1) int[source]¶
Wait for a delimiter (or regex), then send a line.
Note
This method is deprecated. Use after(delim, blocksize, regex, timeout).sendline(…) instead.
- sendafter(delim: DelimiterT, data: str | bytes, blocksize: int = 4096, regex: RegexDelimiterT | None = None, timeout: int | float = -1) int[source]¶
Wait for a delimiter (or regex), then send data.
Note
This method is deprecated. Use after(delim, blocksize, regex, timeout).send(…) instead.
- shutdown(target: Literal['send', 'recv'])[source]¶
Shut down a specific connection.
- Parameters:
target (str) – Connection to close (send or recv)
Examples
The following code shuts down input of remote. .. code-block:: python
tube.shutdown(“send”) data = tube.recv() # OK tube.send(b”data”) # NG
The following code shuts down output of remote. .. code-block:: python
tube.shutdown(“recv”) tube.send(b”data”) # OK data = tube.recv() # NG
- exception ptrlib.connection.tube.TubeTimeout(message: str, *, buffered: bytes = b'')[source]¶
Bases:
TimeoutErrorTimeout with captured partial data.
The partial data read up to the timeout is available as
.buffered.Note
This exception subclasses
TimeoutError, so existingexcept TimeoutErrorhandlers will still work. Prefer catchingTubeTimeoutwhen you need the buffered data.