Compare commits
12 Commits
erikj/test
...
anoa/get_u
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
88d8a7cd19 | ||
|
|
14a9d59edf | ||
|
|
fac30c0554 | ||
|
|
908d15e904 | ||
|
|
0a1ebe5442 | ||
|
|
9316091c8c | ||
|
|
7d54f2413f | ||
|
|
98a6910d94 | ||
|
|
c08273f529 | ||
|
|
edca4cf1bb | ||
|
|
9d7fcb2fda | ||
|
|
1786b1ee0d |
@@ -122,17 +122,17 @@ class DeviceWorkerHandler(BaseHandler):
|
||||
|
||||
# First we check if any devices have changed for users that we share
|
||||
# rooms with.
|
||||
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
||||
users_who_share_room = yield defer.ensureDeferred(self.store.get_users_who_share_room_with_user(
|
||||
user_id
|
||||
)
|
||||
))
|
||||
|
||||
tracked_users = set(users_who_share_room)
|
||||
#tracked_users = set(users_who_share_room)
|
||||
|
||||
# Always tell the user about their own devices
|
||||
tracked_users.add(user_id)
|
||||
#tracked_users.add(user_id)
|
||||
|
||||
changed = yield self.store.get_users_whose_devices_changed(
|
||||
from_token.device_list_key, tracked_users
|
||||
from_token.device_list_key, users_who_share_room
|
||||
)
|
||||
|
||||
# Then work out if any users have since joined
|
||||
@@ -444,9 +444,10 @@ class DeviceHandler(DeviceWorkerHandler):
|
||||
"""Notify that a user's device(s) has changed. Pokes the notifier, and
|
||||
remote servers if the user is local.
|
||||
"""
|
||||
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
|
||||
logger.info("get_users_who_share_room... called from notify_device_update")
|
||||
users_who_share_room = yield defer.ensureDeferred(self.store.get_users_who_share_room_with_user(
|
||||
user_id
|
||||
)
|
||||
))
|
||||
|
||||
hosts = set()
|
||||
if self.hs.is_mine_id(user_id):
|
||||
|
||||
@@ -172,6 +172,7 @@ class EventHandler(BaseHandler):
|
||||
if not event:
|
||||
return None
|
||||
|
||||
logger.info("get_users_in_room called from get_event!")
|
||||
users = await self.store.get_users_in_room(event.room_id)
|
||||
is_peeking = user.to_string() not in users
|
||||
|
||||
|
||||
@@ -1098,6 +1098,7 @@ class PresenceEventSource(object):
|
||||
users_interested_in = set()
|
||||
users_interested_in.add(user_id) # So that we receive our own presence
|
||||
|
||||
logger.info("get_users_who_share_room... _get_interested_in")
|
||||
users_who_share_room = await self.store.get_users_who_share_room_with_user(
|
||||
user_id, on_invalidate=cache_context.invalidate
|
||||
)
|
||||
|
||||
@@ -308,12 +308,14 @@ class SyncHandler(object):
|
||||
if timeout == 0 or since_token is None or full_state:
|
||||
# we are going to return immediately, so don't bother calling
|
||||
# notifier.wait_for_events.
|
||||
logger.info("_wait_for_sync_for_user1")
|
||||
result = await self.current_sync_for_user(
|
||||
sync_config, since_token, full_state=full_state
|
||||
)
|
||||
else:
|
||||
|
||||
def current_sync_callback(before_token, after_token):
|
||||
logger.info("_wait_for_sync_for_user2")
|
||||
return self.current_sync_for_user(sync_config, since_token)
|
||||
|
||||
result = await self.notifier.wait_for_events(
|
||||
@@ -340,6 +342,7 @@ class SyncHandler(object):
|
||||
) -> SyncResult:
|
||||
"""Get the sync for client needed to match what the server has now.
|
||||
"""
|
||||
logger.info("current_sync_for_user")
|
||||
return await self.generate_sync_result(sync_config, since_token, full_state)
|
||||
|
||||
async def push_rules_for_user(self, user: UserID) -> JsonDict:
|
||||
@@ -1139,6 +1142,8 @@ class SyncHandler(object):
|
||||
# room with by looking at all users that have left a room plus users
|
||||
# that were in a room we've left.
|
||||
|
||||
logger.info("get_users_who_share_room... called from _generate_sync_entry")
|
||||
logger.info("*Called with %s", user_id)
|
||||
users_who_share_room = await self.store.get_users_who_share_room_with_user(
|
||||
user_id
|
||||
)
|
||||
@@ -1146,15 +1151,15 @@ class SyncHandler(object):
|
||||
# Always tell the user about their own devices. We check as the user
|
||||
# ID is almost certainly already included (unless they're not in any
|
||||
# rooms) and taking a copy of the set is relatively expensive.
|
||||
if user_id not in users_who_share_room:
|
||||
users_who_share_room = set(users_who_share_room)
|
||||
users_who_share_room.add(user_id)
|
||||
#if user_id not in users_who_share_room:
|
||||
# users_who_share_room = set(users_who_share_room)
|
||||
# users_who_share_room.add(user_id)
|
||||
|
||||
tracked_users = users_who_share_room
|
||||
#tracked_users = users_who_share_room
|
||||
|
||||
# Step 1a, check for changes in devices of users we share a room with
|
||||
users_that_have_changed = await self.store.get_users_whose_devices_changed(
|
||||
since_token.device_list_key, tracked_users
|
||||
since_token.device_list_key, users_who_share_room
|
||||
)
|
||||
|
||||
# Step 1b, check for newly joined rooms
|
||||
|
||||
@@ -135,6 +135,7 @@ class SlavedEventStore(
|
||||
)
|
||||
|
||||
if data.type == EventTypes.Member:
|
||||
logger.info("INVALIDATING get_rooms_for_user_with_stream_ordering")
|
||||
self.get_rooms_for_user_with_stream_ordering.invalidate(
|
||||
(data.state_key,)
|
||||
)
|
||||
|
||||
@@ -171,6 +171,7 @@ class SyncRestServlet(RestServlet):
|
||||
user.to_string(), affect_presence=affect_presence
|
||||
)
|
||||
with context:
|
||||
logger.info("Sync servlet")
|
||||
sync_result = await self.sync_handler.wait_for_sync_for_user(
|
||||
sync_config,
|
||||
since_token=since_token,
|
||||
|
||||
@@ -605,6 +605,7 @@ class EventsStore(
|
||||
}
|
||||
|
||||
for member in members_changed:
|
||||
logger.info("INVALIDATING get_rooms_for_user_with_stream_ordering")
|
||||
txn.call_after(
|
||||
self.get_rooms_for_user_with_stream_ordering.invalidate, (member,)
|
||||
)
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import traceback
|
||||
from typing import Iterable, List, Set
|
||||
|
||||
from six import iteritems, itervalues
|
||||
@@ -163,8 +164,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
|
||||
return hosts
|
||||
|
||||
@cached(max_entries=100000, iterable=True)
|
||||
@cached(max_entries=1000000, iterable=True)
|
||||
def get_users_in_room(self, room_id):
|
||||
logger.info("Traceback: %s", traceback.format_stack())
|
||||
return self.db.runInteraction(
|
||||
"get_users_in_room", self.get_users_in_room_txn, room_id
|
||||
)
|
||||
@@ -484,17 +486,18 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
)
|
||||
return frozenset(r.room_id for r in rooms)
|
||||
|
||||
@cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
|
||||
def get_users_who_share_room_with_user(self, user_id, cache_context):
|
||||
@cached(max_entries=500000, cache_context=True, iterable=True)
|
||||
async def get_users_who_share_room_with_user(self, user_id, cache_context):
|
||||
"""Returns the set of users who share a room with `user_id`
|
||||
"""
|
||||
room_ids = yield self.get_rooms_for_user(
|
||||
logger.info("Called with %s %s", user_id, cache_context)
|
||||
room_ids = await self.get_rooms_for_user(
|
||||
user_id, on_invalidate=cache_context.invalidate
|
||||
)
|
||||
|
||||
user_who_share_room = set()
|
||||
for room_id in room_ids:
|
||||
user_ids = yield self.get_users_in_room(
|
||||
user_ids = await self.get_users_in_room(
|
||||
room_id, on_invalidate=cache_context.invalidate
|
||||
)
|
||||
user_who_share_room.update(user_ids)
|
||||
|
||||
Reference in New Issue
Block a user