tlslib.stdlib

Shims the standard library OpenSSL module into the amended PEP 543 API.

   1"""Shims the standard library OpenSSL module into the amended PEP 543 API."""
   2
   3from __future__ import annotations
   4
   5import os
   6import socket
   7import ssl
   8import tempfile
   9import typing
  10import weakref
  11from collections.abc import Buffer, Sequence
  12from contextlib import contextmanager
  13from pathlib import Path
  14
  15import truststore
  16
  17from .tlslib import (
  18    DEFAULT_CIPHER_LIST,
  19    Certificate,
  20    CipherSuite,
  21    ConfigurationError,
  22    NextProtocol,
  23    PrivateKey,
  24    RaggedEOF,
  25    SigningChain,
  26    TLSClientConfiguration,
  27    TLSError,
  28    TLSImplementation,
  29    TLSServerConfiguration,
  30    TLSVersion,
  31    TrustStore,
  32    WantReadError,
  33    WantWriteError,
  34)
  35
  36_SSLContext = ssl.SSLContext | truststore.SSLContext
  37
  38_TLSMinVersionOpts = {
  39    TLSVersion.MINIMUM_SUPPORTED: ssl.TLSVersion.MINIMUM_SUPPORTED,
  40    TLSVersion.TLSv1_2: ssl.TLSVersion.TLSv1_2,
  41    TLSVersion.TLSv1_3: ssl.TLSVersion.TLSv1_3,
  42}
  43
  44_TLSMaxVersionOpts = {
  45    TLSVersion.TLSv1_2: ssl.TLSVersion.TLSv1_2,
  46    TLSVersion.TLSv1_3: ssl.TLSVersion.TLSv1_3,
  47    TLSVersion.MAXIMUM_SUPPORTED: ssl.TLSVersion.MAXIMUM_SUPPORTED,
  48}
  49
  50# We need to populate a dictionary of ciphers that OpenSSL supports, in the
  51# form of {16-bit number: OpenSSL suite name}.
  52ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  53ctx.set_ciphers("ALL:COMPLEMENTOFALL")
  54_cipher_map = {c["id"] & 0xFFFF: c["name"] for c in ctx.get_ciphers()}
  55del ctx
  56
  57
  58@contextmanager
  59def _error_converter(
  60    ignore_filter: tuple[type[Exception]] | tuple[()] = (),
  61) -> typing.Generator[None, None, None]:
  62    """
  63    Catches errors from the ssl module and wraps them up in TLSError
  64    exceptions. Ignores certain kinds of exceptions as requested.
  65    """
  66    try:
  67        yield
  68    except ignore_filter:
  69        pass
  70    except ssl.SSLWantReadError:
  71        raise WantReadError("Must read data") from None
  72    except ssl.SSLWantWriteError:
  73        raise WantWriteError("Must write data") from None
  74    except ssl.SSLEOFError:
  75        raise RaggedEOF("Ragged EOF") from None
  76    except ssl.SSLError as e:
  77        raise TLSError(e) from None
  78
  79
  80def _remove_path(ts_cert_priv: TrustStore | Certificate | PrivateKey) -> None:
  81    ts_cert_priv._path = None
  82
  83
  84def _is_system_trust_store(trust_store: TrustStore | None) -> bool:
  85    return trust_store is None or (
  86        trust_store._path is None and trust_store._buffer is None and trust_store._id is None
  87    )
  88
  89
  90def _get_path_from_trust_store(
  91    context: _SSLContext, trust_store: TrustStore | None
  92) -> os.PathLike | None:
  93    assert trust_store is not None
  94    if trust_store._path is not None:
  95        return trust_store._path
  96    elif trust_store._buffer is not None:
  97        tmp_path = tempfile.NamedTemporaryFile(mode="w+b", delete=False, delete_on_close=False)
  98        tmp_path.write(trust_store._buffer)
  99        tmp_path.close()
 100        # Store this path to prevent creation of multiple files for each trust store
 101        trust_store._path = Path(tmp_path.name)
 102        weakref.finalize(context, os.remove, tmp_path.name)
 103        # Remove the path in case the trust store outlives the context
 104        weakref.finalize(context, _remove_path, trust_store)
 105        return trust_store._path
 106    elif trust_store._id is not None:
 107        raise ConfigurationError("This TLS implementation does not support id-based trust stores.")
 108    else:
 109        return None
 110
 111
 112def _create_client_context_with_trust_store(trust_store: TrustStore | None) -> _SSLContext:
 113    some_context: _SSLContext
 114
 115    if _is_system_trust_store(trust_store):
 116        some_context = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
 117    else:
 118        some_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
 119        trust_store_path = _get_path_from_trust_store(some_context, trust_store)
 120        some_context.load_verify_locations(trust_store_path)
 121
 122    # TLS Compression is a security risk and is removed in TLS v1.3
 123    some_context.options |= ssl.OP_NO_COMPRESSION
 124
 125    some_context.verify_flags = (
 126        ssl.VerifyFlags.VERIFY_X509_STRICT | ssl.VerifyFlags.VERIFY_X509_PARTIAL_CHAIN
 127    )
 128
 129    return some_context
 130
 131
 132def _create_server_context_with_trust_store(
 133    trust_store: TrustStore | None,
 134) -> ssl.SSLContext:
 135    some_context: ssl.SSLContext
 136
 137    # truststore does not support server side
 138    some_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
 139
 140    if trust_store is not None:
 141        some_context.verify_mode = ssl.CERT_REQUIRED
 142        trust_store_path = _get_path_from_trust_store(some_context, trust_store)
 143
 144        if trust_store_path is not None:
 145            some_context.load_verify_locations(trust_store_path)
 146        else:
 147            some_context.load_default_certs(ssl.Purpose.CLIENT_AUTH)
 148
 149    # TLS Compression is a security risk and is removed in TLS v1.3
 150    some_context.options |= ssl.OP_NO_COMPRESSION
 151
 152    return some_context
 153
 154
 155def _sni_callback_builder(
 156    _name_to_chain_map: weakref.WeakValueDictionary[str, SigningChain],
 157    original_config: TLSServerConfiguration,
 158) -> typing.Callable[[ssl.SSLSocket, str, ssl.SSLContext], ssl.AlertDescription | None]:
 159    def pep543_callback(
 160        ssl_socket: ssl.SSLSocket,
 161        server_name: str,
 162        stdlib_context: ssl.SSLContext,
 163    ) -> ssl.AlertDescription | None:
 164        try:
 165            sign_chain = _name_to_chain_map[server_name]
 166        except KeyError:
 167            return ssl.ALERT_DESCRIPTION_INTERNAL_ERROR
 168
 169        new_config: TLSServerConfiguration = TLSServerConfiguration(
 170            certificate_chain=(sign_chain,),
 171            ciphers=original_config.ciphers,
 172            inner_protocols=original_config.inner_protocols,
 173            lowest_supported_version=original_config.lowest_supported_version,
 174            highest_supported_version=original_config.highest_supported_version,
 175            trust_store=original_config.trust_store,
 176        )
 177        ssl_socket.context = _init_context_server(new_config)
 178
 179        # Returning None, perversely, is how one signals success from this
 180        # function. Will wonders never cease?
 181        return None
 182
 183    return pep543_callback
 184
 185
 186def _configure_server_context_for_certs(
 187    context: ssl.SSLContext,
 188    cert_chain: Sequence[SigningChain] | None = None,
 189    sni_config: TLSServerConfiguration | None = None,
 190) -> ssl.SSLContext:
 191    if cert_chain is not None:
 192        if len(cert_chain) == 1:
 193            # Only one SigningChain, no need to configure SNI
 194            return _configure_context_for_single_signing_chain(context, cert_chain[0])
 195
 196        elif len(cert_chain) > 1:
 197            # We have multiple SigningChains, need to configure SNI
 198            assert sni_config is not None
 199            return _configure_context_for_sni(context, cert_chain, sni_config)
 200
 201    return context
 202
 203
 204def _get_path_from_cert_or_priv(
 205    context: _SSLContext, cert_or_priv: Certificate | PrivateKey
 206) -> os.PathLike:
 207    if cert_or_priv._path is not None:
 208        return cert_or_priv._path
 209    elif cert_or_priv._buffer is not None:
 210        tmp_path = tempfile.NamedTemporaryFile(mode="w+b", delete=False, delete_on_close=False)
 211        tmp_path.write(cert_or_priv._buffer)
 212        tmp_path.close()
 213        weakref.finalize(context, os.remove, tmp_path.name)
 214        # Store the path for future usage, preventing creation of multiple files
 215        cert_or_priv._path = Path(tmp_path.name)
 216        # Remove the path in case the cert or priv outlives the context
 217        weakref.finalize(context, _remove_path, cert_or_priv)
 218        return cert_or_priv._path
 219    elif cert_or_priv._id is not None:
 220        raise ConfigurationError(
 221            "This TLS implementation does not support id-based certificates \
 222                                  or private keys."
 223        )
 224    else:
 225        raise ConfigurationError("Certificate or PrivateKey cannot be empty.")
 226
 227
 228def _get_bytes_from_cert(cert: Certificate) -> bytes:
 229    if cert._buffer is not None:
 230        return cert._buffer
 231    elif cert._path is not None:
 232        # Do not save cert in memory
 233        return Path(cert._path).read_bytes()
 234    elif cert._id is not None:
 235        raise ConfigurationError("This TLS implementation does not support id-based certificates.")
 236    else:
 237        raise ConfigurationError("Certificate cannot be empty.")
 238
 239
 240def _configure_context_for_single_signing_chain(
 241    context: _SSLContext,
 242    cert_chain: SigningChain | None = None,
 243) -> _SSLContext:
 244    """Given a PEP 543 cert chain, configure the SSLContext to send that cert
 245    chain in the handshake.
 246
 247    Returns the context.
 248    """
 249
 250    if cert_chain is not None:
 251        cert = cert_chain.leaf[0]
 252
 253        if len(cert_chain.chain) == 0:
 254            cert_path = _get_path_from_cert_or_priv(context, cert)
 255
 256        else:
 257            with tempfile.NamedTemporaryFile(mode="wb", delete=False) as io:
 258                # Write first cert
 259                io.write(_get_bytes_from_cert(cert))
 260
 261                for cert in cert_chain.chain:
 262                    io.write(b"\n")
 263                    io.write(_get_bytes_from_cert(cert))
 264
 265            weakref.finalize(context, os.remove, io.name)
 266            cert_path = Path(io.name)
 267
 268        key_path = None
 269        if cert_chain.leaf[1] is not None:
 270            privkey = cert_chain.leaf[1]
 271
 272            key_path = _get_path_from_cert_or_priv(context, privkey)
 273
 274        assert cert_path is not None
 275        with _error_converter():
 276            context.load_cert_chain(cert_path, key_path, None)
 277
 278    return context
 279
 280
 281def _configure_context_for_sni(
 282    context: ssl.SSLContext,
 283    cert_chain: Sequence[SigningChain],
 284    sni_config: TLSServerConfiguration,
 285) -> ssl.SSLContext:
 286    # This is a mapping of concrete server names to the corresponding SigningChain
 287    _name_to_chain_map: weakref.WeakValueDictionary[str, SigningChain] = (
 288        weakref.WeakValueDictionary()
 289    )
 290
 291    for sign_chain in cert_chain:
 292        # Parse leaf certificates to find server names
 293        cert = sign_chain.leaf[0]
 294        cert_path = _get_path_from_cert_or_priv(context, cert)
 295        dec_cert = ssl._ssl._test_decode_cert(cert_path)  # type: ignore[attr-defined]
 296
 297        try:
 298            alt_names = dec_cert["subjectAltName"]
 299        except KeyError:
 300            continue
 301
 302        server_name = None
 303        for name in alt_names:
 304            assert len(name) == 2
 305            if name[0] == "DNS":
 306                server_name = name[1]
 307                break
 308
 309        if server_name is not None:
 310            _name_to_chain_map[server_name] = sign_chain
 311
 312    context.sni_callback = _sni_callback_builder(_name_to_chain_map, sni_config)  # type: ignore[assignment]
 313
 314    return context
 315
 316
 317def _configure_context_for_ciphers(
 318    context: _SSLContext, ciphers: Sequence[CipherSuite | int] | None = None
 319) -> _SSLContext:
 320    """Given a PEP 543 cipher suite list, configure the SSLContext to use those
 321    cipher suites.
 322
 323    Returns the context.
 324    """
 325    if ciphers is None:
 326        # OpenSSL does not necessarily have system recommended settings
 327        # The default cipher list is used here instead
 328        ciphers = DEFAULT_CIPHER_LIST
 329
 330    ossl_names = [_cipher_map[cipher] for cipher in ciphers if cipher in _cipher_map]
 331    if not ossl_names:
 332        msg = "None of the provided ciphers are supported by the OpenSSL TLS implementation!"
 333        raise TLSError(msg)
 334    with _error_converter():
 335        context.set_ciphers(":".join(ossl_names))
 336    return context
 337
 338
 339def _configure_context_for_negotiation(
 340    context: _SSLContext,
 341    inner_protocols: Sequence[NextProtocol | bytes] | None = None,
 342) -> _SSLContext:
 343    """Given a PEP 543 list of protocols to negotiate, configures the SSLContext
 344    to negotiate those protocols.
 345    """
 346    if inner_protocols:
 347        protocols = []
 348        for np in inner_protocols:
 349            proto_string = np if isinstance(np, bytes) else np.value
 350            # The protocol string needs to be of type str for the standard
 351            # library.
 352            protocols.append(proto_string.decode("ascii"))
 353
 354        context.set_alpn_protocols(protocols)
 355
 356    return context
 357
 358
 359def _init_context_common(
 360    some_context: _SSLContext,
 361    config: TLSClientConfiguration | TLSServerConfiguration,
 362) -> _SSLContext:
 363    some_context = _configure_context_for_ciphers(
 364        some_context,
 365        config.ciphers,
 366    )
 367    some_context = _configure_context_for_negotiation(
 368        some_context,
 369        config.inner_protocols,
 370    )
 371
 372    # In lieu of system recommended settings, we default to TLS v1.3
 373    lowest_supported_version = config.lowest_supported_version
 374    if lowest_supported_version is None:
 375        lowest_supported_version = TLSVersion.TLSv1_3
 376
 377    highest_supported_version = config.highest_supported_version
 378    if highest_supported_version is None:
 379        highest_supported_version = TLSVersion.MAXIMUM_SUPPORTED
 380
 381    try:
 382        some_context.minimum_version = _TLSMinVersionOpts[lowest_supported_version]
 383        some_context.maximum_version = _TLSMaxVersionOpts[highest_supported_version]
 384    except KeyError:
 385        raise TLSError("Bad maximum/minimum options")
 386
 387    return some_context
 388
 389
 390def _init_context_client(config: TLSClientConfiguration) -> _SSLContext:
 391    """Initialize an SSL context object with a given client configuration."""
 392    some_context = _create_client_context_with_trust_store(config.trust_store)
 393
 394    some_context = _configure_context_for_single_signing_chain(
 395        some_context, config.certificate_chain
 396    )
 397
 398    return _init_context_common(some_context, config)
 399
 400
 401def _init_context_server(config: TLSServerConfiguration) -> _SSLContext:
 402    """Initialize an SSL context object with a given server configuration."""
 403    some_context = _create_server_context_with_trust_store(config.trust_store)
 404
 405    some_context = _configure_server_context_for_certs(
 406        some_context, config.certificate_chain, config
 407    )
 408
 409    return _init_context_common(some_context, config)
 410
 411
 412class OpenSSLTLSSocket:
 413    """A TLSSocket implementation based on OpenSSL."""
 414
 415    __slots__ = (
 416        "_parent_context",
 417        "_socket",
 418        "_ssl_context",
 419    )
 420
 421    _parent_context: OpenSSLClientContext | OpenSSLServerContext
 422    _socket: ssl.SSLSocket
 423    _ssl_context: _SSLContext
 424
 425    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
 426        """OpenTLSSockets should not be constructed by the user.
 427        Instead, the ClientContext.connect() and
 428        ServerContext.connect() use the _create() method."""
 429        msg = (
 430            f"{self.__class__.__name__} does not have a public constructor. "
 431            "Instances are returned by ClientContext.connect() or ServerContext.connect()."
 432        )
 433        raise TypeError(
 434            msg,
 435        )
 436
 437    @classmethod
 438    def _create(
 439        cls,
 440        address: tuple[str | None, int],
 441        parent_context: OpenSSLClientContext | OpenSSLServerContext,
 442        server_side: bool,
 443        ssl_context: _SSLContext,
 444    ) -> OpenSSLTLSSocket:
 445        self = cls.__new__(cls)
 446        self._parent_context = parent_context
 447        self._ssl_context = ssl_context
 448
 449        if server_side is True:
 450            family = socket.getaddrinfo(address[0], address[1], type=socket.SOCK_STREAM)[0][0]
 451            sock = socket.create_server(address, family=family)
 452            with _error_converter():
 453                self._socket = ssl_context.wrap_socket(
 454                    sock, server_side=server_side, server_hostname=None
 455                )
 456        else:
 457            hostname, _ = address
 458            sock = socket.create_connection(address)
 459            with _error_converter():
 460                self._socket = ssl_context.wrap_socket(
 461                    sock, server_side=server_side, server_hostname=hostname
 462                )
 463
 464        self._socket.setblocking(False)
 465
 466        return self
 467
 468    def recv(self, bufsize: int) -> bytes:
 469        """Receive data from the socket. The return value is a bytes object
 470        representing the data received. Should not work before the handshake
 471        is completed."""
 472        with _error_converter():
 473            try:
 474                return self._socket.recv(bufsize)
 475            except ssl.SSLZeroReturnError:
 476                return b""
 477
 478    def send(self, bytes: bytes) -> int:
 479        """Send data to the socket. The socket must be connected to a remote socket."""
 480        with _error_converter():
 481            return self._socket.send(bytes)
 482
 483    def close(self, force: bool = False) -> None:
 484        """Unwraps the TLS connection, shuts down both halves of the connection and
 485        mark the socket closed. If force is True, will only shutdown own half and
 486        not wait for the other side. If force is False, this will raise WantReadError
 487        until the other side sends a close_notify alert."""
 488
 489        try:
 490            with _error_converter():
 491                sock = self._socket.unwrap()
 492        except (ValueError, BrokenPipeError, OSError):
 493            # If these exceptions are raised, we close the socket without re-trying to unwrap it.
 494            # - ValueError: The socket was actually not wrapped
 495            # - BrokenPipeError: There is some issue with the socket
 496            # - OSError: The other side already shut down
 497            sock = self._socket
 498        except WantReadError:
 499            if force:
 500                sock = self._socket
 501            else:
 502                raise
 503
 504        # NOTE: OSError indicates that the other side has already hung up.
 505        with _error_converter(ignore_filter=(OSError,)):
 506            sock.shutdown(socket.SHUT_RDWR)
 507        return sock.close()
 508
 509    def listen(self, backlog: int) -> None:
 510        """Enable a server to accept connections. If backlog is specified, it
 511        specifies the number of unaccepted connections that the system will allow
 512        before refusing new connections."""
 513        with _error_converter():
 514            return self._socket.listen(backlog)
 515
 516    def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]:
 517        """Accept a connection. The socket must be bound to an address and listening
 518        for connections. The return value is a pair (conn, address) where conn is a
 519        new TLSSocket object usable to send and receive data on the connection, and
 520        address is the address bound to the socket on the other end of the connection."""
 521
 522        with _error_converter():
 523            (sock, address) = self._socket.accept()
 524        tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket)
 525        tls_socket._parent_context = self._parent_context
 526        tls_socket._ssl_context = self._ssl_context
 527        tls_socket._socket = sock
 528        with _error_converter():
 529            tls_socket._socket.setblocking(False)
 530        return (tls_socket, address)
 531
 532    def getsockname(self) -> socket._RetAddress:
 533        """Return the local address to which the socket is connected."""
 534        with _error_converter():
 535            return self._socket.getsockname()
 536
 537    def getpeercert(self) -> bytes | None:
 538        """
 539        Return the raw DER bytes of the certificate provided by the peer
 540        during the handshake, if applicable.
 541        """
 542        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
 543        # Obtaining the certificate as a dict is very specific to the ssl module and may be
 544        # difficult to implement for other TLS implementations, so this is not supported
 545
 546        with _error_converter():
 547            cert = self._socket.getpeercert(True)
 548
 549        return cert
 550
 551    def getpeername(self) -> socket._RetAddress:
 552        """Return the remote address to which the socket is connected."""
 553
 554        with _error_converter():
 555            return self._socket.getpeername()
 556
 557    def fileno(self) -> int:
 558        """Return the socket's file descriptor (a small integer), or -1 on failure."""
 559
 560        with _error_converter():
 561            return self._socket.fileno()
 562
 563    @property
 564    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
 565        """The ``Context`` object this socket is tied to."""
 566
 567        return self._parent_context
 568
 569    def cipher(self) -> CipherSuite | int | None:
 570        """
 571        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
 572
 573        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
 574        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
 575        """
 576
 577        # This is the OpenSSL cipher name. We want the ID, which we can get by
 578        # looking for this entry in the context's list of supported ciphers.
 579        ret = self._socket.cipher()
 580
 581        if ret is None:
 582            return None
 583        else:
 584            ossl_cipher, _, _ = ret
 585
 586        for cipher in self._ssl_context.get_ciphers():
 587            if cipher["name"] == ossl_cipher:
 588                break
 589        # Since the cipher was negotiated using the OpenSSL context,
 590        # it must exist in the list of the OpenSSL supported ciphers.
 591        assert cipher["name"] == ossl_cipher
 592
 593        cipher_id = cipher["id"] & 0xFFFF
 594        try:
 595            return CipherSuite(cipher_id)
 596        except ValueError:
 597            return cipher_id
 598
 599    def negotiated_protocol(self) -> NextProtocol | bytes | None:
 600        """
 601        Returns the protocol that was selected during the TLS handshake.
 602
 603        This selection may have been made using ALPN or some future
 604        negotiation mechanism.
 605
 606        If the negotiated protocol is one of the protocols defined in the
 607        ``NextProtocol`` enum, the value from that enum will be returned.
 608        Otherwise, the raw bytestring of the negotiated protocol will be
 609        returned.
 610
 611        If ``Context.set_inner_protocols()`` was not called, if the other
 612        party does not support protocol negotiation, if this socket does
 613        not support any of the peer's proposed protocols, or if the
 614        handshake has not happened yet, ``None`` is returned.
 615        """
 616
 617        proto = self._socket.selected_alpn_protocol()
 618
 619        # The standard library returns this as a str, we want bytes.
 620        if proto is None:
 621            return None
 622
 623        protoBytes = proto.encode("ascii")
 624
 625        try:
 626            return NextProtocol(protoBytes)
 627        except ValueError:
 628            return protoBytes
 629
 630    @property
 631    def negotiated_tls_version(self) -> TLSVersion | None:
 632        """The version of TLS that has been negotiated on this connection."""
 633
 634        ossl_version = self._socket.version()
 635        if ossl_version is None:
 636            return None
 637        else:
 638            return TLSVersion(ossl_version)
 639
 640
 641class OpenSSLTLSBuffer:
 642    """A TLSBuffer implementation based on OpenSSL"""
 643
 644    __slots__ = (
 645        "_ciphertext_buffer",
 646        "_in_bio",
 647        "_object",
 648        "_out_bio",
 649        "_parent_context",
 650        "_ssl_context",
 651    )
 652
 653    _ciphertext_buffer: bytearray
 654    _in_bio: ssl.MemoryBIO
 655    _object: ssl.SSLObject
 656    _out_bio: ssl.MemoryBIO
 657    _parent_context: OpenSSLClientContext | OpenSSLServerContext
 658    _ssl_context: _SSLContext
 659
 660    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
 661        """OpenTLSBuffers should not be constructed by the user.
 662        Instead, the ClientContext.create_buffer() and
 663        ServerContext.create_buffer() use the _create() method."""
 664        msg = (
 665            f"{self.__class__.__name__} does not have a public constructor. "
 666            "Instances are returned by ClientContext.create_buffer() \
 667                or ServerContext.create_buffer()."
 668        )
 669        raise TypeError(
 670            msg,
 671        )
 672
 673    @classmethod
 674    def _create(
 675        cls,
 676        server_hostname: str | None,
 677        parent_context: OpenSSLClientContext | OpenSSLServerContext,
 678        server_side: bool,
 679        ssl_context: _SSLContext,
 680    ) -> OpenSSLTLSBuffer:
 681        self = cls.__new__(cls)
 682        self._parent_context = parent_context
 683        self._ssl_context = ssl_context
 684
 685        # We need this extra buffer to implement the peek/consume API, which
 686        # the MemoryBIO object does not allow.
 687        self._ciphertext_buffer = bytearray()
 688
 689        # Set up the SSLObject we're going to back this with.
 690        self._in_bio = ssl.MemoryBIO()
 691        self._out_bio = ssl.MemoryBIO()
 692
 693        if server_side is True:
 694            with _error_converter():
 695                self._object = ssl_context.wrap_bio(
 696                    self._in_bio, self._out_bio, server_side=True, server_hostname=None
 697                )
 698        else:
 699            with _error_converter():
 700                self._object = ssl_context.wrap_bio(
 701                    self._in_bio, self._out_bio, server_side=False, server_hostname=server_hostname
 702                )
 703
 704        return self
 705
 706    def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int:
 707        """
 708        Read up to ``amt`` bytes of data from the input buffer and return
 709        the result as a ``bytes`` instance. If an optional buffer is
 710        provided, the result is written into the buffer and the number of
 711        bytes is returned instead.
 712
 713        Once EOF is reached, all further calls to this method return the
 714        empty byte string ``b''``.
 715
 716        May read "short": that is, fewer bytes may be returned than were
 717        requested.
 718
 719        Raise ``WantReadError`` or ``WantWriteError`` if there is
 720        insufficient data in either the input or output buffer and the
 721        operation would have caused data to be written or read.
 722
 723        May raise ``RaggedEOF`` if the connection has been closed without a
 724        graceful TLS shutdown. Whether this is an exception that should be
 725        ignored or not is up to the specific application.
 726
 727        As at any time a re-negotiation is possible, a call to ``read()``
 728        can also cause write operations.
 729        """
 730
 731        with _error_converter():
 732            try:
 733                # MyPy insists that buffer must be a bytearray
 734                return self._object.read(amt, buffer)  # type: ignore[arg-type]
 735            except ssl.SSLZeroReturnError:
 736                return b""
 737
 738    def write(self, buf: Buffer) -> int:
 739        """
 740        Write ``buf`` in encrypted form to the output buffer and return the
 741        number of bytes written. The ``buf`` argument must be an object
 742        supporting the buffer interface.
 743
 744        Raise ``WantReadError`` or ``WantWriteError`` if there is
 745        insufficient data in either the input or output buffer and the
 746        operation would have caused data to be written or read. In either
 747        case, users should endeavour to resolve that situation and then
 748        re-call this method. When re-calling this method users *should*
 749        re-use the exact same ``buf`` object, as some TLS implementations
 750        require that the exact same buffer be used.
 751
 752        This operation may write "short": that is, fewer bytes may be
 753        written than were in the buffer.
 754
 755        As at any time a re-negotiation is possible, a call to ``write()``
 756        can also cause read operations.
 757        """
 758
 759        with _error_converter():
 760            return self._object.write(buf)
 761
 762    # Get rid and do handshake ourselves?
 763    def do_handshake(self) -> None:
 764        """
 765        Performs the TLS handshake. Also performs certificate validation
 766        and hostname verification.
 767        """
 768
 769        with _error_converter():
 770            self._object.do_handshake()
 771
 772    def shutdown(self) -> None:
 773        """
 774        Performs a clean TLS shut down. This should generally be used
 775        whenever possible to signal to the remote peer that the content is
 776        finished.
 777        """
 778
 779        with _error_converter():
 780            self._object.unwrap()
 781
 782    def process_incoming(self, data_from_network: bytes) -> None:
 783        """
 784        Receives some TLS data from the network and stores it in an
 785        internal buffer.
 786
 787        If the internal buffer is overfull, this method will raise
 788        ``WantReadError`` and store no data. At this point, the user must
 789        call ``read`` to remove some data from the internal buffer
 790        before repeating this call.
 791        """
 792
 793        with _error_converter():
 794            written_len = self._in_bio.write(data_from_network)
 795
 796        assert written_len == len(data_from_network)
 797
 798    def incoming_bytes_buffered(self) -> int:
 799        """
 800        Returns how many bytes are in the incoming buffer waiting to be processed.
 801        """
 802
 803        return self._in_bio.pending
 804
 805    def process_outgoing(self, amount_bytes_for_network: int) -> bytes:
 806        """
 807        Returns the next ``amt`` bytes of data that should be written to
 808        the network from the outgoing data buffer, removing it from the
 809        internal buffer.
 810        """
 811
 812        return self._out_bio.read(amount_bytes_for_network)
 813
 814    def outgoing_bytes_buffered(self) -> int:
 815        """
 816        Returns how many bytes are in the outgoing buffer waiting to be sent.
 817        """
 818
 819        return self._out_bio.pending
 820
 821    @property
 822    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
 823        """The ``Context`` object this socket is tied to."""
 824
 825        return self._parent_context
 826
 827    def cipher(self) -> CipherSuite | int | None:
 828        """
 829        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
 830
 831        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
 832        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
 833        """
 834
 835        ret = self._object.cipher()
 836
 837        if ret is None:
 838            return None
 839        else:
 840            ossl_cipher, _, _ = ret
 841
 842        for cipher in self._ssl_context.get_ciphers():
 843            if cipher["name"] == ossl_cipher:
 844                break
 845        # Since the cipher was negotiated using the OpenSSL context,
 846        # it must exist in the list of the OpenSSL supported ciphers.
 847        assert cipher["name"] == ossl_cipher
 848
 849        cipher_id = cipher["id"] & 0xFFFF
 850        try:
 851            return CipherSuite(cipher_id)
 852        except ValueError:
 853            return cipher_id
 854
 855    def negotiated_protocol(self) -> NextProtocol | bytes | None:
 856        """
 857        Returns the protocol that was selected during the TLS handshake.
 858
 859        This selection may have been made using ALPN or some future
 860        negotiation mechanism.
 861
 862        If the negotiated protocol is one of the protocols defined in the
 863        ``NextProtocol`` enum, the value from that enum will be returned.
 864        Otherwise, the raw bytestring of the negotiated protocol will be
 865        returned.
 866
 867        If ``Context.set_inner_protocols()`` was not called, if the other
 868        party does not support protocol negotiation, if this socket does
 869        not support any of the peer's proposed protocols, or if the
 870        handshake has not happened yet, ``None`` is returned.
 871        """
 872
 873        proto = self._object.selected_alpn_protocol()
 874
 875        # The standard library returns this as a str, we want bytes.
 876        if proto is None:
 877            return None
 878
 879        protoBytes = proto.encode("ascii")
 880
 881        try:
 882            return NextProtocol(protoBytes)
 883        except ValueError:
 884            return protoBytes
 885
 886    @property
 887    def negotiated_tls_version(self) -> TLSVersion | None:
 888        """The version of TLS that has been negotiated on this connection."""
 889
 890        ossl_version = self._object.version()
 891        if ossl_version is None:
 892            return None
 893        else:
 894            return TLSVersion(ossl_version)
 895
 896    def getpeercert(self) -> bytes | None:
 897        """
 898        Return the raw DER bytes of the certificate provided by the peer
 899        during the handshake, if applicable.
 900        """
 901        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
 902        # Obtaining the certificate as a dict is very specific to the ssl module and may be
 903        # difficult to implement for other implementation, so this is not supported
 904        with _error_converter():
 905            cert = self._object.getpeercert(True)
 906
 907        return cert
 908
 909
 910class OpenSSLClientContext:
 911    """This class controls and creates a socket that is wrapped using the
 912    standard library bindings to OpenSSL to perform TLS connections on the
 913    client side of a network connection.
 914    """
 915
 916    def __init__(self, configuration: TLSClientConfiguration) -> None:
 917        """Create a new context object from a given TLS configuration."""
 918
 919        self._configuration = configuration
 920
 921    @property
 922    def configuration(self) -> TLSClientConfiguration:
 923        """Returns the TLS configuration that was used to create the context."""
 924
 925        return self._configuration
 926
 927    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
 928        """Create a socket-like object that can be used to do TLS."""
 929        ossl_context = _init_context_client(self._configuration)
 930
 931        return OpenSSLTLSSocket._create(
 932            parent_context=self,
 933            server_side=False,
 934            ssl_context=ossl_context,
 935            address=address,
 936        )
 937
 938    def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer:
 939        """Creates a TLSBuffer that acts as an in-memory channel,
 940        and contains information about the TLS exchange
 941        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
 942
 943        ossl_context = _init_context_client(self._configuration)
 944
 945        return OpenSSLTLSBuffer._create(
 946            server_hostname=server_hostname,
 947            parent_context=self,
 948            server_side=False,
 949            ssl_context=ossl_context,
 950        )
 951
 952
 953class OpenSSLServerContext:
 954    """This class controls and creates and creates a socket that is wrapped using the
 955    standard library bindings to OpenSSL to perform TLS connections on the
 956    server side of a network connection.
 957    """
 958
 959    def __init__(self, configuration: TLSServerConfiguration) -> None:
 960        """Create a new context object from a given TLS configuration."""
 961
 962        self._configuration = configuration
 963
 964    @property
 965    def configuration(self) -> TLSServerConfiguration:
 966        """Returns the TLS configuration that was used to create the context."""
 967
 968        return self._configuration
 969
 970    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
 971        """Create a socket-like object that can be used to do TLS."""
 972        ossl_context = _init_context_server(self._configuration)
 973
 974        return OpenSSLTLSSocket._create(
 975            parent_context=self,
 976            server_side=True,
 977            ssl_context=ossl_context,
 978            address=address,
 979        )
 980
 981    def create_buffer(self) -> OpenSSLTLSBuffer:
 982        """Creates a TLSBuffer that acts as an in-memory channel,
 983        and contains information about the TLS exchange
 984        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
 985
 986        ossl_context = _init_context_server(self._configuration)
 987
 988        return OpenSSLTLSBuffer._create(
 989            server_hostname=None,
 990            parent_context=self,
 991            server_side=True,
 992            ssl_context=ossl_context,
 993        )
 994
 995
 996def _check_cert_or_priv(cert_or_priv: Certificate | PrivateKey) -> None:
 997    if cert_or_priv._path is not None or cert_or_priv._buffer is not None:
 998        return None
 999    elif cert_or_priv._id is not None:
1000        raise ConfigurationError(
1001            "This TLS implementation does not support id-based certificates \
1002                                  or private keys."
1003        )
1004    else:
1005        raise ConfigurationError("Certificate or PrivateKey cannot be empty.")
1006
1007
1008def _check_trust_store(trust_store: TrustStore | None) -> None:
1009    if trust_store is not None and trust_store._id is not None:
1010        raise ConfigurationError("This TLS implementation does not support id-based trust stores.")
1011
1012
1013def _check_sign_chain(sign_chain: SigningChain) -> None:
1014    leaf = sign_chain.leaf
1015    _check_cert_or_priv(leaf[0])
1016    priv_key = leaf[1]
1017    if priv_key is not None:
1018        _check_cert_or_priv(priv_key)
1019    for cert in sign_chain.chain:
1020        _check_cert_or_priv(cert)
1021
1022
1023def validate_config(tls_config: TLSClientConfiguration | TLSServerConfiguration) -> None:
1024    """Validates whether the OpenSSL TLS implementation supports this TLS configuration."""
1025    _check_trust_store(tls_config.trust_store)
1026
1027    if isinstance(tls_config, TLSClientConfiguration):
1028        sign_chain = tls_config.certificate_chain
1029        if sign_chain is not None:
1030            _check_sign_chain(sign_chain)
1031
1032    else:
1033        assert isinstance(tls_config, TLSServerConfiguration)
1034        cert_chain = tls_config.certificate_chain
1035        if cert_chain is not None:
1036            for sign_chain in cert_chain:
1037                _check_sign_chain(sign_chain)
1038
1039
1040#: The stdlib ``TLSImplementation`` object.
1041STDLIB_IMPLEMENTATION = TLSImplementation(
1042    client_context=OpenSSLClientContext,
1043    server_context=OpenSSLServerContext,
1044    validate_config=validate_config,
1045)
ctx
class OpenSSLTLSSocket:
413class OpenSSLTLSSocket:
414    """A TLSSocket implementation based on OpenSSL."""
415
416    __slots__ = (
417        "_parent_context",
418        "_socket",
419        "_ssl_context",
420    )
421
422    _parent_context: OpenSSLClientContext | OpenSSLServerContext
423    _socket: ssl.SSLSocket
424    _ssl_context: _SSLContext
425
426    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
427        """OpenTLSSockets should not be constructed by the user.
428        Instead, the ClientContext.connect() and
429        ServerContext.connect() use the _create() method."""
430        msg = (
431            f"{self.__class__.__name__} does not have a public constructor. "
432            "Instances are returned by ClientContext.connect() or ServerContext.connect()."
433        )
434        raise TypeError(
435            msg,
436        )
437
438    @classmethod
439    def _create(
440        cls,
441        address: tuple[str | None, int],
442        parent_context: OpenSSLClientContext | OpenSSLServerContext,
443        server_side: bool,
444        ssl_context: _SSLContext,
445    ) -> OpenSSLTLSSocket:
446        self = cls.__new__(cls)
447        self._parent_context = parent_context
448        self._ssl_context = ssl_context
449
450        if server_side is True:
451            family = socket.getaddrinfo(address[0], address[1], type=socket.SOCK_STREAM)[0][0]
452            sock = socket.create_server(address, family=family)
453            with _error_converter():
454                self._socket = ssl_context.wrap_socket(
455                    sock, server_side=server_side, server_hostname=None
456                )
457        else:
458            hostname, _ = address
459            sock = socket.create_connection(address)
460            with _error_converter():
461                self._socket = ssl_context.wrap_socket(
462                    sock, server_side=server_side, server_hostname=hostname
463                )
464
465        self._socket.setblocking(False)
466
467        return self
468
469    def recv(self, bufsize: int) -> bytes:
470        """Receive data from the socket. The return value is a bytes object
471        representing the data received. Should not work before the handshake
472        is completed."""
473        with _error_converter():
474            try:
475                return self._socket.recv(bufsize)
476            except ssl.SSLZeroReturnError:
477                return b""
478
479    def send(self, bytes: bytes) -> int:
480        """Send data to the socket. The socket must be connected to a remote socket."""
481        with _error_converter():
482            return self._socket.send(bytes)
483
484    def close(self, force: bool = False) -> None:
485        """Unwraps the TLS connection, shuts down both halves of the connection and
486        mark the socket closed. If force is True, will only shutdown own half and
487        not wait for the other side. If force is False, this will raise WantReadError
488        until the other side sends a close_notify alert."""
489
490        try:
491            with _error_converter():
492                sock = self._socket.unwrap()
493        except (ValueError, BrokenPipeError, OSError):
494            # If these exceptions are raised, we close the socket without re-trying to unwrap it.
495            # - ValueError: The socket was actually not wrapped
496            # - BrokenPipeError: There is some issue with the socket
497            # - OSError: The other side already shut down
498            sock = self._socket
499        except WantReadError:
500            if force:
501                sock = self._socket
502            else:
503                raise
504
505        # NOTE: OSError indicates that the other side has already hung up.
506        with _error_converter(ignore_filter=(OSError,)):
507            sock.shutdown(socket.SHUT_RDWR)
508        return sock.close()
509
510    def listen(self, backlog: int) -> None:
511        """Enable a server to accept connections. If backlog is specified, it
512        specifies the number of unaccepted connections that the system will allow
513        before refusing new connections."""
514        with _error_converter():
515            return self._socket.listen(backlog)
516
517    def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]:
518        """Accept a connection. The socket must be bound to an address and listening
519        for connections. The return value is a pair (conn, address) where conn is a
520        new TLSSocket object usable to send and receive data on the connection, and
521        address is the address bound to the socket on the other end of the connection."""
522
523        with _error_converter():
524            (sock, address) = self._socket.accept()
525        tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket)
526        tls_socket._parent_context = self._parent_context
527        tls_socket._ssl_context = self._ssl_context
528        tls_socket._socket = sock
529        with _error_converter():
530            tls_socket._socket.setblocking(False)
531        return (tls_socket, address)
532
533    def getsockname(self) -> socket._RetAddress:
534        """Return the local address to which the socket is connected."""
535        with _error_converter():
536            return self._socket.getsockname()
537
538    def getpeercert(self) -> bytes | None:
539        """
540        Return the raw DER bytes of the certificate provided by the peer
541        during the handshake, if applicable.
542        """
543        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
544        # Obtaining the certificate as a dict is very specific to the ssl module and may be
545        # difficult to implement for other TLS implementations, so this is not supported
546
547        with _error_converter():
548            cert = self._socket.getpeercert(True)
549
550        return cert
551
552    def getpeername(self) -> socket._RetAddress:
553        """Return the remote address to which the socket is connected."""
554
555        with _error_converter():
556            return self._socket.getpeername()
557
558    def fileno(self) -> int:
559        """Return the socket's file descriptor (a small integer), or -1 on failure."""
560
561        with _error_converter():
562            return self._socket.fileno()
563
564    @property
565    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
566        """The ``Context`` object this socket is tied to."""
567
568        return self._parent_context
569
570    def cipher(self) -> CipherSuite | int | None:
571        """
572        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
573
574        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
575        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
576        """
577
578        # This is the OpenSSL cipher name. We want the ID, which we can get by
579        # looking for this entry in the context's list of supported ciphers.
580        ret = self._socket.cipher()
581
582        if ret is None:
583            return None
584        else:
585            ossl_cipher, _, _ = ret
586
587        for cipher in self._ssl_context.get_ciphers():
588            if cipher["name"] == ossl_cipher:
589                break
590        # Since the cipher was negotiated using the OpenSSL context,
591        # it must exist in the list of the OpenSSL supported ciphers.
592        assert cipher["name"] == ossl_cipher
593
594        cipher_id = cipher["id"] & 0xFFFF
595        try:
596            return CipherSuite(cipher_id)
597        except ValueError:
598            return cipher_id
599
600    def negotiated_protocol(self) -> NextProtocol | bytes | None:
601        """
602        Returns the protocol that was selected during the TLS handshake.
603
604        This selection may have been made using ALPN or some future
605        negotiation mechanism.
606
607        If the negotiated protocol is one of the protocols defined in the
608        ``NextProtocol`` enum, the value from that enum will be returned.
609        Otherwise, the raw bytestring of the negotiated protocol will be
610        returned.
611
612        If ``Context.set_inner_protocols()`` was not called, if the other
613        party does not support protocol negotiation, if this socket does
614        not support any of the peer's proposed protocols, or if the
615        handshake has not happened yet, ``None`` is returned.
616        """
617
618        proto = self._socket.selected_alpn_protocol()
619
620        # The standard library returns this as a str, we want bytes.
621        if proto is None:
622            return None
623
624        protoBytes = proto.encode("ascii")
625
626        try:
627            return NextProtocol(protoBytes)
628        except ValueError:
629            return protoBytes
630
631    @property
632    def negotiated_tls_version(self) -> TLSVersion | None:
633        """The version of TLS that has been negotiated on this connection."""
634
635        ossl_version = self._socket.version()
636        if ossl_version is None:
637            return None
638        else:
639            return TLSVersion(ossl_version)

A TLSSocket implementation based on OpenSSL.

OpenSSLTLSSocket(*args: tuple, **kwargs: tuple)
426    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
427        """OpenTLSSockets should not be constructed by the user.
428        Instead, the ClientContext.connect() and
429        ServerContext.connect() use the _create() method."""
430        msg = (
431            f"{self.__class__.__name__} does not have a public constructor. "
432            "Instances are returned by ClientContext.connect() or ServerContext.connect()."
433        )
434        raise TypeError(
435            msg,
436        )

OpenTLSSockets should not be constructed by the user. Instead, the ClientContext.connect() and ServerContext.connect() use the _create() method.

def recv(self, bufsize: int) -> bytes:
469    def recv(self, bufsize: int) -> bytes:
470        """Receive data from the socket. The return value is a bytes object
471        representing the data received. Should not work before the handshake
472        is completed."""
473        with _error_converter():
474            try:
475                return self._socket.recv(bufsize)
476            except ssl.SSLZeroReturnError:
477                return b""

Receive data from the socket. The return value is a bytes object representing the data received. Should not work before the handshake is completed.

def send(self, bytes: bytes) -> int:
479    def send(self, bytes: bytes) -> int:
480        """Send data to the socket. The socket must be connected to a remote socket."""
481        with _error_converter():
482            return self._socket.send(bytes)

Send data to the socket. The socket must be connected to a remote socket.

def close(self, force: bool = False) -> None:
484    def close(self, force: bool = False) -> None:
485        """Unwraps the TLS connection, shuts down both halves of the connection and
486        mark the socket closed. If force is True, will only shutdown own half and
487        not wait for the other side. If force is False, this will raise WantReadError
488        until the other side sends a close_notify alert."""
489
490        try:
491            with _error_converter():
492                sock = self._socket.unwrap()
493        except (ValueError, BrokenPipeError, OSError):
494            # If these exceptions are raised, we close the socket without re-trying to unwrap it.
495            # - ValueError: The socket was actually not wrapped
496            # - BrokenPipeError: There is some issue with the socket
497            # - OSError: The other side already shut down
498            sock = self._socket
499        except WantReadError:
500            if force:
501                sock = self._socket
502            else:
503                raise
504
505        # NOTE: OSError indicates that the other side has already hung up.
506        with _error_converter(ignore_filter=(OSError,)):
507            sock.shutdown(socket.SHUT_RDWR)
508        return sock.close()

Unwraps the TLS connection, shuts down both halves of the connection and mark the socket closed. If force is True, will only shutdown own half and not wait for the other side. If force is False, this will raise WantReadError until the other side sends a close_notify alert.

def listen(self, backlog: int) -> None:
510    def listen(self, backlog: int) -> None:
511        """Enable a server to accept connections. If backlog is specified, it
512        specifies the number of unaccepted connections that the system will allow
513        before refusing new connections."""
514        with _error_converter():
515            return self._socket.listen(backlog)

Enable a server to accept connections. If backlog is specified, it specifies the number of unaccepted connections that the system will allow before refusing new connections.

def accept(self) -> 'tuple[OpenSSLTLSSocket, socket._RetAddress]':
517    def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]:
518        """Accept a connection. The socket must be bound to an address and listening
519        for connections. The return value is a pair (conn, address) where conn is a
520        new TLSSocket object usable to send and receive data on the connection, and
521        address is the address bound to the socket on the other end of the connection."""
522
523        with _error_converter():
524            (sock, address) = self._socket.accept()
525        tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket)
526        tls_socket._parent_context = self._parent_context
527        tls_socket._ssl_context = self._ssl_context
528        tls_socket._socket = sock
529        with _error_converter():
530            tls_socket._socket.setblocking(False)
531        return (tls_socket, address)

Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair (conn, address) where conn is a new TLSSocket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection.

def getsockname(self) -> 'socket._RetAddress':
533    def getsockname(self) -> socket._RetAddress:
534        """Return the local address to which the socket is connected."""
535        with _error_converter():
536            return self._socket.getsockname()

Return the local address to which the socket is connected.

def getpeercert(self) -> bytes | None:
538    def getpeercert(self) -> bytes | None:
539        """
540        Return the raw DER bytes of the certificate provided by the peer
541        during the handshake, if applicable.
542        """
543        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
544        # Obtaining the certificate as a dict is very specific to the ssl module and may be
545        # difficult to implement for other TLS implementations, so this is not supported
546
547        with _error_converter():
548            cert = self._socket.getpeercert(True)
549
550        return cert

Return the raw DER bytes of the certificate provided by the peer during the handshake, if applicable.

def getpeername(self) -> 'socket._RetAddress':
552    def getpeername(self) -> socket._RetAddress:
553        """Return the remote address to which the socket is connected."""
554
555        with _error_converter():
556            return self._socket.getpeername()

Return the remote address to which the socket is connected.

def fileno(self) -> int:
558    def fileno(self) -> int:
559        """Return the socket's file descriptor (a small integer), or -1 on failure."""
560
561        with _error_converter():
562            return self._socket.fileno()

Return the socket's file descriptor (a small integer), or -1 on failure.

564    @property
565    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
566        """The ``Context`` object this socket is tied to."""
567
568        return self._parent_context

The Context object this socket is tied to.

def cipher(self) -> tlslib.tlslib.CipherSuite | int | None:
570    def cipher(self) -> CipherSuite | int | None:
571        """
572        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
573
574        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
575        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
576        """
577
578        # This is the OpenSSL cipher name. We want the ID, which we can get by
579        # looking for this entry in the context's list of supported ciphers.
580        ret = self._socket.cipher()
581
582        if ret is None:
583            return None
584        else:
585            ossl_cipher, _, _ = ret
586
587        for cipher in self._ssl_context.get_ciphers():
588            if cipher["name"] == ossl_cipher:
589                break
590        # Since the cipher was negotiated using the OpenSSL context,
591        # it must exist in the list of the OpenSSL supported ciphers.
592        assert cipher["name"] == ossl_cipher
593
594        cipher_id = cipher["id"] & 0xFFFF
595        try:
596            return CipherSuite(cipher_id)
597        except ValueError:
598            return cipher_id

Returns the CipherSuite entry for the cipher that has been negotiated on the connection.

If no connection has been negotiated, returns None. If the cipher negotiated is not defined in CipherSuite, returns the 16-bit integer representing that cipher directly.

def negotiated_protocol(self) -> tlslib.tlslib.NextProtocol | bytes | None:
600    def negotiated_protocol(self) -> NextProtocol | bytes | None:
601        """
602        Returns the protocol that was selected during the TLS handshake.
603
604        This selection may have been made using ALPN or some future
605        negotiation mechanism.
606
607        If the negotiated protocol is one of the protocols defined in the
608        ``NextProtocol`` enum, the value from that enum will be returned.
609        Otherwise, the raw bytestring of the negotiated protocol will be
610        returned.
611
612        If ``Context.set_inner_protocols()`` was not called, if the other
613        party does not support protocol negotiation, if this socket does
614        not support any of the peer's proposed protocols, or if the
615        handshake has not happened yet, ``None`` is returned.
616        """
617
618        proto = self._socket.selected_alpn_protocol()
619
620        # The standard library returns this as a str, we want bytes.
621        if proto is None:
622            return None
623
624        protoBytes = proto.encode("ascii")
625
626        try:
627            return NextProtocol(protoBytes)
628        except ValueError:
629            return protoBytes

Returns the protocol that was selected during the TLS handshake.

This selection may have been made using ALPN or some future negotiation mechanism.

If the negotiated protocol is one of the protocols defined in the NextProtocol enum, the value from that enum will be returned. Otherwise, the raw bytestring of the negotiated protocol will be returned.

If Context.set_inner_protocols() was not called, if the other party does not support protocol negotiation, if this socket does not support any of the peer's proposed protocols, or if the handshake has not happened yet, None is returned.

negotiated_tls_version: tlslib.tlslib.TLSVersion | None
631    @property
632    def negotiated_tls_version(self) -> TLSVersion | None:
633        """The version of TLS that has been negotiated on this connection."""
634
635        ossl_version = self._socket.version()
636        if ossl_version is None:
637            return None
638        else:
639            return TLSVersion(ossl_version)

The version of TLS that has been negotiated on this connection.

class OpenSSLTLSBuffer:
642class OpenSSLTLSBuffer:
643    """A TLSBuffer implementation based on OpenSSL"""
644
645    __slots__ = (
646        "_ciphertext_buffer",
647        "_in_bio",
648        "_object",
649        "_out_bio",
650        "_parent_context",
651        "_ssl_context",
652    )
653
654    _ciphertext_buffer: bytearray
655    _in_bio: ssl.MemoryBIO
656    _object: ssl.SSLObject
657    _out_bio: ssl.MemoryBIO
658    _parent_context: OpenSSLClientContext | OpenSSLServerContext
659    _ssl_context: _SSLContext
660
661    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
662        """OpenTLSBuffers should not be constructed by the user.
663        Instead, the ClientContext.create_buffer() and
664        ServerContext.create_buffer() use the _create() method."""
665        msg = (
666            f"{self.__class__.__name__} does not have a public constructor. "
667            "Instances are returned by ClientContext.create_buffer() \
668                or ServerContext.create_buffer()."
669        )
670        raise TypeError(
671            msg,
672        )
673
674    @classmethod
675    def _create(
676        cls,
677        server_hostname: str | None,
678        parent_context: OpenSSLClientContext | OpenSSLServerContext,
679        server_side: bool,
680        ssl_context: _SSLContext,
681    ) -> OpenSSLTLSBuffer:
682        self = cls.__new__(cls)
683        self._parent_context = parent_context
684        self._ssl_context = ssl_context
685
686        # We need this extra buffer to implement the peek/consume API, which
687        # the MemoryBIO object does not allow.
688        self._ciphertext_buffer = bytearray()
689
690        # Set up the SSLObject we're going to back this with.
691        self._in_bio = ssl.MemoryBIO()
692        self._out_bio = ssl.MemoryBIO()
693
694        if server_side is True:
695            with _error_converter():
696                self._object = ssl_context.wrap_bio(
697                    self._in_bio, self._out_bio, server_side=True, server_hostname=None
698                )
699        else:
700            with _error_converter():
701                self._object = ssl_context.wrap_bio(
702                    self._in_bio, self._out_bio, server_side=False, server_hostname=server_hostname
703                )
704
705        return self
706
707    def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int:
708        """
709        Read up to ``amt`` bytes of data from the input buffer and return
710        the result as a ``bytes`` instance. If an optional buffer is
711        provided, the result is written into the buffer and the number of
712        bytes is returned instead.
713
714        Once EOF is reached, all further calls to this method return the
715        empty byte string ``b''``.
716
717        May read "short": that is, fewer bytes may be returned than were
718        requested.
719
720        Raise ``WantReadError`` or ``WantWriteError`` if there is
721        insufficient data in either the input or output buffer and the
722        operation would have caused data to be written or read.
723
724        May raise ``RaggedEOF`` if the connection has been closed without a
725        graceful TLS shutdown. Whether this is an exception that should be
726        ignored or not is up to the specific application.
727
728        As at any time a re-negotiation is possible, a call to ``read()``
729        can also cause write operations.
730        """
731
732        with _error_converter():
733            try:
734                # MyPy insists that buffer must be a bytearray
735                return self._object.read(amt, buffer)  # type: ignore[arg-type]
736            except ssl.SSLZeroReturnError:
737                return b""
738
739    def write(self, buf: Buffer) -> int:
740        """
741        Write ``buf`` in encrypted form to the output buffer and return the
742        number of bytes written. The ``buf`` argument must be an object
743        supporting the buffer interface.
744
745        Raise ``WantReadError`` or ``WantWriteError`` if there is
746        insufficient data in either the input or output buffer and the
747        operation would have caused data to be written or read. In either
748        case, users should endeavour to resolve that situation and then
749        re-call this method. When re-calling this method users *should*
750        re-use the exact same ``buf`` object, as some TLS implementations
751        require that the exact same buffer be used.
752
753        This operation may write "short": that is, fewer bytes may be
754        written than were in the buffer.
755
756        As at any time a re-negotiation is possible, a call to ``write()``
757        can also cause read operations.
758        """
759
760        with _error_converter():
761            return self._object.write(buf)
762
763    # Get rid and do handshake ourselves?
764    def do_handshake(self) -> None:
765        """
766        Performs the TLS handshake. Also performs certificate validation
767        and hostname verification.
768        """
769
770        with _error_converter():
771            self._object.do_handshake()
772
773    def shutdown(self) -> None:
774        """
775        Performs a clean TLS shut down. This should generally be used
776        whenever possible to signal to the remote peer that the content is
777        finished.
778        """
779
780        with _error_converter():
781            self._object.unwrap()
782
783    def process_incoming(self, data_from_network: bytes) -> None:
784        """
785        Receives some TLS data from the network and stores it in an
786        internal buffer.
787
788        If the internal buffer is overfull, this method will raise
789        ``WantReadError`` and store no data. At this point, the user must
790        call ``read`` to remove some data from the internal buffer
791        before repeating this call.
792        """
793
794        with _error_converter():
795            written_len = self._in_bio.write(data_from_network)
796
797        assert written_len == len(data_from_network)
798
799    def incoming_bytes_buffered(self) -> int:
800        """
801        Returns how many bytes are in the incoming buffer waiting to be processed.
802        """
803
804        return self._in_bio.pending
805
806    def process_outgoing(self, amount_bytes_for_network: int) -> bytes:
807        """
808        Returns the next ``amt`` bytes of data that should be written to
809        the network from the outgoing data buffer, removing it from the
810        internal buffer.
811        """
812
813        return self._out_bio.read(amount_bytes_for_network)
814
815    def outgoing_bytes_buffered(self) -> int:
816        """
817        Returns how many bytes are in the outgoing buffer waiting to be sent.
818        """
819
820        return self._out_bio.pending
821
822    @property
823    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
824        """The ``Context`` object this socket is tied to."""
825
826        return self._parent_context
827
828    def cipher(self) -> CipherSuite | int | None:
829        """
830        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
831
832        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
833        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
834        """
835
836        ret = self._object.cipher()
837
838        if ret is None:
839            return None
840        else:
841            ossl_cipher, _, _ = ret
842
843        for cipher in self._ssl_context.get_ciphers():
844            if cipher["name"] == ossl_cipher:
845                break
846        # Since the cipher was negotiated using the OpenSSL context,
847        # it must exist in the list of the OpenSSL supported ciphers.
848        assert cipher["name"] == ossl_cipher
849
850        cipher_id = cipher["id"] & 0xFFFF
851        try:
852            return CipherSuite(cipher_id)
853        except ValueError:
854            return cipher_id
855
856    def negotiated_protocol(self) -> NextProtocol | bytes | None:
857        """
858        Returns the protocol that was selected during the TLS handshake.
859
860        This selection may have been made using ALPN or some future
861        negotiation mechanism.
862
863        If the negotiated protocol is one of the protocols defined in the
864        ``NextProtocol`` enum, the value from that enum will be returned.
865        Otherwise, the raw bytestring of the negotiated protocol will be
866        returned.
867
868        If ``Context.set_inner_protocols()`` was not called, if the other
869        party does not support protocol negotiation, if this socket does
870        not support any of the peer's proposed protocols, or if the
871        handshake has not happened yet, ``None`` is returned.
872        """
873
874        proto = self._object.selected_alpn_protocol()
875
876        # The standard library returns this as a str, we want bytes.
877        if proto is None:
878            return None
879
880        protoBytes = proto.encode("ascii")
881
882        try:
883            return NextProtocol(protoBytes)
884        except ValueError:
885            return protoBytes
886
887    @property
888    def negotiated_tls_version(self) -> TLSVersion | None:
889        """The version of TLS that has been negotiated on this connection."""
890
891        ossl_version = self._object.version()
892        if ossl_version is None:
893            return None
894        else:
895            return TLSVersion(ossl_version)
896
897    def getpeercert(self) -> bytes | None:
898        """
899        Return the raw DER bytes of the certificate provided by the peer
900        during the handshake, if applicable.
901        """
902        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
903        # Obtaining the certificate as a dict is very specific to the ssl module and may be
904        # difficult to implement for other implementation, so this is not supported
905        with _error_converter():
906            cert = self._object.getpeercert(True)
907
908        return cert

A TLSBuffer implementation based on OpenSSL

OpenSSLTLSBuffer(*args: tuple, **kwargs: tuple)
661    def __init__(self, *args: tuple, **kwargs: tuple) -> None:
662        """OpenTLSBuffers should not be constructed by the user.
663        Instead, the ClientContext.create_buffer() and
664        ServerContext.create_buffer() use the _create() method."""
665        msg = (
666            f"{self.__class__.__name__} does not have a public constructor. "
667            "Instances are returned by ClientContext.create_buffer() \
668                or ServerContext.create_buffer()."
669        )
670        raise TypeError(
671            msg,
672        )

OpenTLSBuffers should not be constructed by the user. Instead, the ClientContext.create_buffer() and ServerContext.create_buffer() use the _create() method.

def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int:
707    def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int:
708        """
709        Read up to ``amt`` bytes of data from the input buffer and return
710        the result as a ``bytes`` instance. If an optional buffer is
711        provided, the result is written into the buffer and the number of
712        bytes is returned instead.
713
714        Once EOF is reached, all further calls to this method return the
715        empty byte string ``b''``.
716
717        May read "short": that is, fewer bytes may be returned than were
718        requested.
719
720        Raise ``WantReadError`` or ``WantWriteError`` if there is
721        insufficient data in either the input or output buffer and the
722        operation would have caused data to be written or read.
723
724        May raise ``RaggedEOF`` if the connection has been closed without a
725        graceful TLS shutdown. Whether this is an exception that should be
726        ignored or not is up to the specific application.
727
728        As at any time a re-negotiation is possible, a call to ``read()``
729        can also cause write operations.
730        """
731
732        with _error_converter():
733            try:
734                # MyPy insists that buffer must be a bytearray
735                return self._object.read(amt, buffer)  # type: ignore[arg-type]
736            except ssl.SSLZeroReturnError:
737                return b""

Read up to amt bytes of data from the input buffer and return the result as a bytes instance. If an optional buffer is provided, the result is written into the buffer and the number of bytes is returned instead.

Once EOF is reached, all further calls to this method return the empty byte string b''.

May read "short": that is, fewer bytes may be returned than were requested.

Raise WantReadError or WantWriteError if there is insufficient data in either the input or output buffer and the operation would have caused data to be written or read.

May raise RaggedEOF if the connection has been closed without a graceful TLS shutdown. Whether this is an exception that should be ignored or not is up to the specific application.

As at any time a re-negotiation is possible, a call to read() can also cause write operations.

def write(self, buf: Buffer) -> int:
739    def write(self, buf: Buffer) -> int:
740        """
741        Write ``buf`` in encrypted form to the output buffer and return the
742        number of bytes written. The ``buf`` argument must be an object
743        supporting the buffer interface.
744
745        Raise ``WantReadError`` or ``WantWriteError`` if there is
746        insufficient data in either the input or output buffer and the
747        operation would have caused data to be written or read. In either
748        case, users should endeavour to resolve that situation and then
749        re-call this method. When re-calling this method users *should*
750        re-use the exact same ``buf`` object, as some TLS implementations
751        require that the exact same buffer be used.
752
753        This operation may write "short": that is, fewer bytes may be
754        written than were in the buffer.
755
756        As at any time a re-negotiation is possible, a call to ``write()``
757        can also cause read operations.
758        """
759
760        with _error_converter():
761            return self._object.write(buf)

Write buf in encrypted form to the output buffer and return the number of bytes written. The buf argument must be an object supporting the buffer interface.

Raise WantReadError or WantWriteError if there is insufficient data in either the input or output buffer and the operation would have caused data to be written or read. In either case, users should endeavour to resolve that situation and then re-call this method. When re-calling this method users should re-use the exact same buf object, as some TLS implementations require that the exact same buffer be used.

This operation may write "short": that is, fewer bytes may be written than were in the buffer.

As at any time a re-negotiation is possible, a call to write() can also cause read operations.

def do_handshake(self) -> None:
764    def do_handshake(self) -> None:
765        """
766        Performs the TLS handshake. Also performs certificate validation
767        and hostname verification.
768        """
769
770        with _error_converter():
771            self._object.do_handshake()

Performs the TLS handshake. Also performs certificate validation and hostname verification.

def shutdown(self) -> None:
773    def shutdown(self) -> None:
774        """
775        Performs a clean TLS shut down. This should generally be used
776        whenever possible to signal to the remote peer that the content is
777        finished.
778        """
779
780        with _error_converter():
781            self._object.unwrap()

Performs a clean TLS shut down. This should generally be used whenever possible to signal to the remote peer that the content is finished.

def process_incoming(self, data_from_network: bytes) -> None:
783    def process_incoming(self, data_from_network: bytes) -> None:
784        """
785        Receives some TLS data from the network and stores it in an
786        internal buffer.
787
788        If the internal buffer is overfull, this method will raise
789        ``WantReadError`` and store no data. At this point, the user must
790        call ``read`` to remove some data from the internal buffer
791        before repeating this call.
792        """
793
794        with _error_converter():
795            written_len = self._in_bio.write(data_from_network)
796
797        assert written_len == len(data_from_network)

Receives some TLS data from the network and stores it in an internal buffer.

If the internal buffer is overfull, this method will raise WantReadError and store no data. At this point, the user must call read to remove some data from the internal buffer before repeating this call.

def incoming_bytes_buffered(self) -> int:
799    def incoming_bytes_buffered(self) -> int:
800        """
801        Returns how many bytes are in the incoming buffer waiting to be processed.
802        """
803
804        return self._in_bio.pending

Returns how many bytes are in the incoming buffer waiting to be processed.

def process_outgoing(self, amount_bytes_for_network: int) -> bytes:
806    def process_outgoing(self, amount_bytes_for_network: int) -> bytes:
807        """
808        Returns the next ``amt`` bytes of data that should be written to
809        the network from the outgoing data buffer, removing it from the
810        internal buffer.
811        """
812
813        return self._out_bio.read(amount_bytes_for_network)

Returns the next amt bytes of data that should be written to the network from the outgoing data buffer, removing it from the internal buffer.

def outgoing_bytes_buffered(self) -> int:
815    def outgoing_bytes_buffered(self) -> int:
816        """
817        Returns how many bytes are in the outgoing buffer waiting to be sent.
818        """
819
820        return self._out_bio.pending

Returns how many bytes are in the outgoing buffer waiting to be sent.

822    @property
823    def context(self) -> OpenSSLClientContext | OpenSSLServerContext:
824        """The ``Context`` object this socket is tied to."""
825
826        return self._parent_context

The Context object this socket is tied to.

def cipher(self) -> tlslib.tlslib.CipherSuite | int | None:
828    def cipher(self) -> CipherSuite | int | None:
829        """
830        Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
831
832        If no connection has been negotiated, returns ``None``. If the cipher negotiated is not
833        defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
834        """
835
836        ret = self._object.cipher()
837
838        if ret is None:
839            return None
840        else:
841            ossl_cipher, _, _ = ret
842
843        for cipher in self._ssl_context.get_ciphers():
844            if cipher["name"] == ossl_cipher:
845                break
846        # Since the cipher was negotiated using the OpenSSL context,
847        # it must exist in the list of the OpenSSL supported ciphers.
848        assert cipher["name"] == ossl_cipher
849
850        cipher_id = cipher["id"] & 0xFFFF
851        try:
852            return CipherSuite(cipher_id)
853        except ValueError:
854            return cipher_id

Returns the CipherSuite entry for the cipher that has been negotiated on the connection.

If no connection has been negotiated, returns None. If the cipher negotiated is not defined in CipherSuite, returns the 16-bit integer representing that cipher directly.

def negotiated_protocol(self) -> tlslib.tlslib.NextProtocol | bytes | None:
856    def negotiated_protocol(self) -> NextProtocol | bytes | None:
857        """
858        Returns the protocol that was selected during the TLS handshake.
859
860        This selection may have been made using ALPN or some future
861        negotiation mechanism.
862
863        If the negotiated protocol is one of the protocols defined in the
864        ``NextProtocol`` enum, the value from that enum will be returned.
865        Otherwise, the raw bytestring of the negotiated protocol will be
866        returned.
867
868        If ``Context.set_inner_protocols()`` was not called, if the other
869        party does not support protocol negotiation, if this socket does
870        not support any of the peer's proposed protocols, or if the
871        handshake has not happened yet, ``None`` is returned.
872        """
873
874        proto = self._object.selected_alpn_protocol()
875
876        # The standard library returns this as a str, we want bytes.
877        if proto is None:
878            return None
879
880        protoBytes = proto.encode("ascii")
881
882        try:
883            return NextProtocol(protoBytes)
884        except ValueError:
885            return protoBytes

Returns the protocol that was selected during the TLS handshake.

This selection may have been made using ALPN or some future negotiation mechanism.

If the negotiated protocol is one of the protocols defined in the NextProtocol enum, the value from that enum will be returned. Otherwise, the raw bytestring of the negotiated protocol will be returned.

If Context.set_inner_protocols() was not called, if the other party does not support protocol negotiation, if this socket does not support any of the peer's proposed protocols, or if the handshake has not happened yet, None is returned.

negotiated_tls_version: tlslib.tlslib.TLSVersion | None
887    @property
888    def negotiated_tls_version(self) -> TLSVersion | None:
889        """The version of TLS that has been negotiated on this connection."""
890
891        ossl_version = self._object.version()
892        if ossl_version is None:
893            return None
894        else:
895            return TLSVersion(ossl_version)

The version of TLS that has been negotiated on this connection.

def getpeercert(self) -> bytes | None:
897    def getpeercert(self) -> bytes | None:
898        """
899        Return the raw DER bytes of the certificate provided by the peer
900        during the handshake, if applicable.
901        """
902        # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format
903        # Obtaining the certificate as a dict is very specific to the ssl module and may be
904        # difficult to implement for other implementation, so this is not supported
905        with _error_converter():
906            cert = self._object.getpeercert(True)
907
908        return cert

Return the raw DER bytes of the certificate provided by the peer during the handshake, if applicable.

class OpenSSLClientContext:
911class OpenSSLClientContext:
912    """This class controls and creates a socket that is wrapped using the
913    standard library bindings to OpenSSL to perform TLS connections on the
914    client side of a network connection.
915    """
916
917    def __init__(self, configuration: TLSClientConfiguration) -> None:
918        """Create a new context object from a given TLS configuration."""
919
920        self._configuration = configuration
921
922    @property
923    def configuration(self) -> TLSClientConfiguration:
924        """Returns the TLS configuration that was used to create the context."""
925
926        return self._configuration
927
928    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
929        """Create a socket-like object that can be used to do TLS."""
930        ossl_context = _init_context_client(self._configuration)
931
932        return OpenSSLTLSSocket._create(
933            parent_context=self,
934            server_side=False,
935            ssl_context=ossl_context,
936            address=address,
937        )
938
939    def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer:
940        """Creates a TLSBuffer that acts as an in-memory channel,
941        and contains information about the TLS exchange
942        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
943
944        ossl_context = _init_context_client(self._configuration)
945
946        return OpenSSLTLSBuffer._create(
947            server_hostname=server_hostname,
948            parent_context=self,
949            server_side=False,
950            ssl_context=ossl_context,
951        )

This class controls and creates a socket that is wrapped using the standard library bindings to OpenSSL to perform TLS connections on the client side of a network connection.

OpenSSLClientContext(configuration: tlslib.tlslib.TLSClientConfiguration)
917    def __init__(self, configuration: TLSClientConfiguration) -> None:
918        """Create a new context object from a given TLS configuration."""
919
920        self._configuration = configuration

Create a new context object from a given TLS configuration.

configuration: tlslib.tlslib.TLSClientConfiguration
922    @property
923    def configuration(self) -> TLSClientConfiguration:
924        """Returns the TLS configuration that was used to create the context."""
925
926        return self._configuration

Returns the TLS configuration that was used to create the context.

def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
928    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
929        """Create a socket-like object that can be used to do TLS."""
930        ossl_context = _init_context_client(self._configuration)
931
932        return OpenSSLTLSSocket._create(
933            parent_context=self,
934            server_side=False,
935            ssl_context=ossl_context,
936            address=address,
937        )

Create a socket-like object that can be used to do TLS.

def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer:
939    def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer:
940        """Creates a TLSBuffer that acts as an in-memory channel,
941        and contains information about the TLS exchange
942        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
943
944        ossl_context = _init_context_client(self._configuration)
945
946        return OpenSSLTLSBuffer._create(
947            server_hostname=server_hostname,
948            parent_context=self,
949            server_side=False,
950            ssl_context=ossl_context,
951        )

Creates a TLSBuffer that acts as an in-memory channel, and contains information about the TLS exchange (cipher, negotiated_protocol, negotiated_tls_version, etc.).

class OpenSSLServerContext:
954class OpenSSLServerContext:
955    """This class controls and creates and creates a socket that is wrapped using the
956    standard library bindings to OpenSSL to perform TLS connections on the
957    server side of a network connection.
958    """
959
960    def __init__(self, configuration: TLSServerConfiguration) -> None:
961        """Create a new context object from a given TLS configuration."""
962
963        self._configuration = configuration
964
965    @property
966    def configuration(self) -> TLSServerConfiguration:
967        """Returns the TLS configuration that was used to create the context."""
968
969        return self._configuration
970
971    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
972        """Create a socket-like object that can be used to do TLS."""
973        ossl_context = _init_context_server(self._configuration)
974
975        return OpenSSLTLSSocket._create(
976            parent_context=self,
977            server_side=True,
978            ssl_context=ossl_context,
979            address=address,
980        )
981
982    def create_buffer(self) -> OpenSSLTLSBuffer:
983        """Creates a TLSBuffer that acts as an in-memory channel,
984        and contains information about the TLS exchange
985        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
986
987        ossl_context = _init_context_server(self._configuration)
988
989        return OpenSSLTLSBuffer._create(
990            server_hostname=None,
991            parent_context=self,
992            server_side=True,
993            ssl_context=ossl_context,
994        )

This class controls and creates and creates a socket that is wrapped using the standard library bindings to OpenSSL to perform TLS connections on the server side of a network connection.

OpenSSLServerContext(configuration: tlslib.tlslib.TLSServerConfiguration)
960    def __init__(self, configuration: TLSServerConfiguration) -> None:
961        """Create a new context object from a given TLS configuration."""
962
963        self._configuration = configuration

Create a new context object from a given TLS configuration.

configuration: tlslib.tlslib.TLSServerConfiguration
965    @property
966    def configuration(self) -> TLSServerConfiguration:
967        """Returns the TLS configuration that was used to create the context."""
968
969        return self._configuration

Returns the TLS configuration that was used to create the context.

def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
971    def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket:
972        """Create a socket-like object that can be used to do TLS."""
973        ossl_context = _init_context_server(self._configuration)
974
975        return OpenSSLTLSSocket._create(
976            parent_context=self,
977            server_side=True,
978            ssl_context=ossl_context,
979            address=address,
980        )

Create a socket-like object that can be used to do TLS.

def create_buffer(self) -> OpenSSLTLSBuffer:
982    def create_buffer(self) -> OpenSSLTLSBuffer:
983        """Creates a TLSBuffer that acts as an in-memory channel,
984        and contains information about the TLS exchange
985        (cipher, negotiated_protocol, negotiated_tls_version, etc.)."""
986
987        ossl_context = _init_context_server(self._configuration)
988
989        return OpenSSLTLSBuffer._create(
990            server_hostname=None,
991            parent_context=self,
992            server_side=True,
993            ssl_context=ossl_context,
994        )

Creates a TLSBuffer that acts as an in-memory channel, and contains information about the TLS exchange (cipher, negotiated_protocol, negotiated_tls_version, etc.).

def validate_config( tls_config: tlslib.tlslib.TLSClientConfiguration | tlslib.tlslib.TLSServerConfiguration) -> None:
1024def validate_config(tls_config: TLSClientConfiguration | TLSServerConfiguration) -> None:
1025    """Validates whether the OpenSSL TLS implementation supports this TLS configuration."""
1026    _check_trust_store(tls_config.trust_store)
1027
1028    if isinstance(tls_config, TLSClientConfiguration):
1029        sign_chain = tls_config.certificate_chain
1030        if sign_chain is not None:
1031            _check_sign_chain(sign_chain)
1032
1033    else:
1034        assert isinstance(tls_config, TLSServerConfiguration)
1035        cert_chain = tls_config.certificate_chain
1036        if cert_chain is not None:
1037            for sign_chain in cert_chain:
1038                _check_sign_chain(sign_chain)

Validates whether the OpenSSL TLS implementation supports this TLS configuration.

STDLIB_IMPLEMENTATION = <tlslib.tlslib.TLSImplementation object>