tlslib.stdlib
Shims the standard library OpenSSL module into the amended PEP 543 API.
1"""Shims the standard library OpenSSL module into the amended PEP 543 API.""" 2 3from __future__ import annotations 4 5import os 6import socket 7import ssl 8import tempfile 9import typing 10import weakref 11from collections.abc import Buffer, Sequence 12from contextlib import contextmanager 13from pathlib import Path 14 15import truststore 16 17from .tlslib import ( 18 DEFAULT_CIPHER_LIST, 19 Certificate, 20 CipherSuite, 21 ConfigurationError, 22 NextProtocol, 23 PrivateKey, 24 RaggedEOF, 25 SigningChain, 26 TLSClientConfiguration, 27 TLSError, 28 TLSImplementation, 29 TLSServerConfiguration, 30 TLSVersion, 31 TrustStore, 32 WantReadError, 33 WantWriteError, 34) 35 36_SSLContext = ssl.SSLContext | truststore.SSLContext 37 38_TLSMinVersionOpts = { 39 TLSVersion.MINIMUM_SUPPORTED: ssl.TLSVersion.MINIMUM_SUPPORTED, 40 TLSVersion.TLSv1_2: ssl.TLSVersion.TLSv1_2, 41 TLSVersion.TLSv1_3: ssl.TLSVersion.TLSv1_3, 42} 43 44_TLSMaxVersionOpts = { 45 TLSVersion.TLSv1_2: ssl.TLSVersion.TLSv1_2, 46 TLSVersion.TLSv1_3: ssl.TLSVersion.TLSv1_3, 47 TLSVersion.MAXIMUM_SUPPORTED: ssl.TLSVersion.MAXIMUM_SUPPORTED, 48} 49 50# We need to populate a dictionary of ciphers that OpenSSL supports, in the 51# form of {16-bit number: OpenSSL suite name}. 52ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 53ctx.set_ciphers("ALL:COMPLEMENTOFALL") 54_cipher_map = {c["id"] & 0xFFFF: c["name"] for c in ctx.get_ciphers()} 55del ctx 56 57 58@contextmanager 59def _error_converter( 60 ignore_filter: tuple[type[Exception]] | tuple[()] = (), 61) -> typing.Generator[None, None, None]: 62 """ 63 Catches errors from the ssl module and wraps them up in TLSError 64 exceptions. Ignores certain kinds of exceptions as requested. 65 """ 66 try: 67 yield 68 except ignore_filter: 69 pass 70 except ssl.SSLWantReadError: 71 raise WantReadError("Must read data") from None 72 except ssl.SSLWantWriteError: 73 raise WantWriteError("Must write data") from None 74 except ssl.SSLEOFError: 75 raise RaggedEOF("Ragged EOF") from None 76 except ssl.SSLError as e: 77 raise TLSError(e) from None 78 79 80def _remove_path(ts_cert_priv: TrustStore | Certificate | PrivateKey) -> None: 81 ts_cert_priv._path = None 82 83 84def _is_system_trust_store(trust_store: TrustStore | None) -> bool: 85 return trust_store is None or ( 86 trust_store._path is None and trust_store._buffer is None and trust_store._id is None 87 ) 88 89 90def _get_path_from_trust_store( 91 context: _SSLContext, trust_store: TrustStore | None 92) -> os.PathLike | None: 93 assert trust_store is not None 94 if trust_store._path is not None: 95 return trust_store._path 96 elif trust_store._buffer is not None: 97 tmp_path = tempfile.NamedTemporaryFile(mode="w+b", delete=False, delete_on_close=False) 98 tmp_path.write(trust_store._buffer) 99 tmp_path.close() 100 # Store this path to prevent creation of multiple files for each trust store 101 trust_store._path = Path(tmp_path.name) 102 weakref.finalize(context, os.remove, tmp_path.name) 103 # Remove the path in case the trust store outlives the context 104 weakref.finalize(context, _remove_path, trust_store) 105 return trust_store._path 106 elif trust_store._id is not None: 107 raise ConfigurationError("This TLS implementation does not support id-based trust stores.") 108 else: 109 return None 110 111 112def _create_client_context_with_trust_store(trust_store: TrustStore | None) -> _SSLContext: 113 some_context: _SSLContext 114 115 if _is_system_trust_store(trust_store): 116 some_context = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 117 else: 118 some_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 119 trust_store_path = _get_path_from_trust_store(some_context, trust_store) 120 some_context.load_verify_locations(trust_store_path) 121 122 # TLS Compression is a security risk and is removed in TLS v1.3 123 some_context.options |= ssl.OP_NO_COMPRESSION 124 125 some_context.verify_flags = ( 126 ssl.VerifyFlags.VERIFY_X509_STRICT | ssl.VerifyFlags.VERIFY_X509_PARTIAL_CHAIN 127 ) 128 129 return some_context 130 131 132def _create_server_context_with_trust_store( 133 trust_store: TrustStore | None, 134) -> ssl.SSLContext: 135 some_context: ssl.SSLContext 136 137 # truststore does not support server side 138 some_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 139 140 if trust_store is not None: 141 some_context.verify_mode = ssl.CERT_REQUIRED 142 trust_store_path = _get_path_from_trust_store(some_context, trust_store) 143 144 if trust_store_path is not None: 145 some_context.load_verify_locations(trust_store_path) 146 else: 147 some_context.load_default_certs(ssl.Purpose.CLIENT_AUTH) 148 149 # TLS Compression is a security risk and is removed in TLS v1.3 150 some_context.options |= ssl.OP_NO_COMPRESSION 151 152 return some_context 153 154 155def _sni_callback_builder( 156 _name_to_chain_map: weakref.WeakValueDictionary[str, SigningChain], 157 original_config: TLSServerConfiguration, 158) -> typing.Callable[[ssl.SSLSocket, str, ssl.SSLContext], ssl.AlertDescription | None]: 159 def pep543_callback( 160 ssl_socket: ssl.SSLSocket, 161 server_name: str, 162 stdlib_context: ssl.SSLContext, 163 ) -> ssl.AlertDescription | None: 164 try: 165 sign_chain = _name_to_chain_map[server_name] 166 except KeyError: 167 return ssl.ALERT_DESCRIPTION_INTERNAL_ERROR 168 169 new_config: TLSServerConfiguration = TLSServerConfiguration( 170 certificate_chain=(sign_chain,), 171 ciphers=original_config.ciphers, 172 inner_protocols=original_config.inner_protocols, 173 lowest_supported_version=original_config.lowest_supported_version, 174 highest_supported_version=original_config.highest_supported_version, 175 trust_store=original_config.trust_store, 176 ) 177 ssl_socket.context = _init_context_server(new_config) 178 179 # Returning None, perversely, is how one signals success from this 180 # function. Will wonders never cease? 181 return None 182 183 return pep543_callback 184 185 186def _configure_server_context_for_certs( 187 context: ssl.SSLContext, 188 cert_chain: Sequence[SigningChain] | None = None, 189 sni_config: TLSServerConfiguration | None = None, 190) -> ssl.SSLContext: 191 if cert_chain is not None: 192 if len(cert_chain) == 1: 193 # Only one SigningChain, no need to configure SNI 194 return _configure_context_for_single_signing_chain(context, cert_chain[0]) 195 196 elif len(cert_chain) > 1: 197 # We have multiple SigningChains, need to configure SNI 198 assert sni_config is not None 199 return _configure_context_for_sni(context, cert_chain, sni_config) 200 201 return context 202 203 204def _get_path_from_cert_or_priv( 205 context: _SSLContext, cert_or_priv: Certificate | PrivateKey 206) -> os.PathLike: 207 if cert_or_priv._path is not None: 208 return cert_or_priv._path 209 elif cert_or_priv._buffer is not None: 210 tmp_path = tempfile.NamedTemporaryFile(mode="w+b", delete=False, delete_on_close=False) 211 tmp_path.write(cert_or_priv._buffer) 212 tmp_path.close() 213 weakref.finalize(context, os.remove, tmp_path.name) 214 # Store the path for future usage, preventing creation of multiple files 215 cert_or_priv._path = Path(tmp_path.name) 216 # Remove the path in case the cert or priv outlives the context 217 weakref.finalize(context, _remove_path, cert_or_priv) 218 return cert_or_priv._path 219 elif cert_or_priv._id is not None: 220 raise ConfigurationError( 221 "This TLS implementation does not support id-based certificates \ 222 or private keys." 223 ) 224 else: 225 raise ConfigurationError("Certificate or PrivateKey cannot be empty.") 226 227 228def _get_bytes_from_cert(cert: Certificate) -> bytes: 229 if cert._buffer is not None: 230 return cert._buffer 231 elif cert._path is not None: 232 # Do not save cert in memory 233 return Path(cert._path).read_bytes() 234 elif cert._id is not None: 235 raise ConfigurationError("This TLS implementation does not support id-based certificates.") 236 else: 237 raise ConfigurationError("Certificate cannot be empty.") 238 239 240def _configure_context_for_single_signing_chain( 241 context: _SSLContext, 242 cert_chain: SigningChain | None = None, 243) -> _SSLContext: 244 """Given a PEP 543 cert chain, configure the SSLContext to send that cert 245 chain in the handshake. 246 247 Returns the context. 248 """ 249 250 if cert_chain is not None: 251 cert = cert_chain.leaf[0] 252 253 if len(cert_chain.chain) == 0: 254 cert_path = _get_path_from_cert_or_priv(context, cert) 255 256 else: 257 with tempfile.NamedTemporaryFile(mode="wb", delete=False) as io: 258 # Write first cert 259 io.write(_get_bytes_from_cert(cert)) 260 261 for cert in cert_chain.chain: 262 io.write(b"\n") 263 io.write(_get_bytes_from_cert(cert)) 264 265 weakref.finalize(context, os.remove, io.name) 266 cert_path = Path(io.name) 267 268 key_path = None 269 if cert_chain.leaf[1] is not None: 270 privkey = cert_chain.leaf[1] 271 272 key_path = _get_path_from_cert_or_priv(context, privkey) 273 274 assert cert_path is not None 275 with _error_converter(): 276 context.load_cert_chain(cert_path, key_path, None) 277 278 return context 279 280 281def _configure_context_for_sni( 282 context: ssl.SSLContext, 283 cert_chain: Sequence[SigningChain], 284 sni_config: TLSServerConfiguration, 285) -> ssl.SSLContext: 286 # This is a mapping of concrete server names to the corresponding SigningChain 287 _name_to_chain_map: weakref.WeakValueDictionary[str, SigningChain] = ( 288 weakref.WeakValueDictionary() 289 ) 290 291 for sign_chain in cert_chain: 292 # Parse leaf certificates to find server names 293 cert = sign_chain.leaf[0] 294 cert_path = _get_path_from_cert_or_priv(context, cert) 295 dec_cert = ssl._ssl._test_decode_cert(cert_path) # type: ignore[attr-defined] 296 297 try: 298 alt_names = dec_cert["subjectAltName"] 299 except KeyError: 300 continue 301 302 server_name = None 303 for name in alt_names: 304 assert len(name) == 2 305 if name[0] == "DNS": 306 server_name = name[1] 307 break 308 309 if server_name is not None: 310 _name_to_chain_map[server_name] = sign_chain 311 312 context.sni_callback = _sni_callback_builder(_name_to_chain_map, sni_config) # type: ignore[assignment] 313 314 return context 315 316 317def _configure_context_for_ciphers( 318 context: _SSLContext, ciphers: Sequence[CipherSuite | int] | None = None 319) -> _SSLContext: 320 """Given a PEP 543 cipher suite list, configure the SSLContext to use those 321 cipher suites. 322 323 Returns the context. 324 """ 325 if ciphers is None: 326 # OpenSSL does not necessarily have system recommended settings 327 # The default cipher list is used here instead 328 ciphers = DEFAULT_CIPHER_LIST 329 330 ossl_names = [_cipher_map[cipher] for cipher in ciphers if cipher in _cipher_map] 331 if not ossl_names: 332 msg = "None of the provided ciphers are supported by the OpenSSL TLS implementation!" 333 raise TLSError(msg) 334 with _error_converter(): 335 context.set_ciphers(":".join(ossl_names)) 336 return context 337 338 339def _configure_context_for_negotiation( 340 context: _SSLContext, 341 inner_protocols: Sequence[NextProtocol | bytes] | None = None, 342) -> _SSLContext: 343 """Given a PEP 543 list of protocols to negotiate, configures the SSLContext 344 to negotiate those protocols. 345 """ 346 if inner_protocols: 347 protocols = [] 348 for np in inner_protocols: 349 proto_string = np if isinstance(np, bytes) else np.value 350 # The protocol string needs to be of type str for the standard 351 # library. 352 protocols.append(proto_string.decode("ascii")) 353 354 context.set_alpn_protocols(protocols) 355 356 return context 357 358 359def _init_context_common( 360 some_context: _SSLContext, 361 config: TLSClientConfiguration | TLSServerConfiguration, 362) -> _SSLContext: 363 some_context = _configure_context_for_ciphers( 364 some_context, 365 config.ciphers, 366 ) 367 some_context = _configure_context_for_negotiation( 368 some_context, 369 config.inner_protocols, 370 ) 371 372 # In lieu of system recommended settings, we default to TLS v1.3 373 lowest_supported_version = config.lowest_supported_version 374 if lowest_supported_version is None: 375 lowest_supported_version = TLSVersion.TLSv1_3 376 377 highest_supported_version = config.highest_supported_version 378 if highest_supported_version is None: 379 highest_supported_version = TLSVersion.MAXIMUM_SUPPORTED 380 381 try: 382 some_context.minimum_version = _TLSMinVersionOpts[lowest_supported_version] 383 some_context.maximum_version = _TLSMaxVersionOpts[highest_supported_version] 384 except KeyError: 385 raise TLSError("Bad maximum/minimum options") 386 387 return some_context 388 389 390def _init_context_client(config: TLSClientConfiguration) -> _SSLContext: 391 """Initialize an SSL context object with a given client configuration.""" 392 some_context = _create_client_context_with_trust_store(config.trust_store) 393 394 some_context = _configure_context_for_single_signing_chain( 395 some_context, config.certificate_chain 396 ) 397 398 return _init_context_common(some_context, config) 399 400 401def _init_context_server(config: TLSServerConfiguration) -> _SSLContext: 402 """Initialize an SSL context object with a given server configuration.""" 403 some_context = _create_server_context_with_trust_store(config.trust_store) 404 405 some_context = _configure_server_context_for_certs( 406 some_context, config.certificate_chain, config 407 ) 408 409 return _init_context_common(some_context, config) 410 411 412class OpenSSLTLSSocket: 413 """A TLSSocket implementation based on OpenSSL.""" 414 415 __slots__ = ( 416 "_parent_context", 417 "_socket", 418 "_ssl_context", 419 ) 420 421 _parent_context: OpenSSLClientContext | OpenSSLServerContext 422 _socket: ssl.SSLSocket 423 _ssl_context: _SSLContext 424 425 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 426 """OpenTLSSockets should not be constructed by the user. 427 Instead, the ClientContext.connect() and 428 ServerContext.connect() use the _create() method.""" 429 msg = ( 430 f"{self.__class__.__name__} does not have a public constructor. " 431 "Instances are returned by ClientContext.connect() or ServerContext.connect()." 432 ) 433 raise TypeError( 434 msg, 435 ) 436 437 @classmethod 438 def _create( 439 cls, 440 address: tuple[str | None, int], 441 parent_context: OpenSSLClientContext | OpenSSLServerContext, 442 server_side: bool, 443 ssl_context: _SSLContext, 444 ) -> OpenSSLTLSSocket: 445 self = cls.__new__(cls) 446 self._parent_context = parent_context 447 self._ssl_context = ssl_context 448 449 if server_side is True: 450 sock = socket.create_server(address) 451 with _error_converter(): 452 self._socket = ssl_context.wrap_socket( 453 sock, server_side=server_side, server_hostname=None 454 ) 455 else: 456 hostname, _ = address 457 sock = socket.create_connection(address) 458 with _error_converter(): 459 self._socket = ssl_context.wrap_socket( 460 sock, server_side=server_side, server_hostname=hostname 461 ) 462 463 self._socket.setblocking(False) 464 465 return self 466 467 def recv(self, bufsize: int) -> bytes: 468 """Receive data from the socket. The return value is a bytes object 469 representing the data received. Should not work before the handshake 470 is completed.""" 471 with _error_converter(): 472 try: 473 return self._socket.recv(bufsize) 474 except ssl.SSLZeroReturnError: 475 return b"" 476 477 def send(self, bytes: bytes) -> int: 478 """Send data to the socket. The socket must be connected to a remote socket.""" 479 with _error_converter(): 480 return self._socket.send(bytes) 481 482 def close(self, force: bool = False) -> None: 483 """Unwraps the TLS connection, shuts down both halves of the connection and 484 mark the socket closed. If force is True, will only shutdown own half and 485 not wait for the other side. If force is False, this will raise WantReadError 486 until the other side sends a close_notify alert.""" 487 488 try: 489 with _error_converter(): 490 sock = self._socket.unwrap() 491 except (ValueError, BrokenPipeError, OSError): 492 # If these exceptions are raised, we close the socket without re-trying to unwrap it. 493 # - ValueError: The socket was actually not wrapped 494 # - BrokenPipeError: There is some issue with the socket 495 # - OSError: The other side already shut down 496 sock = self._socket 497 except WantReadError: 498 if force: 499 sock = self._socket 500 else: 501 raise 502 503 # NOTE: OSError indicates that the other side has already hung up. 504 with _error_converter(ignore_filter=(OSError,)): 505 sock.shutdown(socket.SHUT_RDWR) 506 return sock.close() 507 508 def listen(self, backlog: int) -> None: 509 """Enable a server to accept connections. If backlog is specified, it 510 specifies the number of unaccepted connections that the system will allow 511 before refusing new connections.""" 512 with _error_converter(): 513 return self._socket.listen(backlog) 514 515 def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]: 516 """Accept a connection. The socket must be bound to an address and listening 517 for connections. The return value is a pair (conn, address) where conn is a 518 new TLSSocket object usable to send and receive data on the connection, and 519 address is the address bound to the socket on the other end of the connection.""" 520 521 with _error_converter(): 522 (sock, address) = self._socket.accept() 523 tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket) 524 tls_socket._parent_context = self._parent_context 525 tls_socket._ssl_context = self._ssl_context 526 tls_socket._socket = sock 527 with _error_converter(): 528 tls_socket._socket.setblocking(False) 529 return (tls_socket, address) 530 531 def getsockname(self) -> socket._RetAddress: 532 """Return the local address to which the socket is connected.""" 533 with _error_converter(): 534 return self._socket.getsockname() 535 536 def getpeercert(self) -> bytes | None: 537 """ 538 Return the raw DER bytes of the certificate provided by the peer 539 during the handshake, if applicable. 540 """ 541 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 542 # Obtaining the certificate as a dict is very specific to the ssl module and may be 543 # difficult to implement for other TLS implementations, so this is not supported 544 545 with _error_converter(): 546 cert = self._socket.getpeercert(True) 547 548 return cert 549 550 def getpeername(self) -> socket._RetAddress: 551 """Return the remote address to which the socket is connected.""" 552 553 with _error_converter(): 554 return self._socket.getpeername() 555 556 def fileno(self) -> int: 557 """Return the socket's file descriptor (a small integer), or -1 on failure.""" 558 559 with _error_converter(): 560 return self._socket.fileno() 561 562 @property 563 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 564 """The ``Context`` object this socket is tied to.""" 565 566 return self._parent_context 567 568 def cipher(self) -> CipherSuite | int | None: 569 """ 570 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 571 572 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 573 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 574 """ 575 576 # This is the OpenSSL cipher name. We want the ID, which we can get by 577 # looking for this entry in the context's list of supported ciphers. 578 ret = self._socket.cipher() 579 580 if ret is None: 581 return None 582 else: 583 ossl_cipher, _, _ = ret 584 585 for cipher in self._ssl_context.get_ciphers(): 586 if cipher["name"] == ossl_cipher: 587 break 588 # Since the cipher was negotiated using the OpenSSL context, 589 # it must exist in the list of the OpenSSL supported ciphers. 590 assert cipher["name"] == ossl_cipher 591 592 cipher_id = cipher["id"] & 0xFFFF 593 try: 594 return CipherSuite(cipher_id) 595 except ValueError: 596 return cipher_id 597 598 def negotiated_protocol(self) -> NextProtocol | bytes | None: 599 """ 600 Returns the protocol that was selected during the TLS handshake. 601 602 This selection may have been made using ALPN or some future 603 negotiation mechanism. 604 605 If the negotiated protocol is one of the protocols defined in the 606 ``NextProtocol`` enum, the value from that enum will be returned. 607 Otherwise, the raw bytestring of the negotiated protocol will be 608 returned. 609 610 If ``Context.set_inner_protocols()`` was not called, if the other 611 party does not support protocol negotiation, if this socket does 612 not support any of the peer's proposed protocols, or if the 613 handshake has not happened yet, ``None`` is returned. 614 """ 615 616 proto = self._socket.selected_alpn_protocol() 617 618 # The standard library returns this as a str, we want bytes. 619 if proto is None: 620 return None 621 622 protoBytes = proto.encode("ascii") 623 624 try: 625 return NextProtocol(protoBytes) 626 except ValueError: 627 return protoBytes 628 629 @property 630 def negotiated_tls_version(self) -> TLSVersion | None: 631 """The version of TLS that has been negotiated on this connection.""" 632 633 ossl_version = self._socket.version() 634 if ossl_version is None: 635 return None 636 else: 637 return TLSVersion(ossl_version) 638 639 640class OpenSSLTLSBuffer: 641 """A TLSBuffer implementation based on OpenSSL""" 642 643 __slots__ = ( 644 "_ciphertext_buffer", 645 "_in_bio", 646 "_object", 647 "_out_bio", 648 "_parent_context", 649 "_ssl_context", 650 ) 651 652 _ciphertext_buffer: bytearray 653 _in_bio: ssl.MemoryBIO 654 _object: ssl.SSLObject 655 _out_bio: ssl.MemoryBIO 656 _parent_context: OpenSSLClientContext | OpenSSLServerContext 657 _ssl_context: _SSLContext 658 659 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 660 """OpenTLSBuffers should not be constructed by the user. 661 Instead, the ClientContext.create_buffer() and 662 ServerContext.create_buffer() use the _create() method.""" 663 msg = ( 664 f"{self.__class__.__name__} does not have a public constructor. " 665 "Instances are returned by ClientContext.create_buffer() \ 666 or ServerContext.create_buffer()." 667 ) 668 raise TypeError( 669 msg, 670 ) 671 672 @classmethod 673 def _create( 674 cls, 675 server_hostname: str | None, 676 parent_context: OpenSSLClientContext | OpenSSLServerContext, 677 server_side: bool, 678 ssl_context: _SSLContext, 679 ) -> OpenSSLTLSBuffer: 680 self = cls.__new__(cls) 681 self._parent_context = parent_context 682 self._ssl_context = ssl_context 683 684 # We need this extra buffer to implement the peek/consume API, which 685 # the MemoryBIO object does not allow. 686 self._ciphertext_buffer = bytearray() 687 688 # Set up the SSLObject we're going to back this with. 689 self._in_bio = ssl.MemoryBIO() 690 self._out_bio = ssl.MemoryBIO() 691 692 if server_side is True: 693 with _error_converter(): 694 self._object = ssl_context.wrap_bio( 695 self._in_bio, self._out_bio, server_side=True, server_hostname=None 696 ) 697 else: 698 with _error_converter(): 699 self._object = ssl_context.wrap_bio( 700 self._in_bio, self._out_bio, server_side=False, server_hostname=server_hostname 701 ) 702 703 return self 704 705 def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int: 706 """ 707 Read up to ``amt`` bytes of data from the input buffer and return 708 the result as a ``bytes`` instance. If an optional buffer is 709 provided, the result is written into the buffer and the number of 710 bytes is returned instead. 711 712 Once EOF is reached, all further calls to this method return the 713 empty byte string ``b''``. 714 715 May read "short": that is, fewer bytes may be returned than were 716 requested. 717 718 Raise ``WantReadError`` or ``WantWriteError`` if there is 719 insufficient data in either the input or output buffer and the 720 operation would have caused data to be written or read. 721 722 May raise ``RaggedEOF`` if the connection has been closed without a 723 graceful TLS shutdown. Whether this is an exception that should be 724 ignored or not is up to the specific application. 725 726 As at any time a re-negotiation is possible, a call to ``read()`` 727 can also cause write operations. 728 """ 729 730 with _error_converter(): 731 try: 732 # MyPy insists that buffer must be a bytearray 733 return self._object.read(amt, buffer) # type: ignore[arg-type] 734 except ssl.SSLZeroReturnError: 735 return b"" 736 737 def write(self, buf: Buffer) -> int: 738 """ 739 Write ``buf`` in encrypted form to the output buffer and return the 740 number of bytes written. The ``buf`` argument must be an object 741 supporting the buffer interface. 742 743 Raise ``WantReadError`` or ``WantWriteError`` if there is 744 insufficient data in either the input or output buffer and the 745 operation would have caused data to be written or read. In either 746 case, users should endeavour to resolve that situation and then 747 re-call this method. When re-calling this method users *should* 748 re-use the exact same ``buf`` object, as some TLS implementations 749 require that the exact same buffer be used. 750 751 This operation may write "short": that is, fewer bytes may be 752 written than were in the buffer. 753 754 As at any time a re-negotiation is possible, a call to ``write()`` 755 can also cause read operations. 756 """ 757 758 with _error_converter(): 759 return self._object.write(buf) 760 761 # Get rid and do handshake ourselves? 762 def do_handshake(self) -> None: 763 """ 764 Performs the TLS handshake. Also performs certificate validation 765 and hostname verification. 766 """ 767 768 with _error_converter(): 769 self._object.do_handshake() 770 771 def shutdown(self) -> None: 772 """ 773 Performs a clean TLS shut down. This should generally be used 774 whenever possible to signal to the remote peer that the content is 775 finished. 776 """ 777 778 with _error_converter(): 779 self._object.unwrap() 780 781 def process_incoming(self, data_from_network: bytes) -> None: 782 """ 783 Receives some TLS data from the network and stores it in an 784 internal buffer. 785 786 If the internal buffer is overfull, this method will raise 787 ``WantReadError`` and store no data. At this point, the user must 788 call ``read`` to remove some data from the internal buffer 789 before repeating this call. 790 """ 791 792 with _error_converter(): 793 written_len = self._in_bio.write(data_from_network) 794 795 assert written_len == len(data_from_network) 796 797 def incoming_bytes_buffered(self) -> int: 798 """ 799 Returns how many bytes are in the incoming buffer waiting to be processed. 800 """ 801 802 return self._in_bio.pending 803 804 def process_outgoing(self, amount_bytes_for_network: int) -> bytes: 805 """ 806 Returns the next ``amt`` bytes of data that should be written to 807 the network from the outgoing data buffer, removing it from the 808 internal buffer. 809 """ 810 811 return self._out_bio.read(amount_bytes_for_network) 812 813 def outgoing_bytes_buffered(self) -> int: 814 """ 815 Returns how many bytes are in the outgoing buffer waiting to be sent. 816 """ 817 818 return self._out_bio.pending 819 820 @property 821 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 822 """The ``Context`` object this socket is tied to.""" 823 824 return self._parent_context 825 826 def cipher(self) -> CipherSuite | int | None: 827 """ 828 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 829 830 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 831 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 832 """ 833 834 ret = self._object.cipher() 835 836 if ret is None: 837 return None 838 else: 839 ossl_cipher, _, _ = ret 840 841 for cipher in self._ssl_context.get_ciphers(): 842 if cipher["name"] == ossl_cipher: 843 break 844 # Since the cipher was negotiated using the OpenSSL context, 845 # it must exist in the list of the OpenSSL supported ciphers. 846 assert cipher["name"] == ossl_cipher 847 848 cipher_id = cipher["id"] & 0xFFFF 849 try: 850 return CipherSuite(cipher_id) 851 except ValueError: 852 return cipher_id 853 854 def negotiated_protocol(self) -> NextProtocol | bytes | None: 855 """ 856 Returns the protocol that was selected during the TLS handshake. 857 858 This selection may have been made using ALPN or some future 859 negotiation mechanism. 860 861 If the negotiated protocol is one of the protocols defined in the 862 ``NextProtocol`` enum, the value from that enum will be returned. 863 Otherwise, the raw bytestring of the negotiated protocol will be 864 returned. 865 866 If ``Context.set_inner_protocols()`` was not called, if the other 867 party does not support protocol negotiation, if this socket does 868 not support any of the peer's proposed protocols, or if the 869 handshake has not happened yet, ``None`` is returned. 870 """ 871 872 proto = self._object.selected_alpn_protocol() 873 874 # The standard library returns this as a str, we want bytes. 875 if proto is None: 876 return None 877 878 protoBytes = proto.encode("ascii") 879 880 try: 881 return NextProtocol(protoBytes) 882 except ValueError: 883 return protoBytes 884 885 @property 886 def negotiated_tls_version(self) -> TLSVersion | None: 887 """The version of TLS that has been negotiated on this connection.""" 888 889 ossl_version = self._object.version() 890 if ossl_version is None: 891 return None 892 else: 893 return TLSVersion(ossl_version) 894 895 def getpeercert(self) -> bytes | None: 896 """ 897 Return the raw DER bytes of the certificate provided by the peer 898 during the handshake, if applicable. 899 """ 900 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 901 # Obtaining the certificate as a dict is very specific to the ssl module and may be 902 # difficult to implement for other implementation, so this is not supported 903 with _error_converter(): 904 cert = self._object.getpeercert(True) 905 906 return cert 907 908 909class OpenSSLClientContext: 910 """This class controls and creates a socket that is wrapped using the 911 standard library bindings to OpenSSL to perform TLS connections on the 912 client side of a network connection. 913 """ 914 915 def __init__(self, configuration: TLSClientConfiguration) -> None: 916 """Create a new context object from a given TLS configuration.""" 917 918 self._configuration = configuration 919 920 @property 921 def configuration(self) -> TLSClientConfiguration: 922 """Returns the TLS configuration that was used to create the context.""" 923 924 return self._configuration 925 926 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 927 """Create a socket-like object that can be used to do TLS.""" 928 ossl_context = _init_context_client(self._configuration) 929 930 return OpenSSLTLSSocket._create( 931 parent_context=self, 932 server_side=False, 933 ssl_context=ossl_context, 934 address=address, 935 ) 936 937 def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer: 938 """Creates a TLSBuffer that acts as an in-memory channel, 939 and contains information about the TLS exchange 940 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 941 942 ossl_context = _init_context_client(self._configuration) 943 944 return OpenSSLTLSBuffer._create( 945 server_hostname=server_hostname, 946 parent_context=self, 947 server_side=False, 948 ssl_context=ossl_context, 949 ) 950 951 952class OpenSSLServerContext: 953 """This class controls and creates and creates a socket that is wrapped using the 954 standard library bindings to OpenSSL to perform TLS connections on the 955 server side of a network connection. 956 """ 957 958 def __init__(self, configuration: TLSServerConfiguration) -> None: 959 """Create a new context object from a given TLS configuration.""" 960 961 self._configuration = configuration 962 963 @property 964 def configuration(self) -> TLSServerConfiguration: 965 """Returns the TLS configuration that was used to create the context.""" 966 967 return self._configuration 968 969 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 970 """Create a socket-like object that can be used to do TLS.""" 971 ossl_context = _init_context_server(self._configuration) 972 973 return OpenSSLTLSSocket._create( 974 parent_context=self, 975 server_side=True, 976 ssl_context=ossl_context, 977 address=address, 978 ) 979 980 def create_buffer(self) -> OpenSSLTLSBuffer: 981 """Creates a TLSBuffer that acts as an in-memory channel, 982 and contains information about the TLS exchange 983 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 984 985 ossl_context = _init_context_server(self._configuration) 986 987 return OpenSSLTLSBuffer._create( 988 server_hostname=None, 989 parent_context=self, 990 server_side=True, 991 ssl_context=ossl_context, 992 ) 993 994 995def _check_cert_or_priv(cert_or_priv: Certificate | PrivateKey) -> None: 996 if cert_or_priv._path is not None or cert_or_priv._buffer is not None: 997 return None 998 elif cert_or_priv._id is not None: 999 raise ConfigurationError( 1000 "This TLS implementation does not support id-based certificates \ 1001 or private keys." 1002 ) 1003 else: 1004 raise ConfigurationError("Certificate or PrivateKey cannot be empty.") 1005 1006 1007def _check_trust_store(trust_store: TrustStore | None) -> None: 1008 if trust_store is not None and trust_store._id is not None: 1009 raise ConfigurationError("This TLS implementation does not support id-based trust stores.") 1010 1011 1012def _check_sign_chain(sign_chain: SigningChain) -> None: 1013 leaf = sign_chain.leaf 1014 _check_cert_or_priv(leaf[0]) 1015 priv_key = leaf[1] 1016 if priv_key is not None: 1017 _check_cert_or_priv(priv_key) 1018 for cert in sign_chain.chain: 1019 _check_cert_or_priv(cert) 1020 1021 1022def validate_config(tls_config: TLSClientConfiguration | TLSServerConfiguration) -> None: 1023 """Validates whether the OpenSSL TLS implementation supports this TLS configuration.""" 1024 _check_trust_store(tls_config.trust_store) 1025 1026 if isinstance(tls_config, TLSClientConfiguration): 1027 sign_chain = tls_config.certificate_chain 1028 if sign_chain is not None: 1029 _check_sign_chain(sign_chain) 1030 1031 else: 1032 assert isinstance(tls_config, TLSServerConfiguration) 1033 cert_chain = tls_config.certificate_chain 1034 if cert_chain is not None: 1035 for sign_chain in cert_chain: 1036 _check_sign_chain(sign_chain) 1037 1038 1039#: The stdlib ``TLSImplementation`` object. 1040STDLIB_IMPLEMENTATION = TLSImplementation( 1041 client_context=OpenSSLClientContext, 1042 server_context=OpenSSLServerContext, 1043 validate_config=validate_config, 1044)
413class OpenSSLTLSSocket: 414 """A TLSSocket implementation based on OpenSSL.""" 415 416 __slots__ = ( 417 "_parent_context", 418 "_socket", 419 "_ssl_context", 420 ) 421 422 _parent_context: OpenSSLClientContext | OpenSSLServerContext 423 _socket: ssl.SSLSocket 424 _ssl_context: _SSLContext 425 426 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 427 """OpenTLSSockets should not be constructed by the user. 428 Instead, the ClientContext.connect() and 429 ServerContext.connect() use the _create() method.""" 430 msg = ( 431 f"{self.__class__.__name__} does not have a public constructor. " 432 "Instances are returned by ClientContext.connect() or ServerContext.connect()." 433 ) 434 raise TypeError( 435 msg, 436 ) 437 438 @classmethod 439 def _create( 440 cls, 441 address: tuple[str | None, int], 442 parent_context: OpenSSLClientContext | OpenSSLServerContext, 443 server_side: bool, 444 ssl_context: _SSLContext, 445 ) -> OpenSSLTLSSocket: 446 self = cls.__new__(cls) 447 self._parent_context = parent_context 448 self._ssl_context = ssl_context 449 450 if server_side is True: 451 sock = socket.create_server(address) 452 with _error_converter(): 453 self._socket = ssl_context.wrap_socket( 454 sock, server_side=server_side, server_hostname=None 455 ) 456 else: 457 hostname, _ = address 458 sock = socket.create_connection(address) 459 with _error_converter(): 460 self._socket = ssl_context.wrap_socket( 461 sock, server_side=server_side, server_hostname=hostname 462 ) 463 464 self._socket.setblocking(False) 465 466 return self 467 468 def recv(self, bufsize: int) -> bytes: 469 """Receive data from the socket. The return value is a bytes object 470 representing the data received. Should not work before the handshake 471 is completed.""" 472 with _error_converter(): 473 try: 474 return self._socket.recv(bufsize) 475 except ssl.SSLZeroReturnError: 476 return b"" 477 478 def send(self, bytes: bytes) -> int: 479 """Send data to the socket. The socket must be connected to a remote socket.""" 480 with _error_converter(): 481 return self._socket.send(bytes) 482 483 def close(self, force: bool = False) -> None: 484 """Unwraps the TLS connection, shuts down both halves of the connection and 485 mark the socket closed. If force is True, will only shutdown own half and 486 not wait for the other side. If force is False, this will raise WantReadError 487 until the other side sends a close_notify alert.""" 488 489 try: 490 with _error_converter(): 491 sock = self._socket.unwrap() 492 except (ValueError, BrokenPipeError, OSError): 493 # If these exceptions are raised, we close the socket without re-trying to unwrap it. 494 # - ValueError: The socket was actually not wrapped 495 # - BrokenPipeError: There is some issue with the socket 496 # - OSError: The other side already shut down 497 sock = self._socket 498 except WantReadError: 499 if force: 500 sock = self._socket 501 else: 502 raise 503 504 # NOTE: OSError indicates that the other side has already hung up. 505 with _error_converter(ignore_filter=(OSError,)): 506 sock.shutdown(socket.SHUT_RDWR) 507 return sock.close() 508 509 def listen(self, backlog: int) -> None: 510 """Enable a server to accept connections. If backlog is specified, it 511 specifies the number of unaccepted connections that the system will allow 512 before refusing new connections.""" 513 with _error_converter(): 514 return self._socket.listen(backlog) 515 516 def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]: 517 """Accept a connection. The socket must be bound to an address and listening 518 for connections. The return value is a pair (conn, address) where conn is a 519 new TLSSocket object usable to send and receive data on the connection, and 520 address is the address bound to the socket on the other end of the connection.""" 521 522 with _error_converter(): 523 (sock, address) = self._socket.accept() 524 tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket) 525 tls_socket._parent_context = self._parent_context 526 tls_socket._ssl_context = self._ssl_context 527 tls_socket._socket = sock 528 with _error_converter(): 529 tls_socket._socket.setblocking(False) 530 return (tls_socket, address) 531 532 def getsockname(self) -> socket._RetAddress: 533 """Return the local address to which the socket is connected.""" 534 with _error_converter(): 535 return self._socket.getsockname() 536 537 def getpeercert(self) -> bytes | None: 538 """ 539 Return the raw DER bytes of the certificate provided by the peer 540 during the handshake, if applicable. 541 """ 542 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 543 # Obtaining the certificate as a dict is very specific to the ssl module and may be 544 # difficult to implement for other TLS implementations, so this is not supported 545 546 with _error_converter(): 547 cert = self._socket.getpeercert(True) 548 549 return cert 550 551 def getpeername(self) -> socket._RetAddress: 552 """Return the remote address to which the socket is connected.""" 553 554 with _error_converter(): 555 return self._socket.getpeername() 556 557 def fileno(self) -> int: 558 """Return the socket's file descriptor (a small integer), or -1 on failure.""" 559 560 with _error_converter(): 561 return self._socket.fileno() 562 563 @property 564 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 565 """The ``Context`` object this socket is tied to.""" 566 567 return self._parent_context 568 569 def cipher(self) -> CipherSuite | int | None: 570 """ 571 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 572 573 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 574 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 575 """ 576 577 # This is the OpenSSL cipher name. We want the ID, which we can get by 578 # looking for this entry in the context's list of supported ciphers. 579 ret = self._socket.cipher() 580 581 if ret is None: 582 return None 583 else: 584 ossl_cipher, _, _ = ret 585 586 for cipher in self._ssl_context.get_ciphers(): 587 if cipher["name"] == ossl_cipher: 588 break 589 # Since the cipher was negotiated using the OpenSSL context, 590 # it must exist in the list of the OpenSSL supported ciphers. 591 assert cipher["name"] == ossl_cipher 592 593 cipher_id = cipher["id"] & 0xFFFF 594 try: 595 return CipherSuite(cipher_id) 596 except ValueError: 597 return cipher_id 598 599 def negotiated_protocol(self) -> NextProtocol | bytes | None: 600 """ 601 Returns the protocol that was selected during the TLS handshake. 602 603 This selection may have been made using ALPN or some future 604 negotiation mechanism. 605 606 If the negotiated protocol is one of the protocols defined in the 607 ``NextProtocol`` enum, the value from that enum will be returned. 608 Otherwise, the raw bytestring of the negotiated protocol will be 609 returned. 610 611 If ``Context.set_inner_protocols()`` was not called, if the other 612 party does not support protocol negotiation, if this socket does 613 not support any of the peer's proposed protocols, or if the 614 handshake has not happened yet, ``None`` is returned. 615 """ 616 617 proto = self._socket.selected_alpn_protocol() 618 619 # The standard library returns this as a str, we want bytes. 620 if proto is None: 621 return None 622 623 protoBytes = proto.encode("ascii") 624 625 try: 626 return NextProtocol(protoBytes) 627 except ValueError: 628 return protoBytes 629 630 @property 631 def negotiated_tls_version(self) -> TLSVersion | None: 632 """The version of TLS that has been negotiated on this connection.""" 633 634 ossl_version = self._socket.version() 635 if ossl_version is None: 636 return None 637 else: 638 return TLSVersion(ossl_version)
A TLSSocket implementation based on OpenSSL.
426 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 427 """OpenTLSSockets should not be constructed by the user. 428 Instead, the ClientContext.connect() and 429 ServerContext.connect() use the _create() method.""" 430 msg = ( 431 f"{self.__class__.__name__} does not have a public constructor. " 432 "Instances are returned by ClientContext.connect() or ServerContext.connect()." 433 ) 434 raise TypeError( 435 msg, 436 )
OpenTLSSockets should not be constructed by the user. Instead, the ClientContext.connect() and ServerContext.connect() use the _create() method.
468 def recv(self, bufsize: int) -> bytes: 469 """Receive data from the socket. The return value is a bytes object 470 representing the data received. Should not work before the handshake 471 is completed.""" 472 with _error_converter(): 473 try: 474 return self._socket.recv(bufsize) 475 except ssl.SSLZeroReturnError: 476 return b""
Receive data from the socket. The return value is a bytes object representing the data received. Should not work before the handshake is completed.
478 def send(self, bytes: bytes) -> int: 479 """Send data to the socket. The socket must be connected to a remote socket.""" 480 with _error_converter(): 481 return self._socket.send(bytes)
Send data to the socket. The socket must be connected to a remote socket.
483 def close(self, force: bool = False) -> None: 484 """Unwraps the TLS connection, shuts down both halves of the connection and 485 mark the socket closed. If force is True, will only shutdown own half and 486 not wait for the other side. If force is False, this will raise WantReadError 487 until the other side sends a close_notify alert.""" 488 489 try: 490 with _error_converter(): 491 sock = self._socket.unwrap() 492 except (ValueError, BrokenPipeError, OSError): 493 # If these exceptions are raised, we close the socket without re-trying to unwrap it. 494 # - ValueError: The socket was actually not wrapped 495 # - BrokenPipeError: There is some issue with the socket 496 # - OSError: The other side already shut down 497 sock = self._socket 498 except WantReadError: 499 if force: 500 sock = self._socket 501 else: 502 raise 503 504 # NOTE: OSError indicates that the other side has already hung up. 505 with _error_converter(ignore_filter=(OSError,)): 506 sock.shutdown(socket.SHUT_RDWR) 507 return sock.close()
Unwraps the TLS connection, shuts down both halves of the connection and mark the socket closed. If force is True, will only shutdown own half and not wait for the other side. If force is False, this will raise WantReadError until the other side sends a close_notify alert.
509 def listen(self, backlog: int) -> None: 510 """Enable a server to accept connections. If backlog is specified, it 511 specifies the number of unaccepted connections that the system will allow 512 before refusing new connections.""" 513 with _error_converter(): 514 return self._socket.listen(backlog)
Enable a server to accept connections. If backlog is specified, it specifies the number of unaccepted connections that the system will allow before refusing new connections.
516 def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]: 517 """Accept a connection. The socket must be bound to an address and listening 518 for connections. The return value is a pair (conn, address) where conn is a 519 new TLSSocket object usable to send and receive data on the connection, and 520 address is the address bound to the socket on the other end of the connection.""" 521 522 with _error_converter(): 523 (sock, address) = self._socket.accept() 524 tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket) 525 tls_socket._parent_context = self._parent_context 526 tls_socket._ssl_context = self._ssl_context 527 tls_socket._socket = sock 528 with _error_converter(): 529 tls_socket._socket.setblocking(False) 530 return (tls_socket, address)
Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair (conn, address) where conn is a new TLSSocket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection.
532 def getsockname(self) -> socket._RetAddress: 533 """Return the local address to which the socket is connected.""" 534 with _error_converter(): 535 return self._socket.getsockname()
Return the local address to which the socket is connected.
537 def getpeercert(self) -> bytes | None: 538 """ 539 Return the raw DER bytes of the certificate provided by the peer 540 during the handshake, if applicable. 541 """ 542 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 543 # Obtaining the certificate as a dict is very specific to the ssl module and may be 544 # difficult to implement for other TLS implementations, so this is not supported 545 546 with _error_converter(): 547 cert = self._socket.getpeercert(True) 548 549 return cert
Return the raw DER bytes of the certificate provided by the peer during the handshake, if applicable.
551 def getpeername(self) -> socket._RetAddress: 552 """Return the remote address to which the socket is connected.""" 553 554 with _error_converter(): 555 return self._socket.getpeername()
Return the remote address to which the socket is connected.
557 def fileno(self) -> int: 558 """Return the socket's file descriptor (a small integer), or -1 on failure.""" 559 560 with _error_converter(): 561 return self._socket.fileno()
Return the socket's file descriptor (a small integer), or -1 on failure.
563 @property 564 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 565 """The ``Context`` object this socket is tied to.""" 566 567 return self._parent_context
The Context
object this socket is tied to.
569 def cipher(self) -> CipherSuite | int | None: 570 """ 571 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 572 573 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 574 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 575 """ 576 577 # This is the OpenSSL cipher name. We want the ID, which we can get by 578 # looking for this entry in the context's list of supported ciphers. 579 ret = self._socket.cipher() 580 581 if ret is None: 582 return None 583 else: 584 ossl_cipher, _, _ = ret 585 586 for cipher in self._ssl_context.get_ciphers(): 587 if cipher["name"] == ossl_cipher: 588 break 589 # Since the cipher was negotiated using the OpenSSL context, 590 # it must exist in the list of the OpenSSL supported ciphers. 591 assert cipher["name"] == ossl_cipher 592 593 cipher_id = cipher["id"] & 0xFFFF 594 try: 595 return CipherSuite(cipher_id) 596 except ValueError: 597 return cipher_id
Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
If no connection has been negotiated, returns None
. If the cipher negotiated is not
defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
599 def negotiated_protocol(self) -> NextProtocol | bytes | None: 600 """ 601 Returns the protocol that was selected during the TLS handshake. 602 603 This selection may have been made using ALPN or some future 604 negotiation mechanism. 605 606 If the negotiated protocol is one of the protocols defined in the 607 ``NextProtocol`` enum, the value from that enum will be returned. 608 Otherwise, the raw bytestring of the negotiated protocol will be 609 returned. 610 611 If ``Context.set_inner_protocols()`` was not called, if the other 612 party does not support protocol negotiation, if this socket does 613 not support any of the peer's proposed protocols, or if the 614 handshake has not happened yet, ``None`` is returned. 615 """ 616 617 proto = self._socket.selected_alpn_protocol() 618 619 # The standard library returns this as a str, we want bytes. 620 if proto is None: 621 return None 622 623 protoBytes = proto.encode("ascii") 624 625 try: 626 return NextProtocol(protoBytes) 627 except ValueError: 628 return protoBytes
Returns the protocol that was selected during the TLS handshake.
This selection may have been made using ALPN or some future negotiation mechanism.
If the negotiated protocol is one of the protocols defined in the
NextProtocol
enum, the value from that enum will be returned.
Otherwise, the raw bytestring of the negotiated protocol will be
returned.
If Context.set_inner_protocols()
was not called, if the other
party does not support protocol negotiation, if this socket does
not support any of the peer's proposed protocols, or if the
handshake has not happened yet, None
is returned.
630 @property 631 def negotiated_tls_version(self) -> TLSVersion | None: 632 """The version of TLS that has been negotiated on this connection.""" 633 634 ossl_version = self._socket.version() 635 if ossl_version is None: 636 return None 637 else: 638 return TLSVersion(ossl_version)
The version of TLS that has been negotiated on this connection.
641class OpenSSLTLSBuffer: 642 """A TLSBuffer implementation based on OpenSSL""" 643 644 __slots__ = ( 645 "_ciphertext_buffer", 646 "_in_bio", 647 "_object", 648 "_out_bio", 649 "_parent_context", 650 "_ssl_context", 651 ) 652 653 _ciphertext_buffer: bytearray 654 _in_bio: ssl.MemoryBIO 655 _object: ssl.SSLObject 656 _out_bio: ssl.MemoryBIO 657 _parent_context: OpenSSLClientContext | OpenSSLServerContext 658 _ssl_context: _SSLContext 659 660 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 661 """OpenTLSBuffers should not be constructed by the user. 662 Instead, the ClientContext.create_buffer() and 663 ServerContext.create_buffer() use the _create() method.""" 664 msg = ( 665 f"{self.__class__.__name__} does not have a public constructor. " 666 "Instances are returned by ClientContext.create_buffer() \ 667 or ServerContext.create_buffer()." 668 ) 669 raise TypeError( 670 msg, 671 ) 672 673 @classmethod 674 def _create( 675 cls, 676 server_hostname: str | None, 677 parent_context: OpenSSLClientContext | OpenSSLServerContext, 678 server_side: bool, 679 ssl_context: _SSLContext, 680 ) -> OpenSSLTLSBuffer: 681 self = cls.__new__(cls) 682 self._parent_context = parent_context 683 self._ssl_context = ssl_context 684 685 # We need this extra buffer to implement the peek/consume API, which 686 # the MemoryBIO object does not allow. 687 self._ciphertext_buffer = bytearray() 688 689 # Set up the SSLObject we're going to back this with. 690 self._in_bio = ssl.MemoryBIO() 691 self._out_bio = ssl.MemoryBIO() 692 693 if server_side is True: 694 with _error_converter(): 695 self._object = ssl_context.wrap_bio( 696 self._in_bio, self._out_bio, server_side=True, server_hostname=None 697 ) 698 else: 699 with _error_converter(): 700 self._object = ssl_context.wrap_bio( 701 self._in_bio, self._out_bio, server_side=False, server_hostname=server_hostname 702 ) 703 704 return self 705 706 def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int: 707 """ 708 Read up to ``amt`` bytes of data from the input buffer and return 709 the result as a ``bytes`` instance. If an optional buffer is 710 provided, the result is written into the buffer and the number of 711 bytes is returned instead. 712 713 Once EOF is reached, all further calls to this method return the 714 empty byte string ``b''``. 715 716 May read "short": that is, fewer bytes may be returned than were 717 requested. 718 719 Raise ``WantReadError`` or ``WantWriteError`` if there is 720 insufficient data in either the input or output buffer and the 721 operation would have caused data to be written or read. 722 723 May raise ``RaggedEOF`` if the connection has been closed without a 724 graceful TLS shutdown. Whether this is an exception that should be 725 ignored or not is up to the specific application. 726 727 As at any time a re-negotiation is possible, a call to ``read()`` 728 can also cause write operations. 729 """ 730 731 with _error_converter(): 732 try: 733 # MyPy insists that buffer must be a bytearray 734 return self._object.read(amt, buffer) # type: ignore[arg-type] 735 except ssl.SSLZeroReturnError: 736 return b"" 737 738 def write(self, buf: Buffer) -> int: 739 """ 740 Write ``buf`` in encrypted form to the output buffer and return the 741 number of bytes written. The ``buf`` argument must be an object 742 supporting the buffer interface. 743 744 Raise ``WantReadError`` or ``WantWriteError`` if there is 745 insufficient data in either the input or output buffer and the 746 operation would have caused data to be written or read. In either 747 case, users should endeavour to resolve that situation and then 748 re-call this method. When re-calling this method users *should* 749 re-use the exact same ``buf`` object, as some TLS implementations 750 require that the exact same buffer be used. 751 752 This operation may write "short": that is, fewer bytes may be 753 written than were in the buffer. 754 755 As at any time a re-negotiation is possible, a call to ``write()`` 756 can also cause read operations. 757 """ 758 759 with _error_converter(): 760 return self._object.write(buf) 761 762 # Get rid and do handshake ourselves? 763 def do_handshake(self) -> None: 764 """ 765 Performs the TLS handshake. Also performs certificate validation 766 and hostname verification. 767 """ 768 769 with _error_converter(): 770 self._object.do_handshake() 771 772 def shutdown(self) -> None: 773 """ 774 Performs a clean TLS shut down. This should generally be used 775 whenever possible to signal to the remote peer that the content is 776 finished. 777 """ 778 779 with _error_converter(): 780 self._object.unwrap() 781 782 def process_incoming(self, data_from_network: bytes) -> None: 783 """ 784 Receives some TLS data from the network and stores it in an 785 internal buffer. 786 787 If the internal buffer is overfull, this method will raise 788 ``WantReadError`` and store no data. At this point, the user must 789 call ``read`` to remove some data from the internal buffer 790 before repeating this call. 791 """ 792 793 with _error_converter(): 794 written_len = self._in_bio.write(data_from_network) 795 796 assert written_len == len(data_from_network) 797 798 def incoming_bytes_buffered(self) -> int: 799 """ 800 Returns how many bytes are in the incoming buffer waiting to be processed. 801 """ 802 803 return self._in_bio.pending 804 805 def process_outgoing(self, amount_bytes_for_network: int) -> bytes: 806 """ 807 Returns the next ``amt`` bytes of data that should be written to 808 the network from the outgoing data buffer, removing it from the 809 internal buffer. 810 """ 811 812 return self._out_bio.read(amount_bytes_for_network) 813 814 def outgoing_bytes_buffered(self) -> int: 815 """ 816 Returns how many bytes are in the outgoing buffer waiting to be sent. 817 """ 818 819 return self._out_bio.pending 820 821 @property 822 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 823 """The ``Context`` object this socket is tied to.""" 824 825 return self._parent_context 826 827 def cipher(self) -> CipherSuite | int | None: 828 """ 829 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 830 831 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 832 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 833 """ 834 835 ret = self._object.cipher() 836 837 if ret is None: 838 return None 839 else: 840 ossl_cipher, _, _ = ret 841 842 for cipher in self._ssl_context.get_ciphers(): 843 if cipher["name"] == ossl_cipher: 844 break 845 # Since the cipher was negotiated using the OpenSSL context, 846 # it must exist in the list of the OpenSSL supported ciphers. 847 assert cipher["name"] == ossl_cipher 848 849 cipher_id = cipher["id"] & 0xFFFF 850 try: 851 return CipherSuite(cipher_id) 852 except ValueError: 853 return cipher_id 854 855 def negotiated_protocol(self) -> NextProtocol | bytes | None: 856 """ 857 Returns the protocol that was selected during the TLS handshake. 858 859 This selection may have been made using ALPN or some future 860 negotiation mechanism. 861 862 If the negotiated protocol is one of the protocols defined in the 863 ``NextProtocol`` enum, the value from that enum will be returned. 864 Otherwise, the raw bytestring of the negotiated protocol will be 865 returned. 866 867 If ``Context.set_inner_protocols()`` was not called, if the other 868 party does not support protocol negotiation, if this socket does 869 not support any of the peer's proposed protocols, or if the 870 handshake has not happened yet, ``None`` is returned. 871 """ 872 873 proto = self._object.selected_alpn_protocol() 874 875 # The standard library returns this as a str, we want bytes. 876 if proto is None: 877 return None 878 879 protoBytes = proto.encode("ascii") 880 881 try: 882 return NextProtocol(protoBytes) 883 except ValueError: 884 return protoBytes 885 886 @property 887 def negotiated_tls_version(self) -> TLSVersion | None: 888 """The version of TLS that has been negotiated on this connection.""" 889 890 ossl_version = self._object.version() 891 if ossl_version is None: 892 return None 893 else: 894 return TLSVersion(ossl_version) 895 896 def getpeercert(self) -> bytes | None: 897 """ 898 Return the raw DER bytes of the certificate provided by the peer 899 during the handshake, if applicable. 900 """ 901 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 902 # Obtaining the certificate as a dict is very specific to the ssl module and may be 903 # difficult to implement for other implementation, so this is not supported 904 with _error_converter(): 905 cert = self._object.getpeercert(True) 906 907 return cert
A TLSBuffer implementation based on OpenSSL
660 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 661 """OpenTLSBuffers should not be constructed by the user. 662 Instead, the ClientContext.create_buffer() and 663 ServerContext.create_buffer() use the _create() method.""" 664 msg = ( 665 f"{self.__class__.__name__} does not have a public constructor. " 666 "Instances are returned by ClientContext.create_buffer() \ 667 or ServerContext.create_buffer()." 668 ) 669 raise TypeError( 670 msg, 671 )
OpenTLSBuffers should not be constructed by the user. Instead, the ClientContext.create_buffer() and ServerContext.create_buffer() use the _create() method.
706 def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int: 707 """ 708 Read up to ``amt`` bytes of data from the input buffer and return 709 the result as a ``bytes`` instance. If an optional buffer is 710 provided, the result is written into the buffer and the number of 711 bytes is returned instead. 712 713 Once EOF is reached, all further calls to this method return the 714 empty byte string ``b''``. 715 716 May read "short": that is, fewer bytes may be returned than were 717 requested. 718 719 Raise ``WantReadError`` or ``WantWriteError`` if there is 720 insufficient data in either the input or output buffer and the 721 operation would have caused data to be written or read. 722 723 May raise ``RaggedEOF`` if the connection has been closed without a 724 graceful TLS shutdown. Whether this is an exception that should be 725 ignored or not is up to the specific application. 726 727 As at any time a re-negotiation is possible, a call to ``read()`` 728 can also cause write operations. 729 """ 730 731 with _error_converter(): 732 try: 733 # MyPy insists that buffer must be a bytearray 734 return self._object.read(amt, buffer) # type: ignore[arg-type] 735 except ssl.SSLZeroReturnError: 736 return b""
Read up to amt
bytes of data from the input buffer and return
the result as a bytes
instance. If an optional buffer is
provided, the result is written into the buffer and the number of
bytes is returned instead.
Once EOF is reached, all further calls to this method return the
empty byte string b''
.
May read "short": that is, fewer bytes may be returned than were requested.
Raise WantReadError
or WantWriteError
if there is
insufficient data in either the input or output buffer and the
operation would have caused data to be written or read.
May raise RaggedEOF
if the connection has been closed without a
graceful TLS shutdown. Whether this is an exception that should be
ignored or not is up to the specific application.
As at any time a re-negotiation is possible, a call to read()
can also cause write operations.
738 def write(self, buf: Buffer) -> int: 739 """ 740 Write ``buf`` in encrypted form to the output buffer and return the 741 number of bytes written. The ``buf`` argument must be an object 742 supporting the buffer interface. 743 744 Raise ``WantReadError`` or ``WantWriteError`` if there is 745 insufficient data in either the input or output buffer and the 746 operation would have caused data to be written or read. In either 747 case, users should endeavour to resolve that situation and then 748 re-call this method. When re-calling this method users *should* 749 re-use the exact same ``buf`` object, as some TLS implementations 750 require that the exact same buffer be used. 751 752 This operation may write "short": that is, fewer bytes may be 753 written than were in the buffer. 754 755 As at any time a re-negotiation is possible, a call to ``write()`` 756 can also cause read operations. 757 """ 758 759 with _error_converter(): 760 return self._object.write(buf)
Write buf
in encrypted form to the output buffer and return the
number of bytes written. The buf
argument must be an object
supporting the buffer interface.
Raise WantReadError
or WantWriteError
if there is
insufficient data in either the input or output buffer and the
operation would have caused data to be written or read. In either
case, users should endeavour to resolve that situation and then
re-call this method. When re-calling this method users should
re-use the exact same buf
object, as some TLS implementations
require that the exact same buffer be used.
This operation may write "short": that is, fewer bytes may be written than were in the buffer.
As at any time a re-negotiation is possible, a call to write()
can also cause read operations.
763 def do_handshake(self) -> None: 764 """ 765 Performs the TLS handshake. Also performs certificate validation 766 and hostname verification. 767 """ 768 769 with _error_converter(): 770 self._object.do_handshake()
Performs the TLS handshake. Also performs certificate validation and hostname verification.
772 def shutdown(self) -> None: 773 """ 774 Performs a clean TLS shut down. This should generally be used 775 whenever possible to signal to the remote peer that the content is 776 finished. 777 """ 778 779 with _error_converter(): 780 self._object.unwrap()
Performs a clean TLS shut down. This should generally be used whenever possible to signal to the remote peer that the content is finished.
782 def process_incoming(self, data_from_network: bytes) -> None: 783 """ 784 Receives some TLS data from the network and stores it in an 785 internal buffer. 786 787 If the internal buffer is overfull, this method will raise 788 ``WantReadError`` and store no data. At this point, the user must 789 call ``read`` to remove some data from the internal buffer 790 before repeating this call. 791 """ 792 793 with _error_converter(): 794 written_len = self._in_bio.write(data_from_network) 795 796 assert written_len == len(data_from_network)
Receives some TLS data from the network and stores it in an internal buffer.
If the internal buffer is overfull, this method will raise
WantReadError
and store no data. At this point, the user must
call read
to remove some data from the internal buffer
before repeating this call.
798 def incoming_bytes_buffered(self) -> int: 799 """ 800 Returns how many bytes are in the incoming buffer waiting to be processed. 801 """ 802 803 return self._in_bio.pending
Returns how many bytes are in the incoming buffer waiting to be processed.
805 def process_outgoing(self, amount_bytes_for_network: int) -> bytes: 806 """ 807 Returns the next ``amt`` bytes of data that should be written to 808 the network from the outgoing data buffer, removing it from the 809 internal buffer. 810 """ 811 812 return self._out_bio.read(amount_bytes_for_network)
Returns the next amt
bytes of data that should be written to
the network from the outgoing data buffer, removing it from the
internal buffer.
814 def outgoing_bytes_buffered(self) -> int: 815 """ 816 Returns how many bytes are in the outgoing buffer waiting to be sent. 817 """ 818 819 return self._out_bio.pending
Returns how many bytes are in the outgoing buffer waiting to be sent.
821 @property 822 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 823 """The ``Context`` object this socket is tied to.""" 824 825 return self._parent_context
The Context
object this socket is tied to.
827 def cipher(self) -> CipherSuite | int | None: 828 """ 829 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 830 831 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 832 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 833 """ 834 835 ret = self._object.cipher() 836 837 if ret is None: 838 return None 839 else: 840 ossl_cipher, _, _ = ret 841 842 for cipher in self._ssl_context.get_ciphers(): 843 if cipher["name"] == ossl_cipher: 844 break 845 # Since the cipher was negotiated using the OpenSSL context, 846 # it must exist in the list of the OpenSSL supported ciphers. 847 assert cipher["name"] == ossl_cipher 848 849 cipher_id = cipher["id"] & 0xFFFF 850 try: 851 return CipherSuite(cipher_id) 852 except ValueError: 853 return cipher_id
Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
If no connection has been negotiated, returns None
. If the cipher negotiated is not
defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
855 def negotiated_protocol(self) -> NextProtocol | bytes | None: 856 """ 857 Returns the protocol that was selected during the TLS handshake. 858 859 This selection may have been made using ALPN or some future 860 negotiation mechanism. 861 862 If the negotiated protocol is one of the protocols defined in the 863 ``NextProtocol`` enum, the value from that enum will be returned. 864 Otherwise, the raw bytestring of the negotiated protocol will be 865 returned. 866 867 If ``Context.set_inner_protocols()`` was not called, if the other 868 party does not support protocol negotiation, if this socket does 869 not support any of the peer's proposed protocols, or if the 870 handshake has not happened yet, ``None`` is returned. 871 """ 872 873 proto = self._object.selected_alpn_protocol() 874 875 # The standard library returns this as a str, we want bytes. 876 if proto is None: 877 return None 878 879 protoBytes = proto.encode("ascii") 880 881 try: 882 return NextProtocol(protoBytes) 883 except ValueError: 884 return protoBytes
Returns the protocol that was selected during the TLS handshake.
This selection may have been made using ALPN or some future negotiation mechanism.
If the negotiated protocol is one of the protocols defined in the
NextProtocol
enum, the value from that enum will be returned.
Otherwise, the raw bytestring of the negotiated protocol will be
returned.
If Context.set_inner_protocols()
was not called, if the other
party does not support protocol negotiation, if this socket does
not support any of the peer's proposed protocols, or if the
handshake has not happened yet, None
is returned.
886 @property 887 def negotiated_tls_version(self) -> TLSVersion | None: 888 """The version of TLS that has been negotiated on this connection.""" 889 890 ossl_version = self._object.version() 891 if ossl_version is None: 892 return None 893 else: 894 return TLSVersion(ossl_version)
The version of TLS that has been negotiated on this connection.
896 def getpeercert(self) -> bytes | None: 897 """ 898 Return the raw DER bytes of the certificate provided by the peer 899 during the handshake, if applicable. 900 """ 901 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 902 # Obtaining the certificate as a dict is very specific to the ssl module and may be 903 # difficult to implement for other implementation, so this is not supported 904 with _error_converter(): 905 cert = self._object.getpeercert(True) 906 907 return cert
Return the raw DER bytes of the certificate provided by the peer during the handshake, if applicable.
910class OpenSSLClientContext: 911 """This class controls and creates a socket that is wrapped using the 912 standard library bindings to OpenSSL to perform TLS connections on the 913 client side of a network connection. 914 """ 915 916 def __init__(self, configuration: TLSClientConfiguration) -> None: 917 """Create a new context object from a given TLS configuration.""" 918 919 self._configuration = configuration 920 921 @property 922 def configuration(self) -> TLSClientConfiguration: 923 """Returns the TLS configuration that was used to create the context.""" 924 925 return self._configuration 926 927 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 928 """Create a socket-like object that can be used to do TLS.""" 929 ossl_context = _init_context_client(self._configuration) 930 931 return OpenSSLTLSSocket._create( 932 parent_context=self, 933 server_side=False, 934 ssl_context=ossl_context, 935 address=address, 936 ) 937 938 def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer: 939 """Creates a TLSBuffer that acts as an in-memory channel, 940 and contains information about the TLS exchange 941 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 942 943 ossl_context = _init_context_client(self._configuration) 944 945 return OpenSSLTLSBuffer._create( 946 server_hostname=server_hostname, 947 parent_context=self, 948 server_side=False, 949 ssl_context=ossl_context, 950 )
This class controls and creates a socket that is wrapped using the standard library bindings to OpenSSL to perform TLS connections on the client side of a network connection.
916 def __init__(self, configuration: TLSClientConfiguration) -> None: 917 """Create a new context object from a given TLS configuration.""" 918 919 self._configuration = configuration
Create a new context object from a given TLS configuration.
921 @property 922 def configuration(self) -> TLSClientConfiguration: 923 """Returns the TLS configuration that was used to create the context.""" 924 925 return self._configuration
Returns the TLS configuration that was used to create the context.
927 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 928 """Create a socket-like object that can be used to do TLS.""" 929 ossl_context = _init_context_client(self._configuration) 930 931 return OpenSSLTLSSocket._create( 932 parent_context=self, 933 server_side=False, 934 ssl_context=ossl_context, 935 address=address, 936 )
Create a socket-like object that can be used to do TLS.
938 def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer: 939 """Creates a TLSBuffer that acts as an in-memory channel, 940 and contains information about the TLS exchange 941 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 942 943 ossl_context = _init_context_client(self._configuration) 944 945 return OpenSSLTLSBuffer._create( 946 server_hostname=server_hostname, 947 parent_context=self, 948 server_side=False, 949 ssl_context=ossl_context, 950 )
Creates a TLSBuffer that acts as an in-memory channel, and contains information about the TLS exchange (cipher, negotiated_protocol, negotiated_tls_version, etc.).
953class OpenSSLServerContext: 954 """This class controls and creates and creates a socket that is wrapped using the 955 standard library bindings to OpenSSL to perform TLS connections on the 956 server side of a network connection. 957 """ 958 959 def __init__(self, configuration: TLSServerConfiguration) -> None: 960 """Create a new context object from a given TLS configuration.""" 961 962 self._configuration = configuration 963 964 @property 965 def configuration(self) -> TLSServerConfiguration: 966 """Returns the TLS configuration that was used to create the context.""" 967 968 return self._configuration 969 970 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 971 """Create a socket-like object that can be used to do TLS.""" 972 ossl_context = _init_context_server(self._configuration) 973 974 return OpenSSLTLSSocket._create( 975 parent_context=self, 976 server_side=True, 977 ssl_context=ossl_context, 978 address=address, 979 ) 980 981 def create_buffer(self) -> OpenSSLTLSBuffer: 982 """Creates a TLSBuffer that acts as an in-memory channel, 983 and contains information about the TLS exchange 984 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 985 986 ossl_context = _init_context_server(self._configuration) 987 988 return OpenSSLTLSBuffer._create( 989 server_hostname=None, 990 parent_context=self, 991 server_side=True, 992 ssl_context=ossl_context, 993 )
This class controls and creates and creates a socket that is wrapped using the standard library bindings to OpenSSL to perform TLS connections on the server side of a network connection.
959 def __init__(self, configuration: TLSServerConfiguration) -> None: 960 """Create a new context object from a given TLS configuration.""" 961 962 self._configuration = configuration
Create a new context object from a given TLS configuration.
964 @property 965 def configuration(self) -> TLSServerConfiguration: 966 """Returns the TLS configuration that was used to create the context.""" 967 968 return self._configuration
Returns the TLS configuration that was used to create the context.
970 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 971 """Create a socket-like object that can be used to do TLS.""" 972 ossl_context = _init_context_server(self._configuration) 973 974 return OpenSSLTLSSocket._create( 975 parent_context=self, 976 server_side=True, 977 ssl_context=ossl_context, 978 address=address, 979 )
Create a socket-like object that can be used to do TLS.
981 def create_buffer(self) -> OpenSSLTLSBuffer: 982 """Creates a TLSBuffer that acts as an in-memory channel, 983 and contains information about the TLS exchange 984 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 985 986 ossl_context = _init_context_server(self._configuration) 987 988 return OpenSSLTLSBuffer._create( 989 server_hostname=None, 990 parent_context=self, 991 server_side=True, 992 ssl_context=ossl_context, 993 )
Creates a TLSBuffer that acts as an in-memory channel, and contains information about the TLS exchange (cipher, negotiated_protocol, negotiated_tls_version, etc.).
1023def validate_config(tls_config: TLSClientConfiguration | TLSServerConfiguration) -> None: 1024 """Validates whether the OpenSSL TLS implementation supports this TLS configuration.""" 1025 _check_trust_store(tls_config.trust_store) 1026 1027 if isinstance(tls_config, TLSClientConfiguration): 1028 sign_chain = tls_config.certificate_chain 1029 if sign_chain is not None: 1030 _check_sign_chain(sign_chain) 1031 1032 else: 1033 assert isinstance(tls_config, TLSServerConfiguration) 1034 cert_chain = tls_config.certificate_chain 1035 if cert_chain is not None: 1036 for sign_chain in cert_chain: 1037 _check_sign_chain(sign_chain)
Validates whether the OpenSSL TLS implementation supports this TLS configuration.