1
0

Compare commits

...

2 Commits

Author SHA1 Message Date
Erik Johnston c1c644db55 Fix backfill 2018-06-01 16:01:26 +01:00
Erik Johnston 4ebf003644 Fix clamp leave and disable backfill 2018-06-01 16:01:26 +01:00
3 changed files with 104 additions and 48 deletions
+8 -30
View File
@@ -714,37 +714,15 @@ class FederationHandler(BaseHandler):
defer.returnValue(events)
@defer.inlineCallbacks
def maybe_backfill(self, room_id, current_depth):
def maybe_backfill(self, room_id, extremities):
"""Checks the database to see if we should backfill before paginating,
and if so do.
Args:
room_id (str)
extremities (list[str]): List of event_ids to backfill from. These
should be event IDs that we don't yet have.
"""
extremities = yield self.store.get_oldest_events_with_depth_in_room(
room_id
)
if not extremities:
logger.debug("Not backfilling as no extremeties found.")
return
# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(
extremities.items(),
key=lambda e: -int(e[1])
)
max_depth = sorted_extremeties_tuple[0][1]
# We don't want to specify too many extremities as it causes the backfill
# request URI to be too long.
extremities = dict(sorted_extremeties_tuple[:5])
if current_depth > max_depth:
logger.debug(
"Not backfilling as we don't need to. %d < %d",
max_depth, current_depth,
)
return
# Now we need to decide which hosts to hit first.
# First we try hosts that are already in the room
# TODO: HEURISTIC ALERT.
@@ -844,7 +822,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
event_ids = list(extremities.iterkeys())
event_ids = list(extremities)
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@@ -871,7 +849,7 @@ class FederationHandler(BaseHandler):
} for key, state_dict in states.iteritems()
}
for e_id, _ in sorted_extremeties_tuple:
for e_id in event_ids:
likely_domains = get_domains_from_state(states[e_id])
success = yield try_backfill([
+20 -18
View File
@@ -211,29 +211,17 @@ class MessageHandler(BaseHandler):
)
if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token(
room_id, room_token.stream
)
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
leave_token = yield self.store.get_topological_token_for_event(
member_event_id,
)
source_config.from_key = yield self.store.clamp_token_before(
room_id, source_config.from_key, leave_token,
)
events, next_key, extremities = yield self.store.paginate_room_events(
room_id=room_id,
@@ -244,6 +232,20 @@ class MessageHandler(BaseHandler):
event_filter=event_filter,
)
if source_config.direction == 'b' and extremities:
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, extremities
)
events, next_key, extremities = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
direction=source_config.direction,
limit=source_config.limit,
event_filter=event_filter,
)
next_token = pagin_config.from_token.copy_and_replace(
"room_key", next_key
)
+76
View File
@@ -915,6 +915,82 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue((events, token, extremities))
def clamp_token_before(self, room_id, token_str, clamp_to_token_str):
"""For a given room returns the given token if its before
clamp_to, otherwise returns clamp_to.
Args:
room_id (str)
token_str (str)
clamp_to_token_str(str): Must be topological token
Returns:
Deferred[str]
"""
token = RoomStreamToken.parse(token_str)
clamp_to_token = RoomStreamToken.parse(clamp_to_token_str)
def clamp_token_before_txn(txn, token):
# If we're given a stream ordering, convert to topological token
if not token.chunk:
row = self._simple_select_one_txn(
txn,
table="events",
keyvalues={
"stream_ordering": token.stream,
},
retcols=("chunk_id", "topological_ordering", "stream_ordering",),
)
token = RoomStreamToken(*row)
# If both tokens have chunk_ids, we can use that.
if token.chunk and clamp_to_token.chunk:
if token.chunk == clamp_to_token.chunk:
if token.topological < clamp_to_token.topological:
return token_str
else:
return clamp_to_token_str
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
)
if table.is_before(token.chunk, clamp_to_token.chunk):
return token_str
else:
return clamp_to_token_str
# Ok, so we're dealing with events that haven't been chunked yet,
# lets just cheat and fallback to depth.
token_depth = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={
"stream_ordering": token.stream,
},
retcol="depth",
)
clamp_depth = self._simple_select_one_onecol_txn(
txn,
table="events",
keyvalues={
"stream_ordering": clamp_to_token.stream,
},
retcol="depth",
)
if token_depth < clamp_depth:
return token_str
else:
return clamp_to_token_str
return self.runInteraction(
"clamp_token_before", clamp_token_before_txn, token
)
class StreamStore(StreamWorkerStore):
def get_room_max_stream_ordering(self):