Fix linter errors
This commit is contained in:
@@ -598,7 +598,7 @@ class RelationsHandler:
|
||||
|
||||
async def fetch_thread_roots_and_aggregations(
|
||||
self,
|
||||
thread_ids: Iterable[str],
|
||||
thread_ids: Collection[str],
|
||||
user_id: str,
|
||||
) -> tuple[ThreadRootsMap, AggregationsMap]:
|
||||
"""Fetch thread root events and their bundled aggregations.
|
||||
|
||||
@@ -36,6 +36,10 @@ from synapse.api.constants import (
|
||||
)
|
||||
from synapse.events import EventBase
|
||||
from synapse.handlers.receipts import ReceiptEventSource
|
||||
from synapse.handlers.relations import (
|
||||
AggregationsMap,
|
||||
ThreadRootsMap,
|
||||
)
|
||||
from synapse.handlers.sliding_sync.room_lists import RoomsForUserType
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.storage.databases.main.receipts import ReceiptInRoom
|
||||
@@ -1209,8 +1213,8 @@ class SlidingSyncExtensionHandler:
|
||||
# the latest event for each thread.
|
||||
|
||||
# Optionally fetch thread root events and their bundled aggregations
|
||||
thread_root_event_map = {}
|
||||
aggregations_map = {}
|
||||
thread_root_event_map: ThreadRootsMap = {}
|
||||
aggregations_map: AggregationsMap = {}
|
||||
if threads_request.include_roots:
|
||||
(
|
||||
thread_root_event_map,
|
||||
|
||||
@@ -25,12 +25,18 @@ from typing import TYPE_CHECKING
|
||||
from synapse.api.constants import Direction
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.events.utils import SerializeEventConfig
|
||||
from synapse.handlers.relations import BundledAggregations, ThreadsListInclude
|
||||
from synapse.handlers.relations import ThreadsListInclude
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
parse_and_validate_json_object_from_request,
|
||||
parse_boolean,
|
||||
parse_integer,
|
||||
parse_string,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.rest.client._base import client_patterns
|
||||
from synapse.storage.databases.main.relations import ThreadsNextBatch, ThreadUpdateInfo
|
||||
from synapse.storage.databases.main.relations import ThreadsNextBatch
|
||||
from synapse.streams.config import (
|
||||
PaginationConfig,
|
||||
extract_stream_token_from_pagination_token,
|
||||
@@ -163,67 +169,11 @@ class ThreadUpdatesServlet(RestServlet):
|
||||
# TODO: Get sliding sync handler for filter_rooms logic
|
||||
# self.sliding_sync_handler = hs.get_sliding_sync_handler()
|
||||
|
||||
async def _serialize_thread_updates(
|
||||
self,
|
||||
thread_updates: list[ThreadUpdateInfo],
|
||||
bundled_aggregations: dict[str, BundledAggregations],
|
||||
time_now: int,
|
||||
serialize_options: SerializeEventConfig,
|
||||
) -> dict[str, dict[str, JsonDict]]:
|
||||
"""
|
||||
Serialize thread updates into the response format.
|
||||
|
||||
Args:
|
||||
thread_updates: List of thread update info from storage
|
||||
bundled_aggregations: Map of event_id to bundled aggregations
|
||||
time_now: Current time in milliseconds
|
||||
serialize_options: Serialization configuration
|
||||
|
||||
Returns:
|
||||
Nested dict mapping room_id -> thread_root_id -> thread update dict
|
||||
"""
|
||||
chunk: dict[str, dict[str, JsonDict]] = {}
|
||||
|
||||
for update in thread_updates:
|
||||
room_id = update.room_id
|
||||
thread_id = update.thread_id
|
||||
|
||||
if room_id not in chunk:
|
||||
chunk[room_id] = {}
|
||||
|
||||
update_dict: JsonDict = {}
|
||||
|
||||
# Serialize thread root if present
|
||||
if update.thread_root_event is not None:
|
||||
bundle_aggs_map = (
|
||||
{thread_id: bundled_aggregations[thread_id]}
|
||||
if thread_id in bundled_aggregations
|
||||
else None
|
||||
)
|
||||
serialized_events = await self.event_serializer.serialize_events(
|
||||
[update.thread_root_event],
|
||||
time_now,
|
||||
config=serialize_options,
|
||||
bundle_aggregations=bundle_aggs_map,
|
||||
)
|
||||
if serialized_events:
|
||||
update_dict["thread_root"] = serialized_events[0]
|
||||
|
||||
# Add per-thread prev_batch if present
|
||||
if update.prev_batch is not None:
|
||||
update_dict["prev_batch"] = await update.prev_batch.to_string(
|
||||
self.store
|
||||
)
|
||||
|
||||
chunk[room_id][thread_id] = update_dict
|
||||
|
||||
return chunk
|
||||
|
||||
async def on_POST(self, request: SynapseRequest) -> tuple[int, JsonDict]:
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
|
||||
# Parse request body
|
||||
body = ThreadUpdatesBody.model_validate_json(request.content.read())
|
||||
body = parse_and_validate_json_object_from_request(request, ThreadUpdatesBody)
|
||||
|
||||
# Parse query parameters
|
||||
dir_str = parse_string(request, "dir", default="b")
|
||||
|
||||
Reference in New Issue
Block a user