Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
49eb11530c | ||
|
|
0546126cc5 | ||
|
|
2aa87305c0 | ||
|
|
e441c10a73 | ||
|
|
8c652a2b5f | ||
|
|
6375abcdac | ||
|
|
c09493d7aa | ||
|
|
74626a8de4 | ||
|
|
55e0916ffc | ||
|
|
f22646efcc | ||
|
|
16c6b860ac | ||
|
|
789251afa7 | ||
|
|
38df10b99e | ||
|
|
93d07c87dc | ||
|
|
5f6e6530d0 | ||
|
|
29805213d1 | ||
|
|
860b1b4841 | ||
|
|
58d848adc0 | ||
|
|
963256638d | ||
|
|
92d850fc87 | ||
|
|
a268c31737 | ||
|
|
48fbe79f71 | ||
|
|
6b186a57ba | ||
|
|
717687e1fc |
@@ -97,8 +97,11 @@ def lookup(destination, path):
|
||||
if ":" in destination:
|
||||
return "https://%s%s" % (destination, path)
|
||||
else:
|
||||
srv = srvlookup.lookup("matrix", "tcp", destination)[0]
|
||||
return "https://%s:%d%s" % (srv.host, srv.port, path)
|
||||
try:
|
||||
srv = srvlookup.lookup("matrix", "tcp", destination)[0]
|
||||
return "https://%s:%d%s" % (srv.host, srv.port, path)
|
||||
except:
|
||||
return "https://%s:%d%s" % (destination, 8448, path)
|
||||
|
||||
def get_json(origin_name, origin_key, destination, path):
|
||||
request_json = {
|
||||
|
||||
@@ -16,4 +16,4 @@
|
||||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.7.0"
|
||||
__version__ = "0.7.0c"
|
||||
|
||||
@@ -50,8 +50,11 @@ class FederationBase(object):
|
||||
Returns:
|
||||
Deferred : A list of PDUs that have valid signatures and hashes.
|
||||
"""
|
||||
|
||||
signed_pdus = []
|
||||
for pdu in pdus:
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def do(pdu):
|
||||
try:
|
||||
new_pdu = yield self._check_sigs_and_hash(pdu)
|
||||
signed_pdus.append(new_pdu)
|
||||
@@ -61,25 +64,37 @@ class FederationBase(object):
|
||||
# Check local db.
|
||||
new_pdu = yield self.store.get_event(
|
||||
pdu.event_id,
|
||||
allow_rejected=True
|
||||
allow_rejected=True,
|
||||
allow_none=True,
|
||||
)
|
||||
if new_pdu:
|
||||
signed_pdus.append(new_pdu)
|
||||
continue
|
||||
return
|
||||
|
||||
# Check pdu.origin
|
||||
if pdu.origin != origin:
|
||||
new_pdu = yield self.get_pdu(
|
||||
destinations=[pdu.origin],
|
||||
event_id=pdu.event_id,
|
||||
outlier=outlier,
|
||||
)
|
||||
try:
|
||||
new_pdu = yield self.get_pdu(
|
||||
destinations=[pdu.origin],
|
||||
event_id=pdu.event_id,
|
||||
outlier=outlier,
|
||||
)
|
||||
|
||||
if new_pdu:
|
||||
signed_pdus.append(new_pdu)
|
||||
continue
|
||||
if new_pdu:
|
||||
signed_pdus.append(new_pdu)
|
||||
return
|
||||
except:
|
||||
pass
|
||||
|
||||
logger.warn("Failed to find copy of %s with valid signature")
|
||||
logger.warn(
|
||||
"Failed to find copy of %s with valid signature",
|
||||
pdu.event_id,
|
||||
)
|
||||
|
||||
yield defer.gatherResults(
|
||||
[do(pdu) for pdu in pdus],
|
||||
consumeErrors=True
|
||||
)
|
||||
|
||||
defer.returnValue(signed_pdus)
|
||||
|
||||
|
||||
@@ -411,9 +411,12 @@ class FederationServer(FederationBase):
|
||||
"_handle_new_pdu getting state for %s",
|
||||
pdu.room_id
|
||||
)
|
||||
state, auth_chain = yield self.get_state_for_room(
|
||||
origin, pdu.room_id, pdu.event_id,
|
||||
)
|
||||
try:
|
||||
state, auth_chain = yield self.get_state_for_room(
|
||||
origin, pdu.room_id, pdu.event_id,
|
||||
)
|
||||
except:
|
||||
logger.warn("Failed to get state for event: %s", pdu.event_id)
|
||||
|
||||
ret = yield self.handler.on_receive_pdu(
|
||||
origin,
|
||||
|
||||
@@ -858,6 +858,40 @@ class FederationHandler(BaseHandler):
|
||||
# Do auth conflict res.
|
||||
logger.debug("Different auth: %s", different_auth)
|
||||
|
||||
different_events = yield defer.gatherResults(
|
||||
[
|
||||
self.store.get_event(
|
||||
d,
|
||||
allow_none=True,
|
||||
allow_rejected=False,
|
||||
)
|
||||
for d in different_auth
|
||||
if d in have_events and not have_events[d]
|
||||
],
|
||||
consumeErrors=True
|
||||
)
|
||||
|
||||
if different_events:
|
||||
local_view = dict(auth_events)
|
||||
remote_view = dict(auth_events)
|
||||
remote_view.update({
|
||||
(d.type, d.state_key) for d in different_events
|
||||
})
|
||||
|
||||
new_state, prev_state = self.state.resolve_events(
|
||||
[local_view, remote_view],
|
||||
event
|
||||
)
|
||||
|
||||
auth_events.update(new_state)
|
||||
|
||||
current_state = set(e.event_id for e in auth_events.values())
|
||||
different_auth = event_auth_events - current_state
|
||||
|
||||
context.current_state.update(auth_events)
|
||||
context.state_group = None
|
||||
|
||||
if different_auth and not event.internal_metadata.is_outlier():
|
||||
# Only do auth resolution if we have something new to say.
|
||||
# We can't rove an auth failure.
|
||||
do_resolution = False
|
||||
|
||||
@@ -372,6 +372,7 @@ class MessageHandler(BaseHandler):
|
||||
room_members = [
|
||||
m for m in current_state.values()
|
||||
if m.type == EventTypes.Member
|
||||
and m.content["membership"] == Membership.JOIN
|
||||
]
|
||||
|
||||
presence_handler = self.hs.get_handlers().presence_handler
|
||||
@@ -384,17 +385,10 @@ class MessageHandler(BaseHandler):
|
||||
as_event=True,
|
||||
)
|
||||
presence.append(member_presence)
|
||||
except SynapseError as e:
|
||||
if e.code == 404:
|
||||
# FIXME: We are doing this as a warn since this gets hit a
|
||||
# lot and spams the logs. Why is this happening?
|
||||
logger.warn(
|
||||
"Failed to get member presence of %r", m.user_id
|
||||
)
|
||||
else:
|
||||
logger.exception(
|
||||
"Failed to get member presence of %r", m.user_id
|
||||
)
|
||||
except SynapseError:
|
||||
logger.exception(
|
||||
"Failed to get member presence of %r", m.user_id
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
|
||||
@@ -259,13 +259,37 @@ class StateHandler(object):
|
||||
|
||||
defer.returnValue((name, state, prev_states))
|
||||
|
||||
new_state, prev_states = self._resolve_events(
|
||||
state_groups.values(), event_type, state_key
|
||||
)
|
||||
|
||||
if self._state_cache is not None:
|
||||
cache = _StateCacheEntry(
|
||||
state=new_state,
|
||||
state_group=None,
|
||||
ts=self.clock.time_msec()
|
||||
)
|
||||
|
||||
self._state_cache[frozenset(event_ids)] = cache
|
||||
|
||||
defer.returnValue((None, new_state, prev_states))
|
||||
|
||||
def resolve_events(self, state_sets, event):
|
||||
if event.is_state():
|
||||
return self._resolve_events(
|
||||
state_sets, event.type, event.state_key
|
||||
)
|
||||
else:
|
||||
return self._resolve_events(state_sets)
|
||||
|
||||
def _resolve_events(self, state_sets, event_type=None, state_key=""):
|
||||
state = {}
|
||||
for group, g_state in state_groups.items():
|
||||
for s in g_state:
|
||||
for st in state_sets:
|
||||
for e in st:
|
||||
state.setdefault(
|
||||
(s.type, s.state_key),
|
||||
(e.type, e.state_key),
|
||||
{}
|
||||
)[s.event_id] = s
|
||||
)[e.event_id] = e
|
||||
|
||||
unconflicted_state = {
|
||||
k: v.values()[0] for k, v in state.items()
|
||||
@@ -302,16 +326,7 @@ class StateHandler(object):
|
||||
new_state = unconflicted_state
|
||||
new_state.update(resolved_state)
|
||||
|
||||
if self._state_cache is not None:
|
||||
cache = _StateCacheEntry(
|
||||
state=new_state,
|
||||
state_group=None,
|
||||
ts=self.clock.time_msec()
|
||||
)
|
||||
|
||||
self._state_cache[frozenset(event_ids)] = cache
|
||||
|
||||
defer.returnValue((None, new_state, prev_states))
|
||||
return new_state, prev_states
|
||||
|
||||
@log_function
|
||||
def _resolve_state_events(self, conflicted_state, auth_events):
|
||||
|
||||
@@ -55,17 +55,16 @@ class EventFederationStore(SQLBaseStore):
|
||||
results = set()
|
||||
|
||||
base_sql = (
|
||||
"SELECT auth_id FROM event_auth WHERE %s"
|
||||
"SELECT auth_id FROM event_auth WHERE event_id = ?"
|
||||
)
|
||||
|
||||
front = set(event_ids)
|
||||
while front:
|
||||
sql = base_sql % (
|
||||
" OR ".join(["event_id=?"] * len(front)),
|
||||
)
|
||||
|
||||
txn.execute(sql, list(front))
|
||||
front = [r[0] for r in txn.fetchall()]
|
||||
new_front = set()
|
||||
for f in front:
|
||||
txn.execute(base_sql, (f,))
|
||||
new_front.update([r[0] for r in txn.fetchall()])
|
||||
front = new_front
|
||||
results.update(front)
|
||||
|
||||
return list(results)
|
||||
|
||||
Reference in New Issue
Block a user