1
0

Compare commits

..

70 Commits

Author SHA1 Message Date
Erik Johnston
230474b620 Actually fix exceptions 2018-11-29 11:46:28 +00:00
Erik Johnston
cf09912280 Don't log ERROR when no profile exists 2018-11-29 11:32:03 +00:00
Matthew Hodgson
cd317a1910 Merge pull request #4235 from matrix-org/travis/fix-auto-invite-errors
Catch room profile errors and anything else that can go wrong
2018-11-28 18:53:22 -08:00
Travis Ralston
11a168442d Catch room profile errors and anything else that can go wrong
Fixes an issue where things become unhappy when the room profile for a user is missing.
2018-11-28 08:57:56 -07:00
Travis Ralston
e8d99369bc Merge pull request #4218 from matrix-org/travis/account-merging
Proof of concept for auto-accepting invites on merged accounts
2018-11-22 09:00:25 -07:00
Travis Ralston
921469383e Use run_as_background_process 2018-11-22 08:50:05 -07:00
Travis Ralston
ccbf6bb222 Safer execution 2018-11-22 08:47:35 -07:00
Travis Ralston
c68d510564 Preserve log contexts in the room_member_handler 2018-11-21 13:21:21 -07:00
Travis Ralston
ce1b393682 Proof of concept for auto-accepting invites
This is for demonstration purposes only. In practice this would actually look up the right profile and use the right thing, not to mention be in a more reasonable location.
2018-11-21 13:03:35 -07:00
Neil Johnson
78ba0e7ab8 Remove riot.im from the list of trusted Identity Servers in the default configuration (#4207) 2018-11-20 12:29:25 +01:00
Richard van der Hoff
416c671474 Merge pull request #4204 from matrix-org/rav/logcontext_leak_fixes
Fix some logcontext leaks
2018-11-20 12:19:19 +01:00
Amber Brown
31425d82a3 Merge remote-tracking branch 'origin/master' into develop 2018-11-19 12:55:25 -06:00
Amber Brown
678ad155a2 Merge tag 'v0.33.9'
Features
--------

- Include flags to optionally add `m.login.terms` to the registration flow when consent tracking is enabled.
([\#4004](https://github.com/matrix-org/synapse/issues/4004), [\#4133](https://github.com/matrix-org/synapse/issues/4133),
[\#4142](https://github.com/matrix-org/synapse/issues/4142), [\#4184](https://github.com/matrix-org/synapse/issues/4184))
- Support for replacing rooms with new ones ([\#4091](https://github.com/matrix-org/synapse/issues/4091), [\#4099](https://github.com/matrix-org/synapse/issues/4099),
[\#4100](https://github.com/matrix-org/synapse/issues/4100), [\#4101](https://github.com/matrix-org/synapse/issues/4101))

Bugfixes
--------

- Fix exceptions when using the email mailer on Python 3. ([\#4095](https://github.com/matrix-org/synapse/issues/4095))
- Fix e2e key backup with more than 9 backup versions ([\#4113](https://github.com/matrix-org/synapse/issues/4113))
- Searches that request profile info now no longer fail with a 500. ([\#4122](https://github.com/matrix-org/synapse/issues/4122))
- fix return code of empty key backups ([\#4123](https://github.com/matrix-org/synapse/issues/4123))
- If the typing stream ID goes backwards (as on a worker when the master restarts), the worker's typing handler will no longer erroneously report rooms containing new
typing events. ([\#4127](https://github.com/matrix-org/synapse/issues/4127))
- Fix table lock of device_lists_remote_cache which could freeze the application ([\#4132](https://github.com/matrix-org/synapse/issues/4132))
- Fix exception when using state res v2 algorithm ([\#4135](https://github.com/matrix-org/synapse/issues/4135))
- Generating the user consent URI no longer fails on Python 3. ([\#4140](https://github.com/matrix-org/synapse/issues/4140),
[\#4163](https://github.com/matrix-org/synapse/issues/4163))
- Loading URL previews from the DB cache on Postgres will no longer cause Unicode type errors when responding to the request, and URL previews will no longer fail if
the remote server returns a Content-Type header with the chartype in quotes. ([\#4157](https://github.com/matrix-org/synapse/issues/4157))
- The hash_password script now works on Python 3. ([\#4161](https://github.com/matrix-org/synapse/issues/4161))
- Fix noop checks when updating device keys, reducing spurious device list update notifications. ([\#4164](https://github.com/matrix-org/synapse/issues/4164))

Deprecations and Removals
-------------------------

- The disused and un-specced identicon generator has been removed. ([\#4106](https://github.com/matrix-org/synapse/issues/4106))
- The obsolete and non-functional /pull federation endpoint has been removed. ([\#4118](https://github.com/matrix-org/synapse/issues/4118))
- The deprecated v1 key exchange endpoints have been removed. ([\#4119](https://github.com/matrix-org/synapse/issues/4119))
- Synapse will no longer fetch keys using the fallback deprecated v1 key exchange method and will now always use v2.
([\#4120](https://github.com/matrix-org/synapse/issues/4120))

Internal Changes
----------------

- Fix build of Docker image with docker-compose ([\#3778](https://github.com/matrix-org/synapse/issues/3778))
- Delete unreferenced state groups during history purge ([\#4006](https://github.com/matrix-org/synapse/issues/4006))
- The "Received rdata" log messages on workers is now logged at DEBUG, not INFO. ([\#4108](https://github.com/matrix-org/synapse/issues/4108))
- Reduce replication traffic for device lists ([\#4109](https://github.com/matrix-org/synapse/issues/4109))
- Fix `synapse_replication_tcp_protocol_*_commands` metric label to be full command name, rather than just the first character
([\#4110](https://github.com/matrix-org/synapse/issues/4110))
- Log some bits about room creation ([\#4121](https://github.com/matrix-org/synapse/issues/4121))
- Fix `tox` failure on old systems ([\#4124](https://github.com/matrix-org/synapse/issues/4124))
- Add STATE_V2_TEST room version ([\#4128](https://github.com/matrix-org/synapse/issues/4128))
- Clean up event accesses and tests ([\#4137](https://github.com/matrix-org/synapse/issues/4137))
- The default logging config will now set an explicit log file encoding of UTF-8. ([\#4138](https://github.com/matrix-org/synapse/issues/4138))
- Add helpers functions for getting prev and auth events of an event ([\#4139](https://github.com/matrix-org/synapse/issues/4139))
- Add some tests for the HTTP pusher. ([\#4149](https://github.com/matrix-org/synapse/issues/4149))
- add purge_history.sh and purge_remote_media.sh scripts to contrib/ ([\#4155](https://github.com/matrix-org/synapse/issues/4155))
- HTTP tests have been refactored to contain less boilerplate. ([\#4156](https://github.com/matrix-org/synapse/issues/4156))
- Drop incoming events from federation for unknown rooms ([\#4165](https://github.com/matrix-org/synapse/issues/4165))
2018-11-19 12:54:29 -06:00
Amber Brown
47e26f5a4d towncrier 2018-11-19 12:43:14 -06:00
Amber Brown
d102e19e47 version 2018-11-19 12:42:49 -06:00
Amber Brown
80cac86b2c Fix fallback auth on Python 3 (#4197) 2018-11-19 12:27:33 -06:00
Richard van der Hoff
0c05da2e2e changelog 2018-11-19 17:07:42 +00:00
Richard van der Hoff
828f18bd8b Fix logcontext leak in test_url_preview 2018-11-19 17:07:01 +00:00
Richard van der Hoff
a267c2e3ed Fix logcontext leak in http pusher test 2018-11-19 17:07:01 +00:00
Richard van der Hoff
884a561447 Fix some tests which leaked logcontexts 2018-11-19 17:07:01 +00:00
Richard van der Hoff
f5faf6bc14 Fix logcontext leak in EmailPusher 2018-11-19 17:07:01 +00:00
Richard van der Hoff
10cdf519aa Merge pull request #4182 from aaronraimist/update-issue-template
Add a pull request template and add multiple issue templates
2018-11-19 14:24:30 +01:00
Richard van der Hoff
65b793c5a1 Merge pull request #4200 from aaronraimist/vacuum-full-note
Add a note saying you need to manually reclaim disk space
2018-11-19 14:19:51 +01:00
Aaron Raimist
cc2cf2da97 Add changelog
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2018-11-18 12:42:08 -06:00
Aaron Raimist
f6cbef6332 Add a note saying you need to manually reclaim disk space
People keep asking why their database hasn't gotten smaller after using this API.

Signed-off-by: Aaron Raimist <aaron@raim.ist>
2018-11-18 12:38:04 -06:00
Amber Brown
4285c818ec Merge pull request #4193 from kivikakk/add-openbsd-prereq
add jpeg to OpenBSD prereq list
2018-11-17 14:27:53 -06:00
Ashe Connor
ceca3b2f30 add changelog.d entry 2018-11-17 15:01:02 +11:00
Ashe Connor
9548dd9586 add jpeg to OpenBSD prereq list
Signed-off-by: Ashe Connor <ashe@kivikakk.ee>
2018-11-17 14:57:20 +11:00
Travis Ralston
0bb273db07 Merge pull request #4192 from matrix-org/travis/fix-consent-urls
Remove duplicate slashes in generated consent URLs
2018-11-16 09:40:50 -07:00
Travis Ralston
3da9781c98 Fix the terms UI auth tests
By setting the config value directly, we skip the block that adds the slash automatically for us.
2018-11-15 23:00:28 -07:00
Travis Ralston
d75db3df59 Changelog 2018-11-15 20:44:57 -07:00
Travis Ralston
ab4526a153 Remove duplicate slashes in generated consent URLs 2018-11-15 20:41:53 -07:00
Amber Brown
8b1affe7d5 Fix Content-Disposition in media repository (#4176) 2018-11-15 15:55:58 -06:00
Travis Ralston
835779f7fb Add option to track MAU stats (but not limit people) (#3830) 2018-11-15 18:08:27 +00:00
Amber Brown
df758e155d Use <meta> tags to discover the per-page encoding of html previews (#4183) 2018-11-15 11:05:08 -06:00
Amber Brown
a51288e5d6 Add a coveragerc (#4180) 2018-11-15 10:50:08 -06:00
Neil Johnson
b5d92d4d46 Merge pull request #4188 from matrix-org/rav/readme-update-1
Update README for #1491 fix
2018-11-15 13:06:41 +00:00
Richard van der Hoff
4f8bb633c7 Update README for #1491 fix 2018-11-15 10:03:36 +00:00
Neil Johnson
bf648c37e7 release 0.33.9rc1 2018-11-14 11:45:52 +00:00
Richard van der Hoff
4b60c969d8 Merge pull request #4184 from matrix-org/rav/fix_public_consent
Fix an internal server error when viewing the public privacy policy
2018-11-14 11:32:43 +00:00
Richard van der Hoff
0c4dc6fd76 changelog 2018-11-14 10:48:08 +00:00
Richard van der Hoff
c1efcd7c6a Add a test for the public T&Cs form 2018-11-14 10:46:27 +00:00
Richard van der Hoff
83a5f459aa Fix an internal server error when viewing the public privacy policy 2018-11-14 10:21:07 +00:00
David Baker
0869566ad3 Merge pull request #4113 from matrix-org/dbkr/e2e_backup_versions_are_numbers
Make e2e backup versions numeric in the DB
2018-11-14 07:55:48 +00:00
Aaron Raimist
924c82ca16 Fix case
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2018-11-13 22:12:07 -06:00
Aaron Raimist
5d02704822 Add SUPPORT.md
https://help.github.com/articles/adding-support-resources-to-your-project/
2018-11-13 21:57:10 -06:00
Aaron Raimist
9ca1215582 Add changelog
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2018-11-13 21:46:48 -06:00
Aaron Raimist
d86826277d Add a pull request template and add multiple issue templates
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2018-11-13 21:43:40 -06:00
David Baker
bca3b91c2d Merge remote-tracking branch 'origin/develop' into dbkr/e2e_backup_versions_are_numbers 2018-11-09 18:35:02 +00:00
Erik Johnston
db5a1c059a Merge pull request #4166 from matrix-org/erikj/drop_unknown_events
Drop incoming events from federation for unknown rooms
2018-11-09 17:59:34 +00:00
David Baker
d44dea0223 pep8 2018-11-09 14:38:31 +00:00
David Baker
4f93abd62d add docs 2018-11-09 13:25:38 +00:00
David Baker
d3fa6194f7 Remove unnecessary str() 2018-11-09 11:11:31 +00:00
Brendan Abolivier
0f3f0a64bf Merge pull request #4168 from matrix-org/babolivier/federation-client-content-type
Add a Content-Type header on POST requests to the federation client script
2018-11-09 11:00:55 +00:00
Brendan Abolivier
91d96759c9 Add a Content-Type header on POST requests to the federation client 2018-11-09 10:41:34 +00:00
Erik Johnston
7b22421a7b Merge pull request #4164 from matrix-org/erikj/fix_device_comparison
Fix noop checks when updating device keys
2018-11-08 14:37:20 +00:00
Erik Johnston
abaa93c158 Add test to assert set_e2e_device_keys correctly returns False on no-op 2018-11-08 14:06:44 +00:00
Richard van der Hoff
c70809a275 Merge pull request #4163 from matrix-org/rav/fix_consent_on_py3
Fix encoding error for consent form on python3
2018-11-08 12:48:51 +00:00
Erik Johnston
5ebed18692 Lets convert bytes to unicode instead 2018-11-08 12:33:13 +00:00
Erik Johnston
06c3d8050f Newsfile 2018-11-08 12:18:41 +00:00
Erik Johnston
b1a22b24ab Fix noop checks when updating device keys
Clients often reupload their device keys (for some reason) so its
important for the server to check for no-ops before sending out device
list update notifications.

The check is broken in python 3 due to the fact comparing bytes and
unicode always fails, and that we write bytes to the DB but get unicode
when we read.
2018-11-08 12:18:38 +00:00
Richard van der Hoff
0a1fc52971 fix parse_string docstring 2018-11-08 11:12:29 +00:00
Richard van der Hoff
de6223836e changelog 2018-11-08 11:06:28 +00:00
hera
2b075fb03a Fix encoding error for consent form on python3
The form was rendering this as "b'01234....'".

-- richvdh
2018-11-08 11:05:39 +00:00
David Baker
e0934acdbb Cast to int here too 2018-10-30 11:12:23 +00:00
David Baker
12941f5f8b Cast bacjup version to int when querying 2018-10-30 11:01:07 +00:00
David Baker
2f0f911c52 Convert version back to a string 2018-10-30 10:35:18 +00:00
David Baker
4eacf0f200 news fragment 2018-10-30 10:05:51 +00:00
David Baker
64fa557f80 Try & make it work on postgres 2018-10-30 09:51:04 +00:00
David Baker
563f9b61b1 Make e2e backup versions numeric in the DB
We were doing max(version) which does not do what we wanted
on a column of type TEXT.
2018-10-29 21:01:22 +00:00
54 changed files with 315 additions and 2100 deletions

View File

@@ -1,26 +1,21 @@
ARG PYTHON_VERSION=3
ARG PYTHON_VERSION=2
###
### Stage 0: builder
###
FROM docker.io/python:${PYTHON_VERSION}-slim-stretch as builder
FROM docker.io/python:${PYTHON_VERSION}-alpine3.8 as builder
# install the OS build deps
RUN apt-get update && apt-get install -y \
build-essential \
RUN apk add \
build-base \
libffi-dev \
sqlite3 \
libssl-dev \
libjpeg-dev \
libxslt1-dev \
libxml2-dev \
libpq-dev
# for ksm_preload
RUN apt-get install -y \
git \
cmake
libjpeg-turbo-dev \
libressl-dev \
libxslt-dev \
linux-headers \
postgresql-dev \
zlib-dev
# build things which have slow build steps, before we copy synapse, so that
# the layer can be cached.
@@ -39,57 +34,30 @@ RUN pip install --prefix="/install" --no-warn-script-location \
COPY . /synapse
RUN pip install --prefix="/install" --no-warn-script-location \
lxml \
psycopg2-binary \
psycopg2 \
/synapse
# N.B. to work, this needs:
# echo 1 > /sys/kernel/mm/ksm/run
# echo 31250 > /sys/kernel/mm/ksm/pages_to_scan # 128MB of 4KB pages at a time
# echo 10000 > /sys/kernel/mm/ksm/pages_to_scan # 40MB of pages at a time
# ...to be run in the Docker host
RUN git clone https://github.com/unbrice/ksm_preload && \
cd ksm_preload && \
cmake . && \
make && \
cp libksm_preload.so /install/lib
###
### Stage 1: runtime
###
FROM docker.io/python:${PYTHON_VERSION}-slim-stretch
FROM docker.io/python:${PYTHON_VERSION}-alpine3.8
RUN apt-get update && apt-get install -y \
procps \
net-tools \
iproute2 \
tcpdump \
traceroute \
mtr-tiny \
inetutils-ping \
less \
lsof \
supervisor \
netcat
# for topologiser
RUN pip install flask
RUN apk add --no-cache --virtual .runtime_deps \
libffi \
libjpeg-turbo \
libressl \
libxslt \
libpq \
zlib \
su-exec
COPY --from=builder /install /usr/local
COPY ./docker/start.py /start.py
COPY ./docker/conf /conf
COPY ./docker/proxy/proxy /proxy/proxy
COPY ./docker/proxy/maps /proxy/maps
COPY ./docker/supervisord.conf /etc/supervisor/conf.d/supervisord.conf
VOLUME ["/data"]
EXPOSE 8008/tcp 8448/tcp 3000/tcp 5683/udp
EXPOSE 8008/tcp 8448/tcp
ENV LD_PRELOAD=/usr/local/lib/libksm_preload.so
# default is 32768 (8 4KB pages)
ENV KSMP_MERGE_THRESHOLD=16384
ENTRYPOINT ["/usr/bin/supervisord"]
ENTRYPOINT ["/start.py"]

View File

@@ -55,7 +55,7 @@ database:
database: "{{ POSTGRES_DB or "synapse" }}"
host: "{{ POSTGRES_HOST or "db" }}"
port: "{{ POSTGRES_PORT or "5432" }}"
cp_min: 1
cp_min: 5
cp_max: 10
{% else %}
database:
@@ -73,7 +73,7 @@ log_config: "/compiled/log.config"
## Ratelimiting ##
rc_messages_per_second: 50
rc_messages_per_second: 0.2
rc_message_burst_count: 10.0
federation_rc_window_size: 1000
federation_rc_sleep_limit: 10

View File

@@ -15,13 +15,6 @@ handlers:
formatter: precise
filters: [context]
{% if SYNAPSE_LOG_HOST %}
http_meshsim:
class: logging.handlers.HTTPHandler
host: {{ SYNAPSE_LOG_HOST }}:3000
url: "/log"
{% endif %}
loggers:
synapse:
level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
@@ -31,12 +24,6 @@ loggers:
# information such as access tokens.
level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
{% if SYNAPSE_LOG_HOST %}
synapse.federation.pdu_destination_logger:
level: INFO
handlers: [http_meshsim,console]
{% endif %}
root:
level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
handlers: [console]

View File

@@ -1,35 +0,0 @@
[
"errcode",
"error",
"msgtype",
"body",
"formatted_body",
"format",
"avatar_url",
"displayname",
"membership",
"url",
"name",
"origin",
"origin_server_ts",
"pdus",
"edus",
"sender",
"type",
"auth_events",
"unsigned",
"prev_events",
"depth",
"redacts",
"event_id",
"room_id",
"state_key",
"content",
"edu_type",
"sha256",
"typing",
"user_id",
"m.read",
"data",
"event_ids"
]

View File

@@ -1,5 +0,0 @@
[
"m.text",
"m.emote",
"m.notice"
]

View File

@@ -1,8 +0,0 @@
[
"m.presence_invite",
"m.presence_accept",
"m.presence_deny",
"m.presence",
"m.typing",
"m.receipt"
]

View File

@@ -1,36 +0,0 @@
[
"M_BAD_JSON",
"M_BAD_STATE",
"M_CANNOT_LEAVE_SERVER_NOTICE_ROOM",
"M_CAPTCHA_INVALID",
"M_CAPTCHA_NEEDED",
"M_CONSENT_NOT_GIVEN",
"M_EXAMPLE_ERROR",
"M_EXCLUSIVE",
"M_FORBIDDEN",
"M_GUEST_ACCESS_FORBIDDEN",
"M_INCOMPATIBLE_ROOM_VERSION",
"M_INVALID_PARAM",
"M_INVALID_ROOM_STATE",
"M_INVALID_USERNAME",
"M_LIMIT_EXCEEDED",
"M_MISSING_PARAM",
"M_MISSING_TOKEN",
"M_NOT_FOUND",
"M_NOT_JSON",
"M_RESOURCE_LIMIT_EXCEEDED",
"M_ROOM_IN_USE",
"M_SERVER_NOT_TRUSTED",
"M_THREEPID_AUTH_FAILED",
"M_THREEPID_DENIED",
"M_THREEPID_IN_USE",
"M_THREEPID_NOT_FOUND",
"M_TOO_LARGE",
"M_UNAUTHORIZED",
"M_UNKNOWN",
"M_UNKNOWN_TOKEN",
"M_UNRECOGNIZED",
"M_UNSUPPORTED_ROOM_VERSION",
"M_USER_IN_USE",
"M_WEAK_PASSWORD"
]

View File

@@ -1,26 +0,0 @@
[
"m.call.answer",
"m.call.candidates",
"m.call.hangup",
"m.call.invite",
"m.direct",
"m.presence",
"m.receipt",
"m.room.aliases",
"m.room.avatar",
"m.room.canonical_alias",
"m.room.create",
"m.room.guest_access",
"m.room.history_visibility",
"m.room.join_rules",
"m.room.member",
"m.room.message",
"m.room.message.feedback",
"m.room.name",
"m.room.power_levels",
"m.room.redaction",
"m.room.third_party_invite",
"m.room.topic",
"m.tag",
"m.typing"
]

View File

@@ -1,80 +0,0 @@
kdisplayname
javatar_url
mdid_not_relay
fage_ts
dpdus
gmsgtypefm.text
gcontent¢dbody
dbody
bhunsigned
ddtab
kauth_events
edepth
foriginhsynapse
porigin_server_ts
fsender
typenm.room.message
hevent_ido$
kprev_events
groom_idn!
cXJGX%
dtypenm.room.message
dtypemm.room.member
events
:synapse0
:synapse1
:synapse2
:synapse3
:synapse4
:synapse5
:synapse7
:synapse8
&exclude_threaded=true
chunk
start
end
thread_id
=%7B%22thread_id%22%3A0%7D&2=20&dir=b&from=
transaction_id
m.room.room_version
m.room.power_levels
m.room.join_rule
m.room.guest_access
user_id
dtypenm.room.aliases
dtypevm.room.canonical_aliasfsender
device_id
home_server
access_token
"dpdus\x81\xaadtypenm.room.messageedepth"
"gcontent\xa2dbody"
"gmsgtypefm.textgroom_idn"
"porigin_server_ts\x1b\x00\x00\x01gZ\xe3\xfd\x1c"
"\x11*\xd1\x02\x06\xd1\x14"
"B\x03\x00:\x8d\x87\xb10\x021Q\x00\x11* \xd1\x00\x06\xd2\x14\x01\x06\xff\xa1dpdus\x81\xaadtypenm.room.messageedepth\x18"
"bE\x00\x01\x1dr\xc1*\xb1\x06Q\x07\xff\xa1dpdus\xa0"
"\xa1dpdus\xa0"
"\xa8erooms\xa3djoin\xa0eleave\xa0finvite\xa0fgroups\xa3djoin\xa0eleave\xa0finvite\xa0hpresence\xa1fevents\x80ito_device\xa1fevents\x80jnext_batchts58_10_0_1_1_1_1_3_1laccount_data\xa1fevents\x80ldevice_lists\xa2dleft\x80gchanged\x80x\x1adevice_one_time_keys_count\xa1qsigned_curve25519\x182"
"\xa8erooms\xa3djoin\xa1n!DQQ0:synapse0\xa6estate\xa1fevents\x80gsummary\xa0htimeline\xa3fevents\x81\xa6dtypenm.room.messagefsender"
"`Zy\x1eglimited\xf4jprev_batchss16_3_0_1_1_1_1_3_1iephemeral\xa1fevents\x80laccount_data\xa1fevents\x80tunread_notifications\xa0eleave"
"ephemeral\xa1fevents\x80laccount_data\xa1fevents\x81\xa2dtypelm.fully_readgcontent\xa1hevent_idk"
"chunk\x8a\xaacage\x0cdtypemm.room.memberfsenderqgcontent\xa3javatar_url\xf6jmembershipdjoinkdisplayname\xf6groom_id"
"gcontent\xa1rhistory_visibilityfsharedgroom_id"
"dtypex\x19m.room.history_visibility"
"gcontent\xa1ijoin_rulefpublicgroom_idn"
"dtypesm.room.power_levelsfsenderq"
"gcontent\xa9cban\x182dkick\x182eusers\xa1q"
"\x18dfevents\xa5km.room.name\x182mm.room.avatar\x182sm.room.power_levels\x18dvm.room.canonical_alias\x182x\x19m.room.history_visibility\x18dfinvite\x00fredact\x182mstate_default\x182musers_default\x00nevents_default\x00groom_idn"
"gcontent\xa2gcreatorqlroom_versiona1groom_idn"
"\xa1eflows\x81\xa1dtypepm.login.password"
"\xa2eerroroNo backup foundgerrcodekM_NOT_FOUND"
"xa1kdevice_keys\xa5dkeys\xa2red25519:J"
"jalgorithms\x82x\x1cm.olm.v1.curve25519-aes-sha2tm.megolm.v1.aes-sha2jsignatures\xa1"
"\xa2fdevice\xa0fglobal\xa5droom\x80fsender\x80gcontent\x81\xa5gactions\x83fnotify\xa2evaluegdefaultiset_tweakesound\xa1iset_tweakihighlightgdefault\xf5genabled\xf5gpatternggrule_idx\x1a.m.rule.contains_user_namehoverride\x86\xa5gactions\x81kdont_notifygdefault\xf5genabled\xf4grule_idn.m.rule.masterjconditions\x80\xa5gactions\x81kdont_notifygdefault\xf5genabled\xf5grule_idx\x18.m.rule.suppress_noticesjconditions\x81\xa3ckeyocontent.msgtypedkindkevent_matchgpatternhm.notice\xa5gactions\x83fnotify\xa2evaluegdefaultiset_tweakesound\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_idu.m.rule.invite_for_mejconditions\x83\xa3ckeydtypedkindkevent_matchgpatternmm.room.member\xa3ckeyrcontent.membershipdkindkevent_matchgpatternfinvite\xa3ckeyistate_keydkindkevent_matchgpatternq\xa5gactions\x81kdont_notifygdefault\xf5genabled\xf5grule_idt.m.rule.member_eventjconditions\x81\xa3ckeydtypedkindkevent_matchgpatternmm.room.member\xa5gactions\x83fnotify\xa2evaluegdefaultiset_tweakesound\xa1iset_tweakihighlightgdefault\xf5genabled\xf5grule_idx\x1d.m.rule.contains_display_namejconditions\x81\xa1dkinducontains_display_name\xa5gactions\x82fnotify\xa2evalue\xf5iset_tweakihighlightgdefault\xf5genabled\xf5grule_idq.m.rule.roomnotifjconditions\x82\xa3ckeylcontent.bodydkindkevent_matchgpatterne@room\xa2ckeydroomdkindx\x1esender_notification_permissioniunderride\x85\xa5gactions\x83fnotify\xa2evaluedringiset_tweakesound\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_idl.m.rule.calljconditions\x81\xa3ckeydtypedkindkevent_matchgpatternmm.call.invite\xa5gactions\x83fnotify\xa2evaluegdefaultiset_tweakesound\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_idw.m.rule.room_one_to_onejconditions\x82\xa2bisa2dkindqroom_member_count\xa3ckeydtypedkindkevent_matchgpatternnm.room.message\xa5gactions\x83fnotify\xa2evaluegdefaultiset_tweakesound\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_idx!.m.rule.encrypted_room_one_to_onejconditions\x82\xa2bisa2dkindqroom_member_count\xa3ckeydtypedkindkevent_matchgpatternpm.room.encrypted\xa5gactions\x82fnotify\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_ido.m.rule.messagejconditions\x81\xa3ckeydtypedkindkevent_matchgpatternnm.room.message\xa5gactions\x82fnotify\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_idq.m.rule.encryptedjconditions\x81\xa3ckeydtypedkindkevent_matchgpatternpm.room.encrypted"
"\xa1droom\xa1htimeline\xa1elimit\x14"
"\xa8erooms\xa3djoin\xa1n!eYB0:synapse0\xa6estate\xa1fevents\x80gsummary\xa0htimeline\xa3fevents\x8e\xa7dtypemm.room.createfsenderq"
"\xa1mone_time_keys\xa5x\x18signed_curve25519:"
"\xa1sone_time_key_counts\xa1qsigned_curve25519\x05"
"\xa4jexpires_in\xfb@\xac \x00\x00\x00\x00\x00jtoken_typefBearerlaccess_tokenxrmatrix_server_namehsynapse"

View File

@@ -1,18 +0,0 @@
[
"minimum_valid_until_ts",
"v",
"limit",
"event_id",
"ver",
"limit",
"since",
"include_all_networks",
"third_party_instance_id",
"room_alias",
"user_id",
"field",
"minimum_valid_until_ts",
"filter",
"access_token",
"timeout"
]

View File

@@ -1,519 +0,0 @@
[
{
"path": "/_matrix/federation/v1/send/{txnId}",
"method": "put",
"name": "send_transaction"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/send/{eventType}/{txnId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/profile/{userId}/displayname",
"method": "get"
},
{
"path": "/_matrix/client/r0/profile/{userId}/displayname",
"method": "put"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/join",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/kick",
"method": "post"
},
{
"path": "/_matrix/client/r0/admin/whois/{userId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/receipt/{receiptType}/{eventId}",
"method": "post"
},
{
"path": "/_matrix/client/versions",
"method": "get"
},
{
"path": "/_matrix/media/r0/config",
"method": "get"
},
{
"path": "/_matrix/media/r0/download/{serverName}/{mediaId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/invite ",
"method": "post"
},
{
"path": "/_matrix/client/r0/join/{roomIdOrAlias}",
"method": "post"
},
{
"path": "/_matrix/client/r0/presence/list/{userId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/presence/list/{userId}",
"method": "post"
},
{
"path": "/_matrix/client/r0/pushrules/{scope}/{kind}/{ruleId}",
"method": "delete"
},
{
"path": "/_matrix/client/r0/pushrules/{scope}/{kind}/{ruleId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/pushrules/{scope}/{kind}/{ruleId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/state/{eventType}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/state/{eventType}",
"method": "put"
},
{
"path": "/_matrix/client/r0/account/whoami",
"method": "get"
},
{
"path": "/_matrix/client/r0/devices",
"method": "get"
},
{
"path": "/_matrix/client/r0/keys/claim",
"method": "post"
},
{
"path": "/_matrix/client/r0/login",
"method": "get"
},
{
"path": "/_matrix/client/r0/login",
"method": "post"
},
{
"path": "/_matrix/client/r0/pushrules/{scope}/{kind}/{ruleId}/actions",
"method": "put"
},
{
"path": "/_matrix/client/r0/register/available",
"method": "get"
},
{
"path": "/_matrix/client/r0/user/{userId}/filter",
"method": "post"
},
{
"path": "/_matrix/client/r0/user_directory/search",
"method": "post"
},
{
"path": "/_matrix/client/r0/account/3pid",
"method": "get"
},
{
"path": "/_matrix/client/r0/account/3pid",
"method": "post"
},
{
"path": "/_matrix/client/r0/publicRooms",
"method": "get"
},
{
"path": "/_matrix/client/r0/publicRooms",
"method": "post"
},
{
"path": "/_matrix/client/r0/register",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/ban",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/typing/{userId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/search",
"method": "post"
},
{
"path": "/_matrix/client/r0/account/password",
"method": "post"
},
{
"path": "/_matrix/client/r0/initialSync",
"method": "get"
},
{
"path": "/_matrix/client/r0/logout/all",
"method": "post"
},
{
"path": "/_matrix/client/r0/account/deactivate",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/forget",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/redact/{eventId}/{txnId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/unban",
"method": "post"
},
{
"path": "/_matrix/client/r0/keys/query",
"method": "post"
},
{
"path": "/_matrix/client/r0/user/{userId}/account_data/{type}",
"method": "put"
},
{
"path": "/_matrix/client/r0/user/{userId}/rooms/{roomId}/tags/{tag}",
"method": "delete"
},
{
"path": "/_matrix/client/r0/user/{userId}/rooms/{roomId}/tags/{tag}",
"method": "put"
},
{
"path": "/_matrix/media/r0/upload",
"method": "post"
},
{
"path": "/_matrix/client/r0/events",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/context/{eventId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/invite",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/messages",
"method": "get"
},
{
"path": "/_matrix/client/r0/account/3pid/delete",
"method": "post"
},
{
"path": "/_matrix/client/r0/createRoom",
"method": "post"
},
{
"path": "/_matrix/client/r0/profile/{userId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/pushrules/{scope}/{kind}/{ruleId}/enabled",
"method": "put"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/leave",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/members",
"method": "get"
},
{
"path": "/_matrix/client/r0/sendToDevice/{eventType}/{txnId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/voip/turnServer",
"method": "get"
},
{
"path": "/.well-known/matrix/client",
"method": "get"
},
{
"path": "/_matrix/client/r0/directory/list/appservice/{networkId}/{roomId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/keys/upload",
"method": "post"
},
{
"path": "/_matrix/client/r0/pushrules/",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/initialSync",
"method": "get"
},
{
"path": "/_matrix/client/r0/user/{userId}/openid/request_token",
"method": "post"
},
{
"path": "/_matrix/client/r0/user/{userId}/rooms/{roomId}/tags",
"method": "get"
},
{
"path": "/_matrix/media/r0/download/{serverName}/{mediaId}/{fileName}",
"method": "get"
},
{
"path": "/_matrix/client/r0/delete_devices",
"method": "post"
},
{
"path": "/_matrix/client/r0/events/{eventId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/profile/{userId}/avatar_url",
"method": "get"
},
{
"path": "/_matrix/client/r0/profile/{userId}/avatar_url",
"method": "put"
},
{
"path": "/_matrix/client/r0/pushers/set",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/event/{eventId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/report/{eventId}",
"method": "post"
},
{
"path": "/_matrix/media/r0/preview_url",
"method": "get"
},
{
"path": "/_matrix/client/r0/directory/room/{roomAlias}",
"method": "delete"
},
{
"path": "/_matrix/client/r0/directory/room/{roomAlias}",
"method": "get"
},
{
"path": "/_matrix/client/r0/directory/room/{roomAlias}",
"method": "put"
},
{
"path": "/_matrix/client/r0/sync",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/read_markers",
"method": "post"
},
{
"path": "/_matrix/client/r0/logout",
"method": "post"
},
{
"path": "/_matrix/client/r0/notifications",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/joined_members",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/state/{eventType}/{stateKey}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/state/{eventType}/{stateKey}",
"method": "put"
},
{
"path": "/_matrix/client/r0/user/{userId}/filter/{filterId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/user/{userId}/rooms/{roomId}/account_data/{type}",
"method": "put"
},
{
"path": "/_matrix/client/r0/joined_rooms",
"method": "get"
},
{
"path": "/_matrix/client/r0/keys/changes",
"method": "get"
},
{
"path": "/_matrix/client/r0/presence/{userId}/status",
"method": "get"
},
{
"path": "/_matrix/client/r0/presence/{userId}/status",
"method": "put"
},
{
"path": "/_matrix/client/r0/pushers",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/state",
"method": "get"
},
{
"path": "/_matrix/media/r0/thumbnail/{serverName}/{mediaId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/devices/{deviceId}",
"method": "delete"
},
{
"path": "/_matrix/client/r0/devices/{deviceId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/devices/{deviceId}",
"method": "put"
},
{
"path": "/_matrix/federation/v1/backfill/{roomId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/get_missing_events/{roomId}",
"method": "post"
},
{
"path": "/_matrix/federation/v1/event_auth/{roomId}/{eventId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query_auth/{roomId}/{eventId}",
"method": "post"
},
{
"path": "/_matrix/federation/v1/state/{roomId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/state_ids/{roomId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/event/{eventId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/invite/{roomId}/{eventId}",
"method": "put"
},
{
"path": "/_matrix/federation/v1/make_join/{roomId}/{userId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/send_join/{roomId}/{eventId}",
"method": "put"
},
{
"path": "/_matrix/federation/v1/query/{serverName}/{keyId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query",
"method": "post"
},
{
"path": "/_matrix/federation/v1/server/{keyId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/make_leave/{roomId}/{userId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/send_leave/{roomId}/{eventId}",
"method": "put"
},
{
"path": "/_matrix/federation/v1/openid/userinfo",
"method": "get"
},
{
"path": "/_matrix/federation/v1/publicRooms",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query/directory",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query/profile",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query/{queryType}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query_auth/{roomId}/{eventId}",
"method": "post"
},
{
"path": "/_matrix/federation/v1/state/{roomId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/state_ids/{roomId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/event/{eventId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/event_auth/{roomId}/{eventId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/exchange_third_party_invite/{roomId}",
"method": "put"
},
{
"path": "/_matrix/federation/v1/3pid/onbind",
"method": "put"
},
{
"path": "/_matrix/federation/v1/user/devices/{userId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/user/keys/claim",
"method": "post"
},
{
"path": "/_matrix/federation/v1/user/keys/query",
"method": "post"
}
]

Binary file not shown.

View File

@@ -6,7 +6,6 @@ import sys
import subprocess
import glob
import codecs
import time
# Utility functions
convert = lambda src, dst, environ: open(dst, "w").write(jinja2.Template(open(src).read()).render(**environ))
@@ -32,10 +31,6 @@ def generate_secrets(environ, secrets):
# Prepare the configuration
mode = sys.argv[1] if len(sys.argv) > 1 else None
environ = os.environ.copy()
for e in environ:
print("%s:%s" % (e, environ[e]))
ownership = "{}:{}".format(environ.get("UID", 991), environ.get("GID", 991))
args = ["python", "-m", "synapse.app.homeserver"]
@@ -69,9 +64,4 @@ else:
args += ["--config-path", "/compiled/homeserver.yaml"]
# Generate missing keys and start synapse
subprocess.check_output(args + ["--generate-keys"])
# we register our test users in add_accounts.sh now to avoid having to wait for HS launch
#os.system("(sleep 10; /usr/local/bin/register_new_matrix_user -u matthew -p secret -c /compiled/homeserver.yaml -a) &");
os.execv("/usr/local/bin/python", args)
os.execv("/sbin/su-exec", ["su-exec", ownership] + args)

View File

@@ -1,8 +0,0 @@
[supervisord]
nodaemon=true
[program:synapse]
command=/start.py
[program:proxy]
command=/proxy/proxy --maps-dir /proxy/maps --debug-log

View File

@@ -120,9 +120,6 @@ ROOM_EVENT_FILTER_SCHEMA = {
"include_redundant_members": {
"type": "boolean"
},
"thread_id": {
"type": "number",
}
}
}
@@ -334,8 +331,6 @@ class Filter(object):
self.contains_url = self.filter_json.get("contains_url", None)
self.thread_id = self.filter_json.get("thread_id", None)
def filters_all_types(self):
return "*" in self.not_types

View File

@@ -38,7 +38,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
AuthError if the checks fail
Returns:
if the auth checks pass.
if the auth checks pass.
"""
if do_size_check:
_check_size_limits(event)
@@ -46,7 +46,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
if not hasattr(event, "room_id"):
raise AuthError(500, "Event has no room_id: %s" % event)
if False and do_sig_check: # Disable all sig checks
if do_sig_check:
sender_domain = get_domain_from_id(event.sender)
event_id_domain = get_domain_from_id(event.event_id)
@@ -103,9 +103,6 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
"No create event in auth events",
)
if event.type == "org.matrix.server_presence":
return
creating_domain = get_domain_from_id(event.room_id)
originating_domain = get_domain_from_id(event.sender)
if creating_domain != originating_domain:

View File

@@ -44,9 +44,6 @@ class _EventInternalMetadata(object):
def is_invite_from_remote(self):
return getattr(self, "invite_from_remote", False)
def is_internal_event(self):
return getattr(self, "internal_event", False)
def get_send_on_behalf_of(self):
"""Whether this server should send the event on behalf of another server.
This is used by the federation "send_join" API to forward the initial join

View File

@@ -14,9 +14,9 @@
# limitations under the License.
import copy
import string
from synapse.types import EventID
from synapse.util.stringutils import random_string
from . import EventBase, FrozenEvent, _event_dict_property
@@ -49,10 +49,10 @@ class EventBuilderFactory(object):
self.event_id_count = 0
def create_event_id(self):
i = self.event_id_count
i = str(self.event_id_count)
self.event_id_count += 1
local_part = _encode_id(i)
local_part = str(int(self.clock.time())) + i + random_string(5)
e_id = EventID(local_part, self.hostname)
@@ -73,19 +73,3 @@ class EventBuilderFactory(object):
key_values["signatures"] = {}
return EventBuilder(key_values=key_values,)
def _numberToBase(n, b):
if n == 0:
return [0]
digits = []
while n:
digits.append(int(n % b))
n //= b
return digits[::-1]
def _encode_id(i):
digits = string.digits + string.ascii_letters
val_slice = _numberToBase(i, len(digits))
return "".join(digits[x] for x in val_slice)

View File

@@ -74,7 +74,6 @@ class EventContext(object):
"delta_ids",
"prev_state_events",
"app_service",
"thread_id",
"_current_state_ids",
"_prev_state_ids",
"_prev_state_id",
@@ -90,9 +89,8 @@ class EventContext(object):
@staticmethod
def with_state(state_group, current_state_ids, prev_state_ids,
thread_id, prev_group=None, delta_ids=None):
prev_group=None, delta_ids=None):
context = EventContext()
context.thread_id = thread_id
# The current state including the current event
context._current_state_ids = current_state_ids
@@ -143,8 +141,7 @@ class EventContext(object):
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None,
"thread_id": self.thread_id,
"app_service_id": self.app_service.id if self.app_service else None
})
@staticmethod
@@ -161,8 +158,6 @@ class EventContext(object):
"""
context = EventContext()
context.thread_id = input["thread_input"]
# We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids.
context._prev_state_id = input["prev_state_id"]

View File

@@ -26,7 +26,7 @@ from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.http.servlet import assert_params_in_dict
from synapse.types import get_domain_from_id, EventID
from synapse.types import get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
logger = logging.getLogger(__name__)
@@ -79,16 +79,16 @@ class FederationBase(object):
allow_none=True,
)
# if not res and pdu.origin != origin:
# try:
# res = yield self.get_pdu(
# destinations=[pdu.origin],
# event_id=pdu.event_id,
# outlier=outlier,
# timeout=10000,
# )
# except SynapseError:
# pass
if not res and pdu.origin != origin:
try:
res = yield self.get_pdu(
destinations=[pdu.origin],
event_id=pdu.event_id,
outlier=outlier,
timeout=10000,
)
except SynapseError:
pass
if not res:
logger.warn(
@@ -136,7 +136,6 @@ class FederationBase(object):
* throws a SynapseError if the signature check failed.
The deferreds run their callbacks in the sentinel logcontext.
"""
return [defer.succeed(p) for p in pdus]
deferreds = _check_sigs_on_pdus(self.keyring, pdus)
ctx = logcontext.LoggingContext.current_context()
@@ -318,7 +317,7 @@ def event_from_pdu_json(pdu_json, outlier=False):
depth = pdu_json['depth']
if not isinstance(depth, six.integer_types):
raise SynapseError(400, "Depth %r not an integer" % (depth, ),
raise SynapseError(400, "Depth %r not an intger" % (depth, ),
Codes.BAD_JSON)
if depth < 0:
@@ -326,40 +325,6 @@ def event_from_pdu_json(pdu_json, outlier=False):
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
event_id = pdu_json["event_id"]
if event_id[0] != "$":
pdu_json["event_id"] = EventID(
event_id,
get_domain_from_id(pdu_json["sender"]),
).to_string()
event_id = pdu_json["event_id"]
dtab = pdu_json.get("unsigned", {}).pop("dtab", None)
if dtab:
pdu_json.setdefault("unsigned", {})["destinations"] = {
dest: cost
for cost, destinations in dtab
for dest in destinations
}
if "auth_events" in pdu_json:
pdu_json["auth_events"] = [
(e, {}) if isinstance(e, six.string_types) else e
for e in pdu_json["auth_events"]
]
if "prev_events" in pdu_json:
pdu_json["prev_events"] = [
(e, {}) if isinstance(e, six.string_types) else e
for e in pdu_json["prev_events"]
]
if "origin" not in pdu_json:
pdu_json["origin"] = get_domain_from_id(pdu_json["sender"])
logger.info("Unmangled event to: %s", pdu_json)
event = FrozenEvent(
pdu_json
)

View File

@@ -39,7 +39,6 @@ from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from synapse.federation.units import _mangle_pdu
logger = logging.getLogger(__name__)
@@ -487,22 +486,6 @@ class FederationClient(FederationBase):
RuntimeError if no servers were reachable.
"""
healths = yield self.store.get_destination_healths(destinations)
with_healths = []
without_healths = []
for d in destinations:
if healths.get(d):
with_healths.append(d)
else:
without_healths.append(d)
with_healths.sort(key=lambda d: healths[d])
destinations = with_healths + without_healths
logger.info("Trying destinations: %r", destinations)
for destination in destinations:
if destination == self.server_name:
continue
@@ -713,7 +696,7 @@ class FederationClient(FederationBase):
destination=destination,
room_id=room_id,
event_id=event_id,
content=_mangle_pdu(pdu.get_pdu_json(time_now)),
content=pdu.get_pdu_json(time_now),
)
except HttpResponseException as e:
if e.code == 403:

View File

@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
import six
from six import iteritems
@@ -37,19 +36,19 @@ from synapse.api.errors import (
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction, _mangle_pdu
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.types import get_domain_from_id
from synapse.util import glob_to_regex
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -72,7 +71,6 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self.clock = hs.get_clock()
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
@@ -85,8 +83,6 @@ class FederationServer(FederationBase):
# come in waves.
self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
self.tracer = hs.get_tracer()
@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, room_id, versions, limit):
@@ -104,7 +100,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_incoming_transaction(self, origin, transaction_data, span):
def on_incoming_transaction(self, origin, transaction_data):
# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = self._clock.time_msec()
@@ -122,13 +118,13 @@ class FederationServer(FederationBase):
(origin, transaction.transaction_id),
)):
result = yield self._handle_incoming_transaction(
origin, transaction, request_time, span,
origin, transaction, request_time,
)
defer.returnValue(result)
@defer.inlineCallbacks
def _handle_incoming_transaction(self, origin, transaction, request_time, span):
def _handle_incoming_transaction(self, origin, transaction, request_time):
""" Process an incoming transaction and return the HTTP response
Args:
@@ -212,48 +208,24 @@ class FederationServer(FederationBase):
pdu_results[event_id] = e.error_dict()
return
thread_id = random.randint(1, 999999999)
pdu_to_thread = {}
first_in_thread = True
for pdu in reversed(pdus_by_room[room_id]):
if self.handler.should_start_thread(pdu):
pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
first_in_thread = False
else:
pdu_to_thread[pdu.event_id] = (0, False)
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
with nested_logging_context(event_id):
thread_id, new_thread = pdu_to_thread[pdu.event_id]
logger.info("Assigning thread %d to %s", thread_id, pdu.event_id)
child_span = self.tracer.start_span('handle_pdu', child_of=span)
with child_span:
child_span.set_tag("event_id", event_id)
try:
ret = yield self._handle_received_pdu(
origin, pdu, thread_id=thread_id,
new_thread=new_thread,
span=child_span,
)
if ret:
pdu_results[event_id] = ret
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
child_span.set_tag("error", True)
child_span.log_kv({"error", e})
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
child_span.set_tag("error", True)
child_span.log_kv({"error", e})
child_span.log_kv({"pdu_result": pdu_results.get(event_id)})
try:
yield self._handle_received_pdu(
origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
@@ -261,16 +233,7 @@ class FederationServer(FederationBase):
)
if hasattr(transaction, "edus"):
logger.info("Got edus: %s", transaction.edus)
edus = []
for x in transaction.edus:
try:
edus.append(Edu(**x))
except Exception:
logger.exception("Failed to handle EDU: %s", x)
for edu in edus:
for edu in (Edu(**x) for x in transaction.edus):
yield self.received_edu(
origin,
edu.edu_type,
@@ -281,7 +244,7 @@ class FederationServer(FederationBase):
"pdus": pdu_results,
}
logger.info("Returning: %s", str(response))
logger.debug("Returning: %s", str(response))
yield self.transaction_actions.set_response(
origin,
@@ -366,8 +329,8 @@ class FederationServer(FederationBase):
)
defer.returnValue({
"pdus": [_mangle_pdu(pdu.get_pdu_json()) for pdu in pdus],
"auth_chain": [_mangle_pdu(pdu.get_pdu_json()) for pdu in auth_chain],
"pdus": [pdu.get_pdu_json() for pdu in pdus],
"auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
})
@defer.inlineCallbacks
@@ -412,7 +375,7 @@ class FederationServer(FederationBase):
yield self.check_server_matches_acl(origin_host, pdu.room_id)
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
time_now = self._clock.time_msec()
defer.returnValue((200, {"event": _mangle_pdu(ret_pdu.get_pdu_json(time_now))}))
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
@defer.inlineCallbacks
def on_send_join_request(self, origin, content):
@@ -426,9 +389,9 @@ class FederationServer(FederationBase):
res_pdus = yield self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
defer.returnValue((200, {
"state": [_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["state"]],
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"auth_chain": [
_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["auth_chain"]
p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
],
}))
@@ -461,7 +424,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id)
res = {
"auth_chain": [_mangle_pdu(a.get_pdu_json(time_now)) for a in auth_pdus],
"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
}
defer.returnValue((200, res))
@@ -510,7 +473,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
send_content = {
"auth_chain": [
_mangle_pdu(e.get_pdu_json(time_now))
e.get_pdu_json(time_now)
for e in ret["auth_chain"]
],
"rejects": ret.get("rejects", []),
@@ -586,7 +549,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
defer.returnValue({
"events": [_mangle_pdu(ev.get_pdu_json(time_now)) for ev in missing_events],
"events": [ev.get_pdu_json(time_now) for ev in missing_events],
})
@log_function
@@ -608,7 +571,7 @@ class FederationServer(FederationBase):
)
@defer.inlineCallbacks
def _handle_received_pdu(self, origin, pdu, thread_id, new_thread, span):
def _handle_received_pdu(self, origin, pdu):
""" Process a PDU received in a federation /send/ transaction.
If the event is invalid, then this method throws a FederationError.
@@ -638,6 +601,30 @@ class FederationServer(FederationBase):
if the event was unacceptable for any other reason (eg, too large,
too many prev_events, couldn't find the prev_events)
"""
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if origin != get_domain_from_id(pdu.event_id):
# We continue to accept join events from any server; this is
# necessary for the federation join dance to work correctly.
# (When we join over federation, the "helper" server is
# responsible for sending out the join event, rather than the
# origin. See bug #1893).
if not (
pdu.type == 'm.room.member' and
pdu.content and
pdu.content.get("membership", None) == 'join'
):
logger.info(
"Discarding PDU %s from invalid origin %s",
pdu.event_id, origin
)
return
else:
logger.info(
"Accepting join PDU %s from %s",
pdu.event_id, origin
)
# Check signature.
try:
pdu = yield self._check_sigs_and_hash(pdu)
@@ -649,35 +636,10 @@ class FederationServer(FederationBase):
affected=pdu.event_id,
)
destinations = pdu.unsigned.get("destinations", {})
costs = yield self.store.get_destination_healths(list(destinations))
logger.info("Destinations: %s", destinations)
logger.info("Costs: %s", costs)
dont_relay = set()
for dest, their_cost in destinations.items():
our_cost = costs.get(dest)
if our_cost and their_cost and their_cost < our_cost:
dont_relay.add(dest)
if destinations:
pdu.unsigned["destinations"] = {
d: c for d, c in destinations.items() if d not in dont_relay
}
yield self.handler.on_receive_pdu(
origin, pdu, sent_to_us_directly=True,
thread_id=thread_id, new_thread=new_thread,
span=span,
)
ret = {}
if dont_relay:
ret = {"did_not_relay": list(dont_relay)}
defer.returnValue(ret)
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name

View File

@@ -14,10 +14,6 @@
# limitations under the License.
import datetime
import logging
import random
import json
import opentracing
import string
from six import itervalues
@@ -26,9 +22,7 @@ from prometheus_client import Counter
from twisted.internet import defer
import synapse.metrics
from synapse.api.constants import EventTypes
from synapse.api.errors import FederationDeniedError, HttpResponseException
from synapse.events import FrozenEvent
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
from synapse.metrics import (
LaterGauge,
@@ -48,8 +42,6 @@ from .units import Edu, Transaction
logger = logging.getLogger(__name__)
pdu_logger = logging.getLogger("synapse.federation.pdu_destination_logger")
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count", ""
)
@@ -136,7 +128,7 @@ class TransactionQueue(object):
self.last_device_list_stream_id_by_dest = {}
# HACK to get unique tx id
self._next_txn_id = 1
self._next_txn_id = int(self.clock.time_msec())
self._order = 1
@@ -145,8 +137,6 @@ class TransactionQueue(object):
self._processing_pending_presence = False
self.tracer = hs.get_tracer()
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
@@ -179,9 +169,10 @@ class TransactionQueue(object):
@defer.inlineCallbacks
def handle_event(event):
should_relay = yield self._should_relay(event, False)
logger.info("Should relay event %s: %s", event.event_id, should_relay)
if not should_relay:
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id)
if not is_mine and send_on_behalf_of is None:
return
try:
@@ -203,9 +194,15 @@ class TransactionQueue(object):
destinations = set(destinations)
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, destinations)
yield self._send_pdu(event, destinations)
self._send_pdu(event, destinations)
@defer.inlineCallbacks
def handle_room_events(events):
@@ -251,54 +248,24 @@ class TransactionQueue(object):
finally:
self._is_processing = False
@defer.inlineCallbacks
def received_new_event(self, origin, event, span):
should_relay = yield self._should_relay(event, True)
logger.info("Should relay event %s: %s", event.event_id, should_relay)
if not should_relay:
return
destinations = event.unsigned.get("destinations")
destinations = set(destinations)
logger.debug("Sending %s to %r", event, destinations)
yield self._send_pdu(event, destinations, span)
@defer.inlineCallbacks
def _send_pdu(self, pdu, destinations, span=None):
def _send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
references = []
if span:
references = [opentracing.follows_from(span.context)]
with self.tracer.start_span('_send_pdu', references=references) as span:
span.set_tag("event_id", pdu.event_id)
span.set_tag("room_id", pdu.room_id)
span.set_tag("sender", pdu.sender)
destinations = yield self._compute_relay_destinations(
pdu, joined_hosts=destinations,
)
order = self._order
self._order += 1
destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
pdu_logger.info(
"Relaying PDU %s in %s to %s",
pdu.event_id, pdu.room_id, destinations,
)
if not destinations:
return
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
# XXX: Should we decide where to route here.
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
(pdu, order)
@@ -306,36 +273,6 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination)
def _compute_relay_destinations(self, pdu, joined_hosts):
"""Compute where we should send an event. Returning an empty set stops
PDU from being sent anywhere.
"""
# XXX: Hook for routing shenanigans
send_on_behalf_of = pdu.internal_metadata.get_send_on_behalf_of()
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
joined_hosts.discard(send_on_behalf_of)
return joined_hosts
def _should_relay(self, event, from_federation):
"""Whether we should consider relaying this event.
"""
# XXX: Hook for routing shenanigans
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id)
if not is_mine and send_on_behalf_of is None:
return False
if event.internal_metadata.is_internal_event():
return False
return True
@logcontext.preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
@@ -475,7 +412,6 @@ class TransactionQueue(object):
@defer.inlineCallbacks
def _transaction_transmission_loop(self, destination):
pdu_spans = {}
pending_pdus = []
try:
self.pending_transactions[destination] = 1
@@ -487,12 +423,6 @@ class TransactionQueue(object):
pending_pdus = []
while True:
txn_id = _encode_id(self._next_txn_id)
self._next_txn_id += 1
for s in pdu_spans.values():
s.set_tag("txn-id", txn_id)
device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages(destination)
)
@@ -509,22 +439,16 @@ class TransactionQueue(object):
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
# We can only include at most 50 PDUs per transactions
pending_pdus, leftover_pdus = pending_pdus[-5:], pending_pdus[:-5]
pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
if leftover_pdus:
# self.pending_pdus_by_dest[destination] = leftover_pdus
for _, _, p_span in leftover_pdus:
p_span.set_tag("success", False)
p_span.log_kv({"result": "dropped"})
p_span.finish()
logger.info("TX [%s] Sending PDUs: %s", destination, pending_pdus)
self.pending_pdus_by_dest[destination] = leftover_pdus
pending_edus = self.pending_edus_by_dest.pop(destination, [])
# We can only include at most 100 EDUs per transactions
pending_edus, leftover_edus = pending_edus[-5:], pending_edus[:-5]
# if leftover_edus:
# self.pending_edus_by_dest[destination] = leftover_edus
pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
if leftover_edus:
self.pending_edus_by_dest[destination] = leftover_edus
pending_presence = self.pending_presence_by_dest.pop(destination, {})
@@ -561,70 +485,28 @@ class TransactionQueue(object):
)
return
pdu_span_references = []
for pdu, _, p_span in pending_pdus:
pdu_spans[pdu.event_id] = p_span
p_span.set_tag("txn-id", txn_id)
pdu_span_references.append(opentracing.follows_from(p_span.context))
# END CRITICAL SECTION
span = self.tracer.start_span(
'_send_new_transaction', references=pdu_span_references,
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus,
)
with span:
span.set_tag("destination", destination)
span.set_tag("txn-id", txn_id)
try:
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, span,
pdu_spans, txn_id,
if success:
sent_transactions_counter.inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
logger.info("Marking as sent %r %r", destination, dev_list_id)
yield self.store.mark_as_sent_devices_by_remote(
destination, dev_list_id
)
except Exception as e:
success = False
span.set_tag("error", True)
span.log_kv({"error": e})
for s in pdu_spans.values():
s.set_tag("error", True)
s.log_kv({"transaction_error": e})
raise
finally:
if not success:
for p, _, _ in pending_pdus:
yield self._pdu_send_txn_failed(
destination, txn_id, p,
span=pdu_spans[p.event_id],
)
# We want to be *very* sure we del5ete this after we stop
# processing
self.pending_transactions.pop(destination, None)
for s in pdu_spans.values():
s.finish()
span.set_tag("success", success)
if success:
sent_transactions_counter.inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
logger.info(
"Marking as sent %r %r", destination, dev_list_id,
)
yield self.store.mark_as_sent_devices_by_remote(
destination, dev_list_id
)
self.last_device_stream_id_by_dest[destination] = device_stream_id
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else:
break
self.last_device_stream_id_by_dest[destination] = device_stream_id
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else:
break
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
@@ -637,14 +519,17 @@ class TransactionQueue(object):
except FederationDeniedError as e:
logger.info(e)
except Exception as e:
logger.exception(
logger.warn(
"TX [%s] Failed to send transaction: %s",
destination,
e,
)
for p, _, _ in pending_pdus:
for p, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
destination)
finally:
# We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None)
@defer.inlineCallbacks
def _get_new_device_messages(self, destination):
@@ -680,8 +565,7 @@ class TransactionQueue(object):
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
span, pdu_spans, txn_id):
def _send_new_transaction(self, destination, pending_pdus, pending_edus):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
@@ -690,13 +574,9 @@ class TransactionQueue(object):
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
logger.debug("TX [%s] _attempt_new_transaction", destination)
span.log_kv({
"pdus": len(pdus),
"edus": len(edus),
})
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction"
@@ -717,6 +597,8 @@ class TransactionQueue(object):
edus=edus,
)
self._next_txn_id += 1
yield self.transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination)
@@ -746,16 +628,13 @@ class TransactionQueue(object):
try:
response = yield self.transport_layer.send_transaction(
transaction, json_data_cb, span,
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
span.set_tag("error", True)
span.log_kv({"error": e})
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response",
@@ -775,111 +654,18 @@ class TransactionQueue(object):
logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
if code == 200:
logger.info(
"TX [%s] {%s} got response json %s",
destination, txn_id, response
)
pdu_results = response.get("pdus", {})
for p in pdus:
yield self._pdu_send_result(
destination, txn_id, p,
response=pdu_results.get(p.event_id, {}),
span=pdu_spans[p.event_id],
)
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination, txn_id, e_id, r,
)
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination, txn_id, p.event_id,
)
success = False
defer.returnValue(success)
@defer.inlineCallbacks
def _pdu_send_result(self, destination, txn_id, pdu, response, span):
"""Gets called after sending the event in a transaction, with the
result for the event from the remote server.
"""
# XXX: Hook for routing shenanigans
if "error" in response:
span.set_tag("error", True)
span.log_kv({
"error.kind": "pdu",
"response.error": response["error"],
})
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination, txn_id, pdu.event_id, response,
)
pdu_logger.info(
"SendErrorPDU",
extra={
"event_id": pdu.event_id, "room_id": pdu.room_id,
"destination": destination,
"server": self.server_name,
},
)
new_destinations = set(pdu.unsigned.get("destinations", []))
new_destinations.discard(destination)
yield self._send_pdu(pdu, list(new_destinations), span)
elif "did_not_relay" in response and response["did_not_relay"]:
new_destinations = set(response["did_not_relay"])
new_destinations.discard(destination)
pdu_logger.info(
"DidNotRelayPDU",
extra={
"event_id": pdu.event_id, "room_id": pdu.room_id,
"destination": destination,
"new_destinations": json.dumps(list(new_destinations)),
"server": self.server_name,
},
)
span.log_kv({
"did_not_relay_to": list(new_destinations),
})
yield self._send_pdu(pdu, list(new_destinations), span)
# @defer.inlineCallbacks
def _pdu_send_txn_failed(self, destination, txn_id, pdu, span):
"""Gets called when sending a transaction failed (after retries)
"""
# XXX: Hook for routing shenanigans
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination, txn_id, pdu.event_id,
)
span.set_tag("error", True)
span.log_kv({
"error.kind": "transaction",
})
pdu_logger.info(
"SendFailPDU",
extra={
"event_id": pdu.event_id, "room_id": pdu.room_id,
"destination": destination,
"server": self.server_name,
},
)
# new_destinations = set(pdu.unsigned.get("destinations", []))
# new_destinations.discard(destination)
# yield self._send_pdu(pdu, list(new_destinations), span)
def _numberToBase(n, b):
if n == 0:
return [0]
digits = []
while n:
digits.append(int(n % b))
n //= b
return digits[::-1]
def _encode_id(i):
digits = string.digits + string.ascii_letters
val_slice = _numberToBase(i, len(digits))
return "".join(digits[x] for x in val_slice)

View File

@@ -136,7 +136,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_transaction(self, transaction, json_data_callback=None, span=None):
def send_transaction(self, transaction, json_data_callback=None):
""" Sends the given Transaction to its destination
Args:
@@ -174,9 +174,8 @@ class TransportLayerClient(object):
path=path,
data=json_data,
json_data_callback=json_data_callback,
long_retries=False,
long_retries=True,
backoff_on_404=True, # If we get a 404 the other side has gone
span=span,
)
defer.returnValue(response)

View File

@@ -15,11 +15,8 @@
# limitations under the License.
import functools
import inspect
import logging
import re
import opentracing
from opentracing.ext import tags
from twisted.internet import defer
@@ -123,12 +120,12 @@ class Authenticator(object):
):
raise FederationDeniedError(origin)
# if not json_request["signatures"]:
# raise NoAuthenticationError(
# 401, "Missing Authorization headers", Codes.UNAUTHORIZED,
# )
if not json_request["signatures"]:
raise NoAuthenticationError(
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
)
# yield self.keyring.verify_json_for_server(origin, json_request)
yield self.keyring.verify_json_for_server(origin, json_request)
logger.info("Request from %s", origin)
request.authenticated_entity = origin
@@ -230,22 +227,15 @@ class BaseFederationServlet(object):
"""
REQUIRE_AUTH = True
def __init__(self, handler, authenticator, ratelimiter, server_name, hs):
def __init__(self, handler, authenticator, ratelimiter, server_name):
self.handler = handler
self.authenticator = authenticator
self.ratelimiter = ratelimiter
self.tracer = hs.get_tracer()
def _wrap(self, func):
authenticator = self.authenticator
ratelimiter = self.ratelimiter
arg_spec = inspect.signature(func)
all_args = arg_spec.parameters
include_span = "request_span" in all_args
logger.info("include_span: %s for %s", include_span, self)
@defer.inlineCallbacks
@functools.wraps(func)
def new_func(request, *args, **kwargs):
@@ -261,67 +251,32 @@ class BaseFederationServlet(object):
Deferred[(int, object)|None]: (response code, response object) as returned
by the callback method. None if the request has already been handled.
"""
content = None
if request.method in [b"PUT", b"POST"]:
# TODO: Handle other method types? other content types?
content = parse_json_object_from_request(request)
try:
carrier = {}
for key, value in request.requestHeaders.getAllRawHeaders():
carrier[key.decode("ascii")] = value[0].decode("ascii")
parent_ctx = self.tracer.extract(
format=opentracing.Format.HTTP_HEADERS, carrier=carrier
origin = yield authenticator.authenticate_request(request, content)
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
logger.warn("authenticate_request failed: missing authentication")
raise
except Exception as e:
logger.warn("authenticate_request failed: %s", e)
raise
if origin:
with ratelimiter.ratelimit(origin) as d:
yield d
response = yield func(
origin, content, request.args, *args, **kwargs
)
else:
response = yield func(
origin, content, request.args, *args, **kwargs
)
except Exception:
logger.exception("trace extract failed")
parent_ctx = None
tags_dict = {
tags.HTTP_METHOD: request.method.decode('ascii'),
tags.HTTP_URL: request.uri.decode('ascii'),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
}
span = self.tracer.start_span(
operation_name="federation-server",
child_of=parent_ctx,
tags=tags_dict,
)
with span:
content = None
if request.method in [b"PUT", b"POST"]:
# TODO: Handle other method types? other content types?
content = parse_json_object_from_request(request)
try:
origin = yield authenticator.authenticate_request(request, content)
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
logger.warn("authenticate_request failed: missing authentication")
raise
except Exception as e:
logger.warn("authenticate_request failed: %s", e)
raise
if include_span:
kwargs["request_span"] = span
try:
if origin:
span.set_tag("origin", origin)
with ratelimiter.ratelimit(origin) as d:
yield d
response = yield func(
origin, content, request.args, *args, **kwargs
)
else:
response = yield func(
origin, content, request.args, *args, **kwargs
)
except Exception as e:
span.set_tag("error", True)
span.log_kv({"error": e})
raise
span.set_tag(tags.HTTP_STATUS_CODE, response[0])
defer.returnValue(response)
@@ -352,7 +307,7 @@ class FederationSendServlet(BaseFederationServlet):
# This is when someone is trying to send us a bunch of data.
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, transaction_id, request_span):
def on_PUT(self, origin, content, query, transaction_id):
""" Called on PUT /send/<transaction_id>/
Args:
@@ -398,7 +353,7 @@ class FederationSendServlet(BaseFederationServlet):
try:
code, response = yield self.handler.on_incoming_transaction(
origin, transaction_data, request_span,
origin, transaction_data,
)
except Exception:
logger.exception("on_incoming_transaction failed")
@@ -1367,7 +1322,6 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in ROOM_LIST_CLASSES:
@@ -1376,7 +1330,6 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in GROUP_SERVER_SERVLET_CLASSES:
@@ -1385,7 +1338,6 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in GROUP_LOCAL_SERVLET_CLASSES:
@@ -1394,7 +1346,6 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES:
@@ -1403,5 +1354,4 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)

View File

@@ -17,18 +17,13 @@
server protocol.
"""
import itertools
import logging
from synapse.types import get_localpart_from_id, get_domain_from_id
from synapse.util.jsonobject import JsonEncodedObject
logger = logging.getLogger(__name__)
BUCKETS = [0, 50, 100, 200, 350, 500, 750, 1000, 2000, 5000, 10000, 100000]
class Edu(JsonEncodedObject):
""" An Edu represents a piece of data sent from one homeserver to another.
@@ -82,13 +77,14 @@ class Transaction(JsonEncodedObject):
internal_keys = [
"transaction_id",
"origin",
"destination",
"origin_server_ts",
"previous_ids",
]
required_keys = [
"transaction_id",
"origin",
"destination",
"origin_server_ts",
"pdus",
]
@@ -112,54 +108,15 @@ class Transaction(JsonEncodedObject):
""" Used to create a new transaction. Will auto fill out
transaction_id and origin_server_ts keys.
"""
kwargs["pdus"] = [
_mangle_pdu(p.get_pdu_json())
for p in pdus
]
if "origin_server_ts" not in kwargs:
raise KeyError(
"Require 'origin_server_ts' to construct a Transaction"
)
if "transaction_id" not in kwargs:
raise KeyError(
"Require 'transaction_id' to construct a Transaction"
)
kwargs["pdus"] = [p.get_pdu_json() for p in pdus]
return Transaction(**kwargs)
def _mangle_pdu(pdu_json):
pdu_json.pop("origin", None)
pdu_json.pop("hashes", None)
pdu_json.pop("signatures", None)
pdu_json.get("unsigned", {}).pop("age_ts", None)
pdu_json.get("unsigned", {}).pop("age", None)
pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"]))
pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"]))
if get_domain_from_id(pdu_json["event_id"]) == get_domain_from_id(pdu_json["sender"]):
pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"])
destinations = pdu_json["unsigned"].pop("destinations", None)
if destinations:
new_destinations = {}
for dest, cost in destinations.items():
for first, second in pairwise(BUCKETS):
if first <= cost <= second:
b = first if cost - first < second - cost else second
new_destinations.setdefault(b, []).append(dest)
break
else:
new_destinations.setdefault(b[-1], []).append(dest)
pdu_json["unsigned"]["dtab"] = list(new_destinations.items())
logger.info("Mangled PDU: %s", pdu_json)
return pdu_json
def _strip_hashes(iterable):
return (
e for e, hashes in iterable
)
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)

View File

@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import secrets
import logging
import unicodedata
@@ -750,9 +748,7 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def issue_access_token(self, user_id, device_id=None):
# access_token = self.macaroon_gen.generate_access_token(user_id)
access_token = base64.b64encode(secrets.token_bytes(8))
access_token = self.macaroon_gen.generate_access_token(user_id)
yield self.store.add_access_token_to_user(user_id, access_token,
device_id)
defer.returnValue(access_token)

View File

@@ -278,7 +278,6 @@ class DeviceHandler(BaseHandler):
"device_list_key", position, rooms=room_ids,
)
return
if hosts:
logger.info("Sending device list update notif to: %r", hosts)
for host in hosts:

View File

@@ -263,8 +263,9 @@ class DirectoryHandler(BaseHandler):
"Room alias %s not found" % (room_alias.to_string(),),
)
hosts = yield self.state.get_current_hosts_in_room(room_id)
servers = set(hosts) | set(servers)
users = yield self.state.get_current_user_in_room(room_id)
extra_servers = set(get_domain_from_id(u) for u in users)
servers = set(extra_servers) | set(servers)
# If this server is in the list of servers, return it first.
if self.server_name in servers:
@@ -275,8 +276,6 @@ class DirectoryHandler(BaseHandler):
else:
servers = list(servers)
logger.info("Returning servers %s", servers)
defer.returnValue({
"room_id": room_id,
"servers": servers,
@@ -291,14 +290,14 @@ class DirectoryHandler(BaseHandler):
400, "Room Alias is not hosted on this Home Server"
)
result = yield self.get_association(
result = yield self.get_association_from_room_alias(
room_alias
)
if result is not None:
defer.returnValue({
"room_id": result["room_id"],
"servers": result["servers"],
"room_id": result.room_id,
"servers": result.servers,
})
else:
raise NotFoundError(

View File

@@ -18,7 +18,6 @@
import itertools
import logging
import random
import six
from six import iteritems, itervalues
@@ -48,7 +47,6 @@ from synapse.crypto.event_signing import (
add_hashes_and_signatures,
compute_event_signature,
)
from synapse.events import FrozenEvent
from synapse.events.validator import EventValidator
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.federation import (
@@ -64,15 +62,12 @@ from synapse.util.distributor import user_joined_room
from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_server
from ._base import BaseHandler
logger = logging.getLogger(__name__)
pdu_logger = logging.getLogger("synapse.federation.pdu_destination_logger")
def shortstr(iterable, maxitems=5):
"""If iterable has maxitems or fewer, return the stringification of a list
@@ -114,7 +109,6 @@ class FederationHandler(BaseHandler):
self.clock = hs.get_clock()
self.store = hs.get_datastore() # type: synapse.storage.DataStore
self.federation_client = hs.get_federation_client()
self.federation_sender = hs.get_federation_sender()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
@@ -141,14 +135,9 @@ class FederationHandler(BaseHandler):
self.room_queues = {}
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
# Always start a new thread for events that have an origin_server_ts
# from before this
self.force_thread_ts = 0
@defer.inlineCallbacks
def on_receive_pdu(
self, origin, pdu, sent_to_us_directly=False, thread_id=None,
new_thread=False, span=None,
self, origin, pdu, sent_to_us_directly=False,
):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -189,17 +178,8 @@ class FederationHandler(BaseHandler):
)
if already_seen:
logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
pdu_logger.info(
"Received already seen event %s in room %s from %s",
pdu.event_id, pdu.room_id, origin,
)
return
pdu_logger.info(
"Received unseen event %s in room %s from %s",
pdu.event_id, pdu.room_id, origin,
)
# do some initial sanity-checking of the event. In particular, make
# sure it doesn't have hundreds of prev_events or auth_events, which
# could cause a huge state resolution or cascade of event fetches.
@@ -281,8 +261,7 @@ class FederationHandler(BaseHandler):
)
yield self._get_missing_events_for_pdu(
origin, pdu, prevs, min_depth,
thread_id=thread_id,
origin, pdu, prevs, min_depth
)
# Update the set of things we've seen after trying to
@@ -325,20 +304,20 @@ class FederationHandler(BaseHandler):
# but there is an interaction with min_depth that I'm not really
# following.
# if sent_to_us_directly:
# logger.warn(
# "[%s %s] Rejecting: failed to fetch %d prev events: %s",
# room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
# )
# raise FederationError(
# "ERROR",
# 403,
# (
# "Your server isn't divulging details about prev_events "
# "referenced in this event."
# ),
# affected=pdu.event_id,
# )
if sent_to_us_directly:
logger.warn(
"[%s %s] Rejecting: failed to fetch %d prev events: %s",
room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
raise FederationError(
"ERROR",
403,
(
"Your server isn't divulging details about prev_events "
"referenced in this event."
),
affected=pdu.event_id,
)
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
@@ -437,51 +416,15 @@ class FederationHandler(BaseHandler):
affected=event_id,
)
logger.info("Thread ID %r", thread_id)
# Remove destinations field before persisting
event_copy = FrozenEvent.from_event(pdu)
pdu.unsigned.pop("destinations", None)
yield self._process_received_pdu(
origin,
pdu,
state=state,
auth_chain=auth_chain,
thread_id=thread_id,
)
if sent_to_us_directly:
yield self.federation_sender.received_new_event(origin, event_copy, span)
if new_thread:
builder = self.event_builder_factory.new({
"type": "org.matrix.new_thread",
"content": {
"thread_id": thread_id,
"latest_event": pdu.event_id,
},
"event_id": random_string(24),
"origin_server_ts": self.clock.time_msec(),
"sender": "@server:server",
"room_id": pdu.room_id,
})
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
event.internal_metadata.internal_event = True
yield self.event_creation_handler.handle_new_client_event(
create_requester(UserID("server", "server")),
event,
context,
ratelimit=False,
extra_users=[],
do_auth=False,
)
@defer.inlineCallbacks
def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth, thread_id):
def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
"""
Args:
origin (str): Origin of the pdu. Will be called to get the missing events
@@ -563,9 +506,9 @@ class FederationHandler(BaseHandler):
room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=5,
limit=10,
min_depth=min_depth,
timeout=15000,
timeout=60000,
)
logger.info(
@@ -577,20 +520,6 @@ class FederationHandler(BaseHandler):
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
pdu_to_thread = {}
if not thread_id:
thread_id = random.randint(1, 999999999)
first_in_thread = True
for pdu in reversed(missing_events):
if self.should_start_thread(pdu):
pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
first_in_thread = False
else:
pdu_to_thread[pdu.event_id] = (0, False)
else:
for pdu in reversed(missing_events):
pdu_to_thread[pdu.event_id] = (thread_id, False)
for ev in missing_events:
logger.info(
"[%s %s] Handling received prev_event %s",
@@ -602,8 +531,6 @@ class FederationHandler(BaseHandler):
origin,
ev,
sent_to_us_directly=False,
thread_id=pdu_to_thread[ev.event_id][0],
new_thread=pdu_to_thread[ev.event_id][1],
)
except FederationError as e:
if e.code == 403:
@@ -615,7 +542,7 @@ class FederationHandler(BaseHandler):
raise
@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain, thread_id):
def _process_received_pdu(self, origin, event, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
"""
@@ -667,7 +594,6 @@ class FederationHandler(BaseHandler):
origin,
event,
state=state,
thread_id=thread_id,
)
except AuthError as e:
raise FederationError(
@@ -750,12 +676,7 @@ class FederationHandler(BaseHandler):
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
set(
itertools.chain.from_iterable(
itertools.chain([e.event_id], e.prev_event_ids(),)
for e in events
)
)
set(e.event_id for e in events)
)
events = [e for e in events if e.event_id not in seen_events]
@@ -770,7 +691,7 @@ class FederationHandler(BaseHandler):
edges = [
ev.event_id
for ev in events
if set(ev.prev_event_ids()) - event_ids - seen_events
if set(ev.prev_event_ids()) - event_ids
]
logger.info(
@@ -804,26 +725,18 @@ class FederationHandler(BaseHandler):
})
missing_auth = required_auth - set(auth_events)
failed_to_fetch = set()
not_in_db = set()
# Try and fetch any missing auth events from both DB and remote servers.
# We repeatedly do this until we stop finding new auth events.
while missing_auth - failed_to_fetch:
logger.info("Missing auth for backfill: %r", missing_auth)
ret_events = yield self.store.get_events(missing_auth - failed_to_fetch)
auth_events.update(ret_events)
to_fetch_from_db = missing_auth - failed_to_fetch
while to_fetch_from_db - not_in_db:
ret_events = yield self.store.get_events(missing_auth - failed_to_fetch)
auth_events.update(ret_events)
required_auth.update(
a_id
for event in ret_events.values()
for a_id in event.auth_event_ids()
)
missing_auth = required_auth - set(auth_events)
to_fetch_from_db = required_auth - set(auth_events) - not_in_db
required_auth.update(
a_id for event in ret_events.values() for a_id in event.auth_event_ids()
)
missing_auth = required_auth - set(auth_events)
if missing_auth - failed_to_fetch:
logger.info(
@@ -892,25 +805,6 @@ class FederationHandler(BaseHandler):
events.sort(key=lambda e: e.depth)
event_id_to_thread = {}
event_to_parents = {}
for event in reversed(events):
threads = yield self.store.get_threads_for_backfill_event(event.event_id)
parents = event_to_parents.get(event.event_id, [])
for p in parents:
t = event_id_to_thread.get(p)
if t is not None:
threads.append(t)
if threads:
thread_id = min(threads)
else:
thread_id = 0
event_id_to_thread[event.event_id] = thread_id
for c in event.prev_event_ids():
event_to_parents.setdefault(c, set()).add(event.event_id)
for event in events:
if event in events_to_state:
continue
@@ -920,7 +814,6 @@ class FederationHandler(BaseHandler):
# TODO: We can probably do something more clever here.
yield self._handle_new_event(
dest, event, backfilled=True,
thread_id=event_id_to_thread[event.event_id],
)
defer.returnValue(events)
@@ -930,13 +823,12 @@ class FederationHandler(BaseHandler):
"""Checks the database to see if we should backfill before paginating,
and if so do.
"""
logger.info("Backfilling")
extremities = yield self.store.get_oldest_events_with_depth_in_room(
room_id
)
if not extremities:
logger.info("Not backfilling as no extremeties found.")
logger.debug("Not backfilling as no extremeties found.")
return
# Check if we reached a point where we should start backfilling.
@@ -951,7 +843,7 @@ class FederationHandler(BaseHandler):
extremities = dict(sorted_extremeties_tuple[:5])
if current_depth > max_depth:
logger.info(
logger.debug(
"Not backfilling as we don't need to. %d < %d",
max_depth, current_depth,
)
@@ -1169,7 +1061,6 @@ class FederationHandler(BaseHandler):
have finished processing the join.
"""
logger.debug("Joining %s to %s", joinee, room_id)
logger.info("Target hosts %s", target_hosts)
origin, event = yield self._make_and_verify_event(
target_hosts,
@@ -1420,8 +1311,6 @@ class FederationHandler(BaseHandler):
sender, target, event.room_id,
)
FrozenEvent.from_event(event)
event.unsigned.pop("invite_room_state", None)
defer.returnValue(event)
@defer.inlineCallbacks
@@ -1699,12 +1588,11 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _handle_new_event(self, origin, event, state=None, auth_events=None,
backfilled=False, thread_id=0):
backfilled=False):
context = yield self._prep_event(
origin, event,
state=state,
auth_events=auth_events,
thread_id=thread_id,
)
# reraise does not allow inlineCallbacks to preserve the stacktrace, so we
@@ -1863,7 +1751,7 @@ class FederationHandler(BaseHandler):
)
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None, thread_id=0):
def _prep_event(self, origin, event, state=None, auth_events=None):
"""
Args:
@@ -1876,7 +1764,7 @@ class FederationHandler(BaseHandler):
Deferred, which resolves to synapse.events.snapshot.EventContext
"""
context = yield self.state_handler.compute_event_context(
event, old_state=state, thread_id=thread_id,
event, old_state=state,
)
if not auth_events:
@@ -2686,10 +2574,3 @@ class FederationHandler(BaseHandler):
)
else:
return user_joined_room(self.distributor, user, room_id)
def should_start_thread(self, event):
now = self.clock.time_msec()
forced = event.origin_server_ts <= self.force_thread_ts
old = now - event.origin_server_ts > 1 * 60 * 1000
return forced or old

View File

@@ -588,7 +588,6 @@ class EventCreationHandler(object):
context,
ratelimit=True,
extra_users=[],
do_auth=True,
):
"""Processes a new event. This includes checking auth, persisting it,
notifying users, sending to remote servers, etc.
@@ -605,8 +604,7 @@ class EventCreationHandler(object):
"""
try:
if do_auth:
yield self.auth.check_from_context(event, context)
yield self.auth.check_from_context(event, context)
except AuthError as err:
logger.warn("Denying new event %r because %s", event, err)
raise err

View File

@@ -347,7 +347,7 @@ class PresenceHandler(object):
"""Checks the presence of users that have timed out and updates as
appropriate.
"""
# logger.info("Handling presence timeouts")
logger.info("Handling presence timeouts")
now = self.clock.time_msec()
try:
@@ -626,7 +626,6 @@ class PresenceHandler(object):
Args:
states (list(UserPresenceState))
"""
return
self.federation.send_presence(states)
@defer.inlineCallbacks
@@ -817,7 +816,6 @@ class PresenceHandler(object):
if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user)
else:
return
yield self.federation.send_edu(
destination=observed_user.domain,
edu_type="m.presence_invite",
@@ -838,7 +836,6 @@ class PresenceHandler(object):
if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user)
else:
return
self.federation.send_edu(
destination=observer_user.domain,
edu_type="m.presence_accept",

View File

@@ -147,8 +147,6 @@ class ReceiptsHandler(BaseHandler):
logger.debug("Sending receipt to: %r", remotedomains)
return
for domain in remotedomains:
self.federation.send_edu(
destination=domain,

View File

@@ -79,8 +79,6 @@ class RoomCreationHandler(BaseHandler):
# linearizer to stop two upgrades happening at once
self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
self._next_room_id = 0
@defer.inlineCallbacks
def upgrade_room(self, requester, old_room_id, new_version):
"""Replace a room with a new room with a different version
@@ -743,9 +741,7 @@ class RoomCreationHandler(BaseHandler):
attempts = 0
while attempts < 5:
try:
i = self._next_room_id
self._next_room_id += 1
random_string = stringutils.random_string(3) + str(i)
random_string = stringutils.random_string(18)
gen_room_id = RoomID(
random_string,
self.hs.hostname,

View File

@@ -35,7 +35,6 @@ from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
@@ -212,7 +211,6 @@ class SyncHandler(object):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.builder_factory = hs.get_event_builder_factory()
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
@@ -711,6 +709,7 @@ class SyncHandler(object):
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
members_to_fetch = None
lazy_load_members = sync_config.filter_collection.lazy_load_members()
@@ -859,28 +858,6 @@ class SyncHandler(object):
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
hosts_in_room = yield self.store.get_hosts_in_room(room_id)
destination_states = yield self.store.get_destination_states()
for host in hosts_in_room:
if host not in destination_states:
continue
if ("org.matrix.server_presence", host) in timeline_state:
continue
state[("org.matrix.server_presence", host)] = self.builder_factory.new({
"type": "org.matrix.server_presence",
"content": {
"state": "connected" if destination_states[host] else "disconnected",
},
"state_key": host,
"event_id": random_string(24),
"origin_server_ts": self.clock.time_msec(),
"sender": "@server:server",
"room_id": room_id,
})
defer.returnValue({
(e.type, e.state_key): e
for e in sync_config.filter_collection.filter_room_state(list(state.values()))
@@ -954,7 +931,10 @@ class SyncHandler(object):
newly_joined_rooms, newly_joined_users, _, _ = res
_, _, newly_left_rooms, newly_left_users = res
block_all_presence_data = True
block_all_presence_data = (
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
if self.hs_config.use_presence and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
@@ -1251,7 +1231,10 @@ class SyncHandler(object):
`(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
"""
user_id = sync_result_builder.sync_config.user.to_string()
block_all_room_ephemeral = True
block_all_room_ephemeral = (
sync_result_builder.since_token is None and
sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
)
if block_all_room_ephemeral:
ephemeral_by_room = {}

View File

@@ -86,7 +86,7 @@ class TypingHandler(object):
self._room_typing = {}
def _handle_timeouts(self):
# logger.info("Checking for typing timeouts")
logger.info("Checking for typing timeouts")
now = self.clock.time_msec()
@@ -231,7 +231,6 @@ class TypingHandler(object):
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
return
self.federation.send_edu(
destination=domain,
edu_type="m.typing",

View File

@@ -194,11 +194,8 @@ class _WrappedConnection(object):
# In Twisted >18.4; the TLS connection will be None if it has closed
# which will make abortConnection() throw. Check that the TLS connection
# is not None before trying to close it.
try:
if self.transport.getHandle() is not None:
self.transport.abortConnection()
except Exception:
logger.warning("Failed to abort connection")
if self.transport.getHandle() is not None:
self.transport.abortConnection()
def request(self, request):
self.last_request = time.time()

View File

@@ -18,12 +18,8 @@ import logging
import random
import sys
from io import BytesIO
import os
import opentracing
from opentracing.ext import tags
from six import PY3, string_types, iteritems
from six import PY3, string_types
from six.moves import urllib
import attr
@@ -59,7 +55,6 @@ outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_request
incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
"", ["method", "code"])
USE_PROXY = "SYNAPSE_USE_PROXY" in os.environ
MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
@@ -70,18 +65,6 @@ else:
MAXINT = sys.maxint
class ProxyMatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.reactor = hs.get_reactor()
self.tls_client_options_factory = hs.tls_client_options_factory
def endpointForURI(self, uri):
return matrix_federation_endpoint(
self.reactor, "localhost:8888", timeout=10,
tls_client_options_factory=None
)
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.reactor = hs.get_reactor()
@@ -207,36 +190,19 @@ class MatrixFederationHttpClient(object):
pool.retryAutomatically = False
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
if USE_PROXY:
self.agent = Agent.usingEndpointFactory(
reactor, ProxyMatrixFederationEndpointFactory(hs), pool=pool
)
else:
self.agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=pool
)
file_pool = HTTPConnectionPool(reactor)
file_pool.retryAutomatically = False
file_pool.maxPersistentPerHost = 5
file_pool.cachedConnectionTimeout = 10
self.file_agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=file_pool
self.agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=pool
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.version_string_bytes = hs.version_string.encode('ascii')
self.default_timeout = 20
self.default_timeout = 60
def schedule(x):
reactor.callLater(_EPSILON, x)
self._cooperator = Cooperator(scheduler=schedule)
self.tracer = hs.get_tracer()
@defer.inlineCallbacks
def _send_request(
self,
@@ -245,9 +211,7 @@ class MatrixFederationHttpClient(object):
timeout=None,
long_retries=False,
ignore_backoff=False,
backoff_on_404=False,
span=None,
agent=None,
backoff_on_404=False
):
"""
Sends a request to the given server.
@@ -289,13 +253,13 @@ class MatrixFederationHttpClient(object):
):
raise FederationDeniedError(request.destination)
# limiter = yield synapse.util.retryutils.get_retry_limiter(
# request.destination,
# self.clock,
# self._store,
# backoff_on_404=backoff_on_404,
# ignore_backoff=ignore_backoff,
# )
limiter = yield synapse.util.retryutils.get_retry_limiter(
request.destination,
self.clock,
self._store,
backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
)
method_bytes = request.method.encode("ascii")
destination_bytes = request.destination.encode("ascii")
@@ -310,8 +274,7 @@ class MatrixFederationHttpClient(object):
b"Host": [destination_bytes],
}
# with limiter:
if True:
with limiter:
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
if long_retries:
@@ -357,53 +320,26 @@ class MatrixFederationHttpClient(object):
url_str,
)
with self.tracer.start_span('request', child_of=span) as child_span:
carrier = {}
opentracing.tracer.inject(
span_context=child_span.context,
format=opentracing.Format.HTTP_HEADERS,
carrier=carrier,
)
for key, value in iteritems(carrier):
headers_dict[key.encode("ascii")] = [value.encode("ascii")]
# we don't want all the fancy cookie and redirect handling that
# treq.request gives: just use the raw Agent.
request_deferred = self.agent.request(
method_bytes,
url_bytes,
headers=Headers(headers_dict),
bodyProducer=producer,
)
if not agent:
agent = self.agent
request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
# we don't want all the fancy cookie and redirect handling that
# treq.request gives: just use the raw Agent.
request_deferred = agent.request(
method_bytes,
url_bytes,
headers=Headers(headers_dict),
bodyProducer=producer,
)
request_deferred = timeout_deferred(
with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
child_span.set_tag(tags.HTTP_METHOD, request.method)
child_span.set_tag(tags.HTTP_URL, url_str)
child_span.set_tag(
tags.SPAN_KIND,
tags.SPAN_KIND_RPC_CLIENT,
)
try:
with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
)
except Exception as e:
child_span.set_tag("error", True)
child_span.log_kv({"error": e})
raise
child_span.set_tag(tags.HTTP_STATUS_CODE, response.code)
break
except Exception as e:
logger.warn(
@@ -518,8 +454,7 @@ class MatrixFederationHttpClient(object):
json_data_callback=None,
long_retries=False, timeout=None,
ignore_backoff=False,
backoff_on_404=False,
span=None):
backoff_on_404=False):
""" Sends the specifed json data using PUT
Args:
@@ -570,7 +505,6 @@ class MatrixFederationHttpClient(object):
timeout=timeout,
ignore_backoff=ignore_backoff,
backoff_on_404=backoff_on_404,
span=span,
)
body = yield _handle_json_response(
@@ -770,7 +704,6 @@ class MatrixFederationHttpClient(object):
request,
retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
agent=self.file_agent,
)
headers = dict(response.headers.getAllRawHeaders())

View File

@@ -66,9 +66,6 @@ REQUIREMENTS = {
# we use attr.s(slots), which arrived in 16.0.0
"attrs>=16.0.0": ["attr>=16.0.0"],
"netaddr>=0.7.18": ["netaddr"],
"jaeger_client": ["jaeger_client"],
"opentracing<2,>=1.2.2": ["opentracing"],
}
CONDITIONAL_REQUIREMENTS = {

View File

@@ -32,7 +32,6 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.types import UserID, create_requester
from synapse.util.stringutils import random_string
from .base import ClientV1RestServlet, client_path_patterns
@@ -741,114 +740,6 @@ class SearchUsersRestServlet(ClientV1RestServlet):
defer.returnValue((200, ret))
class ServerHealth(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/server_health")
def __init__(self, hs):
super(ServerHealth, self).__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore()
self.builder_factory = hs.get_event_builder_factory()
self.clock = hs.get_clock()
def on_GET(self, request):
return self.do_update()
def on_POST(self, request):
return self.do_update()
@defer.inlineCallbacks
def do_update(self):
hosts = yield self.store.get_all_destination_healths()
up_servers = set(h for h, c in hosts.items() if c is not None)
down_servers = set(h for h, c in hosts.items() if c is None)
rooms_to_hosts = yield self.store.get_all_hosts_and_room()
requester = create_requester(UserID("server", "server")),
state = yield self.store.get_destination_states()
new_up = set()
new_down = set()
for host in up_servers:
if state.get(host, True):
continue
new_up.add(host)
yield self.store.store_destination_state(host, True)
for host in down_servers:
if not state.get(host, True):
continue
new_down.add(host)
yield self.store.store_destination_state(host, False)
for room_id, hosts in rooms_to_hosts.items():
for host in hosts:
if host in new_up:
new_state = "connected"
elif host in new_down:
new_state = "disconnected"
else:
continue
logger.info("Marking %s as %r", host, new_state)
builder = self.builder_factory.new({
"type": "org.matrix.server_presence",
"content": {
"state": new_state,
},
"state_key": host,
"event_id": random_string(24),
"origin_server_ts": self.clock.time_msec(),
"sender": "@server:server",
"room_id": room_id,
})
event, context = yield (
self.event_creation_handler.create_new_client_event(
builder=builder,
)
)
event.internal_metadata.internal_event = True
yield self.event_creation_handler.handle_new_client_event(
requester,
event,
context,
ratelimit=False,
extra_users=[],
do_auth=False,
)
defer.returnValue((200, {}))
class ForceThreadServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/force_thread")
def __init__(self, hs):
super(ForceThreadServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler
self.clock = hs.get_clock()
def on_GET(self, request):
return self.do_force_thread()
def on_POST(self, request):
return self.do_force_thread()
@defer.inlineCallbacks
def do_force_thread(self):
yield self.clock.sleep(0)
self.federation_handler.force_thread_ts = self.clock.time_msec()
defer.returnValue((200, {}))
def register_servlets(hs, http_server):
WhoisRestServlet(hs).register(http_server)
PurgeMediaCacheRestServlet(hs).register(http_server)
@@ -863,5 +754,3 @@ def register_servlets(hs, http_server):
QuarantineMediaInRoom(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
UserRegisterServlet(hs).register(http_server)
ServerHealth(hs).register(http_server)
ForceThreadServlet(hs).register(http_server)

View File

@@ -471,11 +471,6 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None
logger.info("filter_bytes: %s", filter_bytes)
logger.info("Event filter: %s", event_filter)
if event_filter:
logger.info("Event filter: %s", event_filter.thread_id)
msgs = yield self.pagination_handler.get_messages(
room_id=room_id,
requester=requester,
@@ -830,30 +825,6 @@ class JoinedRoomsRestServlet(ClientV1RestServlet):
defer.returnValue((200, {"joined_rooms": list(room_ids)}))
class TimestampLookupRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/timestamp_to_event$")
def __init__(self, hs):
super(TimestampLookupRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
yield self.auth.check_joined_room(room_id, requester.user.to_string())
timestamp = parse_integer(request, "ts")
thread_id = parse_integer(request, "thread_id", 0)
event_id = yield self.store.get_event_for_timestamp(
room_id, thread_id, timestamp,
)
defer.returnValue((200, {
"event_id": event_id,
}))
def register_txn_path(servlet, regex_string, http_server, with_get=False):
"""Registers a transaction-based path.
@@ -903,7 +874,6 @@ def register_servlets(hs, http_server):
JoinedRoomsRestServlet(hs).register(http_server)
RoomEventServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server)
TimestampLookupRestServlet(hs).register(http_server)
def register_deprecated_servlets(hs, http_server):

View File

@@ -118,8 +118,6 @@ class SyncRestServlet(RestServlet):
)
)
exclude_threaded = b"exclude_threaded" in request.args
request_key = (user, timeout, since, filter_id, full_state, device_id)
if filter_id:
@@ -171,14 +169,13 @@ class SyncRestServlet(RestServlet):
time_now = self.clock.time_msec()
response_content = self.encode_response(
time_now, sync_result, requester.access_token_id, filter,
exclude_threaded=exclude_threaded,
time_now, sync_result, requester.access_token_id, filter
)
defer.returnValue((200, response_content))
@staticmethod
def encode_response(time_now, sync_result, access_token_id, filter, exclude_threaded):
def encode_response(time_now, sync_result, access_token_id, filter):
if filter.event_format == 'client':
event_formatter = format_event_for_client_v2_without_room_id
elif filter.event_format == 'federation':
@@ -190,7 +187,6 @@ class SyncRestServlet(RestServlet):
sync_result.joined, time_now, access_token_id,
filter.event_fields,
event_formatter,
exclude_threaded=exclude_threaded,
)
invited = SyncRestServlet.encode_invited(
@@ -244,8 +240,7 @@ class SyncRestServlet(RestServlet):
}
@staticmethod
def encode_joined(rooms, time_now, token_id, event_fields, event_formatter,
exclude_threaded):
def encode_joined(rooms, time_now, token_id, event_fields, event_formatter):
"""
Encode the joined rooms in a sync result
@@ -268,7 +263,7 @@ class SyncRestServlet(RestServlet):
for room in rooms:
joined[room.room_id] = SyncRestServlet.encode_room(
room, time_now, token_id, joined=True, only_fields=event_fields,
event_formatter=event_formatter, exclude_threaded=exclude_threaded,
event_formatter=event_formatter,
)
return joined
@@ -335,7 +330,6 @@ class SyncRestServlet(RestServlet):
room, time_now, token_id, joined=False,
only_fields=event_fields,
event_formatter=event_formatter,
exclude_threaded=False,
)
return joined
@@ -343,7 +337,7 @@ class SyncRestServlet(RestServlet):
@staticmethod
def encode_room(
room, time_now, token_id, joined,
only_fields, event_formatter, exclude_threaded,
only_fields, event_formatter,
):
"""
Args:
@@ -383,19 +377,7 @@ class SyncRestServlet(RestServlet):
)
serialized_state = [serialize(e) for e in state_events]
if exclude_threaded:
serialized_timeline = []
for e in reversed(timeline_events):
thread_id = e.internal_metadata.thread_id
if thread_id != 0:
pass
else:
serialized_timeline.append(serialize(e))
serialized_timeline.reverse()
else:
serialized_timeline = [serialize(e) for e in timeline_events]
serialized_timeline = [serialize(e) for e in timeline_events]
account_data = room.account_data

View File

@@ -21,9 +21,6 @@
# Imports required for the default HomeServer() implementation
import abc
import logging
import os
from jaeger_client import Config
from twisted.enterprise import adbapi
from twisted.mail.smtp import sendmail
@@ -179,7 +176,6 @@ class HomeServer(object):
'pagination_handler',
'room_context_handler',
'sendmail',
'tracer',
]
# This is overridden in derived application classes
@@ -476,39 +472,6 @@ class HomeServer(object):
def build_room_context_handler(self):
return RoomContextHandler(self)
def build_tracer(self):
# TODO: Make optional
jaeger_host = os.environ.get("SYNAPSE_JAEGER_HOST", None)
if jaeger_host:
config_dict = {
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
'local_agent': {
'reporting_host': jaeger_host,
},
}
else:
config_dict = {
'sampler': {
'type': 'const',
'param': 0,
},
'logging': True,
}
config = Config(
config=config_dict,
service_name="synapse-" + self.config.server_name,
validate=True,
)
# this call also sets opentracing.tracer
tracer = config.initialize_tracer()
return tracer
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)

View File

@@ -178,7 +178,7 @@ class StateHandler(object):
defer.returnValue(joined_hosts)
@defer.inlineCallbacks
def compute_event_context(self, event, old_state=None, thread_id=0):
def compute_event_context(self, event, old_state=None):
"""Build an EventContext structure for the event.
This works out what the current state should be for the event, and
@@ -194,8 +194,6 @@ class StateHandler(object):
synapse.events.snapshot.EventContext:
"""
event.internal_metadata.thread_id = thread_id
if event.internal_metadata.is_outlier():
# If this is an outlier, then we know it shouldn't have any current
# state. Certainly store.get_current_state won't return any, and
@@ -217,7 +215,6 @@ class StateHandler(object):
# We don't store state for outliers, so we don't generate a state
# group for it.
context = EventContext.with_state(
thread_id=0, # outlier, don't care
state_group=None,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
@@ -254,7 +251,6 @@ class StateHandler(object):
)
context = EventContext.with_state(
thread_id=thread_id,
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
@@ -323,7 +319,6 @@ class StateHandler(object):
state_group = entry.state_group
context = EventContext.with_state(
thread_id=thread_id,
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,

View File

@@ -34,24 +34,6 @@ logger = logging.getLogger(__name__)
class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
SQLBaseStore):
def get_threads_for_backfill_event(self, event_id):
def _get_thread_for_backfill_event_txn(txn):
sql = """
SELECT thread_id
FROM event_edges
INNER JOIN events USING (event_id)
WHERE prev_event_id = ?
"""
txn.execute(sql, (event_id,))
return [thread_id for thread_id, in txn]
return self.runInteraction(
"get_thread_for_backfill_event",
_get_thread_for_backfill_event_txn,
)
def get_auth_chain(self, event_ids, include_given=False):
"""Get auth events for given event_ids. The events *must* be state events.

View File

@@ -402,12 +402,6 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# No change in extremities, so no change in state
continue
logger.info(
"Forward extremities for %s: %s -> %s",
room_id, latest_event_ids, new_latest_event_ids,
)
logger.info("Events: %s", [e.event_id for e, _ in ev_ctx_rm])
# there should always be at least one forward extremity.
# (except during the initial persistence of the send_join
# results, in which case there will be no existing
@@ -543,7 +537,6 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
new_events = [
event for event, ctx in event_contexts
if not event.internal_metadata.is_outlier() and not ctx.rejected
and not event.internal_metadata.is_internal_event()
]
# start with the existing forward extremities
@@ -590,7 +583,6 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
prev_event_id IN (%s)
AND NOT events.outlier
AND rejections.event_id IS NULL
AND NOT events.internal_event
""" % (
",".join("?" for _ in batch),
)
@@ -1290,10 +1282,8 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
"url" in event.content
and isinstance(event.content["url"], text_type)
),
"thread_id": ctx.thread_id,
"internal_event": event.internal_metadata.is_internal_event(),
}
for event, ctx in events_and_contexts
for event, _ in events_and_contexts
],
)

View File

@@ -352,7 +352,7 @@ class EventsWorkerStore(SQLBaseStore):
run_in_background(
self._get_event_from_row,
row["internal_metadata"], row["json"], row["redacts"],
rejected_reason=row["rejects"], thread_id=row["thread_id"],
rejected_reason=row["rejects"],
)
for row in rows
],
@@ -378,10 +378,8 @@ class EventsWorkerStore(SQLBaseStore):
" e.internal_metadata,"
" e.json,"
" r.redacts as redacts,"
" rej.event_id as rejects, "
" ev.thread_id as thread_id"
" rej.event_id as rejects "
" FROM event_json as e"
" INNER JOIN events as ev USING (event_id)"
" LEFT JOIN rejections as rej USING (event_id)"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE e.event_id IN (%s)"
@@ -394,7 +392,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
thread_id, rejected_reason=None):
rejected_reason=None):
with Measure(self._clock, "_get_event_from_row"):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@@ -528,43 +526,3 @@ class EventsWorkerStore(SQLBaseStore):
return res
return self.runInteraction("get_rejection_reasons", f)
def get_event_for_timestamp(self, room_id, thread_id, timestamp):
sql_template = """
SELECT event_id, origin_server_ts FROM events
WHERE
origin_server_ts %s ?
AND room_id = ?
AND thread_id = ?
ORDER BY origin_server_ts
LIMIT 1;
"""
def f(txn):
txn.execute(sql_template % ("<=",), (timestamp, room_id, thread_id))
row = txn.fetchone()
if row:
event_id_before, ts_before = row
else:
event_id_before, ts_before = None, None
txn.execute(sql_template % (">=",), (timestamp, room_id, thread_id))
row = txn.fetchone()
if row:
event_id_after, ts_after = row
else:
event_id_after, ts_after = None, None
if event_id_before and event_id_after:
# Return the closest one
if (timestamp - ts_before) < (ts_after - timestamp):
return event_id_before
else:
return event_id_after
if event_id_before:
return event_id_before
return event_id_after
return self.runInteraction("get_event_for_timestamp", f)

View File

@@ -406,6 +406,11 @@ class RegistrationStore(RegistrationWorkerStore,
)
tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
for token, _, _ in tokens_and_devices:
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (token,)
)
txn.execute(
"DELETE FROM access_tokens WHERE %s" % where_clause,
values
@@ -427,6 +432,10 @@ class RegistrationStore(RegistrationWorkerStore,
},
)
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (access_token,)
)
return self.runInteraction("delete_access_token", f)
@cachedInlineCallbacks()

View File

@@ -88,23 +88,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return [to_ascii(r[0]) for r in txn]
return self.runInteraction("get_users_in_room", f)
def get_all_hosts_and_room(self):
def f(txn):
sql = """
SELECT DISTINCT room_id, regexp_replace(state_key, '^[^:]*:', '') AS host
FROM current_state_events
INNER JOIN room_memberships USING (event_id, room_id)
WHERE
type = 'm.room.member' AND membership = 'join'
"""
txn.execute(sql)
results = {}
for r in txn:
results.setdefault(to_ascii(r[0]), set()).add(to_ascii(r[1]))
return results
return self.runInteraction("get_users_in_room", f)
@cached(max_entries=100000)
def get_room_summary(self, room_id):
""" Get the details of a room roughly suitable for use by the room

View File

@@ -1,16 +0,0 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE events ADD COLUMN IF NOT EXISTS internal_event BOOLEAN NOT NULL DEFAULT false;

View File

@@ -1,7 +0,0 @@
CREATE TABLE IF NOT EXISTS destination_health(
destination TEXT NOT NULL PRIMARY KEY,
cost DOUBLE PRECISION
);

View File

@@ -1,6 +0,0 @@
CREATE TABLE IF NOT EXISTS destination_state(
destination TEXT NOT NULL PRIMARY KEY,
connected BOOLEAN NOT NULL
);

View File

@@ -1,23 +0,0 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE events ADD COLUMN IF NOT EXISTS thread_id BIGINT NOT NULL DEFAULT 0;
CREATE INDEX IF NOT EXISTS events_room_idx ON events (room_id, thread_id);
-- CREATE SEQUENCE thread_id_seq;
CREATE INDEX IF NOT EXISTS event_room_thread_ts ON events (room_id, thread_id, origin_server_ts);

View File

@@ -149,13 +149,7 @@ def filter_to_clause(event_filter):
clauses.append("contains_url = ?")
args.append(event_filter.contains_url)
if event_filter.thread_id is not None:
clauses.append("thread_id = ?")
args.append(event_filter.thread_id)
filter = " AND ".join(clauses), args
return filter
return " AND ".join(clauses), args
class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@@ -814,8 +808,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
rows, token = yield self.runInteraction(
"paginate_room_events", self._paginate_room_events_txn,
room_id, from_key, to_key, direction, limit,
event_filter=event_filter,
room_id, from_key, to_key, direction, limit, event_filter,
)
events = yield self._get_events(