tlslib.stdlib

Shims the standard library OpenSSL module into the amended PEP 543 API.

   1"""Shims the standard library OpenSSL module into the amended PEP 543 API."""
   2
   3from __future__ import annotations
   4
   5import os
   6import socket
   7import ssl
   8import tempfile
   9import typing
  10import weakref
  11from collections.abc import Buffer, Sequence
  12from contextlib import contextmanager
  13from pathlib import Path
  14
  15import truststore
  16
  17from .tlslib import (
  18    DEFAULT_CIPHER_LIST,
  19    Certificate,
  20    CipherSuite,
  21    ConfigurationError,
  22    NextProtocol,
  23    PrivateKey,
  24    RaggedEOF,
  25    SigningChain,
  26    TLSClientConfiguration,
  27    TLSError,
  28    TLSImplementation,
  29    TLSServerConfiguration,
  30    TLSVersion,
  31    TrustStore,
  32    WantReadError,
  33    WantWriteError,
  34)
  35
  36_SSLContext = ssl.SSLContext | truststore.SSLContext
  37
  38_TLSMinVersionOpts = {
  39    TLSVersion.MINIMUM_SUPPORTED: ssl.TLSVersion.MINIMUM_SUPPORTED,
  40    TLSVersion.TLSv1_2: ssl.TLSVersion.TLSv1_2,
  41    TLSVersion.TLSv1_3: ssl.TLSVersion.TLSv1_3,
  42}
  43
  44_TLSMaxVersionOpts = {
  45    TLSVersion.TLSv1_2: ssl.TLSVersion.TLSv1_2,
  46    TLSVersion.TLSv1_3: ssl.TLSVersion.TLSv1_3,
  47    TLSVersion.MAXIMUM_SUPPORTED: ssl.TLSVersion.MAXIMUM_SUPPORTED,
  48}
  49
  50# We need to populate a dictionary of ciphers that OpenSSL supports, in the
  51# form of {16-bit number: OpenSSL suite name}.
  52ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  53ctx.set_ciphers("ALL:COMPLEMENTOFALL")
  54_cipher_map = {c["id"] & 0xFFFF: c["name"] for c in ctx.get_ciphers()}
  55del ctx
  56
  57
  58@contextmanager
  59def _error_converter(
  60    ignore_filter: tuple[type[Exception]] | tuple[()] = (),
  61) -> typing.Generator[None, None, None]:
  62    """
  63    Catches errors from the ssl module and wraps them up in TLSError
  64    exceptions. Ignores certain kinds of exceptions as requested.
  65    """
  66    try:
  67        yield
  68    except ignore_filter:
  69        pass
  70    except ssl.SSLWantReadError:
  71        raise WantReadError("Must read data") from None
  72    except ssl.SSLWantWriteError:
  73        raise WantWriteError("Must write data") from None
  74    except ssl.SSLEOFError:
  75        raise RaggedEOF("Ragged EOF") from None
  76    except ssl.SSLError as e:
  77        raise TLSError(e) from None
  78
  79
  80def _remove_path(ts_cert_priv: TrustStore | Certificate | PrivateKey) -> None:
  81    ts_cert_priv._path = None
  82
  83
  84def _is_system_trust_store(trust_store: TrustStore | None) -> bool:
  85    return trust_store is None or (
  86        trust_store._path is None and trust_store._buffer is None and trust_store._id is None
  87    )
  88
  89
  90def _get_path_from_trust_store(
  91    context: _SSLContext, trust_store: TrustStore | None
  92) -> os.PathLike | None:
  93    assert trust_store is not None
  94    if trust_store._path is not None:
  95        return trust_store._path
  96    elif trust_store._buffer is not None:
  97        tmp_path = tempfile.NamedTemporaryFile(mode="w+b", delete=False, delete_on_close=False)
  98        tmp_path.write(trust_store._buffer)
  99        tmp_path.close()
 100        # Store this path to prevent creation of multiple files for each trust store
 101        trust_store._path = Path(tmp_path.name)
 102        weakref.finalize(context, os.remove, tmp_path.name)
 103        # Remove the path in case the trust store outlives the context
 104        weakref.finalize(context, _remove_path, trust_store)
 105        return trust_store._path
 106    elif trust_store._id is not None:
 107        raise ConfigurationError("This TLS implementation does not support id-based trust stores.")
 108    else:
 109        return None
 110
 111
 112def _create_client_context_with_trust_store(trust_store: TrustStore | None) -> _SSLContext:
 113    some_context: _SSLContext
 114
 115    if _is_system_trust_store(trust_store):
 116        some_context = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
 117    else:
 118        some_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
 119        trust_store_path = _get_path_from_trust_store(some_context, trust_store)
 120        some_context.load_verify_locations(trust_store_path)
 121
 122    # TLS Compression is a security risk and is removed in TLS v1.3
 123    some_context.options |= ssl.OP_NO_COMPRESSION
 124
 125    some_context.verify_flags = (
 126        ssl.VerifyFlags.VERIFY_X509_STRICT | ssl.VerifyFlags.VERIFY_X509_PARTIAL_CHAIN
 127    )
 128
 129    return some_context
 130
 131
 132def _create_server_context_with_trust_store(
 133    trust_store: TrustStore | None,
 134) -> ssl.SSLContext:
 135    some_context: ssl.SSLContext
 136
 137    # truststore does not support server side
 138    some_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
 139
 140    if trust_store is not None:
 141        some_context.verify_mode = ssl.CERT_REQUIRED
 142        trust_store_path = _get_path_from_trust_store(some_context, trust_store)
 143
 144        if trust_store_path is not None:
 145            some_context.load_verify_locations(trust_store_path)
 146        else:
 147            some_context.load_default_certs(ssl.Purpose.CLIENT_AUTH)
 148
 149    # TLS Compression is a security risk and is removed in TLS v1.3
 150    some_context.options |= ssl.OP_NO_COMPRESSION
 151
 152    return some_context
 153
 154
 155def _sni_callback_builder(
 156    _name_to_chain_map: weakref.WeakValueDictionary[str, SigningChain],
 157    original_config: TLSServerConfiguration,
 158) -> typing.Callable[[ssl.SSLSocket, str, ssl.SSLContext], ssl.AlertDescription | None]:
 159    def pep543_callback(
 160        ssl_socket: ssl.SSLSocket,
 161        server_name: str,
 162        stdlib_context: ssl.SSLContext,
 163    ) -> ssl.AlertDescription | None:
 164        try:
 165            sign_chain = _name_to_chain_map[server_name]
 166        except KeyError:
 167            return ssl.ALERT_DESCRIPTION_INTERNAL_ERROR
 168
 169        new_config: TLSServerConfiguration = TLSServerConfiguration(
 170            certificate_chain=(sign_chain,),
 171            ciphers=original_config.ciphers,
 172            inner_protocols=original_config.inner_protocols,
 173            lowest_supported_version=original_config.lowest_supported_version,
 174            highest_supported_version=original_config.highest_supported_version,
 175            trust_store=original_config.trust_store,
 176        )
 177        ssl_socket.context = _init_context_server(new_config)
 178
 179        # Returning None, perversely, is how one signals success from this
 180        # function. Will wonders never cease?
 181        return None
 182
 183    return pep543_callback
 184
 185
 186def _configure_server_context_for_certs(
 187    context: ssl.SSLContext,
 188    cert_chain: Sequence[SigningChain] | None = None,
 189    sni_config: TLSServerConfiguration | None = None,
 190) -> ssl.SSLContext:
 191    if cert_chain is not None:
 192        if len(cert_chain) == 1:
 193            # Only one SigningChain, no need to configure SNI
 194            return _configure_context_for_single_signing_chain(context, cert_chain[0])
 195
 196        elif len(cert_chain) > 1:
 197            # We have multiple SigningChains, need to configure SNI
 198            assert sni_config is not None
 199            return _configure_context_for_sni(context, cert_chain, sni_config)
 200
 201    return context
 202
 203
 204def _get_path_from_cert_or_priv(
 205    context: _SSLContext, cert_or_priv: Certificate | PrivateKey
 206) -> os.PathLike:
 207    if cert_or_priv._path is not None:
 208        return cert_or_priv._path
 209    elif cert_or_priv._buffer is not None:
 210        tmp_path = tempfile.NamedTemporaryFile(mode="w+b", delete=False, delete_on_close=False)
 211        tmp_path.write(cert_or_priv._buffer)
 212        tmp_path.close()
 213        weakref.finalize(context, os.remove, tmp_path.name)
 214        # Store the path for future usage, preventing creation of multiple files
 215        cert_or_priv._path = Path(tmp_path.name)
 216        # Remove the path in case the cert or priv outlives the context
 217        weakref.finalize(context, _remove_path, cert_or_priv)
 218        return cert_or_priv._path
 219    elif cert_or_priv._id is not None:
 220        raise ConfigurationError(
 221            "This TLS implementation does not support id-based certificates \
 222                                  or private keys."
 223        )
 224    else:
 225        raise ConfigurationError("Certificate or PrivateKey cannot be empty.")
 226
 227
 228def _get_bytes_from_cert(cert: Certificate) -> bytes:
 229    if cert._buffer is not None:
 230        return cert._buffer
 231    elif cert._path is not None:
 232        # Do not save cert in memory
 233        return Path(cert._path).read_bytes()
 234    elif cert._id is not None:
 235        raise ConfigurationError("This TLS implementation does not support id-based certificates.")
 236    else:
 237        raise ConfigurationError("Certificate cannot be empty.")
 238
 239
 240def _configure_context_for_single_signing_chain(
 241    context: _SSLContext,
 242    cert_chain: SigningChain | None = None,
 243) -> _SSLContext:
 244    """Given a PEP 543 cert chain, configure the SSLContext to send that cert
 245    chain in the handshake.
 246
 247    Returns the context.
 248    """
 249
 250    if cert_chain is not None:
 251        cert = cert_chain.leaf[0]
 252
 253        if len(cert_chain.chain) == 0:
 254            cert_path = _get_path_from_cert_or_priv(context, cert)
 255
 256        else:
 257            with tempfile.NamedTemporaryFile(mode="wb", delete=False) as io:
 258                # Write first cert
 259                io.write(_get_bytes_from_cert(cert))
 260
 261                for cert in cert_chain.chain:
 262                    io.write(b"\n")
 263                    io.write(_get_bytes_from_cert(cert))
 264
 265            weakref.finalize(context, os.remove, io.name)
 266            cert_path = Path(io.name)
 267
 268        key_path = None
 269        if cert_chain.leaf[1] is not None:
 270            privkey = cert_chain.leaf[1]
 271
 272            key_path = _get_path_from_cert_or_priv(context, privkey)
 273
 274        assert cert_path is not None
 275        with _error_converter():
 276            context.load_cert_chain(cert_path, key_path, None)
 277
 278    return context
 279
 280
 281def _configure_context_for_sni(
 282    context: ssl.SSLContext,
 283    cert_chain: Sequence[SigningChain],
 284    sni_config: TLSServerConfiguration,
 285) -> ssl.SSLContext:
 286    # This is a mapping of concrete server names to the corresponding SigningChain
 287    _name_to_chain_map: weakref.WeakValueDictionary[str, SigningChain] = (
 288        weakref.WeakValueDictionary()
 289    )
 290
 291    for sign_chain in cert_chain:
 292        # Parse leaf certificates to find server names
 293        cert = sign_chain.leaf[0]
 294        cert_path = _get_path_from_cert_or_priv(context, cert)
 295        dec_cert = ssl._ssl._test_decode_cert(cert_path)  # type: ignore[attr-defined]
 296
 297        try:
 298            alt_names = dec_cert["subjectAltName"]
 299        except KeyError:
 300            continue
 301
 302        server_name = None
 303        for name in alt_names:
 304            assert len(name) == 2
 305            if name[0] == "DNS":
 306                server_name = name[1]
 307                break
 308
 309        if server_name is not None:
 310            _name_to_chain_map[server_name] = sign_chain
 311
 312    context.sni_callback = _sni_callback_builder(_name_to_chain_map, sni_config)  # type: ignore[assignment]
 313
 314    return context
 315
 316
 317def _configure_context_for_ciphers(
 318    context: _SSLContext, ciphers: Sequence[CipherSuite | int] | None = None
 319) -> _SSLContext:
 320    """Given a PEP 543 cipher suite list, configure the SSLContext to use those
 321    cipher suites.
 322
 323    Returns the context.
 324    """
 325    if ciphers is None:
 326        # OpenSSL does not necessarily have system recommended settings
 327        # The default cipher list is used here instead
 328        ciphers = DEFAULT_CIPHER_LIST
 329
 330    ossl_names = [_cipher_map[cipher] for cipher in ciphers if cipher in _cipher_map]
 331    if not ossl_names:
 332        msg = "None of the provided ciphers are supported by the OpenSSL TLS implementation!"
 333        raise TLSError(msg)
 334    with _error_converter():
 335        context.set_ciphers(":".join(ossl_names))
 336    return context
 337
 338
 339def _configure_context_for_negotiation(
 340    context: _SSLContext,
 341    inner_protocols: Sequence[NextProtocol | bytes] | None = None,
 342) -> _SSLContext:
 343    """Given a PEP 543 list of protocols to negotiate, configures the SSLContext
 344    to negotiate those protocols.
 345    """
 346    if inner_protocols:
 347        protocols = []
 348        for np in inner_protocols:
 349            proto_string = np if isinstance(np, bytes) else np.value
 350            # The protocol string needs to be of type str for the standard
 351            # library.
 352            protocols.append(proto_string.decode("ascii"))
 353
 354        context.set_alpn_protocols(protocols)
 355
 356    return context
 357
 358
 359def _init_context_common(
 360    some_context: _SSLContext,
 361    config: TLSClientConfiguration | TLSServerConfiguration,
 362) -> _SSLContext:
 363    some_context = _configure_context_for_ciphers(
 364        some_context,
 365        config.ciphers,
 366    )
 367    some_context = _configure_context_for_negotiation(
 368        some_context,
 369        config.inner_protocols,
 370    )
 371
 372    # In lieu of system recommended settings, we default to TLS v1.3
 373    lowest_supported_version = config.lowest_supported_version
 374    if lowest_supported_version is None:
 375        lowest_supported_version = TLSVersion.TLSv1_3
 376
 377    highest_supported_version = config.highest_supported_version
 378    if highest_supported_version is None:
 379        highest_supported_version = TLSVersion.MAXIMUM_SUPPORTED
 380
 381    try:
 382        some_context.minimum_version = _TLSMinVersionOpts[lowest_supported_version]
 383        some_context.maximum_version = _TLSMaxVersionOpts[highest_supported_version]
 384    except KeyError:
 385        raise TLSError("Bad maximum/minimum options")
 386
 387    return some_context
 388
 389
 390def _init_context_client(config: TLSClientConfiguration) -> _SSLContext:
 391    """Initialize an SSL context object with a given client configuration."""
 392    some_context = _create_client_context_with_trust_store(config.trust_store)
 393
 394    some_context = _configure_context_for_single_signing_chain(
 395        some_context, config.certificate_chain
 396    )
 397
 398    return _init_context_common(some_context, config)
 399
 400
 401def _init_context_server(config: TLSServerConfiguration) -> _SSLContext:
 402    """Initialize an SSL context object with a given server configuration."""
 403    some_context = _create_server_context_with_trust_store(config.trust_store)
 404
 405    some_context = _configure_server_context_for_certs(
 406        some_context, config.certificate_chain, config
 407    )
 408
 409    return _init_context_common(some_context, config)
 410
 411
 412class OpenSSLTLSSocket:
 413    """A TLSSocket implementation based on OpenSSL."""
 414
 415    __slots__ = (
 416        "_parent_context",
 417        "_socket",
 418        "_ssl_context",
 419    )
 420
 421    _parent_context: OpenSSLClientContext | OpenSSLServerContext
 422    _socket: ssl.SSLSocket
 423    _ssl_context: _SSLContext
 424
 425    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
 426        """OpenTLSSockets should not be constructed by the user.
 427        Instead, the ClientContext.connect() and
 428        ServerContext.connect() use the _create() method."""
 429        msg = (
 430            f"{self.__class__.__name__} does not have a public constructor. "
 431            "Instances are returned by ClientContext.connect() or ServerContext.connect()."
 432        )
 433        raise TypeError(
 434            msg,
 435        )
 436
 437    @classmethod
 438    def _create(
 439        cls,
 440        address: tuple[str | None, int],
 441        parent_context: OpenSSLClientContext | OpenSSLServerContext,
 442        server_side: bool,
 443        ssl_context: _SSLContext,
 444    ) -> OpenSSLTLSSocket:
 445        self = cls.__new__(cls)
 446        self._parent_context = parent_context
 447        self._ssl_context = ssl_context
 448
 449        if server_side is True:
 450            sock = socket.create_server(address)
 451            with _error_converter():
 452                self._socket = ssl_context.wrap_socket(
 453                    sock, server_side=server_side, server_hostname=None
 454                )
 455        else:
 456            hostname, _ = address
 457            sock = socket.create_connection(address)
 458            with _error_converter():
 459                self._socket = ssl_context.wrap_socket(
 460                    sock, server_side=server_side, server_hostname=hostname
 461                )
 462
 463        self._socket.setblocking(False)
 464
 465        return self
 466
 467    def recv(self, bufsize: int) -> bytes:
 468        """Receive data from the socket. The return value is a bytes object
 469        representing the data received. Should not work before the handshake
 470        is completed."""
 471        with _error_converter():
 472            try:
 473                return self._socket.recv(bufsize)
 474            except ssl.SSLZeroReturnError:
 475                return b""
 476
 477    def send(self, bytes: bytes) -> int:
 478        """Send data to the socket. The socket must be connected to a remote socket."""
 479        with _error_converter():
 480            return self._socket.send(bytes)
 481
 482    def close(self, force: bool = False) -> None:
 483        """Unwraps the TLS connection, shuts down both halves of the connection and
 484        mark the socket closed. If force is True, will only shutdown own half and
 485        not wait for the other side. If force is False, this will raise WantReadError
 486        until the other side sends a close_notify alert."""
 487
 488        try:
 489            with _error_converter():
 490                sock = self._socket.unwrap()
 491        except (ValueError, BrokenPipeError, OSError):
 492            # If these exceptions are raised, we close the socket without re-trying to unwrap it.
 493            # - ValueError: The socket was actually not wrapped
 494            # - BrokenPipeError: There is some issue with the socket
 495            # - OSError: The other side already shut down
 496            sock = self._socket
 497        except WantReadError:
 498            if force:
 499                sock = self._socket
 500            else:
 501                raise
 502
 503        # NOTE: OSError indicates that the other side has already hung up.
 504        with _error_converter(ignore_filter=(OSError,)):
 505            sock.shutdown(socket.SHUT_RDWR)
 506        return sock.close()
 507
 508    def listen(self, backlog: int) -> None:
 509        """Enable a server to accept connections. If backlog is specified, it
 510        specifies the number of unaccepted connections that the system will allow
 511        before refusing new connections."""
 512        with _error_converter():
 513            return self._socket.listen(backlog)
 514
 515    def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]:
 516        """Accept a connection. The socket must be bound to an address and listening
 517        for connections. The return value is a pair (conn, address) where conn is a
 518        new TLSSocket object usable to send and receive data on the connection, and
 519        address is the address bound to the socket on the other end of the connection."""
 520
 521        with _error_converter():
 522            (sock, address) = self._socket.accept()
 523        tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket)
 524        tls_socket._parent_context = self._parent_context
 525        tls_socket._ssl_context = self._ssl_context
 526        tls_socket._socket = sock
 527        with _error_converter():
 528            tls_socket._socket.setblocking(False)
 529        return (tls_socket, address)
 530
 531    def getsockname(self) -> socket._RetAddress:
 532        """Return the local address to which the socket is connected."""
 533        with _error_converter():
 534            return self._socket.getsockname()
 535
 536    def getpeercert(self) -> bytes | None:
 537        """
 538        Return the raw DER bytes of the certificate provided by the peer
 539        during the handshake, if applicable.
 540        """
 541        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
 542        # Obtaining the certificate as a dict is very specific to the ssl module and may be
 543        # difficult to implement for other TLS implementations, so this is not supported
 544
 545        with _error_converter():
 546            cert = self._socket.getpeercert(True)
 547
 548        return cert
 549
 550    def getpeername(self) -> socket._RetAddress:
 551        """Return the remote address to which the socket is connected."""
 552
 553        with _error_converter():
 554            return self._socket.getpeername()
 555
 556    def fileno(self) -> int:
 557        """Return the socket's file descriptor (a small integer), or -1 on failure."""
 558
 559        with _error_converter():
 560            return self._socket.fileno()
 561
 562    @property
 563    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
 564        """The ``Context`` object this socket is tied to."""
 565
 566        return self._parent_context
 567
 568    def cipher(self) -> CipherSuite | int | None:
 569        """
 570        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
 571
 572        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
 573        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
 574        """
 575
 576        # This is the OpenSSL cipher name. We want the ID, which we can get by
 577        # looking for this entry in the context's list of supported ciphers.
 578        ret = self._socket.cipher()
 579
 580        if ret is None:
 581            return None
 582        else:
 583            ossl_cipher, _, _ = ret
 584
 585        for cipher in self._ssl_context.get_ciphers():
 586            if cipher["name"] == ossl_cipher:
 587                break
 588        # Since the cipher was negotiated using the OpenSSL context,
 589        # it must exist in the list of the OpenSSL supported ciphers.
 590        assert cipher["name"] == ossl_cipher
 591
 592        cipher_id = cipher["id"] & 0xFFFF
 593        try:
 594            return CipherSuite(cipher_id)
 595        except ValueError:
 596            return cipher_id
 597
 598    def negotiated_protocol(self) -> NextProtocol | bytes | None:
 599        """
 600        Returns the protocol that was selected during the TLS handshake.
 601
 602        This selection may have been made using ALPN or some future
 603        negotiation mechanism.
 604
 605        If the negotiated protocol is one of the protocols defined in the
 606        ``NextProtocol`` enum, the value from that enum will be returned.
 607        Otherwise, the raw bytestring of the negotiated protocol will be
 608        returned.
 609
 610        If ``Context.set_inner_protocols()`` was not called, if the other
 611        party does not support protocol negotiation, if this socket does
 612        not support any of the peer's proposed protocols, or if the
 613        handshake has not happened yet, ``None`` is returned.
 614        """
 615
 616        proto = self._socket.selected_alpn_protocol()
 617
 618        # The standard library returns this as a str, we want bytes.
 619        if proto is None:
 620            return None
 621
 622        protoBytes = proto.encode("ascii")
 623
 624        try:
 625            return NextProtocol(protoBytes)
 626        except ValueError:
 627            return protoBytes
 628
 629    @property
 630    def negotiated_tls_version(self) -> TLSVersion | None:
 631        """The version of TLS that has been negotiated on this connection."""
 632
 633        ossl_version = self._socket.version()
 634        if ossl_version is None:
 635            return None
 636        else:
 637            return TLSVersion(ossl_version)
 638
 639
 640class OpenSSLTLSBuffer:
 641    """A TLSBuffer implementation based on OpenSSL"""
 642
 643    __slots__ = (
 644        "_ciphertext_buffer",
 645        "_in_bio",
 646        "_object",
 647        "_out_bio",
 648        "_parent_context",
 649        "_ssl_context",
 650    )
 651
 652    _ciphertext_buffer: bytearray
 653    _in_bio: ssl.MemoryBIO
 654    _object: ssl.SSLObject
 655    _out_bio: ssl.MemoryBIO
 656    _parent_context: OpenSSLClientContext | OpenSSLServerContext
 657    _ssl_context: _SSLContext
 658
 659    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
 660        """OpenTLSBuffers should not be constructed by the user.
 661        Instead, the ClientContext.create_buffer() and
 662        ServerContext.create_buffer() use the _create() method."""
 663        msg = (
 664            f"{self.__class__.__name__} does not have a public constructor. "
 665            "Instances are returned by ClientContext.create_buffer() \
 666                or ServerContext.create_buffer()."
 667        )
 668        raise TypeError(
 669            msg,
 670        )
 671
 672    @classmethod
 673    def _create(
 674        cls,
 675        server_hostname: str | None,
 676        parent_context: OpenSSLClientContext | OpenSSLServerContext,
 677        server_side: bool,
 678        ssl_context: _SSLContext,
 679    ) -> OpenSSLTLSBuffer:
 680        self = cls.__new__(cls)
 681        self._parent_context = parent_context
 682        self._ssl_context = ssl_context
 683
 684        # We need this extra buffer to implement the peek/consume API, which
 685        # the MemoryBIO object does not allow.
 686        self._ciphertext_buffer = bytearray()
 687
 688        # Set up the SSLObject we're going to back this with.
 689        self._in_bio = ssl.MemoryBIO()
 690        self._out_bio = ssl.MemoryBIO()
 691
 692        if server_side is True:
 693            with _error_converter():
 694                self._object = ssl_context.wrap_bio(
 695                    self._in_bio, self._out_bio, server_side=True, server_hostname=None
 696                )
 697        else:
 698            with _error_converter():
 699                self._object = ssl_context.wrap_bio(
 700                    self._in_bio, self._out_bio, server_side=False, server_hostname=server_hostname
 701                )
 702
 703        return self
 704
 705    def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int:
 706        """
 707        Read up to ``amt`` bytes of data from the input buffer and return
 708        the result as a ``bytes`` instance. If an optional buffer is
 709        provided, the result is written into the buffer and the number of
 710        bytes is returned instead.
 711
 712        Once EOF is reached, all further calls to this method return the
 713        empty byte string ``b''``.
 714
 715        May read "short": that is, fewer bytes may be returned than were
 716        requested.
 717
 718        Raise ``WantReadError`` or ``WantWriteError`` if there is
 719        insufficient data in either the input or output buffer and the
 720        operation would have caused data to be written or read.
 721
 722        May raise ``RaggedEOF`` if the connection has been closed without a
 723        graceful TLS shutdown. Whether this is an exception that should be
 724        ignored or not is up to the specific application.
 725
 726        As at any time a re-negotiation is possible, a call to ``read()``
 727        can also cause write operations.
 728        """
 729
 730        with _error_converter():
 731            try:
 732                # MyPy insists that buffer must be a bytearray
 733                return self._object.read(amt, buffer)  # type: ignore[arg-type]
 734            except ssl.SSLZeroReturnError:
 735                return b""
 736
 737    def write(self, buf: Buffer) -> int:
 738        """
 739        Write ``buf`` in encrypted form to the output buffer and return the
 740        number of bytes written. The ``buf`` argument must be an object
 741        supporting the buffer interface.
 742
 743        Raise ``WantReadError`` or ``WantWriteError`` if there is
 744        insufficient data in either the input or output buffer and the
 745        operation would have caused data to be written or read. In either
 746        case, users should endeavour to resolve that situation and then
 747        re-call this method. When re-calling this method users *should*
 748        re-use the exact same ``buf`` object, as some TLS implementations
 749        require that the exact same buffer be used.
 750
 751        This operation may write "short": that is, fewer bytes may be
 752        written than were in the buffer.
 753
 754        As at any time a re-negotiation is possible, a call to ``write()``
 755        can also cause read operations.
 756        """
 757
 758        with _error_converter():
 759            return self._object.write(buf)
 760
 761    # Get rid and do handshake ourselves?
 762    def do_handshake(self) -> None:
 763        """
 764        Performs the TLS handshake. Also performs certificate validation
 765        and hostname verification.
 766        """
 767
 768        with _error_converter():
 769            self._object.do_handshake()
 770
 771    def shutdown(self) -> None:
 772        """
 773        Performs a clean TLS shut down. This should generally be used
 774        whenever possible to signal to the remote peer that the content is
 775        finished.
 776        """
 777
 778        with _error_converter():
 779            self._object.unwrap()
 780
 781    def process_incoming(self, data_from_network: bytes) -> None:
 782        """
 783        Receives some TLS data from the network and stores it in an
 784        internal buffer.
 785
 786        If the internal buffer is overfull, this method will raise
 787        ``WantReadError`` and store no data. At this point, the user must
 788        call ``read`` to remove some data from the internal buffer
 789        before repeating this call.
 790        """
 791
 792        with _error_converter():
 793            written_len = self._in_bio.write(data_from_network)
 794
 795        assert written_len == len(data_from_network)
 796
 797    def incoming_bytes_buffered(self) -> int:
 798        """
 799        Returns how many bytes are in the incoming buffer waiting to be processed.
 800        """
 801
 802        return self._in_bio.pending
 803
 804    def process_outgoing(self, amount_bytes_for_network: int) -> bytes:
 805        """
 806        Returns the next ``amt`` bytes of data that should be written to
 807        the network from the outgoing data buffer, removing it from the
 808        internal buffer.
 809        """
 810
 811        return self._out_bio.read(amount_bytes_for_network)
 812
 813    def outgoing_bytes_buffered(self) -> int:
 814        """
 815        Returns how many bytes are in the outgoing buffer waiting to be sent.
 816        """
 817
 818        return self._out_bio.pending
 819
 820    @property
 821    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
 822        """The ``Context`` object this socket is tied to."""
 823
 824        return self._parent_context
 825
 826    def cipher(self) -> CipherSuite | int | None:
 827        """
 828        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
 829
 830        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
 831        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
 832        """
 833
 834        ret = self._object.cipher()
 835
 836        if ret is None:
 837            return None
 838        else:
 839            ossl_cipher, _, _ = ret
 840
 841        for cipher in self._ssl_context.get_ciphers():
 842            if cipher["name"] == ossl_cipher:
 843                break
 844        # Since the cipher was negotiated using the OpenSSL context,
 845        # it must exist in the list of the OpenSSL supported ciphers.
 846        assert cipher["name"] == ossl_cipher
 847
 848        cipher_id = cipher["id"] & 0xFFFF
 849        try:
 850            return CipherSuite(cipher_id)
 851        except ValueError:
 852            return cipher_id
 853
 854    def negotiated_protocol(self) -> NextProtocol | bytes | None:
 855        """
 856        Returns the protocol that was selected during the TLS handshake.
 857
 858        This selection may have been made using ALPN or some future
 859        negotiation mechanism.
 860
 861        If the negotiated protocol is one of the protocols defined in the
 862        ``NextProtocol`` enum, the value from that enum will be returned.
 863        Otherwise, the raw bytestring of the negotiated protocol will be
 864        returned.
 865
 866        If ``Context.set_inner_protocols()`` was not called, if the other
 867        party does not support protocol negotiation, if this socket does
 868        not support any of the peer's proposed protocols, or if the
 869        handshake has not happened yet, ``None`` is returned.
 870        """
 871
 872        proto = self._object.selected_alpn_protocol()
 873
 874        # The standard library returns this as a str, we want bytes.
 875        if proto is None:
 876            return None
 877
 878        protoBytes = proto.encode("ascii")
 879
 880        try:
 881            return NextProtocol(protoBytes)
 882        except ValueError:
 883            return protoBytes
 884
 885    @property
 886    def negotiated_tls_version(self) -> TLSVersion | None:
 887        """The version of TLS that has been negotiated on this connection."""
 888
 889        ossl_version = self._object.version()
 890        if ossl_version is None:
 891            return None
 892        else:
 893            return TLSVersion(ossl_version)
 894
 895    def getpeercert(self) -> bytes | None:
 896        """
 897        Return the raw DER bytes of the certificate provided by the peer
 898        during the handshake, if applicable.
 899        """
 900        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
 901        # Obtaining the certificate as a dict is very specific to the ssl module and may be
 902        # difficult to implement for other implementation, so this is not supported
 903        with _error_converter():
 904            cert = self._object.getpeercert(True)
 905
 906        return cert
 907
 908
 909class OpenSSLClientContext:
 910    """This class controls and creates a socket that is wrapped using the
 911    standard library bindings to OpenSSL to perform TLS connections on the
 912    client side of a network connection.
 913    """
 914
 915    def __init__(self, configuration: TLSClientConfiguration) -> None:
 916        """Create a new context object from a given TLS configuration."""
 917
 918        self._configuration = configuration
 919
 920    @property
 921    def configuration(self) -> TLSClientConfiguration:
 922        """Returns the TLS configuration that was used to create the context."""
 923
 924        return self._configuration
 925
 926    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
 927        """Create a socket-like object that can be used to do TLS."""
 928        ossl_context = _init_context_client(self._configuration)
 929
 930        return OpenSSLTLSSocket._create(
 931            parent_context=self,
 932            server_side=False,
 933            ssl_context=ossl_context,
 934            address=address,
 935        )
 936
 937    def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer:
 938        """Creates a TLSBuffer that acts as an in-memory channel,
 939        and contains information about the TLS exchange
 940        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
 941
 942        ossl_context = _init_context_client(self._configuration)
 943
 944        return OpenSSLTLSBuffer._create(
 945            server_hostname=server_hostname,
 946            parent_context=self,
 947            server_side=False,
 948            ssl_context=ossl_context,
 949        )
 950
 951
 952class OpenSSLServerContext:
 953    """This class controls and creates and creates a socket that is wrapped using the
 954    standard library bindings to OpenSSL to perform TLS connections on the
 955    server side of a network connection.
 956    """
 957
 958    def __init__(self, configuration: TLSServerConfiguration) -> None:
 959        """Create a new context object from a given TLS configuration."""
 960
 961        self._configuration = configuration
 962
 963    @property
 964    def configuration(self) -> TLSServerConfiguration:
 965        """Returns the TLS configuration that was used to create the context."""
 966
 967        return self._configuration
 968
 969    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
 970        """Create a socket-like object that can be used to do TLS."""
 971        ossl_context = _init_context_server(self._configuration)
 972
 973        return OpenSSLTLSSocket._create(
 974            parent_context=self,
 975            server_side=True,
 976            ssl_context=ossl_context,
 977            address=address,
 978        )
 979
 980    def create_buffer(self) -> OpenSSLTLSBuffer:
 981        """Creates a TLSBuffer that acts as an in-memory channel,
 982        and contains information about the TLS exchange
 983        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
 984
 985        ossl_context = _init_context_server(self._configuration)
 986
 987        return OpenSSLTLSBuffer._create(
 988            server_hostname=None,
 989            parent_context=self,
 990            server_side=True,
 991            ssl_context=ossl_context,
 992        )
 993
 994
 995def _check_cert_or_priv(cert_or_priv: Certificate | PrivateKey) -> None:
 996    if cert_or_priv._path is not None or cert_or_priv._buffer is not None:
 997        return None
 998    elif cert_or_priv._id is not None:
 999        raise ConfigurationError(
1000            "This TLS implementation does not support id-based certificates \
1001                                  or private keys."
1002        )
1003    else:
1004        raise ConfigurationError("Certificate or PrivateKey cannot be empty.")
1005
1006
1007def _check_trust_store(trust_store: TrustStore | None) -> None:
1008    if trust_store is not None and trust_store._id is not None:
1009        raise ConfigurationError("This TLS implementation does not support id-based trust stores.")
1010
1011
1012def _check_sign_chain(sign_chain: SigningChain) -> None:
1013    leaf = sign_chain.leaf
1014    _check_cert_or_priv(leaf[0])
1015    priv_key = leaf[1]
1016    if priv_key is not None:
1017        _check_cert_or_priv(priv_key)
1018    for cert in sign_chain.chain:
1019        _check_cert_or_priv(cert)
1020
1021
1022def validate_config(tls_config: TLSClientConfiguration | TLSServerConfiguration) -> None:
1023    """Validates whether the OpenSSL TLS implementation supports this TLS configuration."""
1024    _check_trust_store(tls_config.trust_store)
1025
1026    if isinstance(tls_config, TLSClientConfiguration):
1027        sign_chain = tls_config.certificate_chain
1028        if sign_chain is not None:
1029            _check_sign_chain(sign_chain)
1030
1031    else:
1032        assert isinstance(tls_config, TLSServerConfiguration)
1033        cert_chain = tls_config.certificate_chain
1034        if cert_chain is not None:
1035            for sign_chain in cert_chain:
1036                _check_sign_chain(sign_chain)
1037
1038
1039#: The stdlib ``TLSImplementation`` object.
1040STDLIB_IMPLEMENTATION = TLSImplementation(
1041    client_context=OpenSSLClientContext,
1042    server_context=OpenSSLServerContext,
1043    validate_config=validate_config,
1044)
ctx
class OpenSSLTLSSocket:
413class OpenSSLTLSSocket:
414    """A TLSSocket implementation based on OpenSSL."""
415
416    __slots__ = (
417        "_parent_context",
418        "_socket",
419        "_ssl_context",
420    )
421
422    _parent_context: OpenSSLClientContext | OpenSSLServerContext
423    _socket: ssl.SSLSocket
424    _ssl_context: _SSLContext
425
426    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
427        """OpenTLSSockets should not be constructed by the user.
428        Instead, the ClientContext.connect() and
429        ServerContext.connect() use the _create() method."""
430        msg = (
431            f"{self.__class__.__name__} does not have a public constructor. "
432            "Instances are returned by ClientContext.connect() or ServerContext.connect()."
433        )
434        raise TypeError(
435            msg,
436        )
437
438    @classmethod
439    def _create(
440        cls,
441        address: tuple[str | None, int],
442        parent_context: OpenSSLClientContext | OpenSSLServerContext,
443        server_side: bool,
444        ssl_context: _SSLContext,
445    ) -> OpenSSLTLSSocket:
446        self = cls.__new__(cls)
447        self._parent_context = parent_context
448        self._ssl_context = ssl_context
449
450        if server_side is True:
451            sock = socket.create_server(address)
452            with _error_converter():
453                self._socket = ssl_context.wrap_socket(
454                    sock, server_side=server_side, server_hostname=None
455                )
456        else:
457            hostname, _ = address
458            sock = socket.create_connection(address)
459            with _error_converter():
460                self._socket = ssl_context.wrap_socket(
461                    sock, server_side=server_side, server_hostname=hostname
462                )
463
464        self._socket.setblocking(False)
465
466        return self
467
468    def recv(self, bufsize: int) -> bytes:
469        """Receive data from the socket. The return value is a bytes object
470        representing the data received. Should not work before the handshake
471        is completed."""
472        with _error_converter():
473            try:
474                return self._socket.recv(bufsize)
475            except ssl.SSLZeroReturnError:
476                return b""
477
478    def send(self, bytes: bytes) -> int:
479        """Send data to the socket. The socket must be connected to a remote socket."""
480        with _error_converter():
481            return self._socket.send(bytes)
482
483    def close(self, force: bool = False) -> None:
484        """Unwraps the TLS connection, shuts down both halves of the connection and
485        mark the socket closed. If force is True, will only shutdown own half and
486        not wait for the other side. If force is False, this will raise WantReadError
487        until the other side sends a close_notify alert."""
488
489        try:
490            with _error_converter():
491                sock = self._socket.unwrap()
492        except (ValueError, BrokenPipeError, OSError):
493            # If these exceptions are raised, we close the socket without re-trying to unwrap it.
494            # - ValueError: The socket was actually not wrapped
495            # - BrokenPipeError: There is some issue with the socket
496            # - OSError: The other side already shut down
497            sock = self._socket
498        except WantReadError:
499            if force:
500                sock = self._socket
501            else:
502                raise
503
504        # NOTE: OSError indicates that the other side has already hung up.
505        with _error_converter(ignore_filter=(OSError,)):
506            sock.shutdown(socket.SHUT_RDWR)
507        return sock.close()
508
509    def listen(self, backlog: int) -> None:
510        """Enable a server to accept connections. If backlog is specified, it
511        specifies the number of unaccepted connections that the system will allow
512        before refusing new connections."""
513        with _error_converter():
514            return self._socket.listen(backlog)
515
516    def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]:
517        """Accept a connection. The socket must be bound to an address and listening
518        for connections. The return value is a pair (conn, address) where conn is a
519        new TLSSocket object usable to send and receive data on the connection, and
520        address is the address bound to the socket on the other end of the connection."""
521
522        with _error_converter():
523            (sock, address) = self._socket.accept()
524        tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket)
525        tls_socket._parent_context = self._parent_context
526        tls_socket._ssl_context = self._ssl_context
527        tls_socket._socket = sock
528        with _error_converter():
529            tls_socket._socket.setblocking(False)
530        return (tls_socket, address)
531
532    def getsockname(self) -> socket._RetAddress:
533        """Return the local address to which the socket is connected."""
534        with _error_converter():
535            return self._socket.getsockname()
536
537    def getpeercert(self) -> bytes | None:
538        """
539        Return the raw DER bytes of the certificate provided by the peer
540        during the handshake, if applicable.
541        """
542        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
543        # Obtaining the certificate as a dict is very specific to the ssl module and may be
544        # difficult to implement for other TLS implementations, so this is not supported
545
546        with _error_converter():
547            cert = self._socket.getpeercert(True)
548
549        return cert
550
551    def getpeername(self) -> socket._RetAddress:
552        """Return the remote address to which the socket is connected."""
553
554        with _error_converter():
555            return self._socket.getpeername()
556
557    def fileno(self) -> int:
558        """Return the socket's file descriptor (a small integer), or -1 on failure."""
559
560        with _error_converter():
561            return self._socket.fileno()
562
563    @property
564    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
565        """The ``Context`` object this socket is tied to."""
566
567        return self._parent_context
568
569    def cipher(self) -> CipherSuite | int | None:
570        """
571        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
572
573        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
574        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
575        """
576
577        # This is the OpenSSL cipher name. We want the ID, which we can get by
578        # looking for this entry in the context's list of supported ciphers.
579        ret = self._socket.cipher()
580
581        if ret is None:
582            return None
583        else:
584            ossl_cipher, _, _ = ret
585
586        for cipher in self._ssl_context.get_ciphers():
587            if cipher["name"] == ossl_cipher:
588                break
589        # Since the cipher was negotiated using the OpenSSL context,
590        # it must exist in the list of the OpenSSL supported ciphers.
591        assert cipher["name"] == ossl_cipher
592
593        cipher_id = cipher["id"] & 0xFFFF
594        try:
595            return CipherSuite(cipher_id)
596        except ValueError:
597            return cipher_id
598
599    def negotiated_protocol(self) -> NextProtocol | bytes | None:
600        """
601        Returns the protocol that was selected during the TLS handshake.
602
603        This selection may have been made using ALPN or some future
604        negotiation mechanism.
605
606        If the negotiated protocol is one of the protocols defined in the
607        ``NextProtocol`` enum, the value from that enum will be returned.
608        Otherwise, the raw bytestring of the negotiated protocol will be
609        returned.
610
611        If ``Context.set_inner_protocols()`` was not called, if the other
612        party does not support protocol negotiation, if this socket does
613        not support any of the peer's proposed protocols, or if the
614        handshake has not happened yet, ``None`` is returned.
615        """
616
617        proto = self._socket.selected_alpn_protocol()
618
619        # The standard library returns this as a str, we want bytes.
620        if proto is None:
621            return None
622
623        protoBytes = proto.encode("ascii")
624
625        try:
626            return NextProtocol(protoBytes)
627        except ValueError:
628            return protoBytes
629
630    @property
631    def negotiated_tls_version(self) -> TLSVersion | None:
632        """The version of TLS that has been negotiated on this connection."""
633
634        ossl_version = self._socket.version()
635        if ossl_version is None:
636            return None
637        else:
638            return TLSVersion(ossl_version)

A TLSSocket implementation based on OpenSSL.

OpenSSLTLSSocket(*args: tuple, **kwargs: tuple)
426    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
427        """OpenTLSSockets should not be constructed by the user.
428        Instead, the ClientContext.connect() and
429        ServerContext.connect() use the _create() method."""
430        msg = (
431            f"{self.__class__.__name__} does not have a public constructor. "
432            "Instances are returned by ClientContext.connect() or ServerContext.connect()."
433        )
434        raise TypeError(
435            msg,
436        )

OpenTLSSockets should not be constructed by the user. Instead, the ClientContext.connect() and ServerContext.connect() use the _create() method.

def recv(self, bufsize: int) -> bytes:
468    def recv(self, bufsize: int) -> bytes:
469        """Receive data from the socket. The return value is a bytes object
470        representing the data received. Should not work before the handshake
471        is completed."""
472        with _error_converter():
473            try:
474                return self._socket.recv(bufsize)
475            except ssl.SSLZeroReturnError:
476                return b""

Receive data from the socket. The return value is a bytes object representing the data received. Should not work before the handshake is completed.

def send(self, bytes: bytes) -> int:
478    def send(self, bytes: bytes) -> int:
479        """Send data to the socket. The socket must be connected to a remote socket."""
480        with _error_converter():
481            return self._socket.send(bytes)

Send data to the socket. The socket must be connected to a remote socket.

def close(self, force: bool = False) -> None:
483    def close(self, force: bool = False) -> None:
484        """Unwraps the TLS connection, shuts down both halves of the connection and
485        mark the socket closed. If force is True, will only shutdown own half and
486        not wait for the other side. If force is False, this will raise WantReadError
487        until the other side sends a close_notify alert."""
488
489        try:
490            with _error_converter():
491                sock = self._socket.unwrap()
492        except (ValueError, BrokenPipeError, OSError):
493            # If these exceptions are raised, we close the socket without re-trying to unwrap it.
494            # - ValueError: The socket was actually not wrapped
495            # - BrokenPipeError: There is some issue with the socket
496            # - OSError: The other side already shut down
497            sock = self._socket
498        except WantReadError:
499            if force:
500                sock = self._socket
501            else:
502                raise
503
504        # NOTE: OSError indicates that the other side has already hung up.
505        with _error_converter(ignore_filter=(OSError,)):
506            sock.shutdown(socket.SHUT_RDWR)
507        return sock.close()

Unwraps the TLS connection, shuts down both halves of the connection and mark the socket closed. If force is True, will only shutdown own half and not wait for the other side. If force is False, this will raise WantReadError until the other side sends a close_notify alert.

def listen(self, backlog: int) -> None:
509    def listen(self, backlog: int) -> None:
510        """Enable a server to accept connections. If backlog is specified, it
511        specifies the number of unaccepted connections that the system will allow
512        before refusing new connections."""
513        with _error_converter():
514            return self._socket.listen(backlog)

Enable a server to accept connections. If backlog is specified, it specifies the number of unaccepted connections that the system will allow before refusing new connections.

def accept(self) -> 'tuple[OpenSSLTLSSocket, socket._RetAddress]':
516    def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]:
517        """Accept a connection. The socket must be bound to an address and listening
518        for connections. The return value is a pair (conn, address) where conn is a
519        new TLSSocket object usable to send and receive data on the connection, and
520        address is the address bound to the socket on the other end of the connection."""
521
522        with _error_converter():
523            (sock, address) = self._socket.accept()
524        tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket)
525        tls_socket._parent_context = self._parent_context
526        tls_socket._ssl_context = self._ssl_context
527        tls_socket._socket = sock
528        with _error_converter():
529            tls_socket._socket.setblocking(False)
530        return (tls_socket, address)

Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair (conn, address) where conn is a new TLSSocket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection.

def getsockname(self) -> 'socket._RetAddress':
532    def getsockname(self) -> socket._RetAddress:
533        """Return the local address to which the socket is connected."""
534        with _error_converter():
535            return self._socket.getsockname()

Return the local address to which the socket is connected.

def getpeercert(self) -> bytes | None:
537    def getpeercert(self) -> bytes | None:
538        """
539        Return the raw DER bytes of the certificate provided by the peer
540        during the handshake, if applicable.
541        """
542        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
543        # Obtaining the certificate as a dict is very specific to the ssl module and may be
544        # difficult to implement for other TLS implementations, so this is not supported
545
546        with _error_converter():
547            cert = self._socket.getpeercert(True)
548
549        return cert

Return the raw DER bytes of the certificate provided by the peer during the handshake, if applicable.

def getpeername(self) -> 'socket._RetAddress':
551    def getpeername(self) -> socket._RetAddress:
552        """Return the remote address to which the socket is connected."""
553
554        with _error_converter():
555            return self._socket.getpeername()

Return the remote address to which the socket is connected.

def fileno(self) -> int:
557    def fileno(self) -> int:
558        """Return the socket's file descriptor (a small integer), or -1 on failure."""
559
560        with _error_converter():
561            return self._socket.fileno()

Return the socket's file descriptor (a small integer), or -1 on failure.

563    @property
564    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
565        """The ``Context`` object this socket is tied to."""
566
567        return self._parent_context

The Context object this socket is tied to.

def cipher(self) -> tlslib.tlslib.CipherSuite | int | None:
569    def cipher(self) -> CipherSuite | int | None:
570        """
571        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
572
573        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
574        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
575        """
576
577        # This is the OpenSSL cipher name. We want the ID, which we can get by
578        # looking for this entry in the context's list of supported ciphers.
579        ret = self._socket.cipher()
580
581        if ret is None:
582            return None
583        else:
584            ossl_cipher, _, _ = ret
585
586        for cipher in self._ssl_context.get_ciphers():
587            if cipher["name"] == ossl_cipher:
588                break
589        # Since the cipher was negotiated using the OpenSSL context,
590        # it must exist in the list of the OpenSSL supported ciphers.
591        assert cipher["name"] == ossl_cipher
592
593        cipher_id = cipher["id"] & 0xFFFF
594        try:
595            return CipherSuite(cipher_id)
596        except ValueError:
597            return cipher_id

Returns the CipherSuite entry for the cipher that has been negotiated on the connection.

If no connection has been negotiated, returns None. If the cipher negotiated is not defined in CipherSuite, returns the 16-bit integer representing that cipher directly.

def negotiated_protocol(self) -> tlslib.tlslib.NextProtocol | bytes | None:
599    def negotiated_protocol(self) -> NextProtocol | bytes | None:
600        """
601        Returns the protocol that was selected during the TLS handshake.
602
603        This selection may have been made using ALPN or some future
604        negotiation mechanism.
605
606        If the negotiated protocol is one of the protocols defined in the
607        ``NextProtocol`` enum, the value from that enum will be returned.
608        Otherwise, the raw bytestring of the negotiated protocol will be
609        returned.
610
611        If ``Context.set_inner_protocols()`` was not called, if the other
612        party does not support protocol negotiation, if this socket does
613        not support any of the peer's proposed protocols, or if the
614        handshake has not happened yet, ``None`` is returned.
615        """
616
617        proto = self._socket.selected_alpn_protocol()
618
619        # The standard library returns this as a str, we want bytes.
620        if proto is None:
621            return None
622
623        protoBytes = proto.encode("ascii")
624
625        try:
626            return NextProtocol(protoBytes)
627        except ValueError:
628            return protoBytes

Returns the protocol that was selected during the TLS handshake.

This selection may have been made using ALPN or some future negotiation mechanism.

If the negotiated protocol is one of the protocols defined in the NextProtocol enum, the value from that enum will be returned. Otherwise, the raw bytestring of the negotiated protocol will be returned.

If Context.set_inner_protocols() was not called, if the other party does not support protocol negotiation, if this socket does not support any of the peer's proposed protocols, or if the handshake has not happened yet, None is returned.

negotiated_tls_version: tlslib.tlslib.TLSVersion | None
630    @property
631    def negotiated_tls_version(self) -> TLSVersion | None:
632        """The version of TLS that has been negotiated on this connection."""
633
634        ossl_version = self._socket.version()
635        if ossl_version is None:
636            return None
637        else:
638            return TLSVersion(ossl_version)

The version of TLS that has been negotiated on this connection.

class OpenSSLTLSBuffer:
641class OpenSSLTLSBuffer:
642    """A TLSBuffer implementation based on OpenSSL"""
643
644    __slots__ = (
645        "_ciphertext_buffer",
646        "_in_bio",
647        "_object",
648        "_out_bio",
649        "_parent_context",
650        "_ssl_context",
651    )
652
653    _ciphertext_buffer: bytearray
654    _in_bio: ssl.MemoryBIO
655    _object: ssl.SSLObject
656    _out_bio: ssl.MemoryBIO
657    _parent_context: OpenSSLClientContext | OpenSSLServerContext
658    _ssl_context: _SSLContext
659
660    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
661        """OpenTLSBuffers should not be constructed by the user.
662        Instead, the ClientContext.create_buffer() and
663        ServerContext.create_buffer() use the _create() method."""
664        msg = (
665            f"{self.__class__.__name__} does not have a public constructor. "
666            "Instances are returned by ClientContext.create_buffer() \
667                or ServerContext.create_buffer()."
668        )
669        raise TypeError(
670            msg,
671        )
672
673    @classmethod
674    def _create(
675        cls,
676        server_hostname: str | None,
677        parent_context: OpenSSLClientContext | OpenSSLServerContext,
678        server_side: bool,
679        ssl_context: _SSLContext,
680    ) -> OpenSSLTLSBuffer:
681        self = cls.__new__(cls)
682        self._parent_context = parent_context
683        self._ssl_context = ssl_context
684
685        # We need this extra buffer to implement the peek/consume API, which
686        # the MemoryBIO object does not allow.
687        self._ciphertext_buffer = bytearray()
688
689        # Set up the SSLObject we're going to back this with.
690        self._in_bio = ssl.MemoryBIO()
691        self._out_bio = ssl.MemoryBIO()
692
693        if server_side is True:
694            with _error_converter():
695                self._object = ssl_context.wrap_bio(
696                    self._in_bio, self._out_bio, server_side=True, server_hostname=None
697                )
698        else:
699            with _error_converter():
700                self._object = ssl_context.wrap_bio(
701                    self._in_bio, self._out_bio, server_side=False, server_hostname=server_hostname
702                )
703
704        return self
705
706    def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int:
707        """
708        Read up to ``amt`` bytes of data from the input buffer and return
709        the result as a ``bytes`` instance. If an optional buffer is
710        provided, the result is written into the buffer and the number of
711        bytes is returned instead.
712
713        Once EOF is reached, all further calls to this method return the
714        empty byte string ``b''``.
715
716        May read "short": that is, fewer bytes may be returned than were
717        requested.
718
719        Raise ``WantReadError`` or ``WantWriteError`` if there is
720        insufficient data in either the input or output buffer and the
721        operation would have caused data to be written or read.
722
723        May raise ``RaggedEOF`` if the connection has been closed without a
724        graceful TLS shutdown. Whether this is an exception that should be
725        ignored or not is up to the specific application.
726
727        As at any time a re-negotiation is possible, a call to ``read()``
728        can also cause write operations.
729        """
730
731        with _error_converter():
732            try:
733                # MyPy insists that buffer must be a bytearray
734                return self._object.read(amt, buffer)  # type: ignore[arg-type]
735            except ssl.SSLZeroReturnError:
736                return b""
737
738    def write(self, buf: Buffer) -> int:
739        """
740        Write ``buf`` in encrypted form to the output buffer and return the
741        number of bytes written. The ``buf`` argument must be an object
742        supporting the buffer interface.
743
744        Raise ``WantReadError`` or ``WantWriteError`` if there is
745        insufficient data in either the input or output buffer and the
746        operation would have caused data to be written or read. In either
747        case, users should endeavour to resolve that situation and then
748        re-call this method. When re-calling this method users *should*
749        re-use the exact same ``buf`` object, as some TLS implementations
750        require that the exact same buffer be used.
751
752        This operation may write "short": that is, fewer bytes may be
753        written than were in the buffer.
754
755        As at any time a re-negotiation is possible, a call to ``write()``
756        can also cause read operations.
757        """
758
759        with _error_converter():
760            return self._object.write(buf)
761
762    # Get rid and do handshake ourselves?
763    def do_handshake(self) -> None:
764        """
765        Performs the TLS handshake. Also performs certificate validation
766        and hostname verification.
767        """
768
769        with _error_converter():
770            self._object.do_handshake()
771
772    def shutdown(self) -> None:
773        """
774        Performs a clean TLS shut down. This should generally be used
775        whenever possible to signal to the remote peer that the content is
776        finished.
777        """
778
779        with _error_converter():
780            self._object.unwrap()
781
782    def process_incoming(self, data_from_network: bytes) -> None:
783        """
784        Receives some TLS data from the network and stores it in an
785        internal buffer.
786
787        If the internal buffer is overfull, this method will raise
788        ``WantReadError`` and store no data. At this point, the user must
789        call ``read`` to remove some data from the internal buffer
790        before repeating this call.
791        """
792
793        with _error_converter():
794            written_len = self._in_bio.write(data_from_network)
795
796        assert written_len == len(data_from_network)
797
798    def incoming_bytes_buffered(self) -> int:
799        """
800        Returns how many bytes are in the incoming buffer waiting to be processed.
801        """
802
803        return self._in_bio.pending
804
805    def process_outgoing(self, amount_bytes_for_network: int) -> bytes:
806        """
807        Returns the next ``amt`` bytes of data that should be written to
808        the network from the outgoing data buffer, removing it from the
809        internal buffer.
810        """
811
812        return self._out_bio.read(amount_bytes_for_network)
813
814    def outgoing_bytes_buffered(self) -> int:
815        """
816        Returns how many bytes are in the outgoing buffer waiting to be sent.
817        """
818
819        return self._out_bio.pending
820
821    @property
822    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
823        """The ``Context`` object this socket is tied to."""
824
825        return self._parent_context
826
827    def cipher(self) -> CipherSuite | int | None:
828        """
829        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
830
831        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
832        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
833        """
834
835        ret = self._object.cipher()
836
837        if ret is None:
838            return None
839        else:
840            ossl_cipher, _, _ = ret
841
842        for cipher in self._ssl_context.get_ciphers():
843            if cipher["name"] == ossl_cipher:
844                break
845        # Since the cipher was negotiated using the OpenSSL context,
846        # it must exist in the list of the OpenSSL supported ciphers.
847        assert cipher["name"] == ossl_cipher
848
849        cipher_id = cipher["id"] & 0xFFFF
850        try:
851            return CipherSuite(cipher_id)
852        except ValueError:
853            return cipher_id
854
855    def negotiated_protocol(self) -> NextProtocol | bytes | None:
856        """
857        Returns the protocol that was selected during the TLS handshake.
858
859        This selection may have been made using ALPN or some future
860        negotiation mechanism.
861
862        If the negotiated protocol is one of the protocols defined in the
863        ``NextProtocol`` enum, the value from that enum will be returned.
864        Otherwise, the raw bytestring of the negotiated protocol will be
865        returned.
866
867        If ``Context.set_inner_protocols()`` was not called, if the other
868        party does not support protocol negotiation, if this socket does
869        not support any of the peer's proposed protocols, or if the
870        handshake has not happened yet, ``None`` is returned.
871        """
872
873        proto = self._object.selected_alpn_protocol()
874
875        # The standard library returns this as a str, we want bytes.
876        if proto is None:
877            return None
878
879        protoBytes = proto.encode("ascii")
880
881        try:
882            return NextProtocol(protoBytes)
883        except ValueError:
884            return protoBytes
885
886    @property
887    def negotiated_tls_version(self) -> TLSVersion | None:
888        """The version of TLS that has been negotiated on this connection."""
889
890        ossl_version = self._object.version()
891        if ossl_version is None:
892            return None
893        else:
894            return TLSVersion(ossl_version)
895
896    def getpeercert(self) -> bytes | None:
897        """
898        Return the raw DER bytes of the certificate provided by the peer
899        during the handshake, if applicable.
900        """
901        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
902        # Obtaining the certificate as a dict is very specific to the ssl module and may be
903        # difficult to implement for other implementation, so this is not supported
904        with _error_converter():
905            cert = self._object.getpeercert(True)
906
907        return cert

A TLSBuffer implementation based on OpenSSL

OpenSSLTLSBuffer(*args: tuple, **kwargs: tuple)
660    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
661        """OpenTLSBuffers should not be constructed by the user.
662        Instead, the ClientContext.create_buffer() and
663        ServerContext.create_buffer() use the _create() method."""
664        msg = (
665            f"{self.__class__.__name__} does not have a public constructor. "
666            "Instances are returned by ClientContext.create_buffer() \
667                or ServerContext.create_buffer()."
668        )
669        raise TypeError(
670            msg,
671        )

OpenTLSBuffers should not be constructed by the user. Instead, the ClientContext.create_buffer() and ServerContext.create_buffer() use the _create() method.

def read( self, amt: int, buffer: collections.abc.Buffer | None = None) -> bytes | int:
706    def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int:
707        """
708        Read up to ``amt`` bytes of data from the input buffer and return
709        the result as a ``bytes`` instance. If an optional buffer is
710        provided, the result is written into the buffer and the number of
711        bytes is returned instead.
712
713        Once EOF is reached, all further calls to this method return the
714        empty byte string ``b''``.
715
716        May read "short": that is, fewer bytes may be returned than were
717        requested.
718
719        Raise ``WantReadError`` or ``WantWriteError`` if there is
720        insufficient data in either the input or output buffer and the
721        operation would have caused data to be written or read.
722
723        May raise ``RaggedEOF`` if the connection has been closed without a
724        graceful TLS shutdown. Whether this is an exception that should be
725        ignored or not is up to the specific application.
726
727        As at any time a re-negotiation is possible, a call to ``read()``
728        can also cause write operations.
729        """
730
731        with _error_converter():
732            try:
733                # MyPy insists that buffer must be a bytearray
734                return self._object.read(amt, buffer)  # type: ignore[arg-type]
735            except ssl.SSLZeroReturnError:
736                return b""

Read up to amt bytes of data from the input buffer and return the result as a bytes instance. If an optional buffer is provided, the result is written into the buffer and the number of bytes is returned instead.

Once EOF is reached, all further calls to this method return the empty byte string b''.

May read "short": that is, fewer bytes may be returned than were requested.

Raise WantReadError or WantWriteError if there is insufficient data in either the input or output buffer and the operation would have caused data to be written or read.

May raise RaggedEOF if the connection has been closed without a graceful TLS shutdown. Whether this is an exception that should be ignored or not is up to the specific application.

As at any time a re-negotiation is possible, a call to read() can also cause write operations.

def write(self, buf: collections.abc.Buffer) -> int:
738    def write(self, buf: Buffer) -> int:
739        """
740        Write ``buf`` in encrypted form to the output buffer and return the
741        number of bytes written. The ``buf`` argument must be an object
742        supporting the buffer interface.
743
744        Raise ``WantReadError`` or ``WantWriteError`` if there is
745        insufficient data in either the input or output buffer and the
746        operation would have caused data to be written or read. In either
747        case, users should endeavour to resolve that situation and then
748        re-call this method. When re-calling this method users *should*
749        re-use the exact same ``buf`` object, as some TLS implementations
750        require that the exact same buffer be used.
751
752        This operation may write "short": that is, fewer bytes may be
753        written than were in the buffer.
754
755        As at any time a re-negotiation is possible, a call to ``write()``
756        can also cause read operations.
757        """
758
759        with _error_converter():
760            return self._object.write(buf)

Write buf in encrypted form to the output buffer and return the number of bytes written. The buf argument must be an object supporting the buffer interface.

Raise WantReadError or WantWriteError if there is insufficient data in either the input or output buffer and the operation would have caused data to be written or read. In either case, users should endeavour to resolve that situation and then re-call this method. When re-calling this method users should re-use the exact same buf object, as some TLS implementations require that the exact same buffer be used.

This operation may write "short": that is, fewer bytes may be written than were in the buffer.

As at any time a re-negotiation is possible, a call to write() can also cause read operations.

def do_handshake(self) -> None:
763    def do_handshake(self) -> None:
764        """
765        Performs the TLS handshake. Also performs certificate validation
766        and hostname verification.
767        """
768
769        with _error_converter():
770            self._object.do_handshake()

Performs the TLS handshake. Also performs certificate validation and hostname verification.

def shutdown(self) -> None:
772    def shutdown(self) -> None:
773        """
774        Performs a clean TLS shut down. This should generally be used
775        whenever possible to signal to the remote peer that the content is
776        finished.
777        """
778
779        with _error_converter():
780            self._object.unwrap()

Performs a clean TLS shut down. This should generally be used whenever possible to signal to the remote peer that the content is finished.

def process_incoming(self, data_from_network: bytes) -> None:
782    def process_incoming(self, data_from_network: bytes) -> None:
783        """
784        Receives some TLS data from the network and stores it in an
785        internal buffer.
786
787        If the internal buffer is overfull, this method will raise
788        ``WantReadError`` and store no data. At this point, the user must
789        call ``read`` to remove some data from the internal buffer
790        before repeating this call.
791        """
792
793        with _error_converter():
794            written_len = self._in_bio.write(data_from_network)
795
796        assert written_len == len(data_from_network)

Receives some TLS data from the network and stores it in an internal buffer.

If the internal buffer is overfull, this method will raise WantReadError and store no data. At this point, the user must call read to remove some data from the internal buffer before repeating this call.

def incoming_bytes_buffered(self) -> int:
798    def incoming_bytes_buffered(self) -> int:
799        """
800        Returns how many bytes are in the incoming buffer waiting to be processed.
801        """
802
803        return self._in_bio.pending

Returns how many bytes are in the incoming buffer waiting to be processed.

def process_outgoing(self, amount_bytes_for_network: int) -> bytes:
805    def process_outgoing(self, amount_bytes_for_network: int) -> bytes:
806        """
807        Returns the next ``amt`` bytes of data that should be written to
808        the network from the outgoing data buffer, removing it from the
809        internal buffer.
810        """
811
812        return self._out_bio.read(amount_bytes_for_network)

Returns the next amt bytes of data that should be written to the network from the outgoing data buffer, removing it from the internal buffer.

def outgoing_bytes_buffered(self) -> int:
814    def outgoing_bytes_buffered(self) -> int:
815        """
816        Returns how many bytes are in the outgoing buffer waiting to be sent.
817        """
818
819        return self._out_bio.pending

Returns how many bytes are in the outgoing buffer waiting to be sent.

821    @property
822    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
823        """The ``Context`` object this socket is tied to."""
824
825        return self._parent_context

The Context object this socket is tied to.

def cipher(self) -> tlslib.tlslib.CipherSuite | int | None:
827    def cipher(self) -> CipherSuite | int | None:
828        """
829        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
830
831        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
832        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
833        """
834
835        ret = self._object.cipher()
836
837        if ret is None:
838            return None
839        else:
840            ossl_cipher, _, _ = ret
841
842        for cipher in self._ssl_context.get_ciphers():
843            if cipher["name"] == ossl_cipher:
844                break
845        # Since the cipher was negotiated using the OpenSSL context,
846        # it must exist in the list of the OpenSSL supported ciphers.
847        assert cipher["name"] == ossl_cipher
848
849        cipher_id = cipher["id"] & 0xFFFF
850        try:
851            return CipherSuite(cipher_id)
852        except ValueError:
853            return cipher_id

Returns the CipherSuite entry for the cipher that has been negotiated on the connection.

If no connection has been negotiated, returns None. If the cipher negotiated is not defined in CipherSuite, returns the 16-bit integer representing that cipher directly.

def negotiated_protocol(self) -> tlslib.tlslib.NextProtocol | bytes | None:
855    def negotiated_protocol(self) -> NextProtocol | bytes | None:
856        """
857        Returns the protocol that was selected during the TLS handshake.
858
859        This selection may have been made using ALPN or some future
860        negotiation mechanism.
861
862        If the negotiated protocol is one of the protocols defined in the
863        ``NextProtocol`` enum, the value from that enum will be returned.
864        Otherwise, the raw bytestring of the negotiated protocol will be
865        returned.
866
867        If ``Context.set_inner_protocols()`` was not called, if the other
868        party does not support protocol negotiation, if this socket does
869        not support any of the peer's proposed protocols, or if the
870        handshake has not happened yet, ``None`` is returned.
871        """
872
873        proto = self._object.selected_alpn_protocol()
874
875        # The standard library returns this as a str, we want bytes.
876        if proto is None:
877            return None
878
879        protoBytes = proto.encode("ascii")
880
881        try:
882            return NextProtocol(protoBytes)
883        except ValueError:
884            return protoBytes

Returns the protocol that was selected during the TLS handshake.

This selection may have been made using ALPN or some future negotiation mechanism.

If the negotiated protocol is one of the protocols defined in the NextProtocol enum, the value from that enum will be returned. Otherwise, the raw bytestring of the negotiated protocol will be returned.

If Context.set_inner_protocols() was not called, if the other party does not support protocol negotiation, if this socket does not support any of the peer's proposed protocols, or if the handshake has not happened yet, None is returned.

negotiated_tls_version: tlslib.tlslib.TLSVersion | None
886    @property
887    def negotiated_tls_version(self) -> TLSVersion | None:
888        """The version of TLS that has been negotiated on this connection."""
889
890        ossl_version = self._object.version()
891        if ossl_version is None:
892            return None
893        else:
894            return TLSVersion(ossl_version)

The version of TLS that has been negotiated on this connection.

def getpeercert(self) -> bytes | None:
896    def getpeercert(self) -> bytes | None:
897        """
898        Return the raw DER bytes of the certificate provided by the peer
899        during the handshake, if applicable.
900        """
901        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
902        # Obtaining the certificate as a dict is very specific to the ssl module and may be
903        # difficult to implement for other implementation, so this is not supported
904        with _error_converter():
905            cert = self._object.getpeercert(True)
906
907        return cert

Return the raw DER bytes of the certificate provided by the peer during the handshake, if applicable.

class OpenSSLClientContext:
910class OpenSSLClientContext:
911    """This class controls and creates a socket that is wrapped using the
912    standard library bindings to OpenSSL to perform TLS connections on the
913    client side of a network connection.
914    """
915
916    def __init__(self, configuration: TLSClientConfiguration) -> None:
917        """Create a new context object from a given TLS configuration."""
918
919        self._configuration = configuration
920
921    @property
922    def configuration(self) -> TLSClientConfiguration:
923        """Returns the TLS configuration that was used to create the context."""
924
925        return self._configuration
926
927    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
928        """Create a socket-like object that can be used to do TLS."""
929        ossl_context = _init_context_client(self._configuration)
930
931        return OpenSSLTLSSocket._create(
932            parent_context=self,
933            server_side=False,
934            ssl_context=ossl_context,
935            address=address,
936        )
937
938    def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer:
939        """Creates a TLSBuffer that acts as an in-memory channel,
940        and contains information about the TLS exchange
941        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
942
943        ossl_context = _init_context_client(self._configuration)
944
945        return OpenSSLTLSBuffer._create(
946            server_hostname=server_hostname,
947            parent_context=self,
948            server_side=False,
949            ssl_context=ossl_context,
950        )

This class controls and creates a socket that is wrapped using the standard library bindings to OpenSSL to perform TLS connections on the client side of a network connection.

OpenSSLClientContext(configuration: tlslib.tlslib.TLSClientConfiguration)
916    def __init__(self, configuration: TLSClientConfiguration) -> None:
917        """Create a new context object from a given TLS configuration."""
918
919        self._configuration = configuration

Create a new context object from a given TLS configuration.

configuration: tlslib.tlslib.TLSClientConfiguration
921    @property
922    def configuration(self) -> TLSClientConfiguration:
923        """Returns the TLS configuration that was used to create the context."""
924
925        return self._configuration

Returns the TLS configuration that was used to create the context.

def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
927    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
928        """Create a socket-like object that can be used to do TLS."""
929        ossl_context = _init_context_client(self._configuration)
930
931        return OpenSSLTLSSocket._create(
932            parent_context=self,
933            server_side=False,
934            ssl_context=ossl_context,
935            address=address,
936        )

Create a socket-like object that can be used to do TLS.

def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer:
938    def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer:
939        """Creates a TLSBuffer that acts as an in-memory channel,
940        and contains information about the TLS exchange
941        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
942
943        ossl_context = _init_context_client(self._configuration)
944
945        return OpenSSLTLSBuffer._create(
946            server_hostname=server_hostname,
947            parent_context=self,
948            server_side=False,
949            ssl_context=ossl_context,
950        )

Creates a TLSBuffer that acts as an in-memory channel, and contains information about the TLS exchange (cipher, negotiated_protocol, negotiated_tls_version, etc.).

class OpenSSLServerContext:
953class OpenSSLServerContext:
954    """This class controls and creates and creates a socket that is wrapped using the
955    standard library bindings to OpenSSL to perform TLS connections on the
956    server side of a network connection.
957    """
958
959    def __init__(self, configuration: TLSServerConfiguration) -> None:
960        """Create a new context object from a given TLS configuration."""
961
962        self._configuration = configuration
963
964    @property
965    def configuration(self) -> TLSServerConfiguration:
966        """Returns the TLS configuration that was used to create the context."""
967
968        return self._configuration
969
970    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
971        """Create a socket-like object that can be used to do TLS."""
972        ossl_context = _init_context_server(self._configuration)
973
974        return OpenSSLTLSSocket._create(
975            parent_context=self,
976            server_side=True,
977            ssl_context=ossl_context,
978            address=address,
979        )
980
981    def create_buffer(self) -> OpenSSLTLSBuffer:
982        """Creates a TLSBuffer that acts as an in-memory channel,
983        and contains information about the TLS exchange
984        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
985
986        ossl_context = _init_context_server(self._configuration)
987
988        return OpenSSLTLSBuffer._create(
989            server_hostname=None,
990            parent_context=self,
991            server_side=True,
992            ssl_context=ossl_context,
993        )

This class controls and creates and creates a socket that is wrapped using the standard library bindings to OpenSSL to perform TLS connections on the server side of a network connection.

OpenSSLServerContext(configuration: tlslib.tlslib.TLSServerConfiguration)
959    def __init__(self, configuration: TLSServerConfiguration) -> None:
960        """Create a new context object from a given TLS configuration."""
961
962        self._configuration = configuration

Create a new context object from a given TLS configuration.

configuration: tlslib.tlslib.TLSServerConfiguration
964    @property
965    def configuration(self) -> TLSServerConfiguration:
966        """Returns the TLS configuration that was used to create the context."""
967
968        return self._configuration

Returns the TLS configuration that was used to create the context.

def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
970    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
971        """Create a socket-like object that can be used to do TLS."""
972        ossl_context = _init_context_server(self._configuration)
973
974        return OpenSSLTLSSocket._create(
975            parent_context=self,
976            server_side=True,
977            ssl_context=ossl_context,
978            address=address,
979        )

Create a socket-like object that can be used to do TLS.

def create_buffer(self) -> OpenSSLTLSBuffer:
981    def create_buffer(self) -> OpenSSLTLSBuffer:
982        """Creates a TLSBuffer that acts as an in-memory channel,
983        and contains information about the TLS exchange
984        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
985
986        ossl_context = _init_context_server(self._configuration)
987
988        return OpenSSLTLSBuffer._create(
989            server_hostname=None,
990            parent_context=self,
991            server_side=True,
992            ssl_context=ossl_context,
993        )

Creates a TLSBuffer that acts as an in-memory channel, and contains information about the TLS exchange (cipher, negotiated_protocol, negotiated_tls_version, etc.).

def validate_config( tls_config: tlslib.tlslib.TLSClientConfiguration | tlslib.tlslib.TLSServerConfiguration) -> None:
1023def validate_config(tls_config: TLSClientConfiguration | TLSServerConfiguration) -> None:
1024    """Validates whether the OpenSSL TLS implementation supports this TLS configuration."""
1025    _check_trust_store(tls_config.trust_store)
1026
1027    if isinstance(tls_config, TLSClientConfiguration):
1028        sign_chain = tls_config.certificate_chain
1029        if sign_chain is not None:
1030            _check_sign_chain(sign_chain)
1031
1032    else:
1033        assert isinstance(tls_config, TLSServerConfiguration)
1034        cert_chain = tls_config.certificate_chain
1035        if cert_chain is not None:
1036            for sign_chain in cert_chain:
1037                _check_sign_chain(sign_chain)

Validates whether the OpenSSL TLS implementation supports this TLS configuration.

STDLIB_IMPLEMENTATION = <tlslib.tlslib.TLSImplementation object>