Merge commit '7a3adbd7a' into anoa/dinsic_release_1_23_1
This commit is contained in:
11
CHANGES.md
11
CHANGES.md
@@ -1,3 +1,14 @@
|
||||
Synapse 1.22.0rc2 (2020-10-26)
|
||||
==============================
|
||||
|
||||
Bugfixes
|
||||
--------
|
||||
|
||||
- Fix a bug introduced in v1.22.0rc1 which would cause ephemeral events to not be sent to appservices. ([\#8648](https://github.com/matrix-org/synapse/issues/8648))
|
||||
- Fix `user_daily_visits` to not have duplicate rows for UA. Broke in v1.22.0rc1. ([\#8654](https://github.com/matrix-org/synapse/issues/8654))
|
||||
- Fix a bug introduced in v1.22.0rc1 where presence events were not properly passed to application services. ([\#8656](https://github.com/matrix-org/synapse/issues/8656))
|
||||
|
||||
|
||||
Synapse 1.22.0rc1 (2020-10-22)
|
||||
==============================
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__version__ = "1.22.0rc1"
|
||||
__version__ = "1.22.0rc2"
|
||||
|
||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||
# We import here so that we don't have to install a bunch of deps when
|
||||
|
||||
@@ -236,16 +236,16 @@ class ApplicationServicesHandler:
|
||||
events = await self._handle_receipts(service)
|
||||
if events:
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
service, "read_receipt", new_token
|
||||
)
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
service, "read_receipt", new_token
|
||||
)
|
||||
elif stream_key == "presence_key":
|
||||
events = await self._handle_presence(service, users)
|
||||
if events:
|
||||
self.scheduler.submit_ephemeral_events_for_as(service, events)
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
service, "presence", new_token
|
||||
)
|
||||
await self.store.set_type_stream_id_for_appservice(
|
||||
service, "presence", new_token
|
||||
)
|
||||
|
||||
async def _handle_typing(self, service: ApplicationService, new_token: int):
|
||||
typing_source = self.event_sources.sources["typing"]
|
||||
@@ -271,7 +271,7 @@ class ApplicationServicesHandler:
|
||||
|
||||
async def _handle_presence(
|
||||
self, service: ApplicationService, users: Collection[Union[str, UserID]]
|
||||
):
|
||||
) -> List[JsonDict]:
|
||||
events = [] # type: List[JsonDict]
|
||||
presence_source = self.event_sources.sources["presence"]
|
||||
from_key = await self.store.get_type_stream_id_for_appservice(
|
||||
@@ -288,7 +288,7 @@ class ApplicationServicesHandler:
|
||||
user=user, service=service, from_key=from_key,
|
||||
)
|
||||
time_now = self.clock.time_msec()
|
||||
presence_events = [
|
||||
events.extend(
|
||||
{
|
||||
"type": "m.presence",
|
||||
"sender": event.user_id,
|
||||
@@ -297,8 +297,9 @@ class ApplicationServicesHandler:
|
||||
),
|
||||
}
|
||||
for event in presence_events
|
||||
]
|
||||
events = events + presence_events
|
||||
)
|
||||
|
||||
return events
|
||||
|
||||
async def query_user_exists(self, user_id):
|
||||
"""Check if any application service knows this user_id exists.
|
||||
|
||||
@@ -369,17 +369,25 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
async def get_type_stream_id_for_appservice(
|
||||
self, service: ApplicationService, type: str
|
||||
) -> int:
|
||||
if type not in ("read_receipt", "presence"):
|
||||
raise ValueError(
|
||||
"Expected type to be a valid application stream id type, got %s"
|
||||
% (type,)
|
||||
)
|
||||
|
||||
def get_type_stream_id_for_appservice_txn(txn):
|
||||
stream_id_type = "%s_stream_id" % type
|
||||
txn.execute(
|
||||
"SELECT ? FROM application_services_state WHERE as_id=?",
|
||||
(stream_id_type, service.id,),
|
||||
# We do NOT want to escape `stream_id_type`.
|
||||
"SELECT %s FROM application_services_state WHERE as_id=?"
|
||||
% stream_id_type,
|
||||
(service.id,),
|
||||
)
|
||||
last_txn_id = txn.fetchone()
|
||||
if last_txn_id is None or last_txn_id[0] is None: # no row exists
|
||||
last_stream_id = txn.fetchone()
|
||||
if last_stream_id is None or last_stream_id[0] is None: # no row exists
|
||||
return 0
|
||||
else:
|
||||
return int(last_txn_id[0])
|
||||
return int(last_stream_id[0])
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
|
||||
@@ -388,11 +396,18 @@ class ApplicationServiceTransactionWorkerStore(
|
||||
async def set_type_stream_id_for_appservice(
|
||||
self, service: ApplicationService, type: str, pos: int
|
||||
) -> None:
|
||||
if type not in ("read_receipt", "presence"):
|
||||
raise ValueError(
|
||||
"Expected type to be a valid application stream id type, got %s"
|
||||
% (type,)
|
||||
)
|
||||
|
||||
def set_type_stream_id_for_appservice_txn(txn):
|
||||
stream_id_type = "%s_stream_id" % type
|
||||
txn.execute(
|
||||
"UPDATE ? SET device_list_stream_id = ? WHERE as_id=?",
|
||||
(stream_id_type, pos, service.id),
|
||||
"UPDATE application_services_state SET %s = ? WHERE as_id=?"
|
||||
% stream_id_type,
|
||||
(pos, service.id),
|
||||
)
|
||||
|
||||
await self.db_pool.runInteraction(
|
||||
|
||||
@@ -282,9 +282,10 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
||||
now = self._clock.time_msec()
|
||||
|
||||
# A note on user_agent. Technically a given device can have multiple
|
||||
# user agents, so we need to decide which one to pick. We could have handled this
|
||||
# in number of ways, but given that we don't _that_ much have gone for MAX()
|
||||
# For more details of the other options considered see
|
||||
# user agents, so we need to decide which one to pick. We could have
|
||||
# handled this in number of ways, but given that we don't care
|
||||
# _that_ much we have gone for MAX(). For more details of the other
|
||||
# options considered see
|
||||
# https://github.com/matrix-org/synapse/pull/8503#discussion_r502306111
|
||||
sql = """
|
||||
INSERT INTO user_daily_visits (user_id, device_id, timestamp, user_agent)
|
||||
@@ -299,7 +300,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
||||
WHERE last_seen > ? AND last_seen <= ?
|
||||
AND udv.timestamp IS NULL AND users.is_guest=0
|
||||
AND users.appservice_id IS NULL
|
||||
GROUP BY u.user_id, u.device_id, u.user_agent
|
||||
GROUP BY u.user_id, u.device_id
|
||||
"""
|
||||
|
||||
# This means that the day has rolled over but there could still
|
||||
|
||||
@@ -410,6 +410,62 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase):
|
||||
def make_homeserver(self, reactor, clock):
|
||||
hs = self.setup_test_homeserver()
|
||||
return hs
|
||||
|
||||
def prepare(self, hs, reactor, clock):
|
||||
self.service = Mock(id="foo")
|
||||
self.store = self.hs.get_datastore()
|
||||
self.get_success(self.store.set_appservice_state(self.service, "up"))
|
||||
|
||||
def test_get_type_stream_id_for_appservice_no_value(self):
|
||||
value = self.get_success(
|
||||
self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
|
||||
)
|
||||
self.assertEquals(value, 0)
|
||||
|
||||
value = self.get_success(
|
||||
self.store.get_type_stream_id_for_appservice(self.service, "presence")
|
||||
)
|
||||
self.assertEquals(value, 0)
|
||||
|
||||
def test_get_type_stream_id_for_appservice_invalid_type(self):
|
||||
self.get_failure(
|
||||
self.store.get_type_stream_id_for_appservice(self.service, "foobar"),
|
||||
ValueError,
|
||||
)
|
||||
|
||||
def test_set_type_stream_id_for_appservice(self):
|
||||
read_receipt_value = 1024
|
||||
self.get_success(
|
||||
self.store.set_type_stream_id_for_appservice(
|
||||
self.service, "read_receipt", read_receipt_value
|
||||
)
|
||||
)
|
||||
result = self.get_success(
|
||||
self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
|
||||
)
|
||||
self.assertEqual(result, read_receipt_value)
|
||||
|
||||
self.get_success(
|
||||
self.store.set_type_stream_id_for_appservice(
|
||||
self.service, "presence", read_receipt_value
|
||||
)
|
||||
)
|
||||
result = self.get_success(
|
||||
self.store.get_type_stream_id_for_appservice(self.service, "presence")
|
||||
)
|
||||
self.assertEqual(result, read_receipt_value)
|
||||
|
||||
def test_set_type_stream_id_for_appservice_invalid_type(self):
|
||||
self.get_failure(
|
||||
self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024),
|
||||
ValueError,
|
||||
)
|
||||
|
||||
|
||||
# required for ApplicationServiceTransactionStoreTestCase tests
|
||||
class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore):
|
||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||
|
||||
Reference in New Issue
Block a user