Write union types as X | Y where possible (#19111)

aka PEP 604, added in Python 3.10
This commit is contained in:
Andrew Ferrazzutti
2025-11-06 15:02:33 -05:00
committed by GitHub
parent 6790312831
commit fcac7e0282
465 changed files with 4034 additions and 4555 deletions

View File

@@ -119,11 +119,11 @@ R = TypeVar("R")
P = ParamSpec("P")
# the type of thing that can be passed into `make_request` in the headers list
CustomHeaderType = tuple[Union[str, bytes], Union[str, bytes]]
CustomHeaderType = tuple[str | bytes, str | bytes]
# A pre-prepared SQLite DB that is used as a template when creating new SQLite
# DB each test run. This dramatically speeds up test set up when using SQLite.
PREPPED_SQLITE_DB_CONN: Optional[LoggingDatabaseConnection] = None
PREPPED_SQLITE_DB_CONN: LoggingDatabaseConnection | None = None
class TimedOutException(Exception):
@@ -146,9 +146,9 @@ class FakeChannel:
_reactor: MemoryReactorClock
result: dict = attr.Factory(dict)
_ip: str = "127.0.0.1"
_producer: Optional[Union[IPullProducer, IPushProducer]] = None
resource_usage: Optional[ContextResourceUsage] = None
_request: Optional[Request] = None
_producer: IPullProducer | IPushProducer | None = None
resource_usage: ContextResourceUsage | None = None
_request: Request | None = None
@property
def request(self) -> Request:
@@ -206,7 +206,7 @@ class FakeChannel:
version: bytes,
code: bytes,
reason: bytes,
headers: Union[Headers, list[tuple[bytes, bytes]]],
headers: Headers | list[tuple[bytes, bytes]],
) -> None:
self.result["version"] = version
self.result["code"] = code
@@ -248,7 +248,7 @@ class FakeChannel:
# TODO This should ensure that the IProducer is an IPushProducer or
# IPullProducer, unfortunately twisted.protocols.basic.FileSender does
# implement those, but doesn't declare it.
self._producer = cast(Union[IPushProducer, IPullProducer], producer)
self._producer = cast(IPushProducer | IPullProducer, producer)
self.producerStreaming = streaming
def _produce() -> None:
@@ -357,18 +357,18 @@ class FakeSite:
def make_request(
reactor: MemoryReactorClock,
site: Union[Site, FakeSite],
method: Union[bytes, str],
path: Union[bytes, str],
content: Union[bytes, str, JsonDict] = b"",
access_token: Optional[str] = None,
site: Site | FakeSite,
method: bytes | str,
path: bytes | str,
content: bytes | str | JsonDict = b"",
access_token: str | None = None,
request: type[Request] = SynapseRequest,
shorthand: bool = True,
federation_auth_origin: Optional[bytes] = None,
content_type: Optional[bytes] = None,
federation_auth_origin: bytes | None = None,
content_type: bytes | None = None,
content_is_form: bool = False,
await_result: bool = True,
custom_headers: Optional[Iterable[CustomHeaderType]] = None,
custom_headers: Iterable[CustomHeaderType] | None = None,
client_ip: str = "127.0.0.1",
) -> FakeChannel:
"""
@@ -497,7 +497,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
@implementer(IResolverSimple)
class FakeResolver:
def getHostByName(
self, name: str, timeout: Optional[Sequence[int]] = None
self, name: str, timeout: Sequence[int] | None = None
) -> "Deferred[str]":
if name not in lookups:
return fail(DNSLookupError("OH NO: unknown %s" % (name,)))
@@ -617,7 +617,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
port: int,
factory: ClientFactory,
timeout: float = 30,
bindAddress: Optional[tuple[str, int]] = None,
bindAddress: tuple[str, int] | None = None,
) -> IConnector:
"""Fake L{IReactorTCP.connectTCP}."""
@@ -788,7 +788,7 @@ class ThreadPool:
def callInThreadWithCallback(
self,
onResult: Callable[[bool, Union[Failure, R]], None],
onResult: Callable[[bool, Failure | R], None],
function: Callable[P, R],
*args: P.args,
**kwargs: P.kwargs,
@@ -841,17 +841,17 @@ class FakeTransport:
"""Test reactor
"""
_protocol: Optional[IProtocol] = None
_protocol: IProtocol | None = None
"""The Protocol which is producing data for this transport. Optional, but if set
will get called back for connectionLost() notifications etc.
"""
_peer_address: Union[IPv4Address, IPv6Address] = attr.Factory(
_peer_address: IPv4Address | IPv6Address = attr.Factory(
lambda: address.IPv4Address("TCP", "127.0.0.1", 5678)
)
"""The value to be returned by getPeer"""
_host_address: Union[IPv4Address, IPv6Address] = attr.Factory(
_host_address: IPv4Address | IPv6Address = attr.Factory(
lambda: address.IPv4Address("TCP", "127.0.0.1", 1234)
)
"""The value to be returned by getHost"""
@@ -860,13 +860,13 @@ class FakeTransport:
disconnected = False
connected = True
buffer: bytes = b""
producer: Optional[IPushProducer] = None
producer: IPushProducer | None = None
autoflush: bool = True
def getPeer(self) -> Union[IPv4Address, IPv6Address]:
def getPeer(self) -> IPv4Address | IPv6Address:
return self._peer_address
def getHost(self) -> Union[IPv4Address, IPv6Address]:
def getHost(self) -> IPv4Address | IPv6Address:
return self._host_address
def loseConnection(self) -> None:
@@ -955,7 +955,7 @@ class FakeTransport:
for x in seq:
self.write(x)
def flush(self, maxbytes: Optional[int] = None) -> None:
def flush(self, maxbytes: int | None = None) -> None:
if not self.buffer:
# nothing to do. Don't write empty buffers: it upsets the
# TLSMemoryBIOProtocol
@@ -1061,10 +1061,10 @@ def setup_test_homeserver(
*,
cleanup_func: Callable[[Callable[[], Optional["Deferred[None]"]]], None],
server_name: str = "test",
config: Optional[HomeServerConfig] = None,
reactor: Optional[ISynapseReactor] = None,
config: HomeServerConfig | None = None,
reactor: ISynapseReactor | None = None,
homeserver_to_use: type[HomeServer] = TestHomeServer,
db_txn_limit: Optional[int] = None,
db_txn_limit: int | None = None,
**extra_homeserver_attributes: Any,
) -> HomeServer:
"""