1
0

Merge commit 'c73cc2c2a' into anoa/dinsic_release_1_31_0

This commit is contained in:
Andrew Morgan
2021-04-23 18:14:20 +01:00
30 changed files with 452 additions and 166 deletions

1
changelog.d/9639.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind.

1
changelog.d/9653.feature Normal file
View File

@@ -0,0 +1 @@
Add initial experimental support for a "space summary" API.

1
changelog.d/9674.misc Normal file
View File

@@ -0,0 +1 @@
Increase default join ratelimiting burst rate.

1
changelog.d/9675.misc Normal file
View File

@@ -0,0 +1 @@
Add additional type hints to the Homeserver object.

1
changelog.d/9676.misc Normal file
View File

@@ -0,0 +1 @@
Add type hints to third party event rules and visibility modules.

1
changelog.d/9678.misc Normal file
View File

@@ -0,0 +1 @@
Bump mypy-zope to 0.2.13 to fix "Cannot determine consistent method resolution order (MRO)" errors when running mypy a second time.

1
changelog.d/9679.doc Normal file
View File

@@ -0,0 +1 @@
Improve worker documentation for fallback/web auth endpoints.

View File

@@ -943,10 +943,10 @@ log_config: "CONFDIR/SERVERNAME.log.config"
#rc_joins:
# local:
# per_second: 0.1
# burst_count: 3
# burst_count: 10
# remote:
# per_second: 0.01
# burst_count: 3
# burst_count: 10
#
#rc_3pid_validation:
# per_second: 0.003

View File

@@ -232,7 +232,6 @@ expressions:
# Registration/login requests
^/_matrix/client/(api/v1|r0|unstable)/login$
^/_matrix/client/(r0|unstable)/register$
^/_matrix/client/(r0|unstable)/auth/.*/fallback/web$
# Event sending requests
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/redact
@@ -276,7 +275,7 @@ using):
Ensure that all SSO logins go to a single process.
For multiple workers not handling the SSO endpoints properly, see
[#7530](https://github.com/matrix-org/synapse/issues/7530) and
[#7530](https://github.com/matrix-org/synapse/issues/7530) and
[#9427](https://github.com/matrix-org/synapse/issues/9427).
Note that a HTTP listener with `client` and `federation` resources must be

View File

@@ -20,8 +20,9 @@ files =
synapse/crypto,
synapse/event_auth.py,
synapse/events/builder.py,
synapse/events/validator.py,
synapse/events/spamcheck.py,
synapse/events/third_party_rules.py,
synapse/events/validator.py,
synapse/federation,
synapse/groups,
synapse/handlers,
@@ -38,6 +39,7 @@ files =
synapse/push,
synapse/replication,
synapse/rest,
synapse/secrets.py,
synapse/server.py,
synapse/server_notices,
synapse/spam_checker_api,
@@ -71,6 +73,7 @@ files =
synapse/util/metrics.py,
synapse/util/macaroons.py,
synapse/util/stringutils.py,
synapse/visibility.py,
tests/replication,
tests/test_utils,
tests/handlers/test_password_providers.py,

View File

@@ -103,7 +103,7 @@ CONDITIONAL_REQUIREMENTS["lint"] = [
"flake8",
]
CONDITIONAL_REQUIREMENTS["mypy"] = ["mypy==0.812", "mypy-zope==0.2.11"]
CONDITIONAL_REQUIREMENTS["mypy"] = ["mypy==0.812", "mypy-zope==0.2.13"]
# Dependencies which are exclusively required by unit test code. This is
# NOT a list of all modules that are necessary to run the unit tests.

View File

@@ -98,11 +98,11 @@ class RatelimitConfig(Config):
self.rc_joins_local = RateLimitConfig(
config.get("rc_joins", {}).get("local", {}),
defaults={"per_second": 0.1, "burst_count": 3},
defaults={"per_second": 0.1, "burst_count": 10},
)
self.rc_joins_remote = RateLimitConfig(
config.get("rc_joins", {}).get("remote", {}),
defaults={"per_second": 0.01, "burst_count": 3},
defaults={"per_second": 0.01, "burst_count": 10},
)
# Ratelimit cross-user key requests:
@@ -196,10 +196,10 @@ class RatelimitConfig(Config):
#rc_joins:
# local:
# per_second: 0.1
# burst_count: 3
# burst_count: 10
# remote:
# per_second: 0.01
# burst_count: 3
# burst_count: 10
#
#rc_3pid_validation:
# per_second: 0.003

View File

@@ -13,12 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable, Union
from typing import TYPE_CHECKING, Union
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.types import Requester, StateMap
if TYPE_CHECKING:
from synapse.server import HomeServer
class ThirdPartyEventRules:
"""Allows server admins to provide a Python module implementing an extra
@@ -28,7 +31,7 @@ class ThirdPartyEventRules:
behaviours.
"""
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.third_party_rules = None
self.store = hs.get_datastore()
@@ -95,10 +98,9 @@ class ThirdPartyEventRules:
if self.third_party_rules is None:
return True
ret = await self.third_party_rules.on_create_room(
return await self.third_party_rules.on_create_room(
requester, config, is_requester_admin
)
return ret
async def check_threepid_can_be_invited(
self, medium: str, address: str, room_id: str
@@ -119,10 +121,9 @@ class ThirdPartyEventRules:
state_events = await self._get_state_map_for_room(room_id)
ret = await self.third_party_rules.check_threepid_can_be_invited(
return await self.third_party_rules.check_threepid_can_be_invited(
medium, address, state_events
)
return ret
async def check_visibility_can_be_modified(
self, room_id: str, new_visibility: str
@@ -143,7 +144,7 @@ class ThirdPartyEventRules:
check_func = getattr(
self.third_party_rules, "check_visibility_can_be_modified", None
)
if not check_func or not isinstance(check_func, Callable):
if not check_func or not callable(check_func):
return True
state_events = await self._get_state_map_for_room(room_id)

View File

@@ -29,11 +29,13 @@ from typing import (
List,
Mapping,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)
import attr
from prometheus_client import Counter
from twisted.internet import defer
@@ -457,6 +459,7 @@ class FederationClient(FederationBase):
description: str,
destinations: Iterable[str],
callback: Callable[[str], Awaitable[T]],
failover_on_unknown_endpoint: bool = False,
) -> T:
"""Try an operation on a series of servers, until it succeeds
@@ -476,6 +479,10 @@ class FederationClient(FederationBase):
next server tried. Normally the stacktrace is logged but this is
suppressed if the exception is an InvalidResponseError.
failover_on_unknown_endpoint: if True, we will try other servers if it looks
like a server doesn't support the endpoint. This is typically useful
if the endpoint in question is new or experimental.
Returns:
The result of callback, if it succeeds
@@ -495,16 +502,31 @@ class FederationClient(FederationBase):
except UnsupportedRoomVersionError:
raise
except HttpResponseException as e:
if not 500 <= e.code < 600:
raise e.to_synapse_error()
else:
logger.warning(
"Failed to %s via %s: %i %s",
description,
destination,
e.code,
e.args[0],
)
synapse_error = e.to_synapse_error()
failover = False
if 500 <= e.code < 600:
failover = True
elif failover_on_unknown_endpoint:
# there is no good way to detect an "unknown" endpoint. Dendrite
# returns a 404 (with no body); synapse returns a 400
# with M_UNRECOGNISED.
if e.code == 404 or (
e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
):
failover = True
if not failover:
raise synapse_error from e
logger.warning(
"Failed to %s via %s: %i %s",
description,
destination,
e.code,
e.args[0],
)
except Exception:
logger.warning(
"Failed to %s via %s", description, destination, exc_info=True
@@ -1100,3 +1122,141 @@ class FederationClient(FederationBase):
# If we don't manage to find it, return None. It's not an error if a
# server doesn't give it to us.
return None
async def get_space_summary(
self,
destinations: Iterable[str],
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: List[str],
) -> "FederationSpaceSummaryResult":
"""
Call other servers to get a summary of the given space
Args:
destinations: The remote servers. We will try them in turn, omitting any
that have been blacklisted.
room_id: ID of the space to be queried
suggested_only: If true, ask the remote server to only return children
with the "suggested" flag set
max_rooms_per_space: A limit on the number of children to return for each
space
exclude_rooms: A list of room IDs to tell the remote server to skip
Returns:
a parsed FederationSpaceSummaryResult
Raises:
SynapseError if we were unable to get a valid summary from any of the
remote servers
"""
async def send_request(destination: str) -> FederationSpaceSummaryResult:
res = await self.transport_layer.get_space_summary(
destination=destination,
room_id=room_id,
suggested_only=suggested_only,
max_rooms_per_space=max_rooms_per_space,
exclude_rooms=exclude_rooms,
)
try:
return FederationSpaceSummaryResult.from_json_dict(res)
except ValueError as e:
raise InvalidResponseError(str(e))
return await self._try_destination_list(
"fetch space summary",
destinations,
send_request,
failover_on_unknown_endpoint=True,
)
@attr.s(frozen=True, slots=True)
class FederationSpaceSummaryEventResult:
"""Represents a single event in the result of a successful get_space_summary call.
It's essentially just a serialised event object, but we do a bit of parsing and
validation in `from_json_dict` and store some of the validated properties in
object attributes.
"""
event_type = attr.ib(type=str)
state_key = attr.ib(type=str)
via = attr.ib(type=Sequence[str])
# the raw data, including the above keys
data = attr.ib(type=JsonDict)
@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult":
"""Parse an event within the result of a /spaces/ request
Args:
d: json object to be parsed
Raises:
ValueError if d is not a valid event
"""
event_type = d.get("type")
if not isinstance(event_type, str):
raise ValueError("Invalid event: 'event_type' must be a str")
state_key = d.get("state_key")
if not isinstance(state_key, str):
raise ValueError("Invalid event: 'state_key' must be a str")
content = d.get("content")
if not isinstance(content, dict):
raise ValueError("Invalid event: 'content' must be a dict")
via = content.get("via")
if not isinstance(via, Sequence):
raise ValueError("Invalid event: 'via' must be a list")
if any(not isinstance(v, str) for v in via):
raise ValueError("Invalid event: 'via' must be a list of strings")
return cls(event_type, state_key, via, d)
@attr.s(frozen=True, slots=True)
class FederationSpaceSummaryResult:
"""Represents the data returned by a successful get_space_summary call."""
rooms = attr.ib(type=Sequence[JsonDict])
events = attr.ib(type=Sequence[FederationSpaceSummaryEventResult])
@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryResult":
"""Parse the result of a /spaces/ request
Args:
d: json object to be parsed
Raises:
ValueError if d is not a valid /spaces/ response
"""
rooms = d.get("rooms")
if not isinstance(rooms, Sequence):
raise ValueError("'rooms' must be a list")
if any(not isinstance(r, dict) for r in rooms):
raise ValueError("Invalid room in 'rooms' list")
events = d.get("events")
if not isinstance(events, Sequence):
raise ValueError("'events' must be a list")
if any(not isinstance(e, dict) for e in events):
raise ValueError("Invalid event in 'events' list")
parsed_events = [
FederationSpaceSummaryEventResult.from_json_dict(e) for e in events
]
return cls(rooms, parsed_events)

View File

@@ -1031,6 +1031,38 @@ class TransportLayerClient:
return self.client.get_json(destination=destination, path=path)
async def get_space_summary(
self,
destination: str,
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: List[str],
) -> JsonDict:
"""
Args:
destination: The remote server
room_id: The room ID to ask about.
suggested_only: if True, only suggested rooms will be returned
max_rooms_per_space: an optional limit to the number of children to be
returned per space
exclude_rooms: a list of any rooms we can skip
"""
path = _create_path(
FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id
)
params = {
"suggested_only": suggested_only,
"exclude_rooms": exclude_rooms,
}
if max_rooms_per_space is not None:
params["max_rooms_per_space"] = max_rooms_per_space
return await self.client.post_json(
destination=destination, path=path, data=params
)
def get_info_of_users(self, destination: str, user_ids: List[str]):
"""
Args:

View File

@@ -16,7 +16,7 @@
import itertools
import logging
from collections import deque
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple, cast
import attr
@@ -38,6 +38,9 @@ MAX_ROOMS = 50
# max number of events to return per room.
MAX_ROOMS_PER_SPACE = 50
# max number of federation servers to hit per room
MAX_SERVERS_PER_SPACE = 3
class SpaceSummaryHandler:
def __init__(self, hs: "HomeServer"):
@@ -47,6 +50,8 @@ class SpaceSummaryHandler:
self._state_handler = hs.get_state_handler()
self._store = hs.get_datastore()
self._event_serializer = hs.get_event_client_serializer()
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()
async def get_space_summary(
self,
@@ -78,35 +83,81 @@ class SpaceSummaryHandler:
await self._auth.check_user_in_room_or_world_readable(room_id, requester)
# the queue of rooms to process
room_queue = deque((_RoomQueueEntry(room_id),))
room_queue = deque((_RoomQueueEntry(room_id, ()),))
# rooms we have already processed
processed_rooms = set() # type: Set[str]
# events we have already processed. We don't necessarily have their event ids,
# so instead we key on (room id, state key)
processed_events = set() # type: Set[Tuple[str, str]]
rooms_result = [] # type: List[JsonDict]
events_result = [] # type: List[JsonDict]
while room_queue and len(rooms_result) < MAX_ROOMS:
queue_entry = room_queue.popleft()
room_id = queue_entry.room_id
if room_id in processed_rooms:
# already done this room
continue
logger.debug("Processing room %s", room_id)
processed_rooms.add(room_id)
is_in_room = await self._store.is_host_joined(room_id, self._server_name)
# The client-specified max_rooms_per_space limit doesn't apply to the
# room_id specified in the request, so we ignore it if this is the
# first room we are processing.
max_children = max_rooms_per_space if processed_rooms else None
rooms, events = await self._summarize_local_room(
requester, room_id, suggested_only, max_children
if is_in_room:
rooms, events = await self._summarize_local_room(
requester, room_id, suggested_only, max_children
)
else:
rooms, events = await self._summarize_remote_room(
queue_entry,
suggested_only,
max_children,
exclude_rooms=processed_rooms,
)
logger.debug(
"Query of %s returned rooms %s, events %s",
queue_entry.room_id,
[room.get("room_id") for room in rooms],
["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events],
)
rooms_result.extend(rooms)
events_result.extend(events)
# add any children that we haven't already processed to the queue
for edge_event in events:
if edge_event["state_key"] not in processed_rooms:
room_queue.append(_RoomQueueEntry(edge_event["state_key"]))
# any rooms returned don't need visiting again
processed_rooms.update(cast(str, room.get("room_id")) for room in rooms)
# the room we queried may or may not have been returned, but don't process
# it again, anyway.
processed_rooms.add(room_id)
# XXX: is it ok that we blindly iterate through any events returned by
# a remote server, whether or not they actually link to any rooms in our
# tree?
for ev in events:
# remote servers might return events we have already processed
# (eg, Dendrite returns inward pointers as well as outward ones), so
# we need to filter them out, to avoid returning duplicate links to the
# client.
ev_key = (ev["room_id"], ev["state_key"])
if ev_key in processed_events:
continue
events_result.append(ev)
# add the child to the queue. we have already validated
# that the vias are a list of server names.
room_queue.append(
_RoomQueueEntry(ev["state_key"], ev["content"]["via"])
)
processed_events.add(ev_key)
return {"rooms": rooms_result, "events": events_result}
@@ -149,20 +200,23 @@ class SpaceSummaryHandler:
while room_queue and len(rooms_result) < MAX_ROOMS:
room_id = room_queue.popleft()
if room_id in processed_rooms:
# already done this room
continue
logger.debug("Processing room %s", room_id)
processed_rooms.add(room_id)
rooms, events = await self._summarize_local_room(
None, room_id, suggested_only, max_rooms_per_space
)
processed_rooms.add(room_id)
rooms_result.extend(rooms)
events_result.extend(events)
# add any children that we haven't already processed to the queue
for edge_event in events:
if edge_event["state_key"] not in processed_rooms:
room_queue.append(edge_event["state_key"])
# add any children to the queue
room_queue.extend(edge_event["state_key"] for edge_event in events)
return {"rooms": rooms_result, "events": events_result}
@@ -200,6 +254,43 @@ class SpaceSummaryHandler:
)
return (room_entry,), events_result
async def _summarize_remote_room(
self,
room: "_RoomQueueEntry",
suggested_only: bool,
max_children: Optional[int],
exclude_rooms: Iterable[str],
) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
room_id = room.room_id
logger.info("Requesting summary for %s via %s", room_id, room.via)
# we need to make the exclusion list json-serialisable
exclude_rooms = list(exclude_rooms)
via = itertools.islice(room.via, MAX_SERVERS_PER_SPACE)
try:
res = await self._federation_client.get_space_summary(
via,
room_id,
suggested_only=suggested_only,
max_rooms_per_space=max_children,
exclude_rooms=exclude_rooms,
)
except Exception as e:
logger.warning(
"Unable to get summary of %s via federation: %s",
room_id,
e,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
return (), ()
return res.rooms, tuple(
ev.data
for ev in res.events
if ev.event_type == EventTypes.MSC1772_SPACE_CHILD
)
async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool:
# if we have an authenticated requesting user, first check if they are in the
# room
@@ -276,12 +367,24 @@ class SpaceSummaryHandler:
)
# filter out any events without a "via" (which implies it has been redacted)
return (e for e in events if e.content.get("via"))
return (e for e in events if _has_valid_via(e))
@attr.s(frozen=True, slots=True)
class _RoomQueueEntry:
room_id = attr.ib(type=str)
via = attr.ib(type=Sequence[str])
def _has_valid_via(e: EventBase) -> bool:
via = e.content.get("via")
if not via or not isinstance(via, Sequence):
return False
for v in via:
if not isinstance(v, str):
logger.debug("Ignoring edge event %s with invalid via entry", e.event_id)
return False
return True
def _is_suggested_child_event(edge_event: EventBase) -> bool:

View File

@@ -80,7 +80,7 @@ class SyncConfig:
filter_collection = attr.ib(type=FilterCollection)
is_guest = attr.ib(type=bool)
request_key = attr.ib(type=Tuple[Any, ...])
device_id = attr.ib(type=str)
device_id = attr.ib(type=Optional[str])
@attr.s(slots=True, frozen=True)
@@ -737,7 +737,9 @@ class SyncHandler:
return summary
def get_lazy_loaded_members_cache(self, cache_key: Tuple[str, str]) -> LruCache:
def get_lazy_loaded_members_cache(
self, cache_key: Tuple[str, Optional[str]]
) -> LruCache:
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)

View File

@@ -14,7 +14,7 @@
# limitations under the License.
import itertools
import logging
from typing import Any, Callable, Dict, List
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple
from synapse.api.constants import Membership, PresenceState
from synapse.api.errors import Codes, StoreError, SynapseError
@@ -26,11 +26,15 @@ from synapse.events.utils import (
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.sync import KnockedSyncResult, SyncConfig
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
from synapse.types import StreamToken
from synapse.http.site import SynapseRequest
from synapse.types import JsonDict, StreamToken
from synapse.util import json_decoder
from ._base import client_patterns, set_timeline_upper_limit
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -73,7 +77,7 @@ class SyncRestServlet(RestServlet):
PATTERNS = client_patterns("/sync$")
ALLOWED_PRESENCE = {"online", "offline", "unavailable"}
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
@@ -85,7 +89,7 @@ class SyncRestServlet(RestServlet):
self._server_notices_sender = hs.get_server_notices_sender()
self._event_serializer = hs.get_event_client_serializer()
async def on_GET(self, request):
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
if b"from" in request.args:
# /events used to use 'from', but /sync uses 'since'.
# Lets be helpful and whine if we see a 'from'.

View File

@@ -26,10 +26,10 @@ if sys.version_info[0:2] >= (3, 6):
import secrets
class Secrets:
def token_bytes(self, nbytes=32):
def token_bytes(self, nbytes: int = 32) -> bytes:
return secrets.token_bytes(nbytes)
def token_hex(self, nbytes=32):
def token_hex(self, nbytes: int = 32) -> str:
return secrets.token_hex(nbytes)
@@ -38,8 +38,8 @@ else:
import os
class Secrets:
def token_bytes(self, nbytes=32):
def token_bytes(self, nbytes: int = 32) -> bytes:
return os.urandom(nbytes)
def token_hex(self, nbytes=32):
def token_hex(self, nbytes: int = 32) -> str:
return binascii.hexlify(self.token_bytes(nbytes)).decode("ascii")

View File

@@ -650,13 +650,13 @@ class HomeServer(metaclass=abc.ABCMeta):
return FederationHandlerRegistry(self)
@cache_in_self
def get_server_notices_manager(self):
def get_server_notices_manager(self) -> ServerNoticesManager:
if self.config.worker_app:
raise Exception("Workers cannot send server notices")
return ServerNoticesManager(self)
@cache_in_self
def get_server_notices_sender(self):
def get_server_notices_sender(self) -> WorkerServerNoticesSender:
if self.config.worker_app:
return WorkerServerNoticesSender(self)
return ServerNoticesSender(self)

View File

@@ -13,13 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any
from typing import TYPE_CHECKING, Any, Set
from synapse.api.errors import SynapseError
from synapse.api.urls import ConsentURIBuilder
from synapse.config import ConfigError
from synapse.types import get_localpart_from_id
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -28,16 +31,11 @@ class ConsentServerNotices:
privacy policy consent, and sends one if we do.
"""
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer):
"""
def __init__(self, hs: "HomeServer"):
self._server_notices_manager = hs.get_server_notices_manager()
self._store = hs.get_datastore()
self._users_in_progress = set()
self._users_in_progress = set() # type: Set[str]
self._current_consent_version = hs.config.user_consent_version
self._server_notice_content = hs.config.user_consent_server_notice_content
@@ -73,6 +71,10 @@ class ConsentServerNotices:
try:
u = await self._store.get_user_by_id(user_id)
# The user doesn't exist.
if u is None:
return
if u["is_guest"] and not self._send_to_guests:
# don't send to guests
return

View File

@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List, Tuple
from typing import TYPE_CHECKING, List, Tuple
from synapse.api.constants import (
EventTypes,
@@ -24,6 +24,9 @@ from synapse.api.constants import (
from synapse.api.errors import AuthError, ResourceLimitError, SynapseError
from synapse.server_notices.server_notices_manager import SERVER_NOTICE_ROOM_TAG
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -32,11 +35,7 @@ class ResourceLimitsServerNotices:
ensures that the client is kept up to date.
"""
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer):
"""
def __init__(self, hs: "HomeServer"):
self._server_notices_manager = hs.get_server_notices_manager()
self._store = hs.get_datastore()
self._auth = hs.get_auth()

View File

@@ -58,7 +58,7 @@ class ServerNoticesManager:
user_id: str,
event_content: dict,
type: str = EventTypes.Message,
state_key: Optional[bool] = None,
state_key: Optional[str] = None,
) -> EventBase:
"""Send a notice to the given user

View File

@@ -12,25 +12,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterable, Union
from typing import TYPE_CHECKING, Iterable, Union
from synapse.server_notices.consent_server_notices import ConsentServerNotices
from synapse.server_notices.resource_limits_server_notices import (
ResourceLimitsServerNotices,
)
from synapse.server_notices.worker_server_notices_sender import (
WorkerServerNoticesSender,
)
if TYPE_CHECKING:
from synapse.server import HomeServer
class ServerNoticesSender:
class ServerNoticesSender(WorkerServerNoticesSender):
"""A centralised place which sends server notices automatically when
Certain Events take place
"""
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer):
"""
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._server_notices = (
ConsentServerNotices(hs),
ResourceLimitsServerNotices(hs),

View File

@@ -12,16 +12,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from synapse.server import HomeServer
class WorkerServerNoticesSender:
"""Stub impl of ServerNoticesSender which does nothing"""
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer):
"""
def __init__(self, hs: "HomeServer"):
pass
async def on_user_syncing(self, user_id: str) -> None:
"""Called when the user performs a sync operation.

View File

@@ -14,7 +14,7 @@
# limitations under the License.
import logging
from typing import List, Tuple
from typing import List, Optional, Tuple
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.replication.tcp.streams import ToDeviceStream
@@ -115,7 +115,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
async def get_new_messages_for_device(
self,
user_id: str,
device_id: str,
device_id: Optional[str],
last_stream_id: int,
current_stream_id: int,
limit: int = 100,
@@ -163,7 +163,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def delete_messages_for_device(
self, user_id: str, device_id: str, up_to_stream_id: int
self, user_id: str, device_id: Optional[str], up_to_stream_id: int
) -> int:
"""
Args:

View File

@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, List
from typing import Dict, List, Optional
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
@@ -109,7 +109,7 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
return users
@cached(num_args=1)
async def user_last_seen_monthly_active(self, user_id: str) -> int:
async def user_last_seen_monthly_active(self, user_id: str) -> Optional[int]:
"""
Checks if a given user is part of the monthly active user group

View File

@@ -22,7 +22,6 @@ from canonicaljson import encode_canonical_json
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util.caches.expiringcache import ExpiringCache
@@ -312,49 +311,23 @@ class TransactionStore(TransactionWorkerStore):
stream_ordering: the stream_ordering of the event
"""
return await self.db_pool.runInteraction(
"store_destination_rooms_entries",
self._store_destination_rooms_entries_txn,
destinations,
room_id,
stream_ordering,
await self.db_pool.simple_upsert_many(
table="destinations",
key_names=("destination",),
key_values=[(d,) for d in destinations],
value_names=[],
value_values=[],
desc="store_destination_rooms_entries_dests",
)
def _store_destination_rooms_entries_txn(
self,
txn: LoggingTransaction,
destinations: Iterable[str],
room_id: str,
stream_ordering: int,
) -> None:
# ensure we have a `destinations` row for this destination, as there is
# a foreign key constraint.
if isinstance(self.database_engine, PostgresEngine):
q = """
INSERT INTO destinations (destination)
VALUES (?)
ON CONFLICT DO NOTHING;
"""
elif isinstance(self.database_engine, Sqlite3Engine):
q = """
INSERT OR IGNORE INTO destinations (destination)
VALUES (?);
"""
else:
raise RuntimeError("Unknown database engine")
txn.execute_batch(q, ((destination,) for destination in destinations))
rows = [(destination, room_id) for destination in destinations]
self.db_pool.simple_upsert_many_txn(
txn,
await self.db_pool.simple_upsert_many(
table="destination_rooms",
key_names=("destination", "room_id"),
key_values=rows,
value_names=["stream_ordering"],
value_values=[(stream_ordering,)] * len(rows),
desc="store_destination_rooms_entries_rooms",
)
async def get_destination_last_successful_stream_ordering(

View File

@@ -449,7 +449,7 @@ class StateGroupStorage:
return self.stores.state._get_state_groups_from_groups(groups, state_filter)
async def get_state_for_events(
self, event_ids: List[str], state_filter: StateFilter = StateFilter.all()
self, event_ids: Iterable[str], state_filter: StateFilter = StateFilter.all()
) -> Dict[str, StateMap[EventBase]]:
"""Given a list of event_ids and type tuples, return a list of state
dicts for each event.
@@ -485,7 +485,7 @@ class StateGroupStorage:
return {event: event_to_state[event] for event in event_ids}
async def get_state_ids_for_events(
self, event_ids: List[str], state_filter: StateFilter = StateFilter.all()
self, event_ids: Iterable[str], state_filter: StateFilter = StateFilter.all()
) -> Dict[str, StateMap[str]]:
"""
Get the state dicts corresponding to a list of events, containing the event_ids

View File

@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import operator
from typing import Dict, FrozenSet, List, Optional
from synapse.api.constants import (
AccountDataTypes,
@@ -21,10 +21,11 @@ from synapse.api.constants import (
HistoryVisibility,
Membership,
)
from synapse.events import EventBase
from synapse.events.utils import prune_event
from synapse.storage import Storage
from synapse.storage.state import StateFilter
from synapse.types import get_domain_from_id
from synapse.types import StateMap, get_domain_from_id
logger = logging.getLogger(__name__)
@@ -48,32 +49,32 @@ MEMBERSHIP_PRIORITY = (
async def filter_events_for_client(
storage: Storage,
user_id,
events,
is_peeking=False,
always_include_ids=frozenset(),
filter_send_to_client=True,
):
user_id: str,
events: List[EventBase],
is_peeking: bool = False,
always_include_ids: FrozenSet[str] = frozenset(),
filter_send_to_client: bool = True,
) -> List[EventBase]:
"""
Check which events a user is allowed to see. If the user can see the event but its
sender asked for their data to be erased, prune the content of the event.
Args:
storage
user_id(str): user id to be checked
events(list[synapse.events.EventBase]): sequence of events to be checked
is_peeking(bool): should be True if:
user_id: user id to be checked
events: sequence of events to be checked
is_peeking: should be True if:
* the user is not currently a member of the room, and:
* the user has not been a member of the room since the given
events
always_include_ids (set(event_id)): set of event ids to specifically
always_include_ids: set of event ids to specifically
include (unless sender is ignored)
filter_send_to_client (bool): Whether we're checking an event that's going to be
filter_send_to_client: Whether we're checking an event that's going to be
sent to a client. This might not always be the case since this function can
also be called to check whether a user can see the state at a given point.
Returns:
list[synapse.events.EventBase]
The filtered events.
"""
# Filter out events that have been soft failed so that we don't relay them
# to clients.
@@ -90,7 +91,7 @@ async def filter_events_for_client(
AccountDataTypes.IGNORED_USER_LIST, user_id
)
ignore_list = frozenset()
ignore_list = frozenset() # type: FrozenSet[str]
if ignore_dict_content:
ignored_users_dict = ignore_dict_content.get("ignored_users", {})
if isinstance(ignored_users_dict, dict):
@@ -107,19 +108,18 @@ async def filter_events_for_client(
room_id
] = await storage.main.get_retention_policy_for_room(room_id)
def allowed(event):
def allowed(event: EventBase) -> Optional[EventBase]:
"""
Args:
event (synapse.events.EventBase): event to check
event: event to check
Returns:
None|EventBase:
None if the user cannot see this event at all
None if the user cannot see this event at all
a redacted copy of the event if they can only see a redacted
version
a redacted copy of the event if they can only see a redacted
version
the original event if they can see it as normal.
the original event if they can see it as normal.
"""
# Only run some checks if these events aren't about to be sent to clients. This is
# because, if this is not the case, we're probably only checking if the users can
@@ -252,48 +252,46 @@ async def filter_events_for_client(
return event
# check each event: gives an iterable[None|EventBase]
# Check each event: gives an iterable of None or (a potentially modified)
# EventBase.
filtered_events = map(allowed, events)
# remove the None entries
filtered_events = filter(operator.truth, filtered_events)
# we turn it into a list before returning it.
return list(filtered_events)
# Turn it into a list and remove None entries before returning.
return [ev for ev in filtered_events if ev]
async def filter_events_for_server(
storage: Storage,
server_name,
events,
redact=True,
check_history_visibility_only=False,
):
server_name: str,
events: List[EventBase],
redact: bool = True,
check_history_visibility_only: bool = False,
) -> List[EventBase]:
"""Filter a list of events based on whether given server is allowed to
see them.
Args:
storage
server_name (str)
events (iterable[FrozenEvent])
redact (bool): Whether to return a redacted version of the event, or
server_name
events
redact: Whether to return a redacted version of the event, or
to filter them out entirely.
check_history_visibility_only (bool): Whether to only check the
check_history_visibility_only: Whether to only check the
history visibility, rather than things like if the sender has been
erased. This is used e.g. during pagination to decide whether to
backfill or not.
Returns
list[FrozenEvent]
The filtered events.
"""
def is_sender_erased(event, erased_senders):
def is_sender_erased(event: EventBase, erased_senders: Dict[str, bool]) -> bool:
if erased_senders and erased_senders[event.sender]:
logger.info("Sender of %s has been erased, redacting", event.event_id)
return True
return False
def check_event_is_visible(event, state):
def check_event_is_visible(event: EventBase, state: StateMap[EventBase]) -> bool:
history = state.get((EventTypes.RoomHistoryVisibility, ""), None)
if history:
visibility = history.content.get(