Compare commits
6 Commits
travis/msc
...
quenting/l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
724a48ecdd | ||
|
|
4cee8c7b99 | ||
|
|
4ac656073d | ||
|
|
3212526673 | ||
|
|
c0878ac9e6 | ||
|
|
76c9f09e09 |
@@ -61,7 +61,7 @@ poetry run update_synapse_database --database-config .ci/postgres-config-unporte
|
||||
echo "+++ Comparing ported schema with unported schema"
|
||||
# Ignore the tables that portdb creates. (Should it tidy them up when the porting is completed?)
|
||||
psql synapse -c "DROP TABLE port_from_sqlite3;"
|
||||
pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner synapse_unported > unported.sql
|
||||
pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner synapse > ported.sql
|
||||
pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner --restrict-key=TESTING synapse_unported > unported.sql
|
||||
pg_dump --format=plain --schema-only --no-tablespaces --no-acl --no-owner --restrict-key=TESTING synapse > ported.sql
|
||||
# By default, `diff` returns zero if there are no changes and nonzero otherwise
|
||||
diff -u unported.sql ported.sql | tee schema_diff
|
||||
diff -u unported.sql ported.sql | tee schema_diff
|
||||
|
||||
1
changelog.d/18746.bugfix
Normal file
1
changelog.d/18746.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a bug which could corrupt auth chains making it impossible to perform state resolution.
|
||||
1
changelog.d/18815.misc
Normal file
1
changelog.d/18815.misc
Normal file
@@ -0,0 +1 @@
|
||||
Instrument the `encode_response` part of Sliding Sync requests for more complete traces in Jaeger.
|
||||
1
changelog.d/18816.misc
Normal file
1
changelog.d/18816.misc
Normal file
@@ -0,0 +1 @@
|
||||
Tag Sliding Sync traces when we `wait_for_events`.
|
||||
1
changelog.d/18824.misc
Normal file
1
changelog.d/18824.misc
Normal file
@@ -0,0 +1 @@
|
||||
Fix portdb CI by hardcoding the new pg_dump restrict key that was added due to CVE-2025-8714.
|
||||
1
changelog.d/18832.bugfix
Normal file
1
changelog.d/18832.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Allow enabling MSC4108 when the stable Matrix Authentication Service integration is enabled.
|
||||
@@ -535,11 +535,15 @@ class ExperimentalConfig(Config):
|
||||
"msc4108_delegation_endpoint", None
|
||||
)
|
||||
|
||||
auth_delegated = self.msc3861.enabled or (
|
||||
config.get("matrix_authentication_service") or {}
|
||||
).get("enabled", False)
|
||||
|
||||
if (
|
||||
self.msc4108_enabled or self.msc4108_delegation_endpoint is not None
|
||||
) and not self.msc3861.enabled:
|
||||
) and not auth_delegated:
|
||||
raise ConfigError(
|
||||
"MSC4108 requires MSC3861 to be enabled",
|
||||
"MSC4108 requires MSC3861 or matrix_authentication_service to be enabled",
|
||||
("experimental", "msc4108_delegation_endpoint"),
|
||||
)
|
||||
|
||||
|
||||
@@ -26,8 +26,8 @@ from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Collection,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Match,
|
||||
@@ -49,6 +49,7 @@ from synapse.api.constants import (
|
||||
)
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.api.room_versions import RoomVersion
|
||||
from synapse.logging.opentracing import SynapseTags, set_tag, trace
|
||||
from synapse.types import JsonDict, Requester
|
||||
|
||||
from . import EventBase, StrippedStateEvent, make_event_from_dict
|
||||
@@ -710,9 +711,10 @@ class EventClientSerializer:
|
||||
"m.relations", {}
|
||||
).update(serialized_aggregations)
|
||||
|
||||
@trace
|
||||
async def serialize_events(
|
||||
self,
|
||||
events: Iterable[Union[JsonDict, EventBase]],
|
||||
events: Collection[Union[JsonDict, EventBase]],
|
||||
time_now: int,
|
||||
*,
|
||||
config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
|
||||
@@ -731,6 +733,11 @@ class EventClientSerializer:
|
||||
Returns:
|
||||
The list of serialized events
|
||||
"""
|
||||
set_tag(
|
||||
SynapseTags.FUNC_ARG_PREFIX + "events.length",
|
||||
str(len(events)),
|
||||
)
|
||||
|
||||
return [
|
||||
await self.serialize_event(
|
||||
event,
|
||||
@@ -870,7 +877,7 @@ def maybe_upsert_event_field(
|
||||
return upsert_okay
|
||||
|
||||
|
||||
def strip_event(event: EventBase, for_federation: Optional[bool] = False) -> JsonDict:
|
||||
def strip_event(event: EventBase) -> JsonDict:
|
||||
"""
|
||||
Used for "stripped state" events which provide a simplified view of the state of a
|
||||
room intended to help a potential joiner identify the room (relevant when the user
|
||||
@@ -879,10 +886,13 @@ def strip_event(event: EventBase, for_federation: Optional[bool] = False) -> Jso
|
||||
Stripped state events can only have the `sender`, `type`, `state_key` and `content`
|
||||
properties present.
|
||||
"""
|
||||
# MSC4311 makes all stripped state events fully-formed PDUs over federation,
|
||||
# especially the `m.room.create` event.
|
||||
# TODO: Implement the validation component of MSC4311
|
||||
if for_federation:
|
||||
# MSC4311: Ensure the create event is available on invites and knocks.
|
||||
# TODO: Implement the rest of MSC4311
|
||||
if (
|
||||
event.room_version.msc4291_room_ids_as_hashes
|
||||
and event.type == EventTypes.Create
|
||||
and event.get_state_key() == ""
|
||||
):
|
||||
return event.get_pdu_json()
|
||||
|
||||
return {
|
||||
|
||||
@@ -1336,16 +1336,10 @@ class FederationClient(FederationBase):
|
||||
room_id: str,
|
||||
event_id: str,
|
||||
pdu: EventBase,
|
||||
stripped_state: List[JsonDict],
|
||||
) -> EventBase:
|
||||
room_version = await self.store.get_room_version(room_id)
|
||||
|
||||
content = await self._do_send_invite(
|
||||
destination,
|
||||
pdu,
|
||||
room_version,
|
||||
stripped_state,
|
||||
)
|
||||
content = await self._do_send_invite(destination, pdu, room_version)
|
||||
|
||||
pdu_dict = content["event"]
|
||||
|
||||
@@ -1366,11 +1360,7 @@ class FederationClient(FederationBase):
|
||||
return pdu
|
||||
|
||||
async def _do_send_invite(
|
||||
self,
|
||||
destination: str,
|
||||
pdu: EventBase,
|
||||
room_version: RoomVersion,
|
||||
stripped_state: List[JsonDict],
|
||||
self, destination: str, pdu: EventBase, room_version: RoomVersion
|
||||
) -> JsonDict:
|
||||
"""Actually sends the invite, first trying v2 API and falling back to
|
||||
v1 API if necessary.
|
||||
@@ -1393,7 +1383,7 @@ class FederationClient(FederationBase):
|
||||
content={
|
||||
"event": pdu.get_pdu_json(time_now),
|
||||
"room_version": room_version.identifier,
|
||||
"invite_room_state": stripped_state,
|
||||
"invite_room_state": pdu.unsigned.get("invite_room_state", []),
|
||||
},
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
|
||||
@@ -907,7 +907,7 @@ class FederationServer(FederationBase):
|
||||
# related to the room while the knock request is pending.
|
||||
stripped_room_state = (
|
||||
await self.store.get_stripped_room_state_from_event_context(
|
||||
context, self._room_prejoin_state_types, for_federation=True,
|
||||
context, self._room_prejoin_state_types
|
||||
)
|
||||
)
|
||||
return {"knock_room_state": stripped_room_state}
|
||||
|
||||
@@ -547,7 +547,7 @@ class FederationHandler:
|
||||
|
||||
return False
|
||||
|
||||
async def send_invite(self, target_host: str, event: EventBase, stripped_state: List[JsonDict]) -> EventBase:
|
||||
async def send_invite(self, target_host: str, event: EventBase) -> EventBase:
|
||||
"""Sends the invite to the remote server for signing.
|
||||
|
||||
Invites must be signed by the invitee's server before distribution.
|
||||
@@ -558,7 +558,6 @@ class FederationHandler:
|
||||
room_id=event.room_id,
|
||||
event_id=event.event_id,
|
||||
pdu=event,
|
||||
stripped_state=stripped_state,
|
||||
)
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, f"Can't connect to server {target_host}")
|
||||
|
||||
@@ -1728,6 +1728,9 @@ class FederationEventHandler:
|
||||
event,
|
||||
auth_event_id,
|
||||
)
|
||||
# Drop the event from the auth_map too, else we may incorrectly persist
|
||||
# events which depend on this dropped event.
|
||||
auth_map.pop(event.event_id, None)
|
||||
return
|
||||
auth.append(ae)
|
||||
|
||||
|
||||
@@ -1973,17 +1973,8 @@ class EventCreationHandler:
|
||||
# way? If we have been invited by a remote server, we need
|
||||
# to get them to sign the event.
|
||||
|
||||
# As of MSC4311, "stripped" state events are formatted differently
|
||||
# over federation.
|
||||
stripped_state_fed = await self.store.get_stripped_room_state_from_event_context(
|
||||
context,
|
||||
self.room_prejoin_state_types,
|
||||
membership_user_id=event.sender,
|
||||
for_federation=True,
|
||||
)
|
||||
|
||||
returned_invite = await federation_handler.send_invite(
|
||||
invitee.domain, event, stripped_state_fed,
|
||||
invitee.domain, event
|
||||
)
|
||||
event.unsigned.pop("room_state", None)
|
||||
|
||||
|
||||
@@ -116,7 +116,7 @@ class SlidingSyncHandler:
|
||||
sync_config: SlidingSyncConfig,
|
||||
from_token: Optional[SlidingSyncStreamToken] = None,
|
||||
timeout_ms: int = 0,
|
||||
) -> SlidingSyncResult:
|
||||
) -> Tuple[SlidingSyncResult, bool]:
|
||||
"""
|
||||
Get the sync for a client if we have new data for it now. Otherwise
|
||||
wait for new data to arrive on the server. If the timeout expires, then
|
||||
@@ -128,9 +128,16 @@ class SlidingSyncHandler:
|
||||
from_token: The point in the stream to sync from. Token of the end of the
|
||||
previous batch. May be `None` if this is the initial sync request.
|
||||
timeout_ms: The time in milliseconds to wait for new data to arrive. If 0,
|
||||
we will immediately but there might not be any new data so we just return an
|
||||
empty response.
|
||||
we will respond immediately but there might not be any new data so we just
|
||||
return an empty response.
|
||||
|
||||
Returns:
|
||||
A tuple containing the `SlidingSyncResult` and whether we waited for new
|
||||
activity before responding. Knowing whether we waited is useful in traces
|
||||
to filter out long-running requests where we were just waiting.
|
||||
"""
|
||||
did_wait = False
|
||||
|
||||
# If the user is not part of the mau group, then check that limits have
|
||||
# not been exceeded (if not part of the group by this point, almost certain
|
||||
# auth_blocking will occur)
|
||||
@@ -149,7 +156,7 @@ class SlidingSyncHandler:
|
||||
logger.warning(
|
||||
"Timed out waiting for worker to catch up. Returning empty response"
|
||||
)
|
||||
return SlidingSyncResult.empty(from_token)
|
||||
return SlidingSyncResult.empty(from_token), did_wait
|
||||
|
||||
# If we've spent significant time waiting to catch up, take it off
|
||||
# the timeout.
|
||||
@@ -185,8 +192,9 @@ class SlidingSyncHandler:
|
||||
current_sync_callback,
|
||||
from_token=from_token.stream_token,
|
||||
)
|
||||
did_wait = True
|
||||
|
||||
return result
|
||||
return result, did_wait
|
||||
|
||||
@trace
|
||||
async def current_sync_for_user(
|
||||
|
||||
@@ -34,6 +34,7 @@ import logging
|
||||
import threading
|
||||
import typing
|
||||
import warnings
|
||||
from contextvars import ContextVar
|
||||
from types import TracebackType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
@@ -660,13 +661,12 @@ class PreserveLoggingContext:
|
||||
)
|
||||
|
||||
|
||||
_thread_local = threading.local()
|
||||
_thread_local.current_context = SENTINEL_CONTEXT
|
||||
_current_context: ContextVar[LoggingContextOrSentinel] = ContextVar("current_context")
|
||||
|
||||
|
||||
def current_context() -> LoggingContextOrSentinel:
|
||||
"""Get the current logging context from thread local storage"""
|
||||
return getattr(_thread_local, "current_context", SENTINEL_CONTEXT)
|
||||
return _current_context.get(SENTINEL_CONTEXT)
|
||||
|
||||
|
||||
def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel:
|
||||
@@ -687,7 +687,7 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe
|
||||
if current is not context:
|
||||
rusage = get_thread_resource_usage()
|
||||
current.stop(rusage)
|
||||
_thread_local.current_context = context
|
||||
_current_context.set(context)
|
||||
context.start(rusage)
|
||||
|
||||
return current
|
||||
@@ -803,7 +803,6 @@ def run_in_background(
|
||||
CRITICAL error about an unhandled error will be logged without much
|
||||
indication about where it came from.
|
||||
"""
|
||||
current = current_context()
|
||||
try:
|
||||
res = f(*args, **kwargs)
|
||||
except Exception:
|
||||
@@ -832,23 +831,6 @@ def run_in_background(
|
||||
# optimise out the messing about
|
||||
return d
|
||||
|
||||
# The function may have reset the context before returning, so
|
||||
# we need to restore it now.
|
||||
ctx = set_current_context(current)
|
||||
|
||||
# The original context will be restored when the deferred
|
||||
# completes, but there is nothing waiting for it, so it will
|
||||
# get leaked into the reactor or some other function which
|
||||
# wasn't expecting it. We therefore need to reset the context
|
||||
# here.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
# probably currently within a ``with LoggingContext()`` block,
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
d.addBoth(_set_context_cb, ctx)
|
||||
return d
|
||||
|
||||
|
||||
@@ -868,65 +850,19 @@ def run_coroutine_in_background(
|
||||
cannot change the log contexts.
|
||||
"""
|
||||
|
||||
current = current_context()
|
||||
d = defer.ensureDeferred(coroutine)
|
||||
|
||||
# The function may have reset the context before returning, so
|
||||
# we need to restore it now.
|
||||
ctx = set_current_context(current)
|
||||
|
||||
# The original context will be restored when the deferred
|
||||
# completes, but there is nothing waiting for it, so it will
|
||||
# get leaked into the reactor or some other function which
|
||||
# wasn't expecting it. We therefore need to reset the context
|
||||
# here.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
# probably currently within a ``with LoggingContext()`` block,
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
d.addBoth(_set_context_cb, ctx)
|
||||
return d
|
||||
return defer.ensureDeferred(coroutine)
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
|
||||
"""Given a deferred, make it follow the Synapse logcontext rules:
|
||||
|
||||
If the deferred has completed, essentially does nothing (just returns another
|
||||
completed deferred with the result/failure).
|
||||
|
||||
If the deferred has not yet completed, resets the logcontext before
|
||||
returning a deferred. Then, when the deferred completes, restores the
|
||||
current logcontext before running callbacks/errbacks.
|
||||
|
||||
(This is more-or-less the opposite operation to run_in_background.)
|
||||
"""
|
||||
if deferred.called and not deferred.paused:
|
||||
# it looks like this deferred is ready to run any callbacks we give it
|
||||
# immediately. We may as well optimise out the logcontext faffery.
|
||||
return deferred
|
||||
|
||||
# ok, we can't be sure that a yield won't block, so let's reset the
|
||||
# logcontext, and add a callback to the deferred to restore it.
|
||||
prev_context = set_current_context(SENTINEL_CONTEXT)
|
||||
deferred.addBoth(_set_context_cb, prev_context)
|
||||
return deferred
|
||||
|
||||
|
||||
ResultT = TypeVar("ResultT")
|
||||
|
||||
|
||||
def _set_context_cb(result: ResultT, context: LoggingContextOrSentinel) -> ResultT:
|
||||
"""A callback function which just sets the logging context"""
|
||||
set_current_context(context)
|
||||
return result
|
||||
|
||||
|
||||
def defer_to_thread(
|
||||
reactor: "ISynapseReactor", f: Callable[P, R], *args: P.args, **kwargs: P.kwargs
|
||||
) -> "defer.Deferred[R]":
|
||||
@@ -998,18 +934,6 @@ def defer_to_threadpool(
|
||||
A Deferred which fires a callback with the result of `f`, or an
|
||||
errback if `f` throws an exception.
|
||||
"""
|
||||
curr_context = current_context()
|
||||
if not curr_context:
|
||||
logger.warning(
|
||||
"Calling defer_to_threadpool from sentinel context: metrics will be lost"
|
||||
)
|
||||
parent_context = None
|
||||
else:
|
||||
assert isinstance(curr_context, LoggingContext)
|
||||
parent_context = curr_context
|
||||
|
||||
def g() -> R:
|
||||
with LoggingContext(str(curr_context), parent_context=parent_context):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
|
||||
return make_deferred_yieldable(
|
||||
threads.deferToThreadPool(reactor, threadpool, f, *args, **kwargs)
|
||||
)
|
||||
|
||||
@@ -994,12 +994,18 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
extensions=body.extensions,
|
||||
)
|
||||
|
||||
sliding_sync_results = await self.sliding_sync_handler.wait_for_sync_for_user(
|
||||
(
|
||||
sliding_sync_results,
|
||||
did_wait,
|
||||
) = await self.sliding_sync_handler.wait_for_sync_for_user(
|
||||
requester,
|
||||
sync_config,
|
||||
from_token,
|
||||
timeout,
|
||||
)
|
||||
# Knowing whether we waited is useful in traces to filter out long-running
|
||||
# requests where we were just waiting.
|
||||
set_tag("sliding_sync.did_wait", str(did_wait))
|
||||
|
||||
# The client may have disconnected by now; don't bother to serialize the
|
||||
# response if so.
|
||||
@@ -1011,6 +1017,7 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
|
||||
return 200, response_content
|
||||
|
||||
@trace_with_opname("sliding_sync.encode_response")
|
||||
async def encode_response(
|
||||
self,
|
||||
requester: Requester,
|
||||
@@ -1031,6 +1038,7 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
|
||||
return response
|
||||
|
||||
@trace_with_opname("sliding_sync.encode_lists")
|
||||
def encode_lists(
|
||||
self, lists: Mapping[str, SlidingSyncResult.SlidingWindowList]
|
||||
) -> JsonDict:
|
||||
@@ -1052,6 +1060,7 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
|
||||
return serialized_lists
|
||||
|
||||
@trace_with_opname("sliding_sync.encode_rooms")
|
||||
async def encode_rooms(
|
||||
self,
|
||||
requester: Requester,
|
||||
@@ -1172,6 +1181,7 @@ class SlidingSyncRestServlet(RestServlet):
|
||||
|
||||
return serialized_rooms
|
||||
|
||||
@trace_with_opname("sliding_sync.encode_extensions")
|
||||
async def encode_extensions(
|
||||
self, requester: Requester, extensions: SlidingSyncResult.Extensions
|
||||
) -> JsonDict:
|
||||
|
||||
@@ -1094,7 +1094,6 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
context: EventContext,
|
||||
state_keys_to_include: StateFilter,
|
||||
membership_user_id: Optional[str] = None,
|
||||
for_federation: Optional[bool] = False,
|
||||
) -> List[JsonDict]:
|
||||
"""
|
||||
Retrieve the stripped state from a room, given an event context to retrieve state
|
||||
@@ -1111,8 +1110,6 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
events of. This is useful when generating the stripped state of a room for
|
||||
invites. We want to send membership events of the inviter, so that the
|
||||
invitee can display the inviter's profile information if the room lacks any.
|
||||
for_federation: When True, the stripped state events will be returned as PDUs
|
||||
as per MSC4311. When False, the stripped client format is used.
|
||||
|
||||
Returns:
|
||||
A list of dictionaries, each representing a stripped state event from the room.
|
||||
@@ -1137,7 +1134,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
|
||||
state_to_include = await self.get_events(selected_state_ids.values())
|
||||
|
||||
return [strip_event(e, for_federation) for e in state_to_include.values()]
|
||||
return [strip_event(e) for e in state_to_include.values()]
|
||||
|
||||
def _maybe_start_fetch_thread(self) -> None:
|
||||
"""Starts an event fetch thread if we are not yet at the maximum number."""
|
||||
|
||||
Reference in New Issue
Block a user