Compare commits
6 Commits
anoa/log_e
...
erikj/acl_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
40affadaaa | ||
|
|
8a345190cc | ||
|
|
1229f2213e | ||
|
|
2e35a733cc | ||
|
|
413a4c289b | ||
|
|
4d6cb8814e |
@@ -229,12 +229,14 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _filter_events_for_server(self, server_name, room_id, events):
|
||||
states = yield self.store.get_state_for_events(
|
||||
room_id, [e.event_id for e in events],
|
||||
events_to_state = yield self.store.get_state_for_events(
|
||||
room_id, frozenset(e.event_id for e in events),
|
||||
types=(
|
||||
(EventTypes.RoomHistoryVisibility, ""),
|
||||
(EventTypes.Member, None),
|
||||
)
|
||||
)
|
||||
|
||||
events_and_states = zip(events, states)
|
||||
|
||||
def redact_disallowed(event_and_state):
|
||||
event, state = event_and_state
|
||||
|
||||
@@ -271,9 +273,10 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
return event
|
||||
|
||||
res = map(redact_disallowed, events_and_states)
|
||||
|
||||
logger.info("_filter_events_for_server %r", res)
|
||||
res = map(redact_disallowed, [
|
||||
(e, events_to_state[e.event_id])
|
||||
for e in events
|
||||
])
|
||||
|
||||
defer.returnValue(res)
|
||||
|
||||
|
||||
@@ -137,12 +137,14 @@ class MessageHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _filter_events_for_client(self, user_id, room_id, events):
|
||||
states = yield self.store.get_state_for_events(
|
||||
room_id, [e.event_id for e in events],
|
||||
event_id_to_state = yield self.store.get_state_for_events(
|
||||
room_id, frozenset(e.event_id for e in events),
|
||||
types=(
|
||||
(EventTypes.RoomHistoryVisibility, ""),
|
||||
(EventTypes.Member, user_id),
|
||||
)
|
||||
)
|
||||
|
||||
events_and_states = zip(events, states)
|
||||
|
||||
def allowed(event_and_state):
|
||||
event, state = event_and_state
|
||||
|
||||
@@ -175,10 +177,17 @@ class MessageHandler(BaseHandler):
|
||||
|
||||
return True
|
||||
|
||||
events_and_states = filter(allowed, events_and_states)
|
||||
event_and_state = filter(
|
||||
allowed,
|
||||
[
|
||||
(e, event_id_to_state[e.event_id])
|
||||
for e in events
|
||||
]
|
||||
)
|
||||
|
||||
defer.returnValue([
|
||||
ev
|
||||
for ev, _ in events_and_states
|
||||
for ev, _ in event_and_state
|
||||
])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -294,12 +294,14 @@ class SyncHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _filter_events_for_client(self, user_id, room_id, events):
|
||||
states = yield self.store.get_state_for_events(
|
||||
room_id, [e.event_id for e in events],
|
||||
event_id_to_state = yield self.store.get_state_for_events(
|
||||
room_id, frozenset(e.event_id for e in events),
|
||||
types=(
|
||||
(EventTypes.RoomHistoryVisibility, ""),
|
||||
(EventTypes.Member, user_id),
|
||||
)
|
||||
)
|
||||
|
||||
events_and_states = zip(events, states)
|
||||
|
||||
def allowed(event_and_state):
|
||||
event, state = event_and_state
|
||||
|
||||
@@ -331,10 +333,18 @@ class SyncHandler(BaseHandler):
|
||||
return membership == Membership.INVITE
|
||||
|
||||
return True
|
||||
events_and_states = filter(allowed, events_and_states)
|
||||
|
||||
event_and_state = filter(
|
||||
allowed,
|
||||
[
|
||||
(e, event_id_to_state[e.event_id])
|
||||
for e in events
|
||||
]
|
||||
)
|
||||
|
||||
defer.returnValue([
|
||||
ev
|
||||
for ev, _ in events_and_states
|
||||
for ev, _ in event_and_state
|
||||
])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
||||
@@ -71,6 +71,11 @@ class Cache(object):
|
||||
self.thread = None
|
||||
caches_by_name[name] = self.cache
|
||||
|
||||
class Sentinel(object):
|
||||
__slots__ = []
|
||||
|
||||
self.sentinel = Sentinel()
|
||||
|
||||
def check_thread(self):
|
||||
expected_thread = self.thread
|
||||
if expected_thread is None:
|
||||
@@ -85,9 +90,10 @@ class Cache(object):
|
||||
if len(keyargs) != self.keylen:
|
||||
raise ValueError("Expected a key to have %d items", self.keylen)
|
||||
|
||||
if keyargs in self.cache:
|
||||
val = self.cache.get(keyargs, self.sentinel)
|
||||
if val is not self.sentinel:
|
||||
cache_counter.inc_hits(self.name)
|
||||
return self.cache[keyargs]
|
||||
return val
|
||||
|
||||
cache_counter.inc_misses(self.name)
|
||||
raise KeyError()
|
||||
|
||||
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
import logging
|
||||
@@ -206,64 +207,119 @@ class StateStore(SQLBaseStore):
|
||||
events = yield self._get_events(event_ids, get_prev_content=False)
|
||||
defer.returnValue(events)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_for_events(self, room_id, event_ids):
|
||||
@cached(num_args=3, lru=True)
|
||||
def _get_state_groups_from_group(self, room_id, group, types):
|
||||
def f(txn):
|
||||
groups = set()
|
||||
event_to_group = {}
|
||||
for event_id in event_ids:
|
||||
# TODO: Remove this loop.
|
||||
group = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="event_to_state_groups",
|
||||
keyvalues={"event_id": event_id},
|
||||
retcol="state_group",
|
||||
allow_none=True,
|
||||
)
|
||||
if group:
|
||||
event_to_group[event_id] = group
|
||||
groups.add(group)
|
||||
sql = (
|
||||
"SELECT event_id FROM state_groups_state WHERE"
|
||||
" room_id = ? AND state_group = ? AND (%s)"
|
||||
) % (" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),)
|
||||
|
||||
group_to_state_ids = {}
|
||||
for group in groups:
|
||||
state_ids = self._simple_select_onecol_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
keyvalues={"state_group": group},
|
||||
retcol="event_id",
|
||||
)
|
||||
args = [room_id, group]
|
||||
args.extend([i for typ in types for i in typ])
|
||||
txn.execute(sql, args)
|
||||
|
||||
group_to_state_ids[group] = state_ids
|
||||
return group, [
|
||||
r[0]
|
||||
for r in txn.fetchall()
|
||||
]
|
||||
|
||||
return event_to_group, group_to_state_ids
|
||||
|
||||
res = yield self.runInteraction(
|
||||
"annotate_events_with_state_groups",
|
||||
return self.runInteraction(
|
||||
"_get_state_groups_from_group",
|
||||
f,
|
||||
)
|
||||
|
||||
event_to_group, group_to_state_ids = res
|
||||
@cached(num_args=3, lru=True, max_entries=10000)
|
||||
def _get_state_for_event_id(self, room_id, event_id, types):
|
||||
def f(txn):
|
||||
type_and_state_sql = " OR ".join([
|
||||
"(type = ? AND state_key = ?)"
|
||||
if typ[1] is not None
|
||||
else "type = ?"
|
||||
for typ in types
|
||||
])
|
||||
|
||||
state_list = yield defer.gatherResults(
|
||||
[
|
||||
self._fetch_events_for_group(group, vals)
|
||||
for group, vals in group_to_state_ids.items()
|
||||
],
|
||||
consumeErrors=True,
|
||||
sql = (
|
||||
"SELECT sg.event_id FROM state_groups_state as sg"
|
||||
" INNER JOIN event_to_state_groups as e"
|
||||
" ON e.state_group = sg.state_group"
|
||||
" WHERE e.event_id = ? AND (%s)"
|
||||
) % (type_and_state_sql,)
|
||||
|
||||
args = [event_id]
|
||||
for typ, state_key in types:
|
||||
args.extend(
|
||||
[typ, state_key] if state_key is not None else [typ]
|
||||
)
|
||||
txn.execute(sql, args)
|
||||
|
||||
return event_id, [
|
||||
r[0]
|
||||
for r in txn.fetchall()
|
||||
]
|
||||
|
||||
return self.runInteraction(
|
||||
"_get_state_for_event_id",
|
||||
f,
|
||||
)
|
||||
|
||||
state_dict = {
|
||||
group: {
|
||||
@defer.inlineCallbacks
|
||||
def get_state_for_events(self, room_id, event_ids, types):
|
||||
"""Given a list of event_ids and type tuples, return a list of state
|
||||
dicts for each event. The state dicts will only have the type/state_keys
|
||||
that are in the `types` list.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
event_ids (list)
|
||||
types (list): List of (type, state_key) tuples which are used to
|
||||
filter the state fetched. `state_key` may be None, which matches
|
||||
any `state_key`
|
||||
|
||||
Returns:
|
||||
deferred: A list of dicts corresponding to the event_ids given.
|
||||
The dicts are mappings from (type, state_key) -> state_events
|
||||
"""
|
||||
set_types = frozenset(types)
|
||||
res = yield defer.gatherResults(
|
||||
[
|
||||
self._get_state_for_event_id(
|
||||
room_id, event_id, set_types,
|
||||
)
|
||||
for event_id in event_ids
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
|
||||
event_to_state_ids = dict(res)
|
||||
|
||||
event_dict = yield self._get_events(
|
||||
[
|
||||
item
|
||||
for lst in event_to_state_ids.values()
|
||||
for item in lst
|
||||
],
|
||||
get_prev_content=False
|
||||
).addCallback(
|
||||
lambda evs: {ev.event_id: ev for ev in evs}
|
||||
)
|
||||
|
||||
event_to_state = {
|
||||
event_id: {
|
||||
(ev.type, ev.state_key): ev
|
||||
for ev in state
|
||||
for ev in [
|
||||
event_dict[state_id]
|
||||
for state_id in state_ids
|
||||
if state_id in event_dict
|
||||
]
|
||||
}
|
||||
for group, state in state_list
|
||||
for event_id, state_ids in event_to_state_ids.items()
|
||||
}
|
||||
|
||||
defer.returnValue([
|
||||
state_dict.get(event_to_group.get(event, None), None)
|
||||
defer.returnValue({
|
||||
event: event_to_state[event]
|
||||
for event in event_ids
|
||||
])
|
||||
})
|
||||
|
||||
|
||||
def _make_group_id(clock):
|
||||
|
||||
Reference in New Issue
Block a user