tlslib.stdlib
Shims the standard library OpenSSL module into the amended PEP 543 API.
1"""Shims the standard library OpenSSL module into the amended PEP 543 API.""" 2 3from __future__ import annotations 4 5import os 6import socket 7import ssl 8import tempfile 9import typing 10import weakref 11from collections.abc import Buffer, Sequence 12from contextlib import contextmanager 13from pathlib import Path 14 15import truststore 16 17from .tlslib import ( 18 DEFAULT_CIPHER_LIST, 19 Certificate, 20 CipherSuite, 21 ConfigurationError, 22 NextProtocol, 23 PrivateKey, 24 RaggedEOF, 25 SigningChain, 26 TLSClientConfiguration, 27 TLSError, 28 TLSImplementation, 29 TLSServerConfiguration, 30 TLSVersion, 31 TrustStore, 32 WantReadError, 33 WantWriteError, 34) 35 36_SSLContext = ssl.SSLContext | truststore.SSLContext 37 38_TLSMinVersionOpts = { 39 TLSVersion.MINIMUM_SUPPORTED: ssl.TLSVersion.MINIMUM_SUPPORTED, 40 TLSVersion.TLSv1_2: ssl.TLSVersion.TLSv1_2, 41 TLSVersion.TLSv1_3: ssl.TLSVersion.TLSv1_3, 42} 43 44_TLSMaxVersionOpts = { 45 TLSVersion.TLSv1_2: ssl.TLSVersion.TLSv1_2, 46 TLSVersion.TLSv1_3: ssl.TLSVersion.TLSv1_3, 47 TLSVersion.MAXIMUM_SUPPORTED: ssl.TLSVersion.MAXIMUM_SUPPORTED, 48} 49 50# We need to populate a dictionary of ciphers that OpenSSL supports, in the 51# form of {16-bit number: OpenSSL suite name}. 52ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 53ctx.set_ciphers("ALL:COMPLEMENTOFALL") 54_cipher_map = {c["id"] & 0xFFFF: c["name"] for c in ctx.get_ciphers()} 55del ctx 56 57 58@contextmanager 59def _error_converter( 60 ignore_filter: tuple[type[Exception]] | tuple[()] = (), 61) -> typing.Generator[None, None, None]: 62 """ 63 Catches errors from the ssl module and wraps them up in TLSError 64 exceptions. Ignores certain kinds of exceptions as requested. 65 """ 66 try: 67 yield 68 except ignore_filter: 69 pass 70 except ssl.SSLWantReadError: 71 raise WantReadError("Must read data") from None 72 except ssl.SSLWantWriteError: 73 raise WantWriteError("Must write data") from None 74 except ssl.SSLEOFError: 75 raise RaggedEOF("Ragged EOF") from None 76 except ssl.SSLError as e: 77 raise TLSError(e) from None 78 79 80def _remove_path(ts_cert_priv: TrustStore | Certificate | PrivateKey) -> None: 81 ts_cert_priv._path = None 82 83 84def _is_system_trust_store(trust_store: TrustStore | None) -> bool: 85 return trust_store is None or ( 86 trust_store._path is None and trust_store._buffer is None and trust_store._id is None 87 ) 88 89 90def _get_path_from_trust_store( 91 context: _SSLContext, trust_store: TrustStore | None 92) -> os.PathLike | None: 93 assert trust_store is not None 94 if trust_store._path is not None: 95 return trust_store._path 96 elif trust_store._buffer is not None: 97 tmp_path = tempfile.NamedTemporaryFile(mode="w+b", delete=False, delete_on_close=False) 98 tmp_path.write(trust_store._buffer) 99 tmp_path.close() 100 # Store this path to prevent creation of multiple files for each trust store 101 trust_store._path = Path(tmp_path.name) 102 weakref.finalize(context, os.remove, tmp_path.name) 103 # Remove the path in case the trust store outlives the context 104 weakref.finalize(context, _remove_path, trust_store) 105 return trust_store._path 106 elif trust_store._id is not None: 107 raise ConfigurationError("This TLS implementation does not support id-based trust stores.") 108 else: 109 return None 110 111 112def _create_client_context_with_trust_store(trust_store: TrustStore | None) -> _SSLContext: 113 some_context: _SSLContext 114 115 if _is_system_trust_store(trust_store): 116 some_context = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 117 else: 118 some_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 119 trust_store_path = _get_path_from_trust_store(some_context, trust_store) 120 some_context.load_verify_locations(trust_store_path) 121 122 # TLS Compression is a security risk and is removed in TLS v1.3 123 some_context.options |= ssl.OP_NO_COMPRESSION 124 125 some_context.verify_flags = ( 126 ssl.VerifyFlags.VERIFY_X509_STRICT | ssl.VerifyFlags.VERIFY_X509_PARTIAL_CHAIN 127 ) 128 129 return some_context 130 131 132def _create_server_context_with_trust_store( 133 trust_store: TrustStore | None, 134) -> ssl.SSLContext: 135 some_context: ssl.SSLContext 136 137 # truststore does not support server side 138 some_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 139 140 if trust_store is not None: 141 some_context.verify_mode = ssl.CERT_REQUIRED 142 trust_store_path = _get_path_from_trust_store(some_context, trust_store) 143 144 if trust_store_path is not None: 145 some_context.load_verify_locations(trust_store_path) 146 else: 147 some_context.load_default_certs(ssl.Purpose.CLIENT_AUTH) 148 149 # TLS Compression is a security risk and is removed in TLS v1.3 150 some_context.options |= ssl.OP_NO_COMPRESSION 151 152 return some_context 153 154 155def _sni_callback_builder( 156 _name_to_chain_map: weakref.WeakValueDictionary[str, SigningChain], 157 original_config: TLSServerConfiguration, 158) -> typing.Callable[[ssl.SSLSocket, str, ssl.SSLContext], ssl.AlertDescription | None]: 159 def pep543_callback( 160 ssl_socket: ssl.SSLSocket, 161 server_name: str, 162 stdlib_context: ssl.SSLContext, 163 ) -> ssl.AlertDescription | None: 164 try: 165 sign_chain = _name_to_chain_map[server_name] 166 except KeyError: 167 return ssl.ALERT_DESCRIPTION_INTERNAL_ERROR 168 169 new_config: TLSServerConfiguration = TLSServerConfiguration( 170 certificate_chain=(sign_chain,), 171 ciphers=original_config.ciphers, 172 inner_protocols=original_config.inner_protocols, 173 lowest_supported_version=original_config.lowest_supported_version, 174 highest_supported_version=original_config.highest_supported_version, 175 trust_store=original_config.trust_store, 176 ) 177 ssl_socket.context = _init_context_server(new_config) 178 179 # Returning None, perversely, is how one signals success from this 180 # function. Will wonders never cease? 181 return None 182 183 return pep543_callback 184 185 186def _configure_server_context_for_certs( 187 context: ssl.SSLContext, 188 cert_chain: Sequence[SigningChain] | None = None, 189 sni_config: TLSServerConfiguration | None = None, 190) -> ssl.SSLContext: 191 if cert_chain is not None: 192 if len(cert_chain) == 1: 193 # Only one SigningChain, no need to configure SNI 194 return _configure_context_for_single_signing_chain(context, cert_chain[0]) 195 196 elif len(cert_chain) > 1: 197 # We have multiple SigningChains, need to configure SNI 198 assert sni_config is not None 199 return _configure_context_for_sni(context, cert_chain, sni_config) 200 201 return context 202 203 204def _get_path_from_cert_or_priv( 205 context: _SSLContext, cert_or_priv: Certificate | PrivateKey 206) -> os.PathLike: 207 if cert_or_priv._path is not None: 208 return cert_or_priv._path 209 elif cert_or_priv._buffer is not None: 210 tmp_path = tempfile.NamedTemporaryFile(mode="w+b", delete=False, delete_on_close=False) 211 tmp_path.write(cert_or_priv._buffer) 212 tmp_path.close() 213 weakref.finalize(context, os.remove, tmp_path.name) 214 # Store the path for future usage, preventing creation of multiple files 215 cert_or_priv._path = Path(tmp_path.name) 216 # Remove the path in case the cert or priv outlives the context 217 weakref.finalize(context, _remove_path, cert_or_priv) 218 return cert_or_priv._path 219 elif cert_or_priv._id is not None: 220 raise ConfigurationError( 221 "This TLS implementation does not support id-based certificates \ 222 or private keys." 223 ) 224 else: 225 raise ConfigurationError("Certificate or PrivateKey cannot be empty.") 226 227 228def _get_bytes_from_cert(cert: Certificate) -> bytes: 229 if cert._buffer is not None: 230 return cert._buffer 231 elif cert._path is not None: 232 # Do not save cert in memory 233 return Path(cert._path).read_bytes() 234 elif cert._id is not None: 235 raise ConfigurationError("This TLS implementation does not support id-based certificates.") 236 else: 237 raise ConfigurationError("Certificate cannot be empty.") 238 239 240def _configure_context_for_single_signing_chain( 241 context: _SSLContext, 242 cert_chain: SigningChain | None = None, 243) -> _SSLContext: 244 """Given a PEP 543 cert chain, configure the SSLContext to send that cert 245 chain in the handshake. 246 247 Returns the context. 248 """ 249 250 if cert_chain is not None: 251 cert = cert_chain.leaf[0] 252 253 if len(cert_chain.chain) == 0: 254 cert_path = _get_path_from_cert_or_priv(context, cert) 255 256 else: 257 with tempfile.NamedTemporaryFile(mode="wb", delete=False) as io: 258 # Write first cert 259 io.write(_get_bytes_from_cert(cert)) 260 261 for cert in cert_chain.chain: 262 io.write(b"\n") 263 io.write(_get_bytes_from_cert(cert)) 264 265 weakref.finalize(context, os.remove, io.name) 266 cert_path = Path(io.name) 267 268 key_path = None 269 if cert_chain.leaf[1] is not None: 270 privkey = cert_chain.leaf[1] 271 272 key_path = _get_path_from_cert_or_priv(context, privkey) 273 274 assert cert_path is not None 275 with _error_converter(): 276 context.load_cert_chain(cert_path, key_path, None) 277 278 return context 279 280 281def _configure_context_for_sni( 282 context: ssl.SSLContext, 283 cert_chain: Sequence[SigningChain], 284 sni_config: TLSServerConfiguration, 285) -> ssl.SSLContext: 286 # This is a mapping of concrete server names to the corresponding SigningChain 287 _name_to_chain_map: weakref.WeakValueDictionary[str, SigningChain] = ( 288 weakref.WeakValueDictionary() 289 ) 290 291 for sign_chain in cert_chain: 292 # Parse leaf certificates to find server names 293 cert = sign_chain.leaf[0] 294 cert_path = _get_path_from_cert_or_priv(context, cert) 295 dec_cert = ssl._ssl._test_decode_cert(cert_path) # type: ignore[attr-defined] 296 297 try: 298 alt_names = dec_cert["subjectAltName"] 299 except KeyError: 300 continue 301 302 server_name = None 303 for name in alt_names: 304 assert len(name) == 2 305 if name[0] == "DNS": 306 server_name = name[1] 307 break 308 309 if server_name is not None: 310 _name_to_chain_map[server_name] = sign_chain 311 312 context.sni_callback = _sni_callback_builder(_name_to_chain_map, sni_config) # type: ignore[assignment] 313 314 return context 315 316 317def _configure_context_for_ciphers( 318 context: _SSLContext, ciphers: Sequence[CipherSuite | int] | None = None 319) -> _SSLContext: 320 """Given a PEP 543 cipher suite list, configure the SSLContext to use those 321 cipher suites. 322 323 Returns the context. 324 """ 325 if ciphers is None: 326 # OpenSSL does not necessarily have system recommended settings 327 # The default cipher list is used here instead 328 ciphers = DEFAULT_CIPHER_LIST 329 330 ossl_names = [_cipher_map[cipher] for cipher in ciphers if cipher in _cipher_map] 331 if not ossl_names: 332 msg = "None of the provided ciphers are supported by the OpenSSL TLS implementation!" 333 raise TLSError(msg) 334 with _error_converter(): 335 context.set_ciphers(":".join(ossl_names)) 336 return context 337 338 339def _configure_context_for_negotiation( 340 context: _SSLContext, 341 inner_protocols: Sequence[NextProtocol | bytes] | None = None, 342) -> _SSLContext: 343 """Given a PEP 543 list of protocols to negotiate, configures the SSLContext 344 to negotiate those protocols. 345 """ 346 if inner_protocols: 347 protocols = [] 348 for np in inner_protocols: 349 proto_string = np if isinstance(np, bytes) else np.value 350 # The protocol string needs to be of type str for the standard 351 # library. 352 protocols.append(proto_string.decode("ascii")) 353 354 context.set_alpn_protocols(protocols) 355 356 return context 357 358 359def _init_context_common( 360 some_context: _SSLContext, 361 config: TLSClientConfiguration | TLSServerConfiguration, 362) -> _SSLContext: 363 some_context = _configure_context_for_ciphers( 364 some_context, 365 config.ciphers, 366 ) 367 some_context = _configure_context_for_negotiation( 368 some_context, 369 config.inner_protocols, 370 ) 371 372 # In lieu of system recommended settings, we default to TLS v1.3 373 lowest_supported_version = config.lowest_supported_version 374 if lowest_supported_version is None: 375 lowest_supported_version = TLSVersion.TLSv1_3 376 377 highest_supported_version = config.highest_supported_version 378 if highest_supported_version is None: 379 highest_supported_version = TLSVersion.MAXIMUM_SUPPORTED 380 381 try: 382 some_context.minimum_version = _TLSMinVersionOpts[lowest_supported_version] 383 some_context.maximum_version = _TLSMaxVersionOpts[highest_supported_version] 384 except KeyError: 385 raise TLSError("Bad maximum/minimum options") 386 387 return some_context 388 389 390def _init_context_client(config: TLSClientConfiguration) -> _SSLContext: 391 """Initialize an SSL context object with a given client configuration.""" 392 some_context = _create_client_context_with_trust_store(config.trust_store) 393 394 some_context = _configure_context_for_single_signing_chain( 395 some_context, config.certificate_chain 396 ) 397 398 return _init_context_common(some_context, config) 399 400 401def _init_context_server(config: TLSServerConfiguration) -> _SSLContext: 402 """Initialize an SSL context object with a given server configuration.""" 403 some_context = _create_server_context_with_trust_store(config.trust_store) 404 405 some_context = _configure_server_context_for_certs( 406 some_context, config.certificate_chain, config 407 ) 408 409 return _init_context_common(some_context, config) 410 411 412class OpenSSLTLSSocket: 413 """A TLSSocket implementation based on OpenSSL.""" 414 415 __slots__ = ( 416 "_parent_context", 417 "_socket", 418 "_ssl_context", 419 ) 420 421 _parent_context: OpenSSLClientContext | OpenSSLServerContext 422 _socket: ssl.SSLSocket 423 _ssl_context: _SSLContext 424 425 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 426 """OpenTLSSockets should not be constructed by the user. 427 Instead, the ClientContext.connect() and 428 ServerContext.connect() use the _create() method.""" 429 msg = ( 430 f"{self.__class__.__name__} does not have a public constructor. " 431 "Instances are returned by ClientContext.connect() or ServerContext.connect()." 432 ) 433 raise TypeError( 434 msg, 435 ) 436 437 @classmethod 438 def _create( 439 cls, 440 address: tuple[str | None, int], 441 parent_context: OpenSSLClientContext | OpenSSLServerContext, 442 server_side: bool, 443 ssl_context: _SSLContext, 444 ) -> OpenSSLTLSSocket: 445 self = cls.__new__(cls) 446 self._parent_context = parent_context 447 self._ssl_context = ssl_context 448 449 if server_side is True: 450 family = socket.getaddrinfo(address[0], address[1], type=socket.SOCK_STREAM)[0][0] 451 sock = socket.create_server(address, family=family) 452 with _error_converter(): 453 self._socket = ssl_context.wrap_socket( 454 sock, server_side=server_side, server_hostname=None 455 ) 456 else: 457 hostname, _ = address 458 sock = socket.create_connection(address) 459 with _error_converter(): 460 self._socket = ssl_context.wrap_socket( 461 sock, server_side=server_side, server_hostname=hostname 462 ) 463 464 self._socket.setblocking(False) 465 466 return self 467 468 def recv(self, bufsize: int) -> bytes: 469 """Receive data from the socket. The return value is a bytes object 470 representing the data received. Should not work before the handshake 471 is completed.""" 472 with _error_converter(): 473 try: 474 return self._socket.recv(bufsize) 475 except ssl.SSLZeroReturnError: 476 return b"" 477 478 def send(self, bytes: bytes) -> int: 479 """Send data to the socket. The socket must be connected to a remote socket.""" 480 with _error_converter(): 481 return self._socket.send(bytes) 482 483 def close(self, force: bool = False) -> None: 484 """Unwraps the TLS connection, shuts down both halves of the connection and 485 mark the socket closed. If force is True, will only shutdown own half and 486 not wait for the other side. If force is False, this will raise WantReadError 487 until the other side sends a close_notify alert.""" 488 489 try: 490 with _error_converter(): 491 sock = self._socket.unwrap() 492 except (ValueError, BrokenPipeError, OSError): 493 # If these exceptions are raised, we close the socket without re-trying to unwrap it. 494 # - ValueError: The socket was actually not wrapped 495 # - BrokenPipeError: There is some issue with the socket 496 # - OSError: The other side already shut down 497 sock = self._socket 498 except WantReadError: 499 if force: 500 sock = self._socket 501 else: 502 raise 503 504 # NOTE: OSError indicates that the other side has already hung up. 505 with _error_converter(ignore_filter=(OSError,)): 506 sock.shutdown(socket.SHUT_RDWR) 507 return sock.close() 508 509 def listen(self, backlog: int) -> None: 510 """Enable a server to accept connections. If backlog is specified, it 511 specifies the number of unaccepted connections that the system will allow 512 before refusing new connections.""" 513 with _error_converter(): 514 return self._socket.listen(backlog) 515 516 def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]: 517 """Accept a connection. The socket must be bound to an address and listening 518 for connections. The return value is a pair (conn, address) where conn is a 519 new TLSSocket object usable to send and receive data on the connection, and 520 address is the address bound to the socket on the other end of the connection.""" 521 522 with _error_converter(): 523 (sock, address) = self._socket.accept() 524 tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket) 525 tls_socket._parent_context = self._parent_context 526 tls_socket._ssl_context = self._ssl_context 527 tls_socket._socket = sock 528 with _error_converter(): 529 tls_socket._socket.setblocking(False) 530 return (tls_socket, address) 531 532 def getsockname(self) -> socket._RetAddress: 533 """Return the local address to which the socket is connected.""" 534 with _error_converter(): 535 return self._socket.getsockname() 536 537 def getpeercert(self) -> bytes | None: 538 """ 539 Return the raw DER bytes of the certificate provided by the peer 540 during the handshake, if applicable. 541 """ 542 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 543 # Obtaining the certificate as a dict is very specific to the ssl module and may be 544 # difficult to implement for other TLS implementations, so this is not supported 545 546 with _error_converter(): 547 cert = self._socket.getpeercert(True) 548 549 return cert 550 551 def getpeername(self) -> socket._RetAddress: 552 """Return the remote address to which the socket is connected.""" 553 554 with _error_converter(): 555 return self._socket.getpeername() 556 557 def fileno(self) -> int: 558 """Return the socket's file descriptor (a small integer), or -1 on failure.""" 559 560 with _error_converter(): 561 return self._socket.fileno() 562 563 @property 564 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 565 """The ``Context`` object this socket is tied to.""" 566 567 return self._parent_context 568 569 def cipher(self) -> CipherSuite | int | None: 570 """ 571 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 572 573 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 574 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 575 """ 576 577 # This is the OpenSSL cipher name. We want the ID, which we can get by 578 # looking for this entry in the context's list of supported ciphers. 579 ret = self._socket.cipher() 580 581 if ret is None: 582 return None 583 else: 584 ossl_cipher, _, _ = ret 585 586 for cipher in self._ssl_context.get_ciphers(): 587 if cipher["name"] == ossl_cipher: 588 break 589 # Since the cipher was negotiated using the OpenSSL context, 590 # it must exist in the list of the OpenSSL supported ciphers. 591 assert cipher["name"] == ossl_cipher 592 593 cipher_id = cipher["id"] & 0xFFFF 594 try: 595 return CipherSuite(cipher_id) 596 except ValueError: 597 return cipher_id 598 599 def negotiated_protocol(self) -> NextProtocol | bytes | None: 600 """ 601 Returns the protocol that was selected during the TLS handshake. 602 603 This selection may have been made using ALPN or some future 604 negotiation mechanism. 605 606 If the negotiated protocol is one of the protocols defined in the 607 ``NextProtocol`` enum, the value from that enum will be returned. 608 Otherwise, the raw bytestring of the negotiated protocol will be 609 returned. 610 611 If ``Context.set_inner_protocols()`` was not called, if the other 612 party does not support protocol negotiation, if this socket does 613 not support any of the peer's proposed protocols, or if the 614 handshake has not happened yet, ``None`` is returned. 615 """ 616 617 proto = self._socket.selected_alpn_protocol() 618 619 # The standard library returns this as a str, we want bytes. 620 if proto is None: 621 return None 622 623 protoBytes = proto.encode("ascii") 624 625 try: 626 return NextProtocol(protoBytes) 627 except ValueError: 628 return protoBytes 629 630 @property 631 def negotiated_tls_version(self) -> TLSVersion | None: 632 """The version of TLS that has been negotiated on this connection.""" 633 634 ossl_version = self._socket.version() 635 if ossl_version is None: 636 return None 637 else: 638 return TLSVersion(ossl_version) 639 640 641class OpenSSLTLSBuffer: 642 """A TLSBuffer implementation based on OpenSSL""" 643 644 __slots__ = ( 645 "_ciphertext_buffer", 646 "_in_bio", 647 "_object", 648 "_out_bio", 649 "_parent_context", 650 "_ssl_context", 651 ) 652 653 _ciphertext_buffer: bytearray 654 _in_bio: ssl.MemoryBIO 655 _object: ssl.SSLObject 656 _out_bio: ssl.MemoryBIO 657 _parent_context: OpenSSLClientContext | OpenSSLServerContext 658 _ssl_context: _SSLContext 659 660 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 661 """OpenTLSBuffers should not be constructed by the user. 662 Instead, the ClientContext.create_buffer() and 663 ServerContext.create_buffer() use the _create() method.""" 664 msg = ( 665 f"{self.__class__.__name__} does not have a public constructor. " 666 "Instances are returned by ClientContext.create_buffer() \ 667 or ServerContext.create_buffer()." 668 ) 669 raise TypeError( 670 msg, 671 ) 672 673 @classmethod 674 def _create( 675 cls, 676 server_hostname: str | None, 677 parent_context: OpenSSLClientContext | OpenSSLServerContext, 678 server_side: bool, 679 ssl_context: _SSLContext, 680 ) -> OpenSSLTLSBuffer: 681 self = cls.__new__(cls) 682 self._parent_context = parent_context 683 self._ssl_context = ssl_context 684 685 # We need this extra buffer to implement the peek/consume API, which 686 # the MemoryBIO object does not allow. 687 self._ciphertext_buffer = bytearray() 688 689 # Set up the SSLObject we're going to back this with. 690 self._in_bio = ssl.MemoryBIO() 691 self._out_bio = ssl.MemoryBIO() 692 693 if server_side is True: 694 with _error_converter(): 695 self._object = ssl_context.wrap_bio( 696 self._in_bio, self._out_bio, server_side=True, server_hostname=None 697 ) 698 else: 699 with _error_converter(): 700 self._object = ssl_context.wrap_bio( 701 self._in_bio, self._out_bio, server_side=False, server_hostname=server_hostname 702 ) 703 704 return self 705 706 def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int: 707 """ 708 Read up to ``amt`` bytes of data from the input buffer and return 709 the result as a ``bytes`` instance. If an optional buffer is 710 provided, the result is written into the buffer and the number of 711 bytes is returned instead. 712 713 Once EOF is reached, all further calls to this method return the 714 empty byte string ``b''``. 715 716 May read "short": that is, fewer bytes may be returned than were 717 requested. 718 719 Raise ``WantReadError`` or ``WantWriteError`` if there is 720 insufficient data in either the input or output buffer and the 721 operation would have caused data to be written or read. 722 723 May raise ``RaggedEOF`` if the connection has been closed without a 724 graceful TLS shutdown. Whether this is an exception that should be 725 ignored or not is up to the specific application. 726 727 As at any time a re-negotiation is possible, a call to ``read()`` 728 can also cause write operations. 729 """ 730 731 with _error_converter(): 732 try: 733 # MyPy insists that buffer must be a bytearray 734 return self._object.read(amt, buffer) # type: ignore[arg-type] 735 except ssl.SSLZeroReturnError: 736 return b"" 737 738 def write(self, buf: Buffer) -> int: 739 """ 740 Write ``buf`` in encrypted form to the output buffer and return the 741 number of bytes written. The ``buf`` argument must be an object 742 supporting the buffer interface. 743 744 Raise ``WantReadError`` or ``WantWriteError`` if there is 745 insufficient data in either the input or output buffer and the 746 operation would have caused data to be written or read. In either 747 case, users should endeavour to resolve that situation and then 748 re-call this method. When re-calling this method users *should* 749 re-use the exact same ``buf`` object, as some TLS implementations 750 require that the exact same buffer be used. 751 752 This operation may write "short": that is, fewer bytes may be 753 written than were in the buffer. 754 755 As at any time a re-negotiation is possible, a call to ``write()`` 756 can also cause read operations. 757 """ 758 759 with _error_converter(): 760 return self._object.write(buf) 761 762 # Get rid and do handshake ourselves? 763 def do_handshake(self) -> None: 764 """ 765 Performs the TLS handshake. Also performs certificate validation 766 and hostname verification. 767 """ 768 769 with _error_converter(): 770 self._object.do_handshake() 771 772 def shutdown(self) -> None: 773 """ 774 Performs a clean TLS shut down. This should generally be used 775 whenever possible to signal to the remote peer that the content is 776 finished. 777 """ 778 779 with _error_converter(): 780 self._object.unwrap() 781 782 def process_incoming(self, data_from_network: bytes) -> None: 783 """ 784 Receives some TLS data from the network and stores it in an 785 internal buffer. 786 787 If the internal buffer is overfull, this method will raise 788 ``WantReadError`` and store no data. At this point, the user must 789 call ``read`` to remove some data from the internal buffer 790 before repeating this call. 791 """ 792 793 with _error_converter(): 794 written_len = self._in_bio.write(data_from_network) 795 796 assert written_len == len(data_from_network) 797 798 def incoming_bytes_buffered(self) -> int: 799 """ 800 Returns how many bytes are in the incoming buffer waiting to be processed. 801 """ 802 803 return self._in_bio.pending 804 805 def process_outgoing(self, amount_bytes_for_network: int) -> bytes: 806 """ 807 Returns the next ``amt`` bytes of data that should be written to 808 the network from the outgoing data buffer, removing it from the 809 internal buffer. 810 """ 811 812 return self._out_bio.read(amount_bytes_for_network) 813 814 def outgoing_bytes_buffered(self) -> int: 815 """ 816 Returns how many bytes are in the outgoing buffer waiting to be sent. 817 """ 818 819 return self._out_bio.pending 820 821 @property 822 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 823 """The ``Context`` object this socket is tied to.""" 824 825 return self._parent_context 826 827 def cipher(self) -> CipherSuite | int | None: 828 """ 829 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 830 831 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 832 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 833 """ 834 835 ret = self._object.cipher() 836 837 if ret is None: 838 return None 839 else: 840 ossl_cipher, _, _ = ret 841 842 for cipher in self._ssl_context.get_ciphers(): 843 if cipher["name"] == ossl_cipher: 844 break 845 # Since the cipher was negotiated using the OpenSSL context, 846 # it must exist in the list of the OpenSSL supported ciphers. 847 assert cipher["name"] == ossl_cipher 848 849 cipher_id = cipher["id"] & 0xFFFF 850 try: 851 return CipherSuite(cipher_id) 852 except ValueError: 853 return cipher_id 854 855 def negotiated_protocol(self) -> NextProtocol | bytes | None: 856 """ 857 Returns the protocol that was selected during the TLS handshake. 858 859 This selection may have been made using ALPN or some future 860 negotiation mechanism. 861 862 If the negotiated protocol is one of the protocols defined in the 863 ``NextProtocol`` enum, the value from that enum will be returned. 864 Otherwise, the raw bytestring of the negotiated protocol will be 865 returned. 866 867 If ``Context.set_inner_protocols()`` was not called, if the other 868 party does not support protocol negotiation, if this socket does 869 not support any of the peer's proposed protocols, or if the 870 handshake has not happened yet, ``None`` is returned. 871 """ 872 873 proto = self._object.selected_alpn_protocol() 874 875 # The standard library returns this as a str, we want bytes. 876 if proto is None: 877 return None 878 879 protoBytes = proto.encode("ascii") 880 881 try: 882 return NextProtocol(protoBytes) 883 except ValueError: 884 return protoBytes 885 886 @property 887 def negotiated_tls_version(self) -> TLSVersion | None: 888 """The version of TLS that has been negotiated on this connection.""" 889 890 ossl_version = self._object.version() 891 if ossl_version is None: 892 return None 893 else: 894 return TLSVersion(ossl_version) 895 896 def getpeercert(self) -> bytes | None: 897 """ 898 Return the raw DER bytes of the certificate provided by the peer 899 during the handshake, if applicable. 900 """ 901 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 902 # Obtaining the certificate as a dict is very specific to the ssl module and may be 903 # difficult to implement for other implementation, so this is not supported 904 with _error_converter(): 905 cert = self._object.getpeercert(True) 906 907 return cert 908 909 910class OpenSSLClientContext: 911 """This class controls and creates a socket that is wrapped using the 912 standard library bindings to OpenSSL to perform TLS connections on the 913 client side of a network connection. 914 """ 915 916 def __init__(self, configuration: TLSClientConfiguration) -> None: 917 """Create a new context object from a given TLS configuration.""" 918 919 self._configuration = configuration 920 921 @property 922 def configuration(self) -> TLSClientConfiguration: 923 """Returns the TLS configuration that was used to create the context.""" 924 925 return self._configuration 926 927 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 928 """Create a socket-like object that can be used to do TLS.""" 929 ossl_context = _init_context_client(self._configuration) 930 931 return OpenSSLTLSSocket._create( 932 parent_context=self, 933 server_side=False, 934 ssl_context=ossl_context, 935 address=address, 936 ) 937 938 def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer: 939 """Creates a TLSBuffer that acts as an in-memory channel, 940 and contains information about the TLS exchange 941 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 942 943 ossl_context = _init_context_client(self._configuration) 944 945 return OpenSSLTLSBuffer._create( 946 server_hostname=server_hostname, 947 parent_context=self, 948 server_side=False, 949 ssl_context=ossl_context, 950 ) 951 952 953class OpenSSLServerContext: 954 """This class controls and creates and creates a socket that is wrapped using the 955 standard library bindings to OpenSSL to perform TLS connections on the 956 server side of a network connection. 957 """ 958 959 def __init__(self, configuration: TLSServerConfiguration) -> None: 960 """Create a new context object from a given TLS configuration.""" 961 962 self._configuration = configuration 963 964 @property 965 def configuration(self) -> TLSServerConfiguration: 966 """Returns the TLS configuration that was used to create the context.""" 967 968 return self._configuration 969 970 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 971 """Create a socket-like object that can be used to do TLS.""" 972 ossl_context = _init_context_server(self._configuration) 973 974 return OpenSSLTLSSocket._create( 975 parent_context=self, 976 server_side=True, 977 ssl_context=ossl_context, 978 address=address, 979 ) 980 981 def create_buffer(self) -> OpenSSLTLSBuffer: 982 """Creates a TLSBuffer that acts as an in-memory channel, 983 and contains information about the TLS exchange 984 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 985 986 ossl_context = _init_context_server(self._configuration) 987 988 return OpenSSLTLSBuffer._create( 989 server_hostname=None, 990 parent_context=self, 991 server_side=True, 992 ssl_context=ossl_context, 993 ) 994 995 996def _check_cert_or_priv(cert_or_priv: Certificate | PrivateKey) -> None: 997 if cert_or_priv._path is not None or cert_or_priv._buffer is not None: 998 return None 999 elif cert_or_priv._id is not None: 1000 raise ConfigurationError( 1001 "This TLS implementation does not support id-based certificates \ 1002 or private keys." 1003 ) 1004 else: 1005 raise ConfigurationError("Certificate or PrivateKey cannot be empty.") 1006 1007 1008def _check_trust_store(trust_store: TrustStore | None) -> None: 1009 if trust_store is not None and trust_store._id is not None: 1010 raise ConfigurationError("This TLS implementation does not support id-based trust stores.") 1011 1012 1013def _check_sign_chain(sign_chain: SigningChain) -> None: 1014 leaf = sign_chain.leaf 1015 _check_cert_or_priv(leaf[0]) 1016 priv_key = leaf[1] 1017 if priv_key is not None: 1018 _check_cert_or_priv(priv_key) 1019 for cert in sign_chain.chain: 1020 _check_cert_or_priv(cert) 1021 1022 1023def validate_config(tls_config: TLSClientConfiguration | TLSServerConfiguration) -> None: 1024 """Validates whether the OpenSSL TLS implementation supports this TLS configuration.""" 1025 _check_trust_store(tls_config.trust_store) 1026 1027 if isinstance(tls_config, TLSClientConfiguration): 1028 sign_chain = tls_config.certificate_chain 1029 if sign_chain is not None: 1030 _check_sign_chain(sign_chain) 1031 1032 else: 1033 assert isinstance(tls_config, TLSServerConfiguration) 1034 cert_chain = tls_config.certificate_chain 1035 if cert_chain is not None: 1036 for sign_chain in cert_chain: 1037 _check_sign_chain(sign_chain) 1038 1039 1040#: The stdlib ``TLSImplementation`` object. 1041STDLIB_IMPLEMENTATION = TLSImplementation( 1042 client_context=OpenSSLClientContext, 1043 server_context=OpenSSLServerContext, 1044 validate_config=validate_config, 1045)
413class OpenSSLTLSSocket: 414 """A TLSSocket implementation based on OpenSSL.""" 415 416 __slots__ = ( 417 "_parent_context", 418 "_socket", 419 "_ssl_context", 420 ) 421 422 _parent_context: OpenSSLClientContext | OpenSSLServerContext 423 _socket: ssl.SSLSocket 424 _ssl_context: _SSLContext 425 426 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 427 """OpenTLSSockets should not be constructed by the user. 428 Instead, the ClientContext.connect() and 429 ServerContext.connect() use the _create() method.""" 430 msg = ( 431 f"{self.__class__.__name__} does not have a public constructor. " 432 "Instances are returned by ClientContext.connect() or ServerContext.connect()." 433 ) 434 raise TypeError( 435 msg, 436 ) 437 438 @classmethod 439 def _create( 440 cls, 441 address: tuple[str | None, int], 442 parent_context: OpenSSLClientContext | OpenSSLServerContext, 443 server_side: bool, 444 ssl_context: _SSLContext, 445 ) -> OpenSSLTLSSocket: 446 self = cls.__new__(cls) 447 self._parent_context = parent_context 448 self._ssl_context = ssl_context 449 450 if server_side is True: 451 family = socket.getaddrinfo(address[0], address[1], type=socket.SOCK_STREAM)[0][0] 452 sock = socket.create_server(address, family=family) 453 with _error_converter(): 454 self._socket = ssl_context.wrap_socket( 455 sock, server_side=server_side, server_hostname=None 456 ) 457 else: 458 hostname, _ = address 459 sock = socket.create_connection(address) 460 with _error_converter(): 461 self._socket = ssl_context.wrap_socket( 462 sock, server_side=server_side, server_hostname=hostname 463 ) 464 465 self._socket.setblocking(False) 466 467 return self 468 469 def recv(self, bufsize: int) -> bytes: 470 """Receive data from the socket. The return value is a bytes object 471 representing the data received. Should not work before the handshake 472 is completed.""" 473 with _error_converter(): 474 try: 475 return self._socket.recv(bufsize) 476 except ssl.SSLZeroReturnError: 477 return b"" 478 479 def send(self, bytes: bytes) -> int: 480 """Send data to the socket. The socket must be connected to a remote socket.""" 481 with _error_converter(): 482 return self._socket.send(bytes) 483 484 def close(self, force: bool = False) -> None: 485 """Unwraps the TLS connection, shuts down both halves of the connection and 486 mark the socket closed. If force is True, will only shutdown own half and 487 not wait for the other side. If force is False, this will raise WantReadError 488 until the other side sends a close_notify alert.""" 489 490 try: 491 with _error_converter(): 492 sock = self._socket.unwrap() 493 except (ValueError, BrokenPipeError, OSError): 494 # If these exceptions are raised, we close the socket without re-trying to unwrap it. 495 # - ValueError: The socket was actually not wrapped 496 # - BrokenPipeError: There is some issue with the socket 497 # - OSError: The other side already shut down 498 sock = self._socket 499 except WantReadError: 500 if force: 501 sock = self._socket 502 else: 503 raise 504 505 # NOTE: OSError indicates that the other side has already hung up. 506 with _error_converter(ignore_filter=(OSError,)): 507 sock.shutdown(socket.SHUT_RDWR) 508 return sock.close() 509 510 def listen(self, backlog: int) -> None: 511 """Enable a server to accept connections. If backlog is specified, it 512 specifies the number of unaccepted connections that the system will allow 513 before refusing new connections.""" 514 with _error_converter(): 515 return self._socket.listen(backlog) 516 517 def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]: 518 """Accept a connection. The socket must be bound to an address and listening 519 for connections. The return value is a pair (conn, address) where conn is a 520 new TLSSocket object usable to send and receive data on the connection, and 521 address is the address bound to the socket on the other end of the connection.""" 522 523 with _error_converter(): 524 (sock, address) = self._socket.accept() 525 tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket) 526 tls_socket._parent_context = self._parent_context 527 tls_socket._ssl_context = self._ssl_context 528 tls_socket._socket = sock 529 with _error_converter(): 530 tls_socket._socket.setblocking(False) 531 return (tls_socket, address) 532 533 def getsockname(self) -> socket._RetAddress: 534 """Return the local address to which the socket is connected.""" 535 with _error_converter(): 536 return self._socket.getsockname() 537 538 def getpeercert(self) -> bytes | None: 539 """ 540 Return the raw DER bytes of the certificate provided by the peer 541 during the handshake, if applicable. 542 """ 543 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 544 # Obtaining the certificate as a dict is very specific to the ssl module and may be 545 # difficult to implement for other TLS implementations, so this is not supported 546 547 with _error_converter(): 548 cert = self._socket.getpeercert(True) 549 550 return cert 551 552 def getpeername(self) -> socket._RetAddress: 553 """Return the remote address to which the socket is connected.""" 554 555 with _error_converter(): 556 return self._socket.getpeername() 557 558 def fileno(self) -> int: 559 """Return the socket's file descriptor (a small integer), or -1 on failure.""" 560 561 with _error_converter(): 562 return self._socket.fileno() 563 564 @property 565 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 566 """The ``Context`` object this socket is tied to.""" 567 568 return self._parent_context 569 570 def cipher(self) -> CipherSuite | int | None: 571 """ 572 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 573 574 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 575 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 576 """ 577 578 # This is the OpenSSL cipher name. We want the ID, which we can get by 579 # looking for this entry in the context's list of supported ciphers. 580 ret = self._socket.cipher() 581 582 if ret is None: 583 return None 584 else: 585 ossl_cipher, _, _ = ret 586 587 for cipher in self._ssl_context.get_ciphers(): 588 if cipher["name"] == ossl_cipher: 589 break 590 # Since the cipher was negotiated using the OpenSSL context, 591 # it must exist in the list of the OpenSSL supported ciphers. 592 assert cipher["name"] == ossl_cipher 593 594 cipher_id = cipher["id"] & 0xFFFF 595 try: 596 return CipherSuite(cipher_id) 597 except ValueError: 598 return cipher_id 599 600 def negotiated_protocol(self) -> NextProtocol | bytes | None: 601 """ 602 Returns the protocol that was selected during the TLS handshake. 603 604 This selection may have been made using ALPN or some future 605 negotiation mechanism. 606 607 If the negotiated protocol is one of the protocols defined in the 608 ``NextProtocol`` enum, the value from that enum will be returned. 609 Otherwise, the raw bytestring of the negotiated protocol will be 610 returned. 611 612 If ``Context.set_inner_protocols()`` was not called, if the other 613 party does not support protocol negotiation, if this socket does 614 not support any of the peer's proposed protocols, or if the 615 handshake has not happened yet, ``None`` is returned. 616 """ 617 618 proto = self._socket.selected_alpn_protocol() 619 620 # The standard library returns this as a str, we want bytes. 621 if proto is None: 622 return None 623 624 protoBytes = proto.encode("ascii") 625 626 try: 627 return NextProtocol(protoBytes) 628 except ValueError: 629 return protoBytes 630 631 @property 632 def negotiated_tls_version(self) -> TLSVersion | None: 633 """The version of TLS that has been negotiated on this connection.""" 634 635 ossl_version = self._socket.version() 636 if ossl_version is None: 637 return None 638 else: 639 return TLSVersion(ossl_version)
A TLSSocket implementation based on OpenSSL.
426 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 427 """OpenTLSSockets should not be constructed by the user. 428 Instead, the ClientContext.connect() and 429 ServerContext.connect() use the _create() method.""" 430 msg = ( 431 f"{self.__class__.__name__} does not have a public constructor. " 432 "Instances are returned by ClientContext.connect() or ServerContext.connect()." 433 ) 434 raise TypeError( 435 msg, 436 )
OpenTLSSockets should not be constructed by the user. Instead, the ClientContext.connect() and ServerContext.connect() use the _create() method.
469 def recv(self, bufsize: int) -> bytes: 470 """Receive data from the socket. The return value is a bytes object 471 representing the data received. Should not work before the handshake 472 is completed.""" 473 with _error_converter(): 474 try: 475 return self._socket.recv(bufsize) 476 except ssl.SSLZeroReturnError: 477 return b""
Receive data from the socket. The return value is a bytes object representing the data received. Should not work before the handshake is completed.
479 def send(self, bytes: bytes) -> int: 480 """Send data to the socket. The socket must be connected to a remote socket.""" 481 with _error_converter(): 482 return self._socket.send(bytes)
Send data to the socket. The socket must be connected to a remote socket.
484 def close(self, force: bool = False) -> None: 485 """Unwraps the TLS connection, shuts down both halves of the connection and 486 mark the socket closed. If force is True, will only shutdown own half and 487 not wait for the other side. If force is False, this will raise WantReadError 488 until the other side sends a close_notify alert.""" 489 490 try: 491 with _error_converter(): 492 sock = self._socket.unwrap() 493 except (ValueError, BrokenPipeError, OSError): 494 # If these exceptions are raised, we close the socket without re-trying to unwrap it. 495 # - ValueError: The socket was actually not wrapped 496 # - BrokenPipeError: There is some issue with the socket 497 # - OSError: The other side already shut down 498 sock = self._socket 499 except WantReadError: 500 if force: 501 sock = self._socket 502 else: 503 raise 504 505 # NOTE: OSError indicates that the other side has already hung up. 506 with _error_converter(ignore_filter=(OSError,)): 507 sock.shutdown(socket.SHUT_RDWR) 508 return sock.close()
Unwraps the TLS connection, shuts down both halves of the connection and mark the socket closed. If force is True, will only shutdown own half and not wait for the other side. If force is False, this will raise WantReadError until the other side sends a close_notify alert.
510 def listen(self, backlog: int) -> None: 511 """Enable a server to accept connections. If backlog is specified, it 512 specifies the number of unaccepted connections that the system will allow 513 before refusing new connections.""" 514 with _error_converter(): 515 return self._socket.listen(backlog)
Enable a server to accept connections. If backlog is specified, it specifies the number of unaccepted connections that the system will allow before refusing new connections.
517 def accept(self) -> tuple[OpenSSLTLSSocket, socket._RetAddress]: 518 """Accept a connection. The socket must be bound to an address and listening 519 for connections. The return value is a pair (conn, address) where conn is a 520 new TLSSocket object usable to send and receive data on the connection, and 521 address is the address bound to the socket on the other end of the connection.""" 522 523 with _error_converter(): 524 (sock, address) = self._socket.accept() 525 tls_socket = OpenSSLTLSSocket.__new__(OpenSSLTLSSocket) 526 tls_socket._parent_context = self._parent_context 527 tls_socket._ssl_context = self._ssl_context 528 tls_socket._socket = sock 529 with _error_converter(): 530 tls_socket._socket.setblocking(False) 531 return (tls_socket, address)
Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair (conn, address) where conn is a new TLSSocket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection.
533 def getsockname(self) -> socket._RetAddress: 534 """Return the local address to which the socket is connected.""" 535 with _error_converter(): 536 return self._socket.getsockname()
Return the local address to which the socket is connected.
538 def getpeercert(self) -> bytes | None: 539 """ 540 Return the raw DER bytes of the certificate provided by the peer 541 during the handshake, if applicable. 542 """ 543 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 544 # Obtaining the certificate as a dict is very specific to the ssl module and may be 545 # difficult to implement for other TLS implementations, so this is not supported 546 547 with _error_converter(): 548 cert = self._socket.getpeercert(True) 549 550 return cert
Return the raw DER bytes of the certificate provided by the peer during the handshake, if applicable.
552 def getpeername(self) -> socket._RetAddress: 553 """Return the remote address to which the socket is connected.""" 554 555 with _error_converter(): 556 return self._socket.getpeername()
Return the remote address to which the socket is connected.
558 def fileno(self) -> int: 559 """Return the socket's file descriptor (a small integer), or -1 on failure.""" 560 561 with _error_converter(): 562 return self._socket.fileno()
Return the socket's file descriptor (a small integer), or -1 on failure.
564 @property 565 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 566 """The ``Context`` object this socket is tied to.""" 567 568 return self._parent_context
The Context object this socket is tied to.
570 def cipher(self) -> CipherSuite | int | None: 571 """ 572 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 573 574 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 575 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 576 """ 577 578 # This is the OpenSSL cipher name. We want the ID, which we can get by 579 # looking for this entry in the context's list of supported ciphers. 580 ret = self._socket.cipher() 581 582 if ret is None: 583 return None 584 else: 585 ossl_cipher, _, _ = ret 586 587 for cipher in self._ssl_context.get_ciphers(): 588 if cipher["name"] == ossl_cipher: 589 break 590 # Since the cipher was negotiated using the OpenSSL context, 591 # it must exist in the list of the OpenSSL supported ciphers. 592 assert cipher["name"] == ossl_cipher 593 594 cipher_id = cipher["id"] & 0xFFFF 595 try: 596 return CipherSuite(cipher_id) 597 except ValueError: 598 return cipher_id
Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
If no connection has been negotiated, returns None. If the cipher negotiated is not
defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
600 def negotiated_protocol(self) -> NextProtocol | bytes | None: 601 """ 602 Returns the protocol that was selected during the TLS handshake. 603 604 This selection may have been made using ALPN or some future 605 negotiation mechanism. 606 607 If the negotiated protocol is one of the protocols defined in the 608 ``NextProtocol`` enum, the value from that enum will be returned. 609 Otherwise, the raw bytestring of the negotiated protocol will be 610 returned. 611 612 If ``Context.set_inner_protocols()`` was not called, if the other 613 party does not support protocol negotiation, if this socket does 614 not support any of the peer's proposed protocols, or if the 615 handshake has not happened yet, ``None`` is returned. 616 """ 617 618 proto = self._socket.selected_alpn_protocol() 619 620 # The standard library returns this as a str, we want bytes. 621 if proto is None: 622 return None 623 624 protoBytes = proto.encode("ascii") 625 626 try: 627 return NextProtocol(protoBytes) 628 except ValueError: 629 return protoBytes
Returns the protocol that was selected during the TLS handshake.
This selection may have been made using ALPN or some future negotiation mechanism.
If the negotiated protocol is one of the protocols defined in the
NextProtocol enum, the value from that enum will be returned.
Otherwise, the raw bytestring of the negotiated protocol will be
returned.
If Context.set_inner_protocols() was not called, if the other
party does not support protocol negotiation, if this socket does
not support any of the peer's proposed protocols, or if the
handshake has not happened yet, None is returned.
631 @property 632 def negotiated_tls_version(self) -> TLSVersion | None: 633 """The version of TLS that has been negotiated on this connection.""" 634 635 ossl_version = self._socket.version() 636 if ossl_version is None: 637 return None 638 else: 639 return TLSVersion(ossl_version)
The version of TLS that has been negotiated on this connection.
642class OpenSSLTLSBuffer: 643 """A TLSBuffer implementation based on OpenSSL""" 644 645 __slots__ = ( 646 "_ciphertext_buffer", 647 "_in_bio", 648 "_object", 649 "_out_bio", 650 "_parent_context", 651 "_ssl_context", 652 ) 653 654 _ciphertext_buffer: bytearray 655 _in_bio: ssl.MemoryBIO 656 _object: ssl.SSLObject 657 _out_bio: ssl.MemoryBIO 658 _parent_context: OpenSSLClientContext | OpenSSLServerContext 659 _ssl_context: _SSLContext 660 661 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 662 """OpenTLSBuffers should not be constructed by the user. 663 Instead, the ClientContext.create_buffer() and 664 ServerContext.create_buffer() use the _create() method.""" 665 msg = ( 666 f"{self.__class__.__name__} does not have a public constructor. " 667 "Instances are returned by ClientContext.create_buffer() \ 668 or ServerContext.create_buffer()." 669 ) 670 raise TypeError( 671 msg, 672 ) 673 674 @classmethod 675 def _create( 676 cls, 677 server_hostname: str | None, 678 parent_context: OpenSSLClientContext | OpenSSLServerContext, 679 server_side: bool, 680 ssl_context: _SSLContext, 681 ) -> OpenSSLTLSBuffer: 682 self = cls.__new__(cls) 683 self._parent_context = parent_context 684 self._ssl_context = ssl_context 685 686 # We need this extra buffer to implement the peek/consume API, which 687 # the MemoryBIO object does not allow. 688 self._ciphertext_buffer = bytearray() 689 690 # Set up the SSLObject we're going to back this with. 691 self._in_bio = ssl.MemoryBIO() 692 self._out_bio = ssl.MemoryBIO() 693 694 if server_side is True: 695 with _error_converter(): 696 self._object = ssl_context.wrap_bio( 697 self._in_bio, self._out_bio, server_side=True, server_hostname=None 698 ) 699 else: 700 with _error_converter(): 701 self._object = ssl_context.wrap_bio( 702 self._in_bio, self._out_bio, server_side=False, server_hostname=server_hostname 703 ) 704 705 return self 706 707 def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int: 708 """ 709 Read up to ``amt`` bytes of data from the input buffer and return 710 the result as a ``bytes`` instance. If an optional buffer is 711 provided, the result is written into the buffer and the number of 712 bytes is returned instead. 713 714 Once EOF is reached, all further calls to this method return the 715 empty byte string ``b''``. 716 717 May read "short": that is, fewer bytes may be returned than were 718 requested. 719 720 Raise ``WantReadError`` or ``WantWriteError`` if there is 721 insufficient data in either the input or output buffer and the 722 operation would have caused data to be written or read. 723 724 May raise ``RaggedEOF`` if the connection has been closed without a 725 graceful TLS shutdown. Whether this is an exception that should be 726 ignored or not is up to the specific application. 727 728 As at any time a re-negotiation is possible, a call to ``read()`` 729 can also cause write operations. 730 """ 731 732 with _error_converter(): 733 try: 734 # MyPy insists that buffer must be a bytearray 735 return self._object.read(amt, buffer) # type: ignore[arg-type] 736 except ssl.SSLZeroReturnError: 737 return b"" 738 739 def write(self, buf: Buffer) -> int: 740 """ 741 Write ``buf`` in encrypted form to the output buffer and return the 742 number of bytes written. The ``buf`` argument must be an object 743 supporting the buffer interface. 744 745 Raise ``WantReadError`` or ``WantWriteError`` if there is 746 insufficient data in either the input or output buffer and the 747 operation would have caused data to be written or read. In either 748 case, users should endeavour to resolve that situation and then 749 re-call this method. When re-calling this method users *should* 750 re-use the exact same ``buf`` object, as some TLS implementations 751 require that the exact same buffer be used. 752 753 This operation may write "short": that is, fewer bytes may be 754 written than were in the buffer. 755 756 As at any time a re-negotiation is possible, a call to ``write()`` 757 can also cause read operations. 758 """ 759 760 with _error_converter(): 761 return self._object.write(buf) 762 763 # Get rid and do handshake ourselves? 764 def do_handshake(self) -> None: 765 """ 766 Performs the TLS handshake. Also performs certificate validation 767 and hostname verification. 768 """ 769 770 with _error_converter(): 771 self._object.do_handshake() 772 773 def shutdown(self) -> None: 774 """ 775 Performs a clean TLS shut down. This should generally be used 776 whenever possible to signal to the remote peer that the content is 777 finished. 778 """ 779 780 with _error_converter(): 781 self._object.unwrap() 782 783 def process_incoming(self, data_from_network: bytes) -> None: 784 """ 785 Receives some TLS data from the network and stores it in an 786 internal buffer. 787 788 If the internal buffer is overfull, this method will raise 789 ``WantReadError`` and store no data. At this point, the user must 790 call ``read`` to remove some data from the internal buffer 791 before repeating this call. 792 """ 793 794 with _error_converter(): 795 written_len = self._in_bio.write(data_from_network) 796 797 assert written_len == len(data_from_network) 798 799 def incoming_bytes_buffered(self) -> int: 800 """ 801 Returns how many bytes are in the incoming buffer waiting to be processed. 802 """ 803 804 return self._in_bio.pending 805 806 def process_outgoing(self, amount_bytes_for_network: int) -> bytes: 807 """ 808 Returns the next ``amt`` bytes of data that should be written to 809 the network from the outgoing data buffer, removing it from the 810 internal buffer. 811 """ 812 813 return self._out_bio.read(amount_bytes_for_network) 814 815 def outgoing_bytes_buffered(self) -> int: 816 """ 817 Returns how many bytes are in the outgoing buffer waiting to be sent. 818 """ 819 820 return self._out_bio.pending 821 822 @property 823 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 824 """The ``Context`` object this socket is tied to.""" 825 826 return self._parent_context 827 828 def cipher(self) -> CipherSuite | int | None: 829 """ 830 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 831 832 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 833 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 834 """ 835 836 ret = self._object.cipher() 837 838 if ret is None: 839 return None 840 else: 841 ossl_cipher, _, _ = ret 842 843 for cipher in self._ssl_context.get_ciphers(): 844 if cipher["name"] == ossl_cipher: 845 break 846 # Since the cipher was negotiated using the OpenSSL context, 847 # it must exist in the list of the OpenSSL supported ciphers. 848 assert cipher["name"] == ossl_cipher 849 850 cipher_id = cipher["id"] & 0xFFFF 851 try: 852 return CipherSuite(cipher_id) 853 except ValueError: 854 return cipher_id 855 856 def negotiated_protocol(self) -> NextProtocol | bytes | None: 857 """ 858 Returns the protocol that was selected during the TLS handshake. 859 860 This selection may have been made using ALPN or some future 861 negotiation mechanism. 862 863 If the negotiated protocol is one of the protocols defined in the 864 ``NextProtocol`` enum, the value from that enum will be returned. 865 Otherwise, the raw bytestring of the negotiated protocol will be 866 returned. 867 868 If ``Context.set_inner_protocols()`` was not called, if the other 869 party does not support protocol negotiation, if this socket does 870 not support any of the peer's proposed protocols, or if the 871 handshake has not happened yet, ``None`` is returned. 872 """ 873 874 proto = self._object.selected_alpn_protocol() 875 876 # The standard library returns this as a str, we want bytes. 877 if proto is None: 878 return None 879 880 protoBytes = proto.encode("ascii") 881 882 try: 883 return NextProtocol(protoBytes) 884 except ValueError: 885 return protoBytes 886 887 @property 888 def negotiated_tls_version(self) -> TLSVersion | None: 889 """The version of TLS that has been negotiated on this connection.""" 890 891 ossl_version = self._object.version() 892 if ossl_version is None: 893 return None 894 else: 895 return TLSVersion(ossl_version) 896 897 def getpeercert(self) -> bytes | None: 898 """ 899 Return the raw DER bytes of the certificate provided by the peer 900 during the handshake, if applicable. 901 """ 902 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 903 # Obtaining the certificate as a dict is very specific to the ssl module and may be 904 # difficult to implement for other implementation, so this is not supported 905 with _error_converter(): 906 cert = self._object.getpeercert(True) 907 908 return cert
A TLSBuffer implementation based on OpenSSL
661 def __init__(self, *args: tuple, **kwargs: tuple) -> None: 662 """OpenTLSBuffers should not be constructed by the user. 663 Instead, the ClientContext.create_buffer() and 664 ServerContext.create_buffer() use the _create() method.""" 665 msg = ( 666 f"{self.__class__.__name__} does not have a public constructor. " 667 "Instances are returned by ClientContext.create_buffer() \ 668 or ServerContext.create_buffer()." 669 ) 670 raise TypeError( 671 msg, 672 )
OpenTLSBuffers should not be constructed by the user. Instead, the ClientContext.create_buffer() and ServerContext.create_buffer() use the _create() method.
707 def read(self, amt: int, buffer: Buffer | None = None) -> bytes | int: 708 """ 709 Read up to ``amt`` bytes of data from the input buffer and return 710 the result as a ``bytes`` instance. If an optional buffer is 711 provided, the result is written into the buffer and the number of 712 bytes is returned instead. 713 714 Once EOF is reached, all further calls to this method return the 715 empty byte string ``b''``. 716 717 May read "short": that is, fewer bytes may be returned than were 718 requested. 719 720 Raise ``WantReadError`` or ``WantWriteError`` if there is 721 insufficient data in either the input or output buffer and the 722 operation would have caused data to be written or read. 723 724 May raise ``RaggedEOF`` if the connection has been closed without a 725 graceful TLS shutdown. Whether this is an exception that should be 726 ignored or not is up to the specific application. 727 728 As at any time a re-negotiation is possible, a call to ``read()`` 729 can also cause write operations. 730 """ 731 732 with _error_converter(): 733 try: 734 # MyPy insists that buffer must be a bytearray 735 return self._object.read(amt, buffer) # type: ignore[arg-type] 736 except ssl.SSLZeroReturnError: 737 return b""
Read up to amt bytes of data from the input buffer and return
the result as a bytes instance. If an optional buffer is
provided, the result is written into the buffer and the number of
bytes is returned instead.
Once EOF is reached, all further calls to this method return the
empty byte string b''.
May read "short": that is, fewer bytes may be returned than were requested.
Raise WantReadError or WantWriteError if there is
insufficient data in either the input or output buffer and the
operation would have caused data to be written or read.
May raise RaggedEOF if the connection has been closed without a
graceful TLS shutdown. Whether this is an exception that should be
ignored or not is up to the specific application.
As at any time a re-negotiation is possible, a call to read()
can also cause write operations.
739 def write(self, buf: Buffer) -> int: 740 """ 741 Write ``buf`` in encrypted form to the output buffer and return the 742 number of bytes written. The ``buf`` argument must be an object 743 supporting the buffer interface. 744 745 Raise ``WantReadError`` or ``WantWriteError`` if there is 746 insufficient data in either the input or output buffer and the 747 operation would have caused data to be written or read. In either 748 case, users should endeavour to resolve that situation and then 749 re-call this method. When re-calling this method users *should* 750 re-use the exact same ``buf`` object, as some TLS implementations 751 require that the exact same buffer be used. 752 753 This operation may write "short": that is, fewer bytes may be 754 written than were in the buffer. 755 756 As at any time a re-negotiation is possible, a call to ``write()`` 757 can also cause read operations. 758 """ 759 760 with _error_converter(): 761 return self._object.write(buf)
Write buf in encrypted form to the output buffer and return the
number of bytes written. The buf argument must be an object
supporting the buffer interface.
Raise WantReadError or WantWriteError if there is
insufficient data in either the input or output buffer and the
operation would have caused data to be written or read. In either
case, users should endeavour to resolve that situation and then
re-call this method. When re-calling this method users should
re-use the exact same buf object, as some TLS implementations
require that the exact same buffer be used.
This operation may write "short": that is, fewer bytes may be written than were in the buffer.
As at any time a re-negotiation is possible, a call to write()
can also cause read operations.
764 def do_handshake(self) -> None: 765 """ 766 Performs the TLS handshake. Also performs certificate validation 767 and hostname verification. 768 """ 769 770 with _error_converter(): 771 self._object.do_handshake()
Performs the TLS handshake. Also performs certificate validation and hostname verification.
773 def shutdown(self) -> None: 774 """ 775 Performs a clean TLS shut down. This should generally be used 776 whenever possible to signal to the remote peer that the content is 777 finished. 778 """ 779 780 with _error_converter(): 781 self._object.unwrap()
Performs a clean TLS shut down. This should generally be used whenever possible to signal to the remote peer that the content is finished.
783 def process_incoming(self, data_from_network: bytes) -> None: 784 """ 785 Receives some TLS data from the network and stores it in an 786 internal buffer. 787 788 If the internal buffer is overfull, this method will raise 789 ``WantReadError`` and store no data. At this point, the user must 790 call ``read`` to remove some data from the internal buffer 791 before repeating this call. 792 """ 793 794 with _error_converter(): 795 written_len = self._in_bio.write(data_from_network) 796 797 assert written_len == len(data_from_network)
Receives some TLS data from the network and stores it in an internal buffer.
If the internal buffer is overfull, this method will raise
WantReadError and store no data. At this point, the user must
call read to remove some data from the internal buffer
before repeating this call.
799 def incoming_bytes_buffered(self) -> int: 800 """ 801 Returns how many bytes are in the incoming buffer waiting to be processed. 802 """ 803 804 return self._in_bio.pending
Returns how many bytes are in the incoming buffer waiting to be processed.
806 def process_outgoing(self, amount_bytes_for_network: int) -> bytes: 807 """ 808 Returns the next ``amt`` bytes of data that should be written to 809 the network from the outgoing data buffer, removing it from the 810 internal buffer. 811 """ 812 813 return self._out_bio.read(amount_bytes_for_network)
Returns the next amt bytes of data that should be written to
the network from the outgoing data buffer, removing it from the
internal buffer.
815 def outgoing_bytes_buffered(self) -> int: 816 """ 817 Returns how many bytes are in the outgoing buffer waiting to be sent. 818 """ 819 820 return self._out_bio.pending
Returns how many bytes are in the outgoing buffer waiting to be sent.
822 @property 823 def context(self) -> OpenSSLClientContext | OpenSSLServerContext: 824 """The ``Context`` object this socket is tied to.""" 825 826 return self._parent_context
The Context object this socket is tied to.
828 def cipher(self) -> CipherSuite | int | None: 829 """ 830 Returns the CipherSuite entry for the cipher that has been negotiated on the connection. 831 832 If no connection has been negotiated, returns ``None``. If the cipher negotiated is not 833 defined in CipherSuite, returns the 16-bit integer representing that cipher directly. 834 """ 835 836 ret = self._object.cipher() 837 838 if ret is None: 839 return None 840 else: 841 ossl_cipher, _, _ = ret 842 843 for cipher in self._ssl_context.get_ciphers(): 844 if cipher["name"] == ossl_cipher: 845 break 846 # Since the cipher was negotiated using the OpenSSL context, 847 # it must exist in the list of the OpenSSL supported ciphers. 848 assert cipher["name"] == ossl_cipher 849 850 cipher_id = cipher["id"] & 0xFFFF 851 try: 852 return CipherSuite(cipher_id) 853 except ValueError: 854 return cipher_id
Returns the CipherSuite entry for the cipher that has been negotiated on the connection.
If no connection has been negotiated, returns None. If the cipher negotiated is not
defined in CipherSuite, returns the 16-bit integer representing that cipher directly.
856 def negotiated_protocol(self) -> NextProtocol | bytes | None: 857 """ 858 Returns the protocol that was selected during the TLS handshake. 859 860 This selection may have been made using ALPN or some future 861 negotiation mechanism. 862 863 If the negotiated protocol is one of the protocols defined in the 864 ``NextProtocol`` enum, the value from that enum will be returned. 865 Otherwise, the raw bytestring of the negotiated protocol will be 866 returned. 867 868 If ``Context.set_inner_protocols()`` was not called, if the other 869 party does not support protocol negotiation, if this socket does 870 not support any of the peer's proposed protocols, or if the 871 handshake has not happened yet, ``None`` is returned. 872 """ 873 874 proto = self._object.selected_alpn_protocol() 875 876 # The standard library returns this as a str, we want bytes. 877 if proto is None: 878 return None 879 880 protoBytes = proto.encode("ascii") 881 882 try: 883 return NextProtocol(protoBytes) 884 except ValueError: 885 return protoBytes
Returns the protocol that was selected during the TLS handshake.
This selection may have been made using ALPN or some future negotiation mechanism.
If the negotiated protocol is one of the protocols defined in the
NextProtocol enum, the value from that enum will be returned.
Otherwise, the raw bytestring of the negotiated protocol will be
returned.
If Context.set_inner_protocols() was not called, if the other
party does not support protocol negotiation, if this socket does
not support any of the peer's proposed protocols, or if the
handshake has not happened yet, None is returned.
887 @property 888 def negotiated_tls_version(self) -> TLSVersion | None: 889 """The version of TLS that has been negotiated on this connection.""" 890 891 ossl_version = self._object.version() 892 if ossl_version is None: 893 return None 894 else: 895 return TLSVersion(ossl_version)
The version of TLS that has been negotiated on this connection.
897 def getpeercert(self) -> bytes | None: 898 """ 899 Return the raw DER bytes of the certificate provided by the peer 900 during the handshake, if applicable. 901 """ 902 # In order to return an OpenSSLCertificate, we must obtain the certificate in binary format 903 # Obtaining the certificate as a dict is very specific to the ssl module and may be 904 # difficult to implement for other implementation, so this is not supported 905 with _error_converter(): 906 cert = self._object.getpeercert(True) 907 908 return cert
Return the raw DER bytes of the certificate provided by the peer during the handshake, if applicable.
911class OpenSSLClientContext: 912 """This class controls and creates a socket that is wrapped using the 913 standard library bindings to OpenSSL to perform TLS connections on the 914 client side of a network connection. 915 """ 916 917 def __init__(self, configuration: TLSClientConfiguration) -> None: 918 """Create a new context object from a given TLS configuration.""" 919 920 self._configuration = configuration 921 922 @property 923 def configuration(self) -> TLSClientConfiguration: 924 """Returns the TLS configuration that was used to create the context.""" 925 926 return self._configuration 927 928 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 929 """Create a socket-like object that can be used to do TLS.""" 930 ossl_context = _init_context_client(self._configuration) 931 932 return OpenSSLTLSSocket._create( 933 parent_context=self, 934 server_side=False, 935 ssl_context=ossl_context, 936 address=address, 937 ) 938 939 def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer: 940 """Creates a TLSBuffer that acts as an in-memory channel, 941 and contains information about the TLS exchange 942 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 943 944 ossl_context = _init_context_client(self._configuration) 945 946 return OpenSSLTLSBuffer._create( 947 server_hostname=server_hostname, 948 parent_context=self, 949 server_side=False, 950 ssl_context=ossl_context, 951 )
This class controls and creates a socket that is wrapped using the standard library bindings to OpenSSL to perform TLS connections on the client side of a network connection.
917 def __init__(self, configuration: TLSClientConfiguration) -> None: 918 """Create a new context object from a given TLS configuration.""" 919 920 self._configuration = configuration
Create a new context object from a given TLS configuration.
922 @property 923 def configuration(self) -> TLSClientConfiguration: 924 """Returns the TLS configuration that was used to create the context.""" 925 926 return self._configuration
Returns the TLS configuration that was used to create the context.
928 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 929 """Create a socket-like object that can be used to do TLS.""" 930 ossl_context = _init_context_client(self._configuration) 931 932 return OpenSSLTLSSocket._create( 933 parent_context=self, 934 server_side=False, 935 ssl_context=ossl_context, 936 address=address, 937 )
Create a socket-like object that can be used to do TLS.
939 def create_buffer(self, server_hostname: str) -> OpenSSLTLSBuffer: 940 """Creates a TLSBuffer that acts as an in-memory channel, 941 and contains information about the TLS exchange 942 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 943 944 ossl_context = _init_context_client(self._configuration) 945 946 return OpenSSLTLSBuffer._create( 947 server_hostname=server_hostname, 948 parent_context=self, 949 server_side=False, 950 ssl_context=ossl_context, 951 )
Creates a TLSBuffer that acts as an in-memory channel, and contains information about the TLS exchange (cipher, negotiated_protocol, negotiated_tls_version, etc.).
954class OpenSSLServerContext: 955 """This class controls and creates and creates a socket that is wrapped using the 956 standard library bindings to OpenSSL to perform TLS connections on the 957 server side of a network connection. 958 """ 959 960 def __init__(self, configuration: TLSServerConfiguration) -> None: 961 """Create a new context object from a given TLS configuration.""" 962 963 self._configuration = configuration 964 965 @property 966 def configuration(self) -> TLSServerConfiguration: 967 """Returns the TLS configuration that was used to create the context.""" 968 969 return self._configuration 970 971 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 972 """Create a socket-like object that can be used to do TLS.""" 973 ossl_context = _init_context_server(self._configuration) 974 975 return OpenSSLTLSSocket._create( 976 parent_context=self, 977 server_side=True, 978 ssl_context=ossl_context, 979 address=address, 980 ) 981 982 def create_buffer(self) -> OpenSSLTLSBuffer: 983 """Creates a TLSBuffer that acts as an in-memory channel, 984 and contains information about the TLS exchange 985 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 986 987 ossl_context = _init_context_server(self._configuration) 988 989 return OpenSSLTLSBuffer._create( 990 server_hostname=None, 991 parent_context=self, 992 server_side=True, 993 ssl_context=ossl_context, 994 )
This class controls and creates and creates a socket that is wrapped using the standard library bindings to OpenSSL to perform TLS connections on the server side of a network connection.
960 def __init__(self, configuration: TLSServerConfiguration) -> None: 961 """Create a new context object from a given TLS configuration.""" 962 963 self._configuration = configuration
Create a new context object from a given TLS configuration.
965 @property 966 def configuration(self) -> TLSServerConfiguration: 967 """Returns the TLS configuration that was used to create the context.""" 968 969 return self._configuration
Returns the TLS configuration that was used to create the context.
971 def connect(self, address: tuple[str | None, int]) -> OpenSSLTLSSocket: 972 """Create a socket-like object that can be used to do TLS.""" 973 ossl_context = _init_context_server(self._configuration) 974 975 return OpenSSLTLSSocket._create( 976 parent_context=self, 977 server_side=True, 978 ssl_context=ossl_context, 979 address=address, 980 )
Create a socket-like object that can be used to do TLS.
982 def create_buffer(self) -> OpenSSLTLSBuffer: 983 """Creates a TLSBuffer that acts as an in-memory channel, 984 and contains information about the TLS exchange 985 (cipher, negotiated_protocol, negotiated_tls_version, etc.).""" 986 987 ossl_context = _init_context_server(self._configuration) 988 989 return OpenSSLTLSBuffer._create( 990 server_hostname=None, 991 parent_context=self, 992 server_side=True, 993 ssl_context=ossl_context, 994 )
Creates a TLSBuffer that acts as an in-memory channel, and contains information about the TLS exchange (cipher, negotiated_protocol, negotiated_tls_version, etc.).
1024def validate_config(tls_config: TLSClientConfiguration | TLSServerConfiguration) -> None: 1025 """Validates whether the OpenSSL TLS implementation supports this TLS configuration.""" 1026 _check_trust_store(tls_config.trust_store) 1027 1028 if isinstance(tls_config, TLSClientConfiguration): 1029 sign_chain = tls_config.certificate_chain 1030 if sign_chain is not None: 1031 _check_sign_chain(sign_chain) 1032 1033 else: 1034 assert isinstance(tls_config, TLSServerConfiguration) 1035 cert_chain = tls_config.certificate_chain 1036 if cert_chain is not None: 1037 for sign_chain in cert_chain: 1038 _check_sign_chain(sign_chain)
Validates whether the OpenSSL TLS implementation supports this TLS configuration.