Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0dbc7501ab | |||
| fd941cdcbf | |||
| 2fdbca62e4 | |||
| 7cb07d3a03 | |||
| 2c42673a9b | |||
| b251cff819 | |||
| d64653d062 | |||
| 22ea51faf9 | |||
| 84169a82dc |
@@ -0,0 +1 @@
|
||||
Faster room joins: Avoid blocking lazy-loading `/sync`s during partial joins due to remote memberships. Pull remote memberships from auth events instead of the room state.
|
||||
@@ -0,0 +1 @@
|
||||
Add support for compression to federation responses.
|
||||
@@ -0,0 +1 @@
|
||||
Add metrics to track how the rate limiter is affecting requests (sleep/reject).
|
||||
@@ -0,0 +1 @@
|
||||
Update metrics to track `/messages` response time by room size.
|
||||
@@ -0,0 +1 @@
|
||||
Instrument `FederationStateIdsServlet` (`/state_ids`) for understandable traces in Jaeger.
|
||||
@@ -444,7 +444,7 @@ Sub-options for each listener include:
|
||||
* `names`: a list of names of HTTP resources. See below for a list of valid resource names.
|
||||
|
||||
* `compress`: set to true to enable gzip compression on HTTP bodies for this resource. This is currently only supported with the
|
||||
`client`, `consent` and `metrics` resources.
|
||||
`client`, `consent`, `metrics` and `federation` resources.
|
||||
|
||||
* `additional_resources`: Only valid for an 'http' listener. A map of
|
||||
additional endpoints which should be loaded via dynamic modules.
|
||||
|
||||
@@ -220,7 +220,10 @@ class SynapseHomeServer(HomeServer):
|
||||
resources.update({"/_matrix/consent": consent_resource})
|
||||
|
||||
if name == "federation":
|
||||
resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
|
||||
federation_resource: Resource = TransportLayerServer(self)
|
||||
if compress:
|
||||
federation_resource = gz_wrap(federation_resource)
|
||||
resources.update({FEDERATION_PREFIX: federation_resource})
|
||||
|
||||
if name == "openid":
|
||||
resources.update(
|
||||
|
||||
@@ -1031,6 +1031,12 @@ class FederationEventHandler:
|
||||
InvalidResponseError: if the remote homeserver's response contains fields
|
||||
of the wrong type.
|
||||
"""
|
||||
|
||||
# It would be better if we could query the difference from our known
|
||||
# state to the given `event_id` so the sending server doesn't have to
|
||||
# send as much and we don't have to process so many events. For example
|
||||
# in a room like #matrixhq, we get 200k events (77k state_events, 122k
|
||||
# auth_events) from this and just the `have_seen_events` takes 20s.
|
||||
(
|
||||
state_event_ids,
|
||||
auth_event_ids,
|
||||
|
||||
+224
-31
@@ -16,9 +16,11 @@ import logging
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Collection,
|
||||
Dict,
|
||||
FrozenSet,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
@@ -517,10 +519,17 @@ class SyncHandler:
|
||||
# ensure that we always include current state in the timeline
|
||||
current_state_ids: FrozenSet[str] = frozenset()
|
||||
if any(e.is_state() for e in recents):
|
||||
# FIXME(faster_joins): We use the partial state here as
|
||||
# we don't want to block `/sync` on finishing a lazy join.
|
||||
# Which should be fine once
|
||||
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
|
||||
# since we shouldn't reach here anymore?
|
||||
# Note that we use the current state as a whitelist for filtering
|
||||
# `recents`, so partial state is only a problem when a membership
|
||||
# event turns up in `recents` but has not made it into the current
|
||||
# state.
|
||||
current_state_ids_map = (
|
||||
await self._state_storage_controller.get_current_state_ids(
|
||||
room_id
|
||||
)
|
||||
await self.store.get_partial_current_state_ids(room_id)
|
||||
)
|
||||
current_state_ids = frozenset(current_state_ids_map.values())
|
||||
|
||||
@@ -589,7 +598,13 @@ class SyncHandler:
|
||||
if any(e.is_state() for e in loaded_recents):
|
||||
# FIXME(faster_joins): We use the partial state here as
|
||||
# we don't want to block `/sync` on finishing a lazy join.
|
||||
# Is this the correct way of doing it?
|
||||
# Which should be fine once
|
||||
# https://github.com/matrix-org/synapse/issues/12989 is resolved,
|
||||
# since we shouldn't reach here anymore?
|
||||
# Note that we use the current state as a whitelist for filtering
|
||||
# `loaded_recents`, so partial state is only a problem when a
|
||||
# membership event turns up in `loaded_recents` but has not made it
|
||||
# into the current state.
|
||||
current_state_ids_map = (
|
||||
await self.store.get_partial_current_state_ids(room_id)
|
||||
)
|
||||
@@ -637,7 +652,10 @@ class SyncHandler:
|
||||
)
|
||||
|
||||
async def get_state_after_event(
|
||||
self, event_id: str, state_filter: Optional[StateFilter] = None
|
||||
self,
|
||||
event_id: str,
|
||||
state_filter: Optional[StateFilter] = None,
|
||||
await_full_state: bool = True,
|
||||
) -> StateMap[str]:
|
||||
"""
|
||||
Get the room state after the given event
|
||||
@@ -645,9 +663,14 @@ class SyncHandler:
|
||||
Args:
|
||||
event_id: event of interest
|
||||
state_filter: The state filter used to fetch state from the database.
|
||||
await_full_state: if `True`, will block if we do not yet have complete state
|
||||
at the event and `state_filter` is not satisfied by partial state.
|
||||
Defaults to `True`.
|
||||
"""
|
||||
state_ids = await self._state_storage_controller.get_state_ids_for_event(
|
||||
event_id, state_filter=state_filter or StateFilter.all()
|
||||
event_id,
|
||||
state_filter=state_filter or StateFilter.all(),
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
# using get_metadata_for_events here (instead of get_event) sidesteps an issue
|
||||
@@ -670,6 +693,7 @@ class SyncHandler:
|
||||
room_id: str,
|
||||
stream_position: StreamToken,
|
||||
state_filter: Optional[StateFilter] = None,
|
||||
await_full_state: bool = True,
|
||||
) -> StateMap[str]:
|
||||
"""Get the room state at a particular stream position
|
||||
|
||||
@@ -677,6 +701,9 @@ class SyncHandler:
|
||||
room_id: room for which to get state
|
||||
stream_position: point at which to get state
|
||||
state_filter: The state filter used to fetch state from the database.
|
||||
await_full_state: if `True`, will block if we do not yet have complete state
|
||||
at the last event in the room before `stream_position` and
|
||||
`state_filter` is not satisfied by partial state. Defaults to `True`.
|
||||
"""
|
||||
# FIXME: This gets the state at the latest event before the stream ordering,
|
||||
# which might not be the same as the "current state" of the room at the time
|
||||
@@ -688,7 +715,9 @@ class SyncHandler:
|
||||
|
||||
if last_event_id:
|
||||
state = await self.get_state_after_event(
|
||||
last_event_id, state_filter=state_filter or StateFilter.all()
|
||||
last_event_id,
|
||||
state_filter=state_filter or StateFilter.all(),
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
else:
|
||||
@@ -891,7 +920,15 @@ class SyncHandler:
|
||||
with Measure(self.clock, "compute_state_delta"):
|
||||
# The memberships needed for events in the timeline.
|
||||
# Only calculated when `lazy_load_members` is on.
|
||||
members_to_fetch = None
|
||||
members_to_fetch: Optional[Set[str]] = None
|
||||
|
||||
# A dictionary mapping user IDs to the first event in the timeline sent by
|
||||
# them. Only calculated when `lazy_load_members` is on.
|
||||
first_event_by_sender_map: Optional[Dict[str, EventBase]] = None
|
||||
|
||||
# The contribution to the room state from state events in the timeline.
|
||||
# Only contains the last event for any given state key.
|
||||
timeline_state: StateMap[str]
|
||||
|
||||
lazy_load_members = sync_config.filter_collection.lazy_load_members()
|
||||
include_redundant_members = (
|
||||
@@ -902,10 +939,23 @@ class SyncHandler:
|
||||
# We only request state for the members needed to display the
|
||||
# timeline:
|
||||
|
||||
members_to_fetch = {
|
||||
event.sender # FIXME: we also care about invite targets etc.
|
||||
for event in batch.events
|
||||
}
|
||||
timeline_state = {}
|
||||
|
||||
members_to_fetch = set()
|
||||
first_event_by_sender_map = {}
|
||||
for event in batch.events:
|
||||
# Build the map from user IDs to the first timeline event they sent.
|
||||
if event.sender not in first_event_by_sender_map:
|
||||
first_event_by_sender_map[event.sender] = event
|
||||
|
||||
# We need the event's sender, unless their membership was in a
|
||||
# previous timeline event.
|
||||
if (EventTypes.Member, event.sender) not in timeline_state:
|
||||
members_to_fetch.add(event.sender)
|
||||
# FIXME: we also care about invite targets etc.
|
||||
|
||||
if event.is_state():
|
||||
timeline_state[(event.type, event.state_key)] = event.event_id
|
||||
|
||||
if full_state:
|
||||
# always make sure we LL ourselves so we know we're in the room
|
||||
@@ -915,16 +965,21 @@ class SyncHandler:
|
||||
members_to_fetch.add(sync_config.user.to_string())
|
||||
|
||||
state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
|
||||
else:
|
||||
state_filter = StateFilter.all()
|
||||
|
||||
# The contribution to the room state from state events in the timeline.
|
||||
# Only contains the last event for any given state key.
|
||||
timeline_state = {
|
||||
(event.type, event.state_key): event.event_id
|
||||
for event in batch.events
|
||||
if event.is_state()
|
||||
}
|
||||
# We are happy to use partial state to compute the `/sync` response.
|
||||
# Since partial state may not include the lazy-loaded memberships we
|
||||
# require, we fix up the state response afterwards with memberships from
|
||||
# auth events.
|
||||
await_full_state = False
|
||||
else:
|
||||
timeline_state = {
|
||||
(event.type, event.state_key): event.event_id
|
||||
for event in batch.events
|
||||
if event.is_state()
|
||||
}
|
||||
|
||||
state_filter = StateFilter.all()
|
||||
await_full_state = True
|
||||
|
||||
# Now calculate the state to return in the sync response for the room.
|
||||
# This is more or less the change in state between the end of the previous
|
||||
@@ -936,19 +991,26 @@ class SyncHandler:
|
||||
if batch:
|
||||
state_at_timeline_end = (
|
||||
await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[-1].event_id, state_filter=state_filter
|
||||
batch.events[-1].event_id,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
)
|
||||
|
||||
state_at_timeline_start = (
|
||||
await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[0].event_id, state_filter=state_filter
|
||||
batch.events[0].event_id,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
)
|
||||
|
||||
else:
|
||||
state_at_timeline_end = await self.get_state_at(
|
||||
room_id, stream_position=now_token, state_filter=state_filter
|
||||
room_id,
|
||||
stream_position=now_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
state_at_timeline_start = state_at_timeline_end
|
||||
@@ -964,14 +1026,19 @@ class SyncHandler:
|
||||
if batch:
|
||||
state_at_timeline_start = (
|
||||
await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[0].event_id, state_filter=state_filter
|
||||
batch.events[0].event_id,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# We can get here if the user has ignored the senders of all
|
||||
# the recent events.
|
||||
state_at_timeline_start = await self.get_state_at(
|
||||
room_id, stream_position=now_token, state_filter=state_filter
|
||||
room_id,
|
||||
stream_position=now_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
# for now, we disable LL for gappy syncs - see
|
||||
@@ -993,20 +1060,28 @@ class SyncHandler:
|
||||
# is indeed the case.
|
||||
assert since_token is not None
|
||||
state_at_previous_sync = await self.get_state_at(
|
||||
room_id, stream_position=since_token, state_filter=state_filter
|
||||
room_id,
|
||||
stream_position=since_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
if batch:
|
||||
state_at_timeline_end = (
|
||||
await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[-1].event_id, state_filter=state_filter
|
||||
batch.events[-1].event_id,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# We can get here if the user has ignored the senders of all
|
||||
# the recent events.
|
||||
state_at_timeline_end = await self.get_state_at(
|
||||
room_id, stream_position=now_token, state_filter=state_filter
|
||||
room_id,
|
||||
stream_position=now_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
state_ids = _calculate_state(
|
||||
@@ -1036,8 +1111,23 @@ class SyncHandler:
|
||||
(EventTypes.Member, member)
|
||||
for member in members_to_fetch
|
||||
),
|
||||
await_full_state=False,
|
||||
)
|
||||
|
||||
# If we only have partial state for the room, `state_ids` may be missing the
|
||||
# memberships we wanted. We attempt to find some by digging through the auth
|
||||
# events of timeline events.
|
||||
if lazy_load_members and await self.store.is_partial_state_room(room_id):
|
||||
assert members_to_fetch is not None
|
||||
assert first_event_by_sender_map is not None
|
||||
|
||||
additional_state_ids = (
|
||||
await self._find_missing_partial_state_memberships(
|
||||
room_id, members_to_fetch, first_event_by_sender_map, state_ids
|
||||
)
|
||||
)
|
||||
state_ids = {**state_ids, **additional_state_ids}
|
||||
|
||||
# At this point, if `lazy_load_members` is enabled, `state_ids` includes
|
||||
# the memberships of all event senders in the timeline. This is because we
|
||||
# may not have sent the memberships in a previous sync.
|
||||
@@ -1086,6 +1176,99 @@ class SyncHandler:
|
||||
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
|
||||
}
|
||||
|
||||
async def _find_missing_partial_state_memberships(
|
||||
self,
|
||||
room_id: str,
|
||||
members_to_fetch: Collection[str],
|
||||
events_with_membership_auth: Mapping[str, EventBase],
|
||||
found_state_ids: StateMap[str],
|
||||
) -> StateMap[str]:
|
||||
"""Finds missing memberships from a set of auth events and returns them as a
|
||||
state map.
|
||||
|
||||
Args:
|
||||
room_id: The partial state room to find the remaining memberships for.
|
||||
members_to_fetch: The memberships to find.
|
||||
events_with_membership_auth: A mapping from user IDs to events whose auth
|
||||
events are known to contain their membership.
|
||||
found_state_ids: A dict from (type, state_key) -> state_event_id, containing
|
||||
memberships that have been previously found. Entries in
|
||||
`members_to_fetch` that have a membership in `found_state_ids` are
|
||||
ignored.
|
||||
|
||||
Returns:
|
||||
A dict from ("m.room.member", state_key) -> state_event_id, containing the
|
||||
memberships missing from `found_state_ids`.
|
||||
|
||||
Raises:
|
||||
KeyError: if `events_with_membership_auth` does not have an entry for a
|
||||
missing membership. Memberships in `found_state_ids` do not need an
|
||||
entry in `events_with_membership_auth`.
|
||||
"""
|
||||
additional_state_ids: MutableStateMap[str] = {}
|
||||
|
||||
# Tracks the missing members for logging purposes.
|
||||
missing_members = set()
|
||||
|
||||
# Identify memberships missing from `found_state_ids` and pick out the auth
|
||||
# events in which to look for them.
|
||||
auth_event_ids: Set[str] = set()
|
||||
for member in members_to_fetch:
|
||||
if (EventTypes.Member, member) in found_state_ids:
|
||||
continue
|
||||
|
||||
missing_members.add(member)
|
||||
event_with_membership_auth = events_with_membership_auth[member]
|
||||
auth_event_ids.update(event_with_membership_auth.auth_event_ids())
|
||||
|
||||
auth_events = await self.store.get_events(auth_event_ids)
|
||||
|
||||
# Run through the missing memberships once more, picking out the memberships
|
||||
# from the pile of auth events we have just fetched.
|
||||
for member in members_to_fetch:
|
||||
if (EventTypes.Member, member) in found_state_ids:
|
||||
continue
|
||||
|
||||
event_with_membership_auth = events_with_membership_auth[member]
|
||||
|
||||
# Dig through the auth events to find the desired membership.
|
||||
for auth_event_id in event_with_membership_auth.auth_event_ids():
|
||||
# We only store events once we have all their auth events,
|
||||
# so the auth event must be in the pile we have just
|
||||
# fetched.
|
||||
auth_event = auth_events[auth_event_id]
|
||||
|
||||
if (
|
||||
auth_event.type == EventTypes.Member
|
||||
and auth_event.state_key == member
|
||||
):
|
||||
missing_members.remove(member)
|
||||
additional_state_ids[
|
||||
(EventTypes.Member, member)
|
||||
] = auth_event.event_id
|
||||
break
|
||||
|
||||
if missing_members:
|
||||
# There really shouldn't be any missing memberships now. Either:
|
||||
# * we couldn't find an auth event, which shouldn't happen because we do
|
||||
# not persist events with persisting their auth events first, or
|
||||
# * the set of auth events did not contain a membership we wanted, which
|
||||
# means our caller didn't compute the events in `members_to_fetch`
|
||||
# correctly, or we somehow accepted an event whose auth events were
|
||||
# dodgy.
|
||||
logger.error(
|
||||
"Failed to find memberships for %s in partial state room "
|
||||
"%s in the auth events of %s.",
|
||||
missing_members,
|
||||
room_id,
|
||||
[
|
||||
events_with_membership_auth[member].event_id
|
||||
for member in missing_members
|
||||
],
|
||||
)
|
||||
|
||||
return additional_state_ids
|
||||
|
||||
async def unread_notifs_for_room_id(
|
||||
self, room_id: str, sync_config: SyncConfig
|
||||
) -> NotifCounts:
|
||||
@@ -1730,7 +1913,11 @@ class SyncHandler:
|
||||
continue
|
||||
|
||||
if room_id in sync_result_builder.joined_room_ids or has_join:
|
||||
old_state_ids = await self.get_state_at(room_id, since_token)
|
||||
old_state_ids = await self.get_state_at(
|
||||
room_id,
|
||||
since_token,
|
||||
state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
|
||||
)
|
||||
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
|
||||
old_mem_ev = None
|
||||
if old_mem_ev_id:
|
||||
@@ -1756,7 +1943,13 @@ class SyncHandler:
|
||||
newly_left_rooms.append(room_id)
|
||||
else:
|
||||
if not old_state_ids:
|
||||
old_state_ids = await self.get_state_at(room_id, since_token)
|
||||
old_state_ids = await self.get_state_at(
|
||||
room_id,
|
||||
since_token,
|
||||
state_filter=StateFilter.from_types(
|
||||
[(EventTypes.Member, user_id)]
|
||||
),
|
||||
)
|
||||
old_mem_ev_id = old_state_ids.get(
|
||||
(EventTypes.Member, user_id), None
|
||||
)
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
""" This module contains REST servlets to do with rooms: /rooms/<paths> """
|
||||
import logging
|
||||
import re
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Awaitable, Dict, List, Optional, Tuple
|
||||
from urllib import parse as urlparse
|
||||
|
||||
@@ -48,6 +49,7 @@ from synapse.http.servlet import (
|
||||
parse_strings_from_args,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.opentracing import set_tag
|
||||
from synapse.rest.client._base import client_patterns
|
||||
from synapse.rest.client.transactions import HttpTransactionCache
|
||||
@@ -62,6 +64,33 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _RoomSize(Enum):
|
||||
"""
|
||||
Enum to differentiate sizes of rooms. This is a pretty good approximation
|
||||
about how hard it will be to get events in the room. We could also look at
|
||||
room "complexity".
|
||||
"""
|
||||
|
||||
# This doesn't necessarily mean the room is a DM, just that there is a DM
|
||||
# amount of people there.
|
||||
DM_SIZE = "direct_message_size"
|
||||
SMALL = "small"
|
||||
SUBSTANTIAL = "substantial"
|
||||
LARGE = "large"
|
||||
|
||||
@staticmethod
|
||||
def from_member_count(member_count: int) -> "_RoomSize":
|
||||
if member_count <= 2:
|
||||
return _RoomSize.DM_SIZE
|
||||
elif member_count < 100:
|
||||
return _RoomSize.SMALL
|
||||
elif member_count < 1000:
|
||||
return _RoomSize.SUBSTANTIAL
|
||||
else:
|
||||
return _RoomSize.LARGE
|
||||
|
||||
|
||||
# This is an extra metric on top of `synapse_http_server_response_time_seconds`
|
||||
# which times the same sort of thing but this one allows us to see values
|
||||
# greater than 10s. We use a separate dedicated histogram with its own buckets
|
||||
@@ -70,7 +99,11 @@ logger = logging.getLogger(__name__)
|
||||
messsages_response_timer = Histogram(
|
||||
"synapse_room_message_list_rest_servlet_response_time_seconds",
|
||||
"sec",
|
||||
[],
|
||||
# We have a label for room size so we can try to see a more realistic
|
||||
# picture of /messages response time for bigger rooms. We don't want the
|
||||
# tiny rooms that can always respond fast skewing our results when we're trying
|
||||
# to optimize the bigger cases.
|
||||
["room_size"],
|
||||
buckets=(
|
||||
0.005,
|
||||
0.01,
|
||||
@@ -587,14 +620,26 @@ class RoomMessageListRestServlet(RestServlet):
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
super().__init__()
|
||||
self._hs = hs
|
||||
self.clock = hs.get_clock()
|
||||
self.pagination_handler = hs.get_pagination_handler()
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastores().main
|
||||
|
||||
@messsages_response_timer.time()
|
||||
async def on_GET(
|
||||
self, request: SynapseRequest, room_id: str
|
||||
) -> Tuple[int, JsonDict]:
|
||||
processing_start_time = self.clock.time_msec()
|
||||
# Fire off and hope that we get a result by the end.
|
||||
#
|
||||
# We're using the mypy type ignore comment because the `@cached`
|
||||
# decorator on `get_number_joined_users_in_room` doesn't play well with
|
||||
# the type system. Maybe in the future, it can use some ParamSpec
|
||||
# wizardry to fix it up.
|
||||
room_member_count_deferred = run_in_background( # type: ignore[call-arg]
|
||||
self.store.get_number_joined_users_in_room,
|
||||
room_id, # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
||||
pagination_config = await PaginationConfig.from_request(
|
||||
self.store, request, default_limit=10
|
||||
@@ -625,6 +670,12 @@ class RoomMessageListRestServlet(RestServlet):
|
||||
event_filter=event_filter,
|
||||
)
|
||||
|
||||
processing_end_time = self.clock.time_msec()
|
||||
room_member_count = await make_deferred_yieldable(room_member_count_deferred)
|
||||
messsages_response_timer.labels(
|
||||
room_size=_RoomSize.from_member_count(room_member_count)
|
||||
).observe((processing_start_time - processing_end_time) / 1000)
|
||||
|
||||
return 200, msgs
|
||||
|
||||
|
||||
|
||||
@@ -234,6 +234,7 @@ class StateStorageController:
|
||||
self,
|
||||
event_ids: Collection[str],
|
||||
state_filter: Optional[StateFilter] = None,
|
||||
await_full_state: bool = True,
|
||||
) -> Dict[str, StateMap[str]]:
|
||||
"""
|
||||
Get the state dicts corresponding to a list of events, containing the event_ids
|
||||
@@ -242,6 +243,9 @@ class StateStorageController:
|
||||
Args:
|
||||
event_ids: events whose state should be returned
|
||||
state_filter: The state filter used to fetch state from the database.
|
||||
await_full_state: if `True`, will block if we do not yet have complete state
|
||||
at these events and `state_filter` is not satisfied by partial state.
|
||||
Defaults to `True`.
|
||||
|
||||
Returns:
|
||||
A dict from event_id -> (type, state_key) -> event_id
|
||||
@@ -250,8 +254,12 @@ class StateStorageController:
|
||||
RuntimeError if we don't have a state group for one or more of the events
|
||||
(ie they are outliers or unknown)
|
||||
"""
|
||||
await_full_state = True
|
||||
if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
|
||||
if (
|
||||
await_full_state
|
||||
and state_filter
|
||||
and not state_filter.must_await_full_state(self._is_mine_id)
|
||||
):
|
||||
# Full state is not required if the state filter is restrictive enough.
|
||||
await_full_state = False
|
||||
|
||||
event_to_groups = await self.get_state_group_for_events(
|
||||
@@ -294,7 +302,10 @@ class StateStorageController:
|
||||
|
||||
@trace
|
||||
async def get_state_ids_for_event(
|
||||
self, event_id: str, state_filter: Optional[StateFilter] = None
|
||||
self,
|
||||
event_id: str,
|
||||
state_filter: Optional[StateFilter] = None,
|
||||
await_full_state: bool = True,
|
||||
) -> StateMap[str]:
|
||||
"""
|
||||
Get the state dict corresponding to a particular event
|
||||
@@ -302,6 +313,9 @@ class StateStorageController:
|
||||
Args:
|
||||
event_id: event whose state should be returned
|
||||
state_filter: The state filter used to fetch state from the database.
|
||||
await_full_state: if `True`, will block if we do not yet have complete state
|
||||
at the event and `state_filter` is not satisfied by partial state.
|
||||
Defaults to `True`.
|
||||
|
||||
Returns:
|
||||
A dict from (type, state_key) -> state_event_id
|
||||
@@ -311,7 +325,9 @@ class StateStorageController:
|
||||
outlier or is unknown)
|
||||
"""
|
||||
state_map = await self.get_state_ids_for_events(
|
||||
[event_id], state_filter or StateFilter.all()
|
||||
[event_id],
|
||||
state_filter or StateFilter.all(),
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
return state_map[event_id]
|
||||
|
||||
|
||||
@@ -54,7 +54,13 @@ from synapse.logging.context import (
|
||||
current_context,
|
||||
make_deferred_yieldable,
|
||||
)
|
||||
from synapse.logging.opentracing import start_active_span, tag_args, trace
|
||||
from synapse.logging.opentracing import (
|
||||
SynapseTags,
|
||||
set_tag,
|
||||
start_active_span,
|
||||
tag_args,
|
||||
trace,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import (
|
||||
run_as_background_process,
|
||||
wrap_as_background_process,
|
||||
@@ -1449,7 +1455,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
@trace
|
||||
@tag_args
|
||||
async def have_seen_events(
|
||||
self, room_id: str, event_ids: Iterable[str]
|
||||
self, room_id: str, event_ids: Collection[str]
|
||||
) -> Set[str]:
|
||||
"""Given a list of event ids, check if we have already processed them.
|
||||
|
||||
@@ -1462,44 +1468,43 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
event_ids: events we are looking for
|
||||
|
||||
Returns:
|
||||
The set of events we have already seen.
|
||||
The remaining set of events we haven't seen.
|
||||
"""
|
||||
set_tag(
|
||||
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
|
||||
str(len(event_ids)),
|
||||
)
|
||||
|
||||
# @cachedList chomps lots of memory if you call it with a big list, so
|
||||
# we break it down. However, each batch requires its own index scan, so we make
|
||||
# the batches as big as possible.
|
||||
|
||||
results: Set[str] = set()
|
||||
for chunk in batch_iter(event_ids, 500):
|
||||
r = await self._have_seen_events_dict(
|
||||
[(room_id, event_id) for event_id in chunk]
|
||||
)
|
||||
results.update(eid for ((_rid, eid), have_event) in r.items() if have_event)
|
||||
remaining_event_ids: Set[str] = set()
|
||||
for chunk in batch_iter(event_ids, 1000):
|
||||
remaining_event_ids_from_chunk = await self._have_seen_events_dict(chunk)
|
||||
remaining_event_ids.update(remaining_event_ids_from_chunk)
|
||||
|
||||
return results
|
||||
return remaining_event_ids
|
||||
|
||||
@cachedList(cached_method_name="have_seen_event", list_name="keys")
|
||||
async def _have_seen_events_dict(
|
||||
self, keys: Collection[Tuple[str, str]]
|
||||
) -> Dict[Tuple[str, str], bool]:
|
||||
# @cachedList(cached_method_name="have_seen_event", list_name="event_ids")
|
||||
async def _have_seen_events_dict(self, event_ids: Collection[str]) -> set[str]:
|
||||
"""Helper for have_seen_events
|
||||
|
||||
Returns:
|
||||
a dict {(room_id, event_id)-> bool}
|
||||
The remaining set of events we haven't seen.
|
||||
"""
|
||||
# if the event cache contains the event, obviously we've seen it.
|
||||
|
||||
cache_results = {
|
||||
(rid, eid)
|
||||
for (rid, eid) in keys
|
||||
if await self._get_event_cache.contains((eid,))
|
||||
# if the event cache contains the event, obviously we've seen it.
|
||||
event_cache_entry_map = self._get_events_from_local_cache(event_ids)
|
||||
event_ids_in_cache = event_cache_entry_map.keys()
|
||||
remaining_event_ids = {
|
||||
event_id for event_id in event_ids if event_id not in event_ids_in_cache
|
||||
}
|
||||
results = dict.fromkeys(cache_results, True)
|
||||
remaining = [k for k in keys if k not in cache_results]
|
||||
if not remaining:
|
||||
return results
|
||||
if not remaining_event_ids:
|
||||
return set()
|
||||
|
||||
def have_seen_events_txn(txn: LoggingTransaction) -> None:
|
||||
nonlocal remaining_event_ids
|
||||
# we deliberately do *not* query the database for room_id, to make the
|
||||
# query an index-only lookup on `events_event_id_key`.
|
||||
#
|
||||
@@ -1507,23 +1512,24 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
sql = "SELECT event_id FROM events AS e WHERE "
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining]
|
||||
txn.database_engine, "e.event_id", remaining_event_ids
|
||||
)
|
||||
txn.execute(sql + clause, args)
|
||||
found_events = {eid for eid, in txn}
|
||||
found_event_ids = {eid for eid, in txn}
|
||||
|
||||
# ... and then we can update the results for each key
|
||||
results.update(
|
||||
{(rid, eid): (eid in found_events) for (rid, eid) in remaining}
|
||||
)
|
||||
remaining_event_ids = {
|
||||
event_id
|
||||
for event_id in remaining_event_ids
|
||||
if event_id not in found_event_ids
|
||||
}
|
||||
|
||||
await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
|
||||
return results
|
||||
return remaining_event_ids
|
||||
|
||||
@cached(max_entries=100000, tree=True)
|
||||
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
|
||||
res = await self._have_seen_events_dict(((room_id, event_id),))
|
||||
return res[(room_id, event_id)]
|
||||
remaining_event_ids = await self._have_seen_events_dict({event_id})
|
||||
return event_id not in remaining_event_ids
|
||||
|
||||
def _get_current_state_event_counts_txn(
|
||||
self, txn: LoggingTransaction, room_id: str
|
||||
|
||||
@@ -30,7 +30,7 @@ from synapse.logging.context import (
|
||||
run_in_background,
|
||||
)
|
||||
from synapse.logging.opentracing import start_active_span
|
||||
from synapse.metrics import Histogram
|
||||
from synapse.metrics import Histogram, LaterGauge
|
||||
from synapse.util import Clock
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
@@ -74,6 +74,27 @@ class FederationRateLimiter:
|
||||
str, "_PerHostRatelimiter"
|
||||
] = collections.defaultdict(new_limiter)
|
||||
|
||||
# We track the number of affected hosts per time-period so we can
|
||||
# differentiate one really noisy homeserver from a general
|
||||
# ratelimit tuning problem across the federation.
|
||||
LaterGauge(
|
||||
"synapse_rate_limit_sleep_affected_hosts",
|
||||
"Number of hosts that had requests put to sleep",
|
||||
[],
|
||||
lambda: sum(
|
||||
ratelimiter.should_sleep() for ratelimiter in self.ratelimiters.values()
|
||||
),
|
||||
)
|
||||
LaterGauge(
|
||||
"synapse_rate_limit_reject_affected_hosts",
|
||||
"Number of hosts that had requests rejected",
|
||||
[],
|
||||
lambda: sum(
|
||||
ratelimiter.should_reject()
|
||||
for ratelimiter in self.ratelimiters.values()
|
||||
),
|
||||
)
|
||||
|
||||
def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]":
|
||||
"""Used to ratelimit an incoming request from a given host
|
||||
|
||||
@@ -133,12 +154,33 @@ class _PerHostRatelimiter:
|
||||
self.host = host
|
||||
|
||||
request_id = object()
|
||||
ret = self._on_enter(request_id)
|
||||
# Ideally we'd use `Deferred.fromCoroutine()` here, to save on redundant
|
||||
# type-checking, but we'd need Twisted >= 21.2.
|
||||
ret = defer.ensureDeferred(self._on_enter_with_tracing(request_id))
|
||||
try:
|
||||
yield ret
|
||||
finally:
|
||||
self._on_exit(request_id)
|
||||
|
||||
def should_reject(self) -> bool:
|
||||
"""
|
||||
Whether to reject the request if we already have too many queued up
|
||||
(either sleeping or in the ready queue).
|
||||
"""
|
||||
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
|
||||
return queue_size > self.reject_limit
|
||||
|
||||
def should_sleep(self) -> bool:
|
||||
"""
|
||||
Whether to sleep the request if we already have too many requests coming
|
||||
through within the window.
|
||||
"""
|
||||
return len(self.request_times) > self.sleep_limit
|
||||
|
||||
async def _on_enter_with_tracing(self, request_id: object) -> None:
|
||||
with start_active_span("ratelimit wait"), queue_wait_timer.time():
|
||||
await self._on_enter(request_id)
|
||||
|
||||
def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
@@ -149,8 +191,7 @@ class _PerHostRatelimiter:
|
||||
|
||||
# reject the request if we already have too many queued up (either
|
||||
# sleeping or in the ready queue).
|
||||
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
|
||||
if queue_size > self.reject_limit:
|
||||
if self.should_reject():
|
||||
logger.debug("Ratelimiter(%s): rejecting request", self.host)
|
||||
rate_limit_reject_counter.inc()
|
||||
raise LimitExceededError(
|
||||
@@ -180,7 +221,7 @@ class _PerHostRatelimiter:
|
||||
len(self.request_times),
|
||||
)
|
||||
|
||||
if len(self.request_times) > self.sleep_limit:
|
||||
if self.should_sleep():
|
||||
logger.debug(
|
||||
"Ratelimiter(%s) [%s]: sleeping request for %f sec",
|
||||
self.host,
|
||||
@@ -222,17 +263,8 @@ class _PerHostRatelimiter:
|
||||
# Ensure that we've properly cleaned up.
|
||||
self.sleeping_requests.discard(request_id)
|
||||
self.ready_request_queue.pop(request_id, None)
|
||||
wait_span_scope.__exit__(None, None, None)
|
||||
wait_timer_cm.__exit__(None, None, None)
|
||||
return r
|
||||
|
||||
# Tracing
|
||||
wait_span_scope = start_active_span("ratelimit wait")
|
||||
wait_span_scope.__enter__()
|
||||
# Metrics
|
||||
wait_timer_cm = queue_wait_timer.time()
|
||||
wait_timer_cm.__enter__()
|
||||
|
||||
ret_defer.addCallbacks(on_start, on_err)
|
||||
ret_defer.addBoth(on_both)
|
||||
return make_deferred_yieldable(ret_defer)
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import json
|
||||
from contextlib import contextmanager
|
||||
from typing import Generator, List, Tuple
|
||||
@@ -36,6 +37,8 @@ from synapse.util.async_helpers import yieldable_gather_results
|
||||
|
||||
from tests import unittest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
|
||||
def prepare(self, reactor, clock, hs):
|
||||
@@ -91,6 +94,71 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
self.event_ids.append(event_id)
|
||||
|
||||
# TODO: Remove me before merging
|
||||
def test_benchmark(self):
|
||||
import time
|
||||
|
||||
room_id = "room123"
|
||||
event_ids = []
|
||||
setup_start_time = time.time()
|
||||
with LoggingContext(name="test-setup") as ctx:
|
||||
for i in range(50000):
|
||||
event_json = {"type": f"test {i}", "room_id": room_id}
|
||||
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
|
||||
event_id = event.event_id
|
||||
|
||||
event_ids.append(event_id)
|
||||
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"events",
|
||||
{
|
||||
"event_id": event_id,
|
||||
"room_id": room_id,
|
||||
"topological_ordering": i,
|
||||
"stream_ordering": 123 + i,
|
||||
"type": event.type,
|
||||
"processed": True,
|
||||
"outlier": False,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
setup_end_time = time.time()
|
||||
logger.info(
|
||||
"Setup time: %s",
|
||||
(setup_end_time - setup_start_time),
|
||||
)
|
||||
|
||||
with LoggingContext(name="test") as ctx:
|
||||
|
||||
def time_have_seen_events(test_prefix: str, event_ids):
|
||||
benchmark_start_time = time.time()
|
||||
remaining_event_ids = self.get_success(
|
||||
self.store.have_seen_events(room_id, event_ids)
|
||||
)
|
||||
benchmark_end_time = time.time()
|
||||
logger.info(
|
||||
"Benchmark time (%s): %s",
|
||||
"{test_prefix: <13}".format(test_prefix=test_prefix),
|
||||
(benchmark_end_time - benchmark_start_time),
|
||||
)
|
||||
self.assertIsNotNone(remaining_event_ids)
|
||||
|
||||
event_ids_odd = event_ids[::2]
|
||||
event_ids_even = event_ids[1::2]
|
||||
|
||||
time_have_seen_events("1 cold cache", event_ids)
|
||||
time_have_seen_events("2, warm cache", event_ids)
|
||||
time_have_seen_events("3, warm cache", event_ids)
|
||||
time_have_seen_events("4, odds", event_ids_odd)
|
||||
time_have_seen_events("5, odds", event_ids_odd)
|
||||
time_have_seen_events("6, evens", event_ids_even)
|
||||
time_have_seen_events("7, evens", event_ids_even)
|
||||
|
||||
# that should result in a many db queries
|
||||
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
|
||||
|
||||
def test_simple(self):
|
||||
with LoggingContext(name="test") as ctx:
|
||||
res = self.get_success(
|
||||
|
||||
Reference in New Issue
Block a user