1
0

Compare commits

...

9 Commits

Author SHA1 Message Date
Eric Eastwood 0dbc7501ab Better test prefixes 2022-08-19 02:35:29 -05:00
Eric Eastwood fd941cdcbf Better benchmark 2022-08-19 02:09:16 -05:00
Eric Eastwood 2fdbca62e4 Add benchmark 2022-08-19 01:30:23 -05:00
Eric Eastwood 7cb07d3a03 Start of optimizing 2022-08-18 22:14:05 -05:00
Eric Eastwood 2c42673a9b Add metrics to track /messages response time by room size (#13545)
Follow-up to https://github.com/matrix-org/synapse/pull/13533

Part of https://github.com/matrix-org/synapse/issues/13356
2022-08-18 14:15:53 -05:00
Sean Quah b251cff819 Fix incorrect juggling of logging contexts in _PerHostRatelimiter (#13554)
Signed-off-by: Sean Quah <seanq@matrix.org>

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2022-08-18 16:26:26 +01:00
Eric Eastwood d64653d062 Track number of hosts affected by the rate limiter (#13541)
Track number of hosts affected by the rate limiter so we can differentiate one really noisy homeserver from a general ratelimit tuning problem across the federation.

Follow-up to https://github.com/matrix-org/synapse/pull/13534

Part of https://github.com/matrix-org/synapse/issues/13356
2022-08-18 10:05:07 -05:00
Ayush Anand 22ea51faf9 Add support for compression to federation responses (#13537)
Closes #13415.

Signed-off-by: Ayush Anand <iamayushanand@gmail.com>
2022-08-18 15:14:47 +01:00
Sean Quah 84169a82dc Avoid blocking lazy-loading /syncs during partial joins (#13477)
Use a state filter or accept partial state in a few places where we
request state, to avoid blocking.

To make lazy-loading `/sync`s work, we need to provide the memberships
of event senders, which are not guaranteed to be in the room state.
Instead we dig through auth events for memberships to present to
clients. The auth events of an event are guaranteed to contain a
passable membership event, otherwise the event would have been rejected.

Note that this only covers the common code paths encountered during
testing. There has been no exhaustive checking of all sync code paths.

Fixes #13146.

Signed-off-by: Sean Quah <seanq@matrix.org>
2022-08-18 11:53:02 +01:00
14 changed files with 466 additions and 86 deletions
+1
View File
@@ -0,0 +1 @@
Faster room joins: Avoid blocking lazy-loading `/sync`s during partial joins due to remote memberships. Pull remote memberships from auth events instead of the room state.
+1
View File
@@ -0,0 +1 @@
Add support for compression to federation responses.
+1
View File
@@ -0,0 +1 @@
Add metrics to track how the rate limiter is affecting requests (sleep/reject).
+1
View File
@@ -0,0 +1 @@
Update metrics to track `/messages` response time by room size.
+1
View File
@@ -0,0 +1 @@
Instrument `FederationStateIdsServlet` (`/state_ids`) for understandable traces in Jaeger.
@@ -444,7 +444,7 @@ Sub-options for each listener include:
* `names`: a list of names of HTTP resources. See below for a list of valid resource names.
* `compress`: set to true to enable gzip compression on HTTP bodies for this resource. This is currently only supported with the
`client`, `consent` and `metrics` resources.
`client`, `consent`, `metrics` and `federation` resources.
* `additional_resources`: Only valid for an 'http' listener. A map of
additional endpoints which should be loaded via dynamic modules.
+4 -1
View File
@@ -220,7 +220,10 @@ class SynapseHomeServer(HomeServer):
resources.update({"/_matrix/consent": consent_resource})
if name == "federation":
resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
federation_resource: Resource = TransportLayerServer(self)
if compress:
federation_resource = gz_wrap(federation_resource)
resources.update({FEDERATION_PREFIX: federation_resource})
if name == "openid":
resources.update(
+6
View File
@@ -1031,6 +1031,12 @@ class FederationEventHandler:
InvalidResponseError: if the remote homeserver's response contains fields
of the wrong type.
"""
# It would be better if we could query the difference from our known
# state to the given `event_id` so the sending server doesn't have to
# send as much and we don't have to process so many events. For example
# in a room like #matrixhq, we get 200k events (77k state_events, 122k
# auth_events) from this and just the `have_seen_events` takes 20s.
(
state_event_ids,
auth_event_ids,
+224 -31
View File
@@ -16,9 +16,11 @@ import logging
from typing import (
TYPE_CHECKING,
Any,
Collection,
Dict,
FrozenSet,
List,
Mapping,
Optional,
Sequence,
Set,
@@ -517,10 +519,17 @@ class SyncHandler:
# ensure that we always include current state in the timeline
current_state_ids: FrozenSet[str] = frozenset()
if any(e.is_state() for e in recents):
# FIXME(faster_joins): We use the partial state here as
# we don't want to block `/sync` on finishing a lazy join.
# Which should be fine once
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
# since we shouldn't reach here anymore?
# Note that we use the current state as a whitelist for filtering
# `recents`, so partial state is only a problem when a membership
# event turns up in `recents` but has not made it into the current
# state.
current_state_ids_map = (
await self._state_storage_controller.get_current_state_ids(
room_id
)
await self.store.get_partial_current_state_ids(room_id)
)
current_state_ids = frozenset(current_state_ids_map.values())
@@ -589,7 +598,13 @@ class SyncHandler:
if any(e.is_state() for e in loaded_recents):
# FIXME(faster_joins): We use the partial state here as
# we don't want to block `/sync` on finishing a lazy join.
# Is this the correct way of doing it?
# Which should be fine once
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
# since we shouldn't reach here anymore?
# Note that we use the current state as a whitelist for filtering
# `loaded_recents`, so partial state is only a problem when a
# membership event turns up in `loaded_recents` but has not made it
# into the current state.
current_state_ids_map = (
await self.store.get_partial_current_state_ids(room_id)
)
@@ -637,7 +652,10 @@ class SyncHandler:
)
async def get_state_after_event(
self, event_id: str, state_filter: Optional[StateFilter] = None
self,
event_id: str,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""
Get the room state after the given event
@@ -645,9 +663,14 @@ class SyncHandler:
Args:
event_id: event of interest
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the event and `state_filter` is not satisfied by partial state.
Defaults to `True`.
"""
state_ids = await self._state_storage_controller.get_state_ids_for_event(
event_id, state_filter=state_filter or StateFilter.all()
event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)
# using get_metadata_for_events here (instead of get_event) sidesteps an issue
@@ -670,6 +693,7 @@ class SyncHandler:
room_id: str,
stream_position: StreamToken,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""Get the room state at a particular stream position
@@ -677,6 +701,9 @@ class SyncHandler:
room_id: room for which to get state
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the last event in the room before `stream_position` and
`state_filter` is not satisfied by partial state. Defaults to `True`.
"""
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
@@ -688,7 +715,9 @@ class SyncHandler:
if last_event_id:
state = await self.get_state_after_event(
last_event_id, state_filter=state_filter or StateFilter.all()
last_event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)
else:
@@ -891,7 +920,15 @@ class SyncHandler:
with Measure(self.clock, "compute_state_delta"):
# The memberships needed for events in the timeline.
# Only calculated when `lazy_load_members` is on.
members_to_fetch = None
members_to_fetch: Optional[Set[str]] = None
# A dictionary mapping user IDs to the first event in the timeline sent by
# them. Only calculated when `lazy_load_members` is on.
first_event_by_sender_map: Optional[Dict[str, EventBase]] = None
# The contribution to the room state from state events in the timeline.
# Only contains the last event for any given state key.
timeline_state: StateMap[str]
lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
@@ -902,10 +939,23 @@ class SyncHandler:
# We only request state for the members needed to display the
# timeline:
members_to_fetch = {
event.sender # FIXME: we also care about invite targets etc.
for event in batch.events
}
timeline_state = {}
members_to_fetch = set()
first_event_by_sender_map = {}
for event in batch.events:
# Build the map from user IDs to the first timeline event they sent.
if event.sender not in first_event_by_sender_map:
first_event_by_sender_map[event.sender] = event
# We need the event's sender, unless their membership was in a
# previous timeline event.
if (EventTypes.Member, event.sender) not in timeline_state:
members_to_fetch.add(event.sender)
# FIXME: we also care about invite targets etc.
if event.is_state():
timeline_state[(event.type, event.state_key)] = event.event_id
if full_state:
# always make sure we LL ourselves so we know we're in the room
@@ -915,16 +965,21 @@ class SyncHandler:
members_to_fetch.add(sync_config.user.to_string())
state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
else:
state_filter = StateFilter.all()
# The contribution to the room state from state events in the timeline.
# Only contains the last event for any given state key.
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events
if event.is_state()
}
# We are happy to use partial state to compute the `/sync` response.
# Since partial state may not include the lazy-loaded memberships we
# require, we fix up the state response afterwards with memberships from
# auth events.
await_full_state = False
else:
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events
if event.is_state()
}
state_filter = StateFilter.all()
await_full_state = True
# Now calculate the state to return in the sync response for the room.
# This is more or less the change in state between the end of the previous
@@ -936,19 +991,26 @@ class SyncHandler:
if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
state_at_timeline_end = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_at_timeline_start = state_at_timeline_end
@@ -964,14 +1026,19 @@ class SyncHandler:
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id, state_filter=state_filter
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
# for now, we disable LL for gappy syncs - see
@@ -993,20 +1060,28 @@ class SyncHandler:
# is indeed the case.
assert since_token is not None
state_at_previous_sync = await self.get_state_at(
room_id, stream_position=since_token, state_filter=state_filter
room_id,
stream_position=since_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id, state_filter=state_filter
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_end = await self.get_state_at(
room_id, stream_position=now_token, state_filter=state_filter
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_ids = _calculate_state(
@@ -1036,8 +1111,23 @@ class SyncHandler:
(EventTypes.Member, member)
for member in members_to_fetch
),
await_full_state=False,
)
# If we only have partial state for the room, `state_ids` may be missing the
# memberships we wanted. We attempt to find some by digging through the auth
# events of timeline events.
if lazy_load_members and await self.store.is_partial_state_room(room_id):
assert members_to_fetch is not None
assert first_event_by_sender_map is not None
additional_state_ids = (
await self._find_missing_partial_state_memberships(
room_id, members_to_fetch, first_event_by_sender_map, state_ids
)
)
state_ids = {**state_ids, **additional_state_ids}
# At this point, if `lazy_load_members` is enabled, `state_ids` includes
# the memberships of all event senders in the timeline. This is because we
# may not have sent the memberships in a previous sync.
@@ -1086,6 +1176,99 @@ class SyncHandler:
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
}
async def _find_missing_partial_state_memberships(
self,
room_id: str,
members_to_fetch: Collection[str],
events_with_membership_auth: Mapping[str, EventBase],
found_state_ids: StateMap[str],
) -> StateMap[str]:
"""Finds missing memberships from a set of auth events and returns them as a
state map.
Args:
room_id: The partial state room to find the remaining memberships for.
members_to_fetch: The memberships to find.
events_with_membership_auth: A mapping from user IDs to events whose auth
events are known to contain their membership.
found_state_ids: A dict from (type, state_key) -> state_event_id, containing
memberships that have been previously found. Entries in
`members_to_fetch` that have a membership in `found_state_ids` are
ignored.
Returns:
A dict from ("m.room.member", state_key) -> state_event_id, containing the
memberships missing from `found_state_ids`.
Raises:
KeyError: if `events_with_membership_auth` does not have an entry for a
missing membership. Memberships in `found_state_ids` do not need an
entry in `events_with_membership_auth`.
"""
additional_state_ids: MutableStateMap[str] = {}
# Tracks the missing members for logging purposes.
missing_members = set()
# Identify memberships missing from `found_state_ids` and pick out the auth
# events in which to look for them.
auth_event_ids: Set[str] = set()
for member in members_to_fetch:
if (EventTypes.Member, member) in found_state_ids:
continue
missing_members.add(member)
event_with_membership_auth = events_with_membership_auth[member]
auth_event_ids.update(event_with_membership_auth.auth_event_ids())
auth_events = await self.store.get_events(auth_event_ids)
# Run through the missing memberships once more, picking out the memberships
# from the pile of auth events we have just fetched.
for member in members_to_fetch:
if (EventTypes.Member, member) in found_state_ids:
continue
event_with_membership_auth = events_with_membership_auth[member]
# Dig through the auth events to find the desired membership.
for auth_event_id in event_with_membership_auth.auth_event_ids():
# We only store events once we have all their auth events,
# so the auth event must be in the pile we have just
# fetched.
auth_event = auth_events[auth_event_id]
if (
auth_event.type == EventTypes.Member
and auth_event.state_key == member
):
missing_members.remove(member)
additional_state_ids[
(EventTypes.Member, member)
] = auth_event.event_id
break
if missing_members:
# There really shouldn't be any missing memberships now. Either:
# * we couldn't find an auth event, which shouldn't happen because we do
# not persist events with persisting their auth events first, or
# * the set of auth events did not contain a membership we wanted, which
# means our caller didn't compute the events in `members_to_fetch`
# correctly, or we somehow accepted an event whose auth events were
# dodgy.
logger.error(
"Failed to find memberships for %s in partial state room "
"%s in the auth events of %s.",
missing_members,
room_id,
[
events_with_membership_auth[member].event_id
for member in missing_members
],
)
return additional_state_ids
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> NotifCounts:
@@ -1730,7 +1913,11 @@ class SyncHandler:
continue
if room_id in sync_result_builder.joined_room_ids or has_join:
old_state_ids = await self.get_state_at(room_id, since_token)
old_state_ids = await self.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
)
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
old_mem_ev = None
if old_mem_ev_id:
@@ -1756,7 +1943,13 @@ class SyncHandler:
newly_left_rooms.append(room_id)
else:
if not old_state_ids:
old_state_ids = await self.get_state_at(room_id, since_token)
old_state_ids = await self.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types(
[(EventTypes.Member, user_id)]
),
)
old_mem_ev_id = old_state_ids.get(
(EventTypes.Member, user_id), None
)
+53 -2
View File
@@ -16,6 +16,7 @@
""" This module contains REST servlets to do with rooms: /rooms/<paths> """
import logging
import re
from enum import Enum
from typing import TYPE_CHECKING, Awaitable, Dict, List, Optional, Tuple
from urllib import parse as urlparse
@@ -48,6 +49,7 @@ from synapse.http.servlet import (
parse_strings_from_args,
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag
from synapse.rest.client._base import client_patterns
from synapse.rest.client.transactions import HttpTransactionCache
@@ -62,6 +64,33 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class _RoomSize(Enum):
"""
Enum to differentiate sizes of rooms. This is a pretty good approximation
about how hard it will be to get events in the room. We could also look at
room "complexity".
"""
# This doesn't necessarily mean the room is a DM, just that there is a DM
# amount of people there.
DM_SIZE = "direct_message_size"
SMALL = "small"
SUBSTANTIAL = "substantial"
LARGE = "large"
@staticmethod
def from_member_count(member_count: int) -> "_RoomSize":
if member_count <= 2:
return _RoomSize.DM_SIZE
elif member_count < 100:
return _RoomSize.SMALL
elif member_count < 1000:
return _RoomSize.SUBSTANTIAL
else:
return _RoomSize.LARGE
# This is an extra metric on top of `synapse_http_server_response_time_seconds`
# which times the same sort of thing but this one allows us to see values
# greater than 10s. We use a separate dedicated histogram with its own buckets
@@ -70,7 +99,11 @@ logger = logging.getLogger(__name__)
messsages_response_timer = Histogram(
"synapse_room_message_list_rest_servlet_response_time_seconds",
"sec",
[],
# We have a label for room size so we can try to see a more realistic
# picture of /messages response time for bigger rooms. We don't want the
# tiny rooms that can always respond fast skewing our results when we're trying
# to optimize the bigger cases.
["room_size"],
buckets=(
0.005,
0.01,
@@ -587,14 +620,26 @@ class RoomMessageListRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self._hs = hs
self.clock = hs.get_clock()
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
@messsages_response_timer.time()
async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
processing_start_time = self.clock.time_msec()
# Fire off and hope that we get a result by the end.
#
# We're using the mypy type ignore comment because the `@cached`
# decorator on `get_number_joined_users_in_room` doesn't play well with
# the type system. Maybe in the future, it can use some ParamSpec
# wizardry to fix it up.
room_member_count_deferred = run_in_background( # type: ignore[call-arg]
self.store.get_number_joined_users_in_room,
room_id, # type: ignore[arg-type]
)
requester = await self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = await PaginationConfig.from_request(
self.store, request, default_limit=10
@@ -625,6 +670,12 @@ class RoomMessageListRestServlet(RestServlet):
event_filter=event_filter,
)
processing_end_time = self.clock.time_msec()
room_member_count = await make_deferred_yieldable(room_member_count_deferred)
messsages_response_timer.labels(
room_size=_RoomSize.from_member_count(room_member_count)
).observe((processing_start_time - processing_end_time) / 1000)
return 200, msgs
+20 -4
View File
@@ -234,6 +234,7 @@ class StateStorageController:
self,
event_ids: Collection[str],
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> Dict[str, StateMap[str]]:
"""
Get the state dicts corresponding to a list of events, containing the event_ids
@@ -242,6 +243,9 @@ class StateStorageController:
Args:
event_ids: events whose state should be returned
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at these events and `state_filter` is not satisfied by partial state.
Defaults to `True`.
Returns:
A dict from event_id -> (type, state_key) -> event_id
@@ -250,8 +254,12 @@ class StateStorageController:
RuntimeError if we don't have a state group for one or more of the events
(ie they are outliers or unknown)
"""
await_full_state = True
if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
if (
await_full_state
and state_filter
and not state_filter.must_await_full_state(self._is_mine_id)
):
# Full state is not required if the state filter is restrictive enough.
await_full_state = False
event_to_groups = await self.get_state_group_for_events(
@@ -294,7 +302,10 @@ class StateStorageController:
@trace
async def get_state_ids_for_event(
self, event_id: str, state_filter: Optional[StateFilter] = None
self,
event_id: str,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""
Get the state dict corresponding to a particular event
@@ -302,6 +313,9 @@ class StateStorageController:
Args:
event_id: event whose state should be returned
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the event and `state_filter` is not satisfied by partial state.
Defaults to `True`.
Returns:
A dict from (type, state_key) -> state_event_id
@@ -311,7 +325,9 @@ class StateStorageController:
outlier or is unknown)
"""
state_map = await self.get_state_ids_for_events(
[event_id], state_filter or StateFilter.all()
[event_id],
state_filter or StateFilter.all(),
await_full_state=await_full_state,
)
return state_map[event_id]
+39 -33
View File
@@ -54,7 +54,13 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
from synapse.logging.opentracing import start_active_span, tag_args, trace
from synapse.logging.opentracing import (
SynapseTags,
set_tag,
start_active_span,
tag_args,
trace,
)
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -1449,7 +1455,7 @@ class EventsWorkerStore(SQLBaseStore):
@trace
@tag_args
async def have_seen_events(
self, room_id: str, event_ids: Iterable[str]
self, room_id: str, event_ids: Collection[str]
) -> Set[str]:
"""Given a list of event ids, check if we have already processed them.
@@ -1462,44 +1468,43 @@ class EventsWorkerStore(SQLBaseStore):
event_ids: events we are looking for
Returns:
The set of events we have already seen.
The remaining set of events we haven't seen.
"""
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
str(len(event_ids)),
)
# @cachedList chomps lots of memory if you call it with a big list, so
# we break it down. However, each batch requires its own index scan, so we make
# the batches as big as possible.
results: Set[str] = set()
for chunk in batch_iter(event_ids, 500):
r = await self._have_seen_events_dict(
[(room_id, event_id) for event_id in chunk]
)
results.update(eid for ((_rid, eid), have_event) in r.items() if have_event)
remaining_event_ids: Set[str] = set()
for chunk in batch_iter(event_ids, 1000):
remaining_event_ids_from_chunk = await self._have_seen_events_dict(chunk)
remaining_event_ids.update(remaining_event_ids_from_chunk)
return results
return remaining_event_ids
@cachedList(cached_method_name="have_seen_event", list_name="keys")
async def _have_seen_events_dict(
self, keys: Collection[Tuple[str, str]]
) -> Dict[Tuple[str, str], bool]:
# @cachedList(cached_method_name="have_seen_event", list_name="event_ids")
async def _have_seen_events_dict(self, event_ids: Collection[str]) -> set[str]:
"""Helper for have_seen_events
Returns:
a dict {(room_id, event_id)-> bool}
The remaining set of events we haven't seen.
"""
# if the event cache contains the event, obviously we've seen it.
cache_results = {
(rid, eid)
for (rid, eid) in keys
if await self._get_event_cache.contains((eid,))
# if the event cache contains the event, obviously we've seen it.
event_cache_entry_map = self._get_events_from_local_cache(event_ids)
event_ids_in_cache = event_cache_entry_map.keys()
remaining_event_ids = {
event_id for event_id in event_ids if event_id not in event_ids_in_cache
}
results = dict.fromkeys(cache_results, True)
remaining = [k for k in keys if k not in cache_results]
if not remaining:
return results
if not remaining_event_ids:
return set()
def have_seen_events_txn(txn: LoggingTransaction) -> None:
nonlocal remaining_event_ids
# we deliberately do *not* query the database for room_id, to make the
# query an index-only lookup on `events_event_id_key`.
#
@@ -1507,23 +1512,24 @@ class EventsWorkerStore(SQLBaseStore):
sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause(
txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining]
txn.database_engine, "e.event_id", remaining_event_ids
)
txn.execute(sql + clause, args)
found_events = {eid for eid, in txn}
found_event_ids = {eid for eid, in txn}
# ... and then we can update the results for each key
results.update(
{(rid, eid): (eid in found_events) for (rid, eid) in remaining}
)
remaining_event_ids = {
event_id
for event_id in remaining_event_ids
if event_id not in found_event_ids
}
await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
return results
return remaining_event_ids
@cached(max_entries=100000, tree=True)
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
res = await self._have_seen_events_dict(((room_id, event_id),))
return res[(room_id, event_id)]
remaining_event_ids = await self._have_seen_events_dict({event_id})
return event_id not in remaining_event_ids
def _get_current_state_event_counts_txn(
self, txn: LoggingTransaction, room_id: str
+46 -14
View File
@@ -30,7 +30,7 @@ from synapse.logging.context import (
run_in_background,
)
from synapse.logging.opentracing import start_active_span
from synapse.metrics import Histogram
from synapse.metrics import Histogram, LaterGauge
from synapse.util import Clock
if typing.TYPE_CHECKING:
@@ -74,6 +74,27 @@ class FederationRateLimiter:
str, "_PerHostRatelimiter"
] = collections.defaultdict(new_limiter)
# We track the number of affected hosts per time-period so we can
# differentiate one really noisy homeserver from a general
# ratelimit tuning problem across the federation.
LaterGauge(
"synapse_rate_limit_sleep_affected_hosts",
"Number of hosts that had requests put to sleep",
[],
lambda: sum(
ratelimiter.should_sleep() for ratelimiter in self.ratelimiters.values()
),
)
LaterGauge(
"synapse_rate_limit_reject_affected_hosts",
"Number of hosts that had requests rejected",
[],
lambda: sum(
ratelimiter.should_reject()
for ratelimiter in self.ratelimiters.values()
),
)
def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]":
"""Used to ratelimit an incoming request from a given host
@@ -133,12 +154,33 @@ class _PerHostRatelimiter:
self.host = host
request_id = object()
ret = self._on_enter(request_id)
# Ideally we'd use `Deferred.fromCoroutine()` here, to save on redundant
# type-checking, but we'd need Twisted >= 21.2.
ret = defer.ensureDeferred(self._on_enter_with_tracing(request_id))
try:
yield ret
finally:
self._on_exit(request_id)
def should_reject(self) -> bool:
"""
Whether to reject the request if we already have too many queued up
(either sleeping or in the ready queue).
"""
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
return queue_size > self.reject_limit
def should_sleep(self) -> bool:
"""
Whether to sleep the request if we already have too many requests coming
through within the window.
"""
return len(self.request_times) > self.sleep_limit
async def _on_enter_with_tracing(self, request_id: object) -> None:
with start_active_span("ratelimit wait"), queue_wait_timer.time():
await self._on_enter(request_id)
def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
time_now = self.clock.time_msec()
@@ -149,8 +191,7 @@ class _PerHostRatelimiter:
# reject the request if we already have too many queued up (either
# sleeping or in the ready queue).
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
if queue_size > self.reject_limit:
if self.should_reject():
logger.debug("Ratelimiter(%s): rejecting request", self.host)
rate_limit_reject_counter.inc()
raise LimitExceededError(
@@ -180,7 +221,7 @@ class _PerHostRatelimiter:
len(self.request_times),
)
if len(self.request_times) > self.sleep_limit:
if self.should_sleep():
logger.debug(
"Ratelimiter(%s) [%s]: sleeping request for %f sec",
self.host,
@@ -222,17 +263,8 @@ class _PerHostRatelimiter:
# Ensure that we've properly cleaned up.
self.sleeping_requests.discard(request_id)
self.ready_request_queue.pop(request_id, None)
wait_span_scope.__exit__(None, None, None)
wait_timer_cm.__exit__(None, None, None)
return r
# Tracing
wait_span_scope = start_active_span("ratelimit wait")
wait_span_scope.__enter__()
# Metrics
wait_timer_cm = queue_wait_timer.time()
wait_timer_cm.__enter__()
ret_defer.addCallbacks(on_start, on_err)
ret_defer.addBoth(on_both)
return make_deferred_yieldable(ret_defer)
@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import json
from contextlib import contextmanager
from typing import Generator, List, Tuple
@@ -36,6 +37,8 @@ from synapse.util.async_helpers import yieldable_gather_results
from tests import unittest
logger = logging.getLogger(__name__)
class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
@@ -91,6 +94,71 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
)
self.event_ids.append(event_id)
# TODO: Remove me before merging
def test_benchmark(self):
import time
room_id = "room123"
event_ids = []
setup_start_time = time.time()
with LoggingContext(name="test-setup") as ctx:
for i in range(50000):
event_json = {"type": f"test {i}", "room_id": room_id}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id
event_ids.append(event_id)
self.get_success(
self.store.db_pool.simple_insert(
"events",
{
"event_id": event_id,
"room_id": room_id,
"topological_ordering": i,
"stream_ordering": 123 + i,
"type": event.type,
"processed": True,
"outlier": False,
},
)
)
setup_end_time = time.time()
logger.info(
"Setup time: %s",
(setup_end_time - setup_start_time),
)
with LoggingContext(name="test") as ctx:
def time_have_seen_events(test_prefix: str, event_ids):
benchmark_start_time = time.time()
remaining_event_ids = self.get_success(
self.store.have_seen_events(room_id, event_ids)
)
benchmark_end_time = time.time()
logger.info(
"Benchmark time (%s): %s",
"{test_prefix: <13}".format(test_prefix=test_prefix),
(benchmark_end_time - benchmark_start_time),
)
self.assertIsNotNone(remaining_event_ids)
event_ids_odd = event_ids[::2]
event_ids_even = event_ids[1::2]
time_have_seen_events("1 cold cache", event_ids)
time_have_seen_events("2, warm cache", event_ids)
time_have_seen_events("3, warm cache", event_ids)
time_have_seen_events("4, odds", event_ids_odd)
time_have_seen_events("5, odds", event_ids_odd)
time_have_seen_events("6, evens", event_ids_even)
time_have_seen_events("7, evens", event_ids_even)
# that should result in a many db queries
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
def test_simple(self):
with LoggingContext(name="test") as ctx:
res = self.get_success(