Add concept of internal events
This commit is contained in:
@@ -44,6 +44,9 @@ class _EventInternalMetadata(object):
|
||||
def is_invite_from_remote(self):
|
||||
return getattr(self, "invite_from_remote", False)
|
||||
|
||||
def is_internal_event(self):
|
||||
return getattr(self, "internal_event", False)
|
||||
|
||||
def get_send_on_behalf_of(self):
|
||||
"""Whether this server should send the event on behalf of another server.
|
||||
This is used by the federation "send_join" API to forward the initial join
|
||||
|
||||
@@ -175,6 +175,9 @@ class TransactionQueue(object):
|
||||
if not is_mine and send_on_behalf_of is None:
|
||||
return
|
||||
|
||||
if event.internal_metadata.is_internal_event():
|
||||
return
|
||||
|
||||
try:
|
||||
# Get the state from before the event.
|
||||
# We need to make sure that this is the state from before
|
||||
|
||||
@@ -55,13 +55,14 @@ from synapse.replication.http.federation import (
|
||||
)
|
||||
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
|
||||
from synapse.state import StateResolutionStore, resolve_events_with_store
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.types import UserID, create_requester, get_domain_from_id
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.distributor import user_joined_room
|
||||
from synapse.util.frozenutils import unfreeze
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.visibility import filter_events_for_server
|
||||
|
||||
from ._base import BaseHandler
|
||||
@@ -223,9 +224,11 @@ class FederationHandler(BaseHandler):
|
||||
state = None
|
||||
auth_chain = []
|
||||
|
||||
new_thread = False
|
||||
if thread_id is None:
|
||||
# FIXME: Pick something better?
|
||||
thread_id = random.randint(0, 999999999)
|
||||
new_thread = True
|
||||
|
||||
# Get missing pdus if necessary.
|
||||
if not pdu.internal_metadata.is_outlier():
|
||||
@@ -425,6 +428,7 @@ class FederationHandler(BaseHandler):
|
||||
pass
|
||||
else:
|
||||
thread_id = 0
|
||||
new_thread = False
|
||||
|
||||
logger.info("Thread ID %r", thread_id)
|
||||
|
||||
@@ -436,6 +440,32 @@ class FederationHandler(BaseHandler):
|
||||
thread_id=thread_id,
|
||||
)
|
||||
|
||||
if new_thread:
|
||||
builder = self.event_builder_factory.new({
|
||||
"type": "org.matrix.new_thread",
|
||||
"content": {
|
||||
"thread_id": thread_id,
|
||||
"latest_event": pdu.event_id,
|
||||
},
|
||||
"event_id": random_string(24),
|
||||
"origin_server_ts": self.clock.time_msec(),
|
||||
"sender": "@server:server",
|
||||
"room_id": pdu.room_id,
|
||||
})
|
||||
|
||||
event, context = yield self.event_creation_handler.create_new_client_event(
|
||||
builder=builder,
|
||||
)
|
||||
event.internal_metadata.internal_event = True
|
||||
yield self.event_creation_handler.handle_new_client_event(
|
||||
create_requester(UserID("server", "server")),
|
||||
event,
|
||||
context,
|
||||
ratelimit=True,
|
||||
extra_users=[],
|
||||
do_auth=False,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth, thread_id):
|
||||
"""
|
||||
|
||||
@@ -588,6 +588,7 @@ class EventCreationHandler(object):
|
||||
context,
|
||||
ratelimit=True,
|
||||
extra_users=[],
|
||||
do_auth=True,
|
||||
):
|
||||
"""Processes a new event. This includes checking auth, persisting it,
|
||||
notifying users, sending to remote servers, etc.
|
||||
@@ -604,7 +605,8 @@ class EventCreationHandler(object):
|
||||
"""
|
||||
|
||||
try:
|
||||
yield self.auth.check_from_context(event, context)
|
||||
if do_auth:
|
||||
yield self.auth.check_from_context(event, context)
|
||||
except AuthError as err:
|
||||
logger.warn("Denying new event %r because %s", event, err)
|
||||
raise err
|
||||
|
||||
@@ -32,7 +32,6 @@ from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.handlers.sync import SyncConfig
|
||||
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
|
||||
from synapse.types import StreamToken
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
from ._base import client_v2_patterns, set_timeline_upper_limit
|
||||
|
||||
@@ -387,22 +386,10 @@ class SyncRestServlet(RestServlet):
|
||||
|
||||
if exclude_threaded:
|
||||
serialized_timeline = []
|
||||
seen_threads = set()
|
||||
for e in reversed(timeline_events):
|
||||
thread_id = e.internal_metadata.thread_id
|
||||
if thread_id != 0:
|
||||
if thread_id not in seen_threads:
|
||||
serialized_timeline.append({
|
||||
"type": "org.matrix.new_thread",
|
||||
"content": {
|
||||
"thread_id": thread_id,
|
||||
"latest_event": e.event_id,
|
||||
},
|
||||
"event_id": random_string(24),
|
||||
"origin_server_ts": e.origin_server_ts,
|
||||
"sender": "@server",
|
||||
})
|
||||
seen_threads.add(thread_id)
|
||||
pass
|
||||
else:
|
||||
serialized_timeline.append(serialize(e))
|
||||
|
||||
|
||||
@@ -537,6 +537,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
|
||||
new_events = [
|
||||
event for event, ctx in event_contexts
|
||||
if not event.internal_metadata.is_outlier() and not ctx.rejected
|
||||
and not event.internal_metadata.is_internal_event()
|
||||
]
|
||||
|
||||
# start with the existing forward extremities
|
||||
|
||||
Reference in New Issue
Block a user