Compare commits
31 Commits
anoa/test
...
anoa/conte
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f2434d494 | ||
|
|
a6670b7257 | ||
|
|
7c232bd98b | ||
|
|
d74054afda | ||
|
|
bca3455b38 | ||
|
|
187dc6ad02 | ||
|
|
e16521faab | ||
|
|
4e2a072a05 | ||
|
|
32ad2a3349 | ||
|
|
3889fcd9d7 | ||
|
|
b064a41291 | ||
|
|
1adf27c82a | ||
|
|
3cf7d6d5b6 | ||
|
|
cff1cb8685 | ||
|
|
dd57715de2 | ||
|
|
91718b3f23 | ||
|
|
be29ed7ad8 | ||
|
|
2b6b7f482a | ||
|
|
3675fb9bc6 | ||
|
|
7ba98a2874 | ||
|
|
4be582d7c8 | ||
|
|
01fbd95736 | ||
|
|
03edfc5850 | ||
|
|
391fb47791 | ||
|
|
3a86477162 | ||
|
|
b7dec300b7 | ||
|
|
51b8a21f0c | ||
|
|
9279a2c4e4 | ||
|
|
9c59bc59c8 | ||
|
|
dd2954f78d | ||
|
|
4efe1d4d3f |
@@ -6,7 +6,7 @@ Features
|
||||
|
||||
- Add v2 APIs for the `send_join` and `send_leave` federation endpoints (as described in [MSC1802](https://github.com/matrix-org/matrix-doc/pull/1802)). ([\#6349](https://github.com/matrix-org/synapse/issues/6349))
|
||||
- Add a develop script to generate full SQL schemas. ([\#6394](https://github.com/matrix-org/synapse/issues/6394))
|
||||
- Add custom SAML username mapping functinality through an external provider plugin. ([\#6411](https://github.com/matrix-org/synapse/issues/6411))
|
||||
- Add custom SAML username mapping functionality through an external provider plugin. ([\#6411](https://github.com/matrix-org/synapse/issues/6411))
|
||||
- Automatically delete empty groups/communities. ([\#6453](https://github.com/matrix-org/synapse/issues/6453))
|
||||
- Add option `limit_profile_requests_to_users_who_share_rooms` to prevent requirement of a local user sharing a room with another user to query their profile information. ([\#6523](https://github.com/matrix-org/synapse/issues/6523))
|
||||
- Add an `export_signing_key` script to extract the public part of signing keys when rotating them. ([\#6546](https://github.com/matrix-org/synapse/issues/6546))
|
||||
|
||||
1
changelog.d/6621.doc
Normal file
1
changelog.d/6621.doc
Normal file
@@ -0,0 +1 @@
|
||||
Fix a typo in the configuration example for purge jobs in the sample configuration file.
|
||||
1
changelog.d/6624.doc
Normal file
1
changelog.d/6624.doc
Normal file
@@ -0,0 +1 @@
|
||||
Add complete documentation of the message retention policies support.
|
||||
1
changelog.d/6654.bugfix
Normal file
1
changelog.d/6654.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Correctly proxy HTTP errors due to API calls to remote group servers.
|
||||
1
changelog.d/6656.doc
Normal file
1
changelog.d/6656.doc
Normal file
@@ -0,0 +1 @@
|
||||
No more overriding the entire /etc folder of the container in docker-compose.yaml. Contributed by Fabian Meyer.
|
||||
1
changelog.d/6664.bugfix
Normal file
1
changelog.d/6664.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix media repo admin APIs when using a media worker.
|
||||
1
changelog.d/6665.doc
Normal file
1
changelog.d/6665.doc
Normal file
@@ -0,0 +1 @@
|
||||
Add complete documentation of the message retention policies support.
|
||||
@@ -18,7 +18,7 @@ services:
|
||||
- SYNAPSE_CONFIG_PATH=/etc/homeserver.yaml
|
||||
volumes:
|
||||
# You may either store all the files in a local folder
|
||||
- ./matrix-config:/etc
|
||||
- ./matrix-config/homeserver.yaml:/etc/homeserver.yaml
|
||||
- ./files:/data
|
||||
# .. or you may split this between different storage points
|
||||
# - ./files:/data
|
||||
|
||||
191
docs/message_retention_policies.md
Normal file
191
docs/message_retention_policies.md
Normal file
@@ -0,0 +1,191 @@
|
||||
# Message retention policies
|
||||
|
||||
Synapse admins can enable support for message retention policies on
|
||||
their homeserver. Message retention policies exist at a room level,
|
||||
follow the semantics described in
|
||||
[MSC1763](https://github.com/matrix-org/matrix-doc/blob/matthew/msc1763/proposals/1763-configurable-retention-periods.md),
|
||||
and allow server and room admins to configure how long messages should
|
||||
be kept in a homeserver's database before being purged from it.
|
||||
**Please note that, as this feature isn't part of the Matrix
|
||||
specification yet, this implementation is to be considered as
|
||||
experimental.**
|
||||
|
||||
A message retention policy is mainly defined by its `max_lifetime`
|
||||
parameter, which defines how long a message can be kept around after
|
||||
it was sent to the room. If a room doesn't have a message retention
|
||||
policy, and there's no default one for a given server, then no message
|
||||
sent in that room is ever purged on that server.
|
||||
|
||||
MSC1763 also specifies semantics for a `min_lifetime` parameter which
|
||||
defines the amount of time after which an event _can_ get purged (after
|
||||
it was sent to the room), but Synapse doesn't currently support it
|
||||
beyond registering it.
|
||||
|
||||
Both `max_lifetime` and `min_lifetime` are optional parameters.
|
||||
|
||||
Note that message retention policies don't apply to state events.
|
||||
|
||||
Once an event reaches its expiry date (defined as the time it was sent
|
||||
plus the value for `max_lifetime` in the room), two things happen:
|
||||
|
||||
* Synapse stops serving the event to clients via any endpoint.
|
||||
* The message gets picked up by the next purge job (see the "Purge jobs"
|
||||
section) and is removed from Synapse's database.
|
||||
|
||||
Since purge jobs don't run continuously, this means that an event might
|
||||
stay in a server's database for longer than the value for `max_lifetime`
|
||||
in the room would allow, though hidden from clients.
|
||||
|
||||
Similarly, if a server (with support for message retention policies
|
||||
enabled) receives from another server an event that should have been
|
||||
purged according to its room's policy, then the receiving server will
|
||||
process and store that event until it's picked up by the next purge job,
|
||||
though it will always hide it from clients.
|
||||
|
||||
|
||||
## Server configuration
|
||||
|
||||
Support for this feature can be enabled and configured in the
|
||||
`retention` section of the Synapse configuration file (see the
|
||||
[sample file](https://github.com/matrix-org/synapse/blob/v1.7.3/docs/sample_config.yaml#L332-L393)).
|
||||
|
||||
To enable support for message retention policies, set the setting
|
||||
`enabled` in this section to `true`.
|
||||
|
||||
|
||||
### Default policy
|
||||
|
||||
A default message retention policy is a policy defined in Synapse's
|
||||
configuration that is used by Synapse for every room that doesn't have a
|
||||
message retention policy configured in its state. This allows server
|
||||
admins to ensure that messages are never kept indefinitely in a server's
|
||||
database.
|
||||
|
||||
A default policy can be defined as such, in the `retention` section of
|
||||
the configuration file:
|
||||
|
||||
```yaml
|
||||
default_policy:
|
||||
min_lifetime: 1d
|
||||
max_lifetime: 1y
|
||||
```
|
||||
|
||||
Here, `min_lifetime` and `max_lifetime` have the same meaning and level
|
||||
of support as previously described. They can be expressed either as a
|
||||
duration (using the units `s` (seconds), `m` (minutes), `h` (hours),
|
||||
`d` (days), `w` (weeks) and `y` (years)) or as a number of milliseconds.
|
||||
|
||||
|
||||
### Purge jobs
|
||||
|
||||
Purge jobs are the jobs that Synapse runs in the background to purge
|
||||
expired events from the database. They are only run if support for
|
||||
message retention policies is enabled in the server's configuration. If
|
||||
no configuration for purge jobs is configured by the server admin,
|
||||
Synapse will use a default configuration, which is described in the
|
||||
[sample configuration file](https://github.com/matrix-org/synapse/blob/master/docs/sample_config.yaml#L332-L393).
|
||||
|
||||
Some server admins might want a finer control on when events are removed
|
||||
depending on an event's room's policy. This can be done by setting the
|
||||
`purge_jobs` sub-section in the `retention` section of the configuration
|
||||
file. An example of such configuration could be:
|
||||
|
||||
```yaml
|
||||
purge_jobs:
|
||||
- longest_max_lifetime: 3d
|
||||
interval: 12h
|
||||
- shortest_max_lifetime: 3d
|
||||
longest_max_lifetime: 1w
|
||||
interval: 1d
|
||||
- shortest_max_lifetime: 1w
|
||||
interval: 2d
|
||||
```
|
||||
|
||||
In this example, we define three jobs:
|
||||
|
||||
* one that runs twice a day (every 12 hours) and purges events in rooms
|
||||
which policy's `max_lifetime` is lower or equal to 3 days.
|
||||
* one that runs once a day and purges events in rooms which policy's
|
||||
`max_lifetime` is between 3 days and a week.
|
||||
* one that runs once every 2 days and purges events in rooms which
|
||||
policy's `max_lifetime` is greater than a week.
|
||||
|
||||
Note that this example is tailored to show different configurations and
|
||||
features slightly more jobs than it's probably necessary (in practice, a
|
||||
server admin would probably consider it better to replace the two last
|
||||
jobs with one that runs once a day and handles rooms which which
|
||||
policy's `max_lifetime` is greater than 3 days).
|
||||
|
||||
Keep in mind, when configuring these jobs, that a purge job can become
|
||||
quite heavy on the server if it targets many rooms, therefore prefer
|
||||
having jobs with a low interval that target a limited set of rooms. Also
|
||||
make sure to include a job with no minimum and one with no maximum to
|
||||
make sure your configuration handles every policy.
|
||||
|
||||
As previously mentioned in this documentation, while a purge job that
|
||||
runs e.g. every day means that an expired event might stay in the
|
||||
database for up to a day after its expiry, Synapse hides expired events
|
||||
from clients as soon as they expire, so the event is not visible to
|
||||
local users between its expiry date and the moment it gets purged from
|
||||
the server's database.
|
||||
|
||||
|
||||
### Lifetime limits
|
||||
|
||||
**Note: this feature is mainly useful within a closed federation or on
|
||||
servers that don't federate, because there currently is no way to
|
||||
enforce these limits in an open federation.**
|
||||
|
||||
Server admins can restrict the values their local users are allowed to
|
||||
use for both `min_lifetime` and `max_lifetime`. These limits can be
|
||||
defined as such in the `retention` section of the configuration file:
|
||||
|
||||
```yaml
|
||||
allowed_lifetime_min: 1d
|
||||
allowed_lifetime_max: 1y
|
||||
```
|
||||
|
||||
Here, `allowed_lifetime_min` is the lowest value a local user can set
|
||||
for both `min_lifetime` and `max_lifetime`, and `allowed_lifetime_max`
|
||||
is the highest value. Both parameters are optional (e.g. setting
|
||||
`allowed_lifetime_min` but not `allowed_lifetime_max` only enforces a
|
||||
minimum and no maximum).
|
||||
|
||||
Like other settings in this section, these parameters can be expressed
|
||||
either as a duration or as a number of milliseconds.
|
||||
|
||||
|
||||
## Room configuration
|
||||
|
||||
To configure a room's message retention policy, a room's admin or
|
||||
moderator needs to send a state event in that room with the type
|
||||
`m.room.retention` and the following content:
|
||||
|
||||
```json
|
||||
{
|
||||
"max_lifetime": ...
|
||||
}
|
||||
```
|
||||
|
||||
In this event's content, the `max_lifetime` parameter has the same
|
||||
meaning as previously described, and needs to be expressed in
|
||||
milliseconds. The event's content can also include a `min_lifetime`
|
||||
parameter, which has the same meaning and limited support as previously
|
||||
described.
|
||||
|
||||
Note that over every server in the room, only the ones with support for
|
||||
message retention policies will actually remove expired events. This
|
||||
support is currently not enabled by default in Synapse.
|
||||
|
||||
|
||||
## Note on reclaiming disk space
|
||||
|
||||
While purge jobs actually delete data from the database, the disk space
|
||||
used by the database might not decrease immediately on the database's
|
||||
host. However, even though the database engine won't free up the disk
|
||||
space, it will start writing new data into where the purged data was.
|
||||
|
||||
If you want to reclaim the freed disk space anyway and return it to the
|
||||
operating system, the server admin needs to run `VACUUM FULL;` (or
|
||||
`VACUUM;` for SQLite databases) on Synapse's database (see the related
|
||||
[PostgreSQL documentation](https://www.postgresql.org/docs/current/sql-vacuum.html)).
|
||||
@@ -387,17 +387,17 @@ retention:
|
||||
#
|
||||
# The rationale for this per-job configuration is that some rooms might have a
|
||||
# retention policy with a low 'max_lifetime', where history needs to be purged
|
||||
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
|
||||
# that purge to be performed by a job that's iterating over every room it knows,
|
||||
# which would be quite heavy on the server.
|
||||
# of outdated messages on a more frequent basis than for the rest of the rooms
|
||||
# (e.g. every 12h), but not want that purge to be performed by a job that's
|
||||
# iterating over every room it knows, which could be heavy on the server.
|
||||
#
|
||||
#purge_jobs:
|
||||
# - shortest_max_lifetime: 1d
|
||||
# longest_max_lifetime: 3d
|
||||
# interval: 5m:
|
||||
# interval: 12h
|
||||
# - shortest_max_lifetime: 3d
|
||||
# longest_max_lifetime: 1y
|
||||
# interval: 24h
|
||||
# interval: 1d
|
||||
|
||||
|
||||
## TLS ##
|
||||
|
||||
@@ -34,6 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.admin import register_servlets_for_media_repo
|
||||
@@ -47,6 +48,7 @@ logger = logging.getLogger("synapse.app.media_repository")
|
||||
|
||||
|
||||
class MediaRepositorySlavedStore(
|
||||
RoomStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedClientIpStore,
|
||||
|
||||
@@ -948,17 +948,17 @@ class ServerConfig(Config):
|
||||
#
|
||||
# The rationale for this per-job configuration is that some rooms might have a
|
||||
# retention policy with a low 'max_lifetime', where history needs to be purged
|
||||
# of outdated messages on a very frequent basis (e.g. every 5min), but not want
|
||||
# that purge to be performed by a job that's iterating over every room it knows,
|
||||
# which would be quite heavy on the server.
|
||||
# of outdated messages on a more frequent basis than for the rest of the rooms
|
||||
# (e.g. every 12h), but not want that purge to be performed by a job that's
|
||||
# iterating over every room it knows, which could be heavy on the server.
|
||||
#
|
||||
#purge_jobs:
|
||||
# - shortest_max_lifetime: 1d
|
||||
# longest_max_lifetime: 3d
|
||||
# interval: 5m:
|
||||
# interval: 12h
|
||||
# - shortest_max_lifetime: 3d
|
||||
# longest_max_lifetime: 1y
|
||||
# interval: 24h
|
||||
# interval: 1d
|
||||
"""
|
||||
% locals()
|
||||
)
|
||||
|
||||
@@ -130,6 +130,8 @@ class GroupsLocalHandler(object):
|
||||
res = yield self.transport_client.get_group_summary(
|
||||
get_domain_from_id(group_id), group_id, requester_user_id
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
@@ -190,6 +192,8 @@ class GroupsLocalHandler(object):
|
||||
res = yield self.transport_client.create_group(
|
||||
get_domain_from_id(group_id), group_id, user_id, content
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
@@ -231,6 +235,8 @@ class GroupsLocalHandler(object):
|
||||
res = yield self.transport_client.get_users_in_group(
|
||||
get_domain_from_id(group_id), group_id, requester_user_id
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
@@ -271,6 +277,8 @@ class GroupsLocalHandler(object):
|
||||
res = yield self.transport_client.join_group(
|
||||
get_domain_from_id(group_id), group_id, user_id, content
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
@@ -315,6 +323,8 @@ class GroupsLocalHandler(object):
|
||||
res = yield self.transport_client.accept_group_invite(
|
||||
get_domain_from_id(group_id), group_id, user_id, content
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
@@ -361,6 +371,8 @@ class GroupsLocalHandler(object):
|
||||
requester_user_id,
|
||||
content,
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
@@ -424,6 +436,8 @@ class GroupsLocalHandler(object):
|
||||
user_id,
|
||||
content,
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
@@ -460,6 +474,8 @@ class GroupsLocalHandler(object):
|
||||
bulk_result = yield self.transport_client.bulk_get_publicised_groups(
|
||||
get_domain_from_id(user_id), [user_id]
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
raise e.to_synapse_error()
|
||||
except RequestSendFailed:
|
||||
raise SynapseError(502, "Failed to contact group server")
|
||||
|
||||
|
||||
@@ -52,15 +52,14 @@ class UploadResource(DirectServeResource):
|
||||
requester = await self.auth.get_user_by_req(request)
|
||||
# TODO: The checks here are a bit late. The content will have
|
||||
# already been uploaded to a tmp file at this point
|
||||
content_length = request.getHeader(b"Content-Length").decode("ascii")
|
||||
if content_length is None:
|
||||
raise SynapseError(msg="Request must specify a Content-Length", code=400)
|
||||
if int(content_length) > self.max_upload_size:
|
||||
raise SynapseError(
|
||||
msg="Upload request body is too large",
|
||||
code=413,
|
||||
errcode=Codes.TOO_LARGE,
|
||||
)
|
||||
if request.hasHeader(b"Content-Length"):
|
||||
content_length = request.getHeader(b"Content-Length").decode("ascii")
|
||||
if int(content_length) > self.max_upload_size:
|
||||
raise SynapseError(
|
||||
msg="Upload request body is too large",
|
||||
code=413,
|
||||
errcode=Codes.TOO_LARGE,
|
||||
)
|
||||
|
||||
upload_name = parse_string(request, b"filename", encoding=None)
|
||||
if upload_name:
|
||||
|
||||
@@ -366,6 +366,134 @@ class RoomWorkerStore(SQLBaseStore):
|
||||
|
||||
defer.returnValue(row)
|
||||
|
||||
def get_media_mxcs_in_room(self, room_id):
|
||||
"""Retrieves all the local and remote media MXC URIs in a given room
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
|
||||
Returns:
|
||||
The local and remote media as a lists of tuples where the key is
|
||||
the hostname and the value is the media ID.
|
||||
"""
|
||||
|
||||
def _get_media_mxcs_in_room_txn(txn):
|
||||
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
|
||||
local_media_mxcs = []
|
||||
remote_media_mxcs = []
|
||||
|
||||
# Convert the IDs to MXC URIs
|
||||
for media_id in local_mxcs:
|
||||
local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
|
||||
for hostname, media_id in remote_mxcs:
|
||||
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
|
||||
|
||||
return local_media_mxcs, remote_media_mxcs
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_media_ids_in_room", _get_media_mxcs_in_room_txn
|
||||
)
|
||||
|
||||
def quarantine_media_ids_in_room(self, room_id, quarantined_by):
|
||||
"""For a room loops through all events with media and quarantines
|
||||
the associated media
|
||||
"""
|
||||
|
||||
def _quarantine_media_in_room_txn(txn):
|
||||
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
|
||||
total_media_quarantined = 0
|
||||
|
||||
# Now update all the tables to set the quarantined_by flag
|
||||
|
||||
txn.executemany(
|
||||
"""
|
||||
UPDATE local_media_repository
|
||||
SET quarantined_by = ?
|
||||
WHERE media_id = ?
|
||||
""",
|
||||
((quarantined_by, media_id) for media_id in local_mxcs),
|
||||
)
|
||||
|
||||
txn.executemany(
|
||||
"""
|
||||
UPDATE remote_media_cache
|
||||
SET quarantined_by = ?
|
||||
WHERE media_origin = ? AND media_id = ?
|
||||
""",
|
||||
(
|
||||
(quarantined_by, origin, media_id)
|
||||
for origin, media_id in remote_mxcs
|
||||
),
|
||||
)
|
||||
|
||||
total_media_quarantined += len(local_mxcs)
|
||||
total_media_quarantined += len(remote_mxcs)
|
||||
|
||||
return total_media_quarantined
|
||||
|
||||
return self.db.runInteraction(
|
||||
"quarantine_media_in_room", _quarantine_media_in_room_txn
|
||||
)
|
||||
|
||||
def _get_media_mxcs_in_room_txn(self, txn, room_id):
|
||||
"""Retrieves all the local and remote media MXC URIs in a given room
|
||||
|
||||
Args:
|
||||
txn (cursor)
|
||||
room_id (str)
|
||||
|
||||
Returns:
|
||||
The local and remote media as a lists of tuples where the key is
|
||||
the hostname and the value is the media ID.
|
||||
"""
|
||||
mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
|
||||
|
||||
sql = """
|
||||
SELECT stream_ordering, json FROM events
|
||||
JOIN event_json USING (room_id, event_id)
|
||||
WHERE room_id = ?
|
||||
%(where_clause)s
|
||||
AND contains_url = ? AND outlier = ?
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
txn.execute(sql % {"where_clause": ""}, (room_id, True, False, 100))
|
||||
|
||||
local_media_mxcs = []
|
||||
remote_media_mxcs = []
|
||||
|
||||
while True:
|
||||
next_token = None
|
||||
for stream_ordering, content_json in txn:
|
||||
next_token = stream_ordering
|
||||
event_json = json.loads(content_json)
|
||||
content = event_json["content"]
|
||||
content_url = content.get("url")
|
||||
thumbnail_url = content.get("info", {}).get("thumbnail_url")
|
||||
|
||||
for url in (content_url, thumbnail_url):
|
||||
if not url:
|
||||
continue
|
||||
matches = mxc_re.match(url)
|
||||
if matches:
|
||||
hostname = matches.group(1)
|
||||
media_id = matches.group(2)
|
||||
if hostname == self.hs.hostname:
|
||||
local_media_mxcs.append(media_id)
|
||||
else:
|
||||
remote_media_mxcs.append((hostname, media_id))
|
||||
|
||||
if next_token is None:
|
||||
# We've gone through the whole room, so we're finished.
|
||||
break
|
||||
|
||||
txn.execute(
|
||||
sql % {"where_clause": "AND stream_ordering < ?"},
|
||||
(room_id, next_token, True, False, 100),
|
||||
)
|
||||
|
||||
return local_media_mxcs, remote_media_mxcs
|
||||
|
||||
|
||||
class RoomBackgroundUpdateStore(SQLBaseStore):
|
||||
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
|
||||
@@ -810,126 +938,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
def get_media_mxcs_in_room(self, room_id):
|
||||
"""Retrieves all the local and remote media MXC URIs in a given room
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
|
||||
Returns:
|
||||
The local and remote media as a lists of tuples where the key is
|
||||
the hostname and the value is the media ID.
|
||||
"""
|
||||
|
||||
def _get_media_mxcs_in_room_txn(txn):
|
||||
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
|
||||
local_media_mxcs = []
|
||||
remote_media_mxcs = []
|
||||
|
||||
# Convert the IDs to MXC URIs
|
||||
for media_id in local_mxcs:
|
||||
local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
|
||||
for hostname, media_id in remote_mxcs:
|
||||
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
|
||||
|
||||
return local_media_mxcs, remote_media_mxcs
|
||||
|
||||
return self.db.runInteraction(
|
||||
"get_media_ids_in_room", _get_media_mxcs_in_room_txn
|
||||
)
|
||||
|
||||
def quarantine_media_ids_in_room(self, room_id, quarantined_by):
|
||||
"""For a room loops through all events with media and quarantines
|
||||
the associated media
|
||||
"""
|
||||
|
||||
def _quarantine_media_in_room_txn(txn):
|
||||
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
|
||||
total_media_quarantined = 0
|
||||
|
||||
# Now update all the tables to set the quarantined_by flag
|
||||
|
||||
txn.executemany(
|
||||
"""
|
||||
UPDATE local_media_repository
|
||||
SET quarantined_by = ?
|
||||
WHERE media_id = ?
|
||||
""",
|
||||
((quarantined_by, media_id) for media_id in local_mxcs),
|
||||
)
|
||||
|
||||
txn.executemany(
|
||||
"""
|
||||
UPDATE remote_media_cache
|
||||
SET quarantined_by = ?
|
||||
WHERE media_origin = ? AND media_id = ?
|
||||
""",
|
||||
(
|
||||
(quarantined_by, origin, media_id)
|
||||
for origin, media_id in remote_mxcs
|
||||
),
|
||||
)
|
||||
|
||||
total_media_quarantined += len(local_mxcs)
|
||||
total_media_quarantined += len(remote_mxcs)
|
||||
|
||||
return total_media_quarantined
|
||||
|
||||
return self.db.runInteraction(
|
||||
"quarantine_media_in_room", _quarantine_media_in_room_txn
|
||||
)
|
||||
|
||||
def _get_media_mxcs_in_room_txn(self, txn, room_id):
|
||||
"""Retrieves all the local and remote media MXC URIs in a given room
|
||||
|
||||
Args:
|
||||
txn (cursor)
|
||||
room_id (str)
|
||||
|
||||
Returns:
|
||||
The local and remote media as a lists of tuples where the key is
|
||||
the hostname and the value is the media ID.
|
||||
"""
|
||||
mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
|
||||
|
||||
next_token = self.get_current_events_token() + 1
|
||||
local_media_mxcs = []
|
||||
remote_media_mxcs = []
|
||||
|
||||
while next_token:
|
||||
sql = """
|
||||
SELECT stream_ordering, json FROM events
|
||||
JOIN event_json USING (room_id, event_id)
|
||||
WHERE room_id = ?
|
||||
AND stream_ordering < ?
|
||||
AND contains_url = ? AND outlier = ?
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
txn.execute(sql, (room_id, next_token, True, False, 100))
|
||||
|
||||
next_token = None
|
||||
for stream_ordering, content_json in txn:
|
||||
next_token = stream_ordering
|
||||
event_json = json.loads(content_json)
|
||||
content = event_json["content"]
|
||||
content_url = content.get("url")
|
||||
thumbnail_url = content.get("info", {}).get("thumbnail_url")
|
||||
|
||||
for url in (content_url, thumbnail_url):
|
||||
if not url:
|
||||
continue
|
||||
matches = mxc_re.match(url)
|
||||
if matches:
|
||||
hostname = matches.group(1)
|
||||
media_id = matches.group(2)
|
||||
if hostname == self.hs.hostname:
|
||||
local_media_mxcs.append(media_id)
|
||||
else:
|
||||
remote_media_mxcs.append((hostname, media_id))
|
||||
|
||||
return local_media_mxcs, remote_media_mxcs
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_rooms_for_retention_period_in_range(
|
||||
self, min_ms, max_ms, include_null=False
|
||||
|
||||
Reference in New Issue
Block a user