1
0

Annotate FakeTransport

This commit is contained in:
David Robertson
2022-08-21 23:02:58 +01:00
parent c9e80bc772
commit 9d4da69ffd
+24 -30
View File
@@ -594,7 +594,7 @@ def get_clock() -> Tuple[ThreadedMemoryReactorClock, Clock]:
@implementer(ITransport)
@attr.s(cmp=False)
@attr.s(cmp=False, auto_attribs=True)
class FakeTransport:
"""
A twisted.internet.interfaces.ITransport implementation which sends all its data
@@ -609,35 +609,29 @@ class FakeTransport:
If you want bidirectional communication, you'll need two instances.
"""
other = attr.ib()
"""The Protocol object which will receive any data written to this transport.
other: IProtocol
"""The Protocol object which will receive any data written to this transport."""
:type: twisted.internet.interfaces.IProtocol
"""
_reactor: IReactorTime
"""Test reactor """
_reactor = attr.ib()
"""Test reactor
:type: twisted.internet.interfaces.IReactorTime
"""
_protocol = attr.ib(default=None)
_protocol: Optional[IProtocol] = None
"""The Protocol which is producing data for this transport. Optional, but if set
will get called back for connectionLost() notifications etc.
"""
_peer_address: Optional[IAddress] = attr.ib(default=None)
_peer_address: Optional[IAddress] = None
"""The value to be returned by getPeer"""
_host_address: Optional[IAddress] = attr.ib(default=None)
_host_address: Optional[IAddress] = None
"""The value to be returned by getHost"""
disconnecting = False
disconnected = False
connected = True
buffer = attr.ib(default=b"")
producer = attr.ib(default=None)
autoflush = attr.ib(default=True)
disconnecting: bool = False
disconnected: bool = False
connected: bool = True
buffer: bytes = b""
producer: Optional[IPushProducer] = None
autoflush: bool = True
def getPeer(self) -> Optional[IAddress]:
return self._peer_address
@@ -645,7 +639,7 @@ class FakeTransport:
def getHost(self) -> Optional[IAddress]:
return self._host_address
def loseConnection(self, reason=None):
def loseConnection(self, reason: Optional[Failure] = None) -> None:
if not self.disconnecting:
logger.info("FakeTransport: loseConnection(%s)", reason)
self.disconnecting = True
@@ -661,7 +655,7 @@ class FakeTransport:
self.connected = False
self.disconnected = True
def abortConnection(self):
def abortConnection(self) -> None:
logger.info("FakeTransport: abortConnection()")
if not self.disconnecting:
@@ -671,28 +665,28 @@ class FakeTransport:
self.disconnected = True
def pauseProducing(self):
def pauseProducing(self) -> None:
if not self.producer:
return
self.producer.pauseProducing()
def resumeProducing(self):
def resumeProducing(self) -> None:
if not self.producer:
return
self.producer.resumeProducing()
def unregisterProducer(self):
def unregisterProducer(self) -> None:
if not self.producer:
return
self.producer = None
def registerProducer(self, producer, streaming):
def registerProducer(self, producer: IPushProducer, streaming: bool) -> None:
self.producer = producer
self.producerStreaming = streaming
def _produce():
def _produce() -> None:
if not self.producer:
# we've been unregistered
return
@@ -704,7 +698,7 @@ class FakeTransport:
if not streaming:
self._reactor.callLater(0.0, _produce)
def write(self, byt):
def write(self, byt: bytes) -> None:
if self.disconnecting:
raise Exception("Writing to disconnecting FakeTransport")
@@ -716,11 +710,11 @@ class FakeTransport:
if self.autoflush:
self._reactor.callLater(0.0, self.flush)
def writeSequence(self, seq):
def writeSequence(self, seq: Iterable[bytes]) -> None:
for x in seq:
self.write(x)
def flush(self, maxbytes=None):
def flush(self, maxbytes: Optional[int] = None) -> None:
if not self.buffer:
# nothing to do. Don't write empty buffers: it upsets the
# TLSMemoryBIOProtocol