1
0

Compare commits

..

126 Commits

Author SHA1 Message Date
Erik Johnston
92dbb8a62f update proxy 2019-02-13 15:16:05 +00:00
Erik Johnston
816f5a502f Update proxy 2019-02-13 15:16:05 +00:00
Erik Johnston
561fa571c3 Fix fetching media when using proxy 2019-02-13 15:16:05 +00:00
Erik Johnston
b76f1e2cb8 Fix bug in DTG 2019-02-13 15:16:05 +00:00
Erik Johnston
549e580dc9 Reduce send invite request size 2019-02-13 15:16:05 +00:00
Erik Johnston
c3f36414bf Update proxy 2019-02-13 15:16:05 +00:00
Erik Johnston
17eb4504a8 Update flate file 2019-02-13 15:16:05 +00:00
Erik Johnston
a066b00487 Compress some client data 2019-02-13 15:16:05 +00:00
Erik Johnston
4e0ac33053 Handle slow/lossy connections better when sending transactions 2019-02-13 15:16:05 +00:00
Erik Johnston
76d888cf48 pep8 2019-02-13 15:16:05 +00:00
Erik Johnston
dde7110c0d Reduce transaction response size 2019-02-13 15:16:05 +00:00
Erik Johnston
5e6b5ccd26 Actually fix exceptions 2019-02-13 15:16:04 +00:00
Erik Johnston
1d7420ed2f Don't log ERROR when no profile exists 2019-02-13 15:16:04 +00:00
Travis Ralston
a527fbaae6 Catch room profile errors and anything else that can go wrong
Fixes an issue where things become unhappy when the room profile for a user is missing.
2019-02-13 15:16:04 +00:00
Erik Johnston
b951f35572 Reduce size of fed transaction IDs 2019-02-13 15:16:04 +00:00
Brendan Abolivier
6eca7dc3e8 Update maps and proxy 2019-02-13 15:16:04 +00:00
Erik Johnston
1466adf427 Make event_ids smaller 2019-02-13 15:16:04 +00:00
Erik Johnston
a99c2f56b5 Mangle some more PDU fields 2019-02-13 15:16:04 +00:00
Brendan Abolivier
306b670371 Update proxy maps 2019-02-13 15:16:04 +00:00
Brendan Abolivier
31825c10d6 Update proxy & maps 2019-02-13 15:16:04 +00:00
Erik Johnston
a01468c1a8 Change access tokens to be base64'ed 4 bytes 2019-02-13 15:16:04 +00:00
Brendan Abolivier
31c910a9a2 Update proxy 2019-02-13 15:16:04 +00:00
Erik Johnston
62fa8570ec Route full mesh if message contains 'mesh' 2019-02-13 15:16:04 +00:00
Brendan Abolivier
5f52a2c25e Update proxy 2019-02-13 15:16:04 +00:00
Travis Ralston
645d5c8c35 Use run_as_background_process 2019-02-13 15:16:04 +00:00
Travis Ralston
0463d9ba75 Safer execution 2019-02-13 15:16:04 +00:00
Travis Ralston
b26d8cea66 Preserve log contexts in the room_member_handler 2019-02-13 15:16:04 +00:00
Travis Ralston
de6d002d01 Proof of concept for auto-accepting invites
This is for demonstration purposes only. In practice this would actually look up the right profile and use the right thing, not to mention be in a more reasonable location.
2019-02-13 15:16:04 +00:00
Neil Johnson
2b77c8d50e Remove riot.im from the list of trusted Identity Servers in the default configuration (#4207) 2019-02-13 15:16:04 +00:00
Richard van der Hoff
fa78a83ac3 changelog 2019-02-13 15:16:04 +00:00
Richard van der Hoff
5eceb4dc0f Fix logcontext leak in test_url_preview 2019-02-13 15:16:04 +00:00
Richard van der Hoff
07577e0542 Fix logcontext leak in http pusher test 2019-02-13 15:16:04 +00:00
Richard van der Hoff
3cda7da827 Fix some tests which leaked logcontexts 2019-02-13 15:16:04 +00:00
Richard van der Hoff
cb7c2ad85a Fix logcontext leak in EmailPusher 2019-02-13 15:16:04 +00:00
Amber Brown
a29da814c6 towncrier 2019-02-13 15:16:04 +00:00
Amber Brown
5e499c58fd version 2019-02-13 15:16:03 +00:00
Neil Johnson
fa574331fb release 0.33.9rc1 2019-02-13 15:16:03 +00:00
Amber Brown
ca05b679e3 Fix fallback auth on Python 3 (#4197) 2019-02-13 15:16:03 +00:00
Aaron Raimist
83ed2c494b Fix case
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-02-13 15:16:03 +00:00
Aaron Raimist
5d4dfc0313 Add SUPPORT.md
https://help.github.com/articles/adding-support-resources-to-your-project/
2019-02-13 15:16:03 +00:00
Aaron Raimist
7d4b700204 Add changelog
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-02-13 15:16:03 +00:00
Aaron Raimist
c01605da24 Add a pull request template and add multiple issue templates
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-02-13 15:16:03 +00:00
Aaron Raimist
76b251c599 Add changelog
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-02-13 15:16:03 +00:00
Aaron Raimist
26708be7f8 Add a note saying you need to manually reclaim disk space
People keep asking why their database hasn't gotten smaller after using this API.

Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-02-13 15:16:03 +00:00
Erik Johnston
6baa32fd65 Update coap-proxy 2019-02-13 15:16:03 +00:00
Matthew Hodgson
32df91cbcb de-hardcode IP for jaeger 2019-02-13 15:16:03 +00:00
Erik Johnston
42f555393a GAH FILES WHY 2019-02-13 15:16:03 +00:00
Erik Johnston
224df403ea Fixup opentracing error logging 2019-02-13 15:16:03 +00:00
Erik Johnston
2d8da62feb Only relay 'live' events 2019-02-13 15:16:03 +00:00
Erik Johnston
e2230b28fb Mangle PDUs some more. Disable presence/typing/receipts. Don't die if we can't parse an EDU 2019-02-13 15:16:03 +00:00
Erik Johnston
28c3a43a7e Make using proxy optional 2019-02-13 15:16:03 +00:00
Brendan Abolivier
c11388b1ce Update proxy version and maps 2019-02-13 15:16:03 +00:00
Erik Johnston
9dfb6b6c52 Drop unnecessary keys from transactions 2019-02-13 15:16:03 +00:00
Erik Johnston
caa0004466 Track PDU in opentracing 2019-02-13 15:16:03 +00:00
Erik Johnston
ae7460b9f4 Make room ID smaller 2019-02-13 15:16:03 +00:00
Erik Johnston
fe6e221cfa Opentracing. Reduce event ID size 2019-02-13 15:16:03 +00:00
Erik Johnston
b071101729 Strip signatures and hashes on outgoing events 2019-02-13 15:16:03 +00:00
Matthew Hodgson
3405156c4b use right script 2019-02-13 15:16:03 +00:00
Matthew Hodgson
1682ee95ac switch to registering users via add_users.sh 2019-02-13 15:16:03 +00:00
Erik Johnston
e7b70e272f Fix jaeger over federation 2019-02-13 15:16:03 +00:00
Erik Johnston
ec288b48fd add basic jaeger support 2019-02-13 15:16:03 +00:00
Erik Johnston
27ca009b0a Reenable retries for sending transactions 2019-02-13 15:16:03 +00:00
Erik Johnston
6284acf910 Add API to force new threads 2019-02-13 15:16:03 +00:00
Erik Johnston
93db2124ec Add timestamp lookup API 2019-02-13 15:16:03 +00:00
Brendan Abolivier
ca0e0892ca Fix proxy 2019-02-13 15:16:03 +00:00
Brendan Abolivier
8cccbc6f47 Use UDP-able proxy 2019-02-13 15:16:02 +00:00
Brendan Abolivier
799112b0fd Fix cbor encoding in the proxy and enable it by default 2019-02-13 15:16:02 +00:00
Brendan Abolivier
4f7b42c20f Update proxy 2019-02-13 15:16:02 +00:00
Brendan Abolivier
72779ec93f Start synapse + proxy in the same container 2019-02-13 15:16:02 +00:00
Brendan Abolivier
fc99d3dab3 Make the Docker image run both synapse and the proxy 2019-02-13 15:16:02 +00:00
Brendan Abolivier
55bfb3caa8 Make synapse talk HTTP to the local proxy only when federating 2019-02-13 15:16:02 +00:00
Erik Johnston
e8be4ca1ad Join via closest server 2019-02-13 15:16:02 +00:00
Erik Johnston
781bd4fb96 FILES 2019-02-13 15:16:02 +00:00
Erik Johnston
4b5ad3dd12 Add SYNAPSE_LOG_HOST to enable HTTP logging for PDU tracking 2019-02-13 15:15:56 +00:00
Matthew Hodgson
a688d10bca secret password; more timeout 2019-02-13 14:24:42 +00:00
Matthew Hodgson
fe3b9d085f meshsim Dockerfile 2019-02-13 14:24:42 +00:00
Ashe Connor
dad89a4902 add changelog.d entry 2019-02-13 14:24:42 +00:00
Ashe Connor
d5243f0ff3 add jpeg to OpenBSD prereq list
Signed-off-by: Ashe Connor <ashe@kivikakk.ee>
2019-02-13 14:24:42 +00:00
Travis Ralston
037a5b48a6 Fix the terms UI auth tests
By setting the config value directly, we skip the block that adds the slash automatically for us.
2019-02-13 14:24:42 +00:00
Travis Ralston
5abcb455b2 Changelog 2019-02-13 14:24:42 +00:00
Travis Ralston
8d98c4e3e3 Remove duplicate slashes in generated consent URLs 2019-02-13 14:24:42 +00:00
Amber Brown
e0581ccf0e Fix Content-Disposition in media repository (#4176) 2019-02-13 14:24:42 +00:00
Travis Ralston
ac9b734e31 Add option to track MAU stats (but not limit people) (#3830) 2019-02-13 14:24:42 +00:00
Amber Brown
dc768f208e Use <meta> tags to discover the per-page encoding of html previews (#4183) 2019-02-13 14:24:42 +00:00
Amber Brown
404cee9853 Add a coveragerc (#4180) 2019-02-13 14:24:42 +00:00
Richard van der Hoff
166cc35a48 Update README for #1491 fix 2019-02-13 14:24:42 +00:00
Richard van der Hoff
0d934b9ae1 changelog 2019-02-13 14:24:42 +00:00
Richard van der Hoff
ba2b6229c1 Add a test for the public T&Cs form 2019-02-13 14:24:42 +00:00
Richard van der Hoff
7cee15c47d Fix an internal server error when viewing the public privacy policy 2019-02-13 14:24:42 +00:00
David Baker
71f866d54d pep8 2019-02-13 14:24:42 +00:00
David Baker
785f5ef0f3 add docs 2019-02-13 14:24:42 +00:00
David Baker
daf28668d0 Remove unnecessary str() 2019-02-13 14:24:42 +00:00
David Baker
e750d031c8 Cast to int here too 2019-02-13 14:24:42 +00:00
David Baker
efb77b87d1 Cast bacjup version to int when querying 2019-02-13 14:24:42 +00:00
David Baker
b0ac23319a Convert version back to a string 2019-02-13 14:24:42 +00:00
David Baker
515a6cb0d3 news fragment 2019-02-13 14:24:42 +00:00
David Baker
8f46b61aed Try & make it work on postgres 2019-02-13 14:24:42 +00:00
David Baker
f814a1ec5a Make e2e backup versions numeric in the DB
We were doing max(version) which does not do what we wanted
on a column of type TEXT.
2019-02-13 14:24:42 +00:00
Brendan Abolivier
7b28b058e1 Add a Content-Type header on POST requests to the federation client 2019-02-13 14:24:42 +00:00
Erik Johnston
06132f1f0b Add test to assert set_e2e_device_keys correctly returns False on no-op 2019-02-13 14:24:42 +00:00
Erik Johnston
b8077ca8cd Lets convert bytes to unicode instead 2019-02-13 14:24:42 +00:00
Erik Johnston
e56d0456cb Newsfile 2019-02-13 14:24:42 +00:00
Erik Johnston
755f42d769 Fix noop checks when updating device keys
Clients often reupload their device keys (for some reason) so its
important for the server to check for no-ops before sending out device
list update notifications.

The check is broken in python 3 due to the fact comparing bytes and
unicode always fails, and that we write bytes to the DB but get unicode
when we read.
2019-02-13 14:24:42 +00:00
Richard van der Hoff
ef77ab59a7 fix parse_string docstring 2019-02-13 14:24:42 +00:00
Richard van der Hoff
d26852e9d8 changelog 2019-02-13 14:24:42 +00:00
hera
1f8a82077e Fix encoding error for consent form on python3
The form was rendering this as "b'01234....'".

-- richvdh
2019-02-13 14:24:42 +00:00
Erik Johnston
14059e2300 pep8 2019-02-13 14:24:41 +00:00
Erik Johnston
3223f415e2 Add server health apis and server presence 2019-02-13 14:23:21 +00:00
Erik Johnston
f57e71645a Missing file 2019-02-13 14:22:59 +00:00
Erik Johnston
c400d9dcca Add backchatter 2019-02-13 14:22:58 +00:00
Erik Johnston
ed43a63fcf Don't verify stuff 2019-02-13 14:22:18 +00:00
Erik Johnston
e6896040c7 Merge branch 'erikj/thread_demo' of github.com:matrix-org/synapse into erikj/add_routing_hooks 2018-11-21 11:45:11 +00:00
Erik Johnston
d0d3c63705 Fix threading when pulling in via get_missing_events 2018-11-21 10:45:35 +00:00
Erik Johnston
5ae1644d3d Send down new thread marker 2018-11-20 17:42:43 +00:00
Erik Johnston
115e4bb4c6 Fix threading 2018-11-20 17:04:19 +00:00
Erik Johnston
607ac7ea37 Lower all the timeouts 2018-11-20 13:32:47 +00:00
Erik Johnston
775441105a Reduce timeouts for sending transaction 2018-11-20 11:30:43 +00:00
Erik Johnston
e644f49b46 Delta file 2018-11-19 15:09:07 +00:00
Erik Johnston
712caeba60 Add hooks in federation for funky event routing 2018-11-14 16:12:33 +00:00
Erik Johnston
956b47da2b Dont' log so aggressively 2018-11-14 15:32:33 +00:00
Erik Johnston
822fcc3bb8 Add concept of internal events 2018-11-13 15:33:54 +00:00
Erik Johnston
5daa2b9dbc Fix sync for archived rooms 2018-11-13 15:13:03 +00:00
Erik Johnston
08395c7f89 Implemented thread support for backfills 2018-11-13 14:56:38 +00:00
Erik Johnston
c67953748d Add thread_id to filter 2018-11-13 10:34:38 +00:00
Erik Johnston
78fec6b3c9 Add flag to sync to exclude threads 2018-11-12 16:20:14 +00:00
Erik Johnston
dfa830e61a Store and fetch thread IDs 2018-11-12 15:44:22 +00:00
54 changed files with 2104 additions and 319 deletions

View File

@@ -1,21 +1,26 @@
ARG PYTHON_VERSION=2
ARG PYTHON_VERSION=3
###
### Stage 0: builder
###
FROM docker.io/python:${PYTHON_VERSION}-alpine3.8 as builder
FROM docker.io/python:${PYTHON_VERSION}-slim-stretch as builder
# install the OS build deps
RUN apk add \
build-base \
RUN apt-get update && apt-get install -y \
build-essential \
libffi-dev \
libjpeg-turbo-dev \
libressl-dev \
libxslt-dev \
linux-headers \
postgresql-dev \
zlib-dev
sqlite3 \
libssl-dev \
libjpeg-dev \
libxslt1-dev \
libxml2-dev \
libpq-dev
# for ksm_preload
RUN apt-get install -y \
git \
cmake
# build things which have slow build steps, before we copy synapse, so that
# the layer can be cached.
@@ -34,30 +39,57 @@ RUN pip install --prefix="/install" --no-warn-script-location \
COPY . /synapse
RUN pip install --prefix="/install" --no-warn-script-location \
lxml \
psycopg2 \
psycopg2-binary \
/synapse
# N.B. to work, this needs:
# echo 1 > /sys/kernel/mm/ksm/run
# echo 31250 > /sys/kernel/mm/ksm/pages_to_scan # 128MB of 4KB pages at a time
# echo 10000 > /sys/kernel/mm/ksm/pages_to_scan # 40MB of pages at a time
# ...to be run in the Docker host
RUN git clone https://github.com/unbrice/ksm_preload && \
cd ksm_preload && \
cmake . && \
make && \
cp libksm_preload.so /install/lib
###
### Stage 1: runtime
###
FROM docker.io/python:${PYTHON_VERSION}-alpine3.8
FROM docker.io/python:${PYTHON_VERSION}-slim-stretch
RUN apk add --no-cache --virtual .runtime_deps \
libffi \
libjpeg-turbo \
libressl \
libxslt \
libpq \
zlib \
su-exec
RUN apt-get update && apt-get install -y \
procps \
net-tools \
iproute2 \
tcpdump \
traceroute \
mtr-tiny \
inetutils-ping \
less \
lsof \
supervisor \
netcat
# for topologiser
RUN pip install flask
COPY --from=builder /install /usr/local
COPY ./docker/start.py /start.py
COPY ./docker/conf /conf
COPY ./docker/proxy/proxy /proxy/proxy
COPY ./docker/proxy/maps /proxy/maps
COPY ./docker/supervisord.conf /etc/supervisor/conf.d/supervisord.conf
VOLUME ["/data"]
EXPOSE 8008/tcp 8448/tcp
EXPOSE 8008/tcp 8448/tcp 3000/tcp 5683/udp
ENTRYPOINT ["/start.py"]
ENV LD_PRELOAD=/usr/local/lib/libksm_preload.so
# default is 32768 (8 4KB pages)
ENV KSMP_MERGE_THRESHOLD=16384
ENTRYPOINT ["/usr/bin/supervisord"]

View File

@@ -55,7 +55,7 @@ database:
database: "{{ POSTGRES_DB or "synapse" }}"
host: "{{ POSTGRES_HOST or "db" }}"
port: "{{ POSTGRES_PORT or "5432" }}"
cp_min: 5
cp_min: 1
cp_max: 10
{% else %}
database:
@@ -73,7 +73,7 @@ log_config: "/compiled/log.config"
## Ratelimiting ##
rc_messages_per_second: 0.2
rc_messages_per_second: 50
rc_message_burst_count: 10.0
federation_rc_window_size: 1000
federation_rc_sleep_limit: 10

View File

@@ -15,6 +15,13 @@ handlers:
formatter: precise
filters: [context]
{% if SYNAPSE_LOG_HOST %}
http_meshsim:
class: logging.handlers.HTTPHandler
host: {{ SYNAPSE_LOG_HOST }}:3000
url: "/log"
{% endif %}
loggers:
synapse:
level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
@@ -24,6 +31,12 @@ loggers:
# information such as access tokens.
level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
{% if SYNAPSE_LOG_HOST %}
synapse.federation.pdu_destination_logger:
level: INFO
handlers: [http_meshsim,console]
{% endif %}
root:
level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
handlers: [console]

View File

@@ -0,0 +1,35 @@
[
"errcode",
"error",
"msgtype",
"body",
"formatted_body",
"format",
"avatar_url",
"displayname",
"membership",
"url",
"name",
"origin",
"origin_server_ts",
"pdus",
"edus",
"sender",
"type",
"auth_events",
"unsigned",
"prev_events",
"depth",
"redacts",
"event_id",
"room_id",
"state_key",
"content",
"edu_type",
"sha256",
"typing",
"user_id",
"m.read",
"data",
"event_ids"
]

View File

@@ -0,0 +1,5 @@
[
"m.text",
"m.emote",
"m.notice"
]

View File

@@ -0,0 +1,8 @@
[
"m.presence_invite",
"m.presence_accept",
"m.presence_deny",
"m.presence",
"m.typing",
"m.receipt"
]

View File

@@ -0,0 +1,36 @@
[
"M_BAD_JSON",
"M_BAD_STATE",
"M_CANNOT_LEAVE_SERVER_NOTICE_ROOM",
"M_CAPTCHA_INVALID",
"M_CAPTCHA_NEEDED",
"M_CONSENT_NOT_GIVEN",
"M_EXAMPLE_ERROR",
"M_EXCLUSIVE",
"M_FORBIDDEN",
"M_GUEST_ACCESS_FORBIDDEN",
"M_INCOMPATIBLE_ROOM_VERSION",
"M_INVALID_PARAM",
"M_INVALID_ROOM_STATE",
"M_INVALID_USERNAME",
"M_LIMIT_EXCEEDED",
"M_MISSING_PARAM",
"M_MISSING_TOKEN",
"M_NOT_FOUND",
"M_NOT_JSON",
"M_RESOURCE_LIMIT_EXCEEDED",
"M_ROOM_IN_USE",
"M_SERVER_NOT_TRUSTED",
"M_THREEPID_AUTH_FAILED",
"M_THREEPID_DENIED",
"M_THREEPID_IN_USE",
"M_THREEPID_NOT_FOUND",
"M_TOO_LARGE",
"M_UNAUTHORIZED",
"M_UNKNOWN",
"M_UNKNOWN_TOKEN",
"M_UNRECOGNIZED",
"M_UNSUPPORTED_ROOM_VERSION",
"M_USER_IN_USE",
"M_WEAK_PASSWORD"
]

View File

@@ -0,0 +1,26 @@
[
"m.call.answer",
"m.call.candidates",
"m.call.hangup",
"m.call.invite",
"m.direct",
"m.presence",
"m.receipt",
"m.room.aliases",
"m.room.avatar",
"m.room.canonical_alias",
"m.room.create",
"m.room.guest_access",
"m.room.history_visibility",
"m.room.join_rules",
"m.room.member",
"m.room.message",
"m.room.message.feedback",
"m.room.name",
"m.room.power_levels",
"m.room.redaction",
"m.room.third_party_invite",
"m.room.topic",
"m.tag",
"m.typing"
]

View File

@@ -0,0 +1,80 @@
kdisplayname
javatar_url
mdid_not_relay
fage_ts
dpdus
gmsgtypefm.text
gcontent¢dbody
dbody
bhunsigned
ddtab
kauth_events
edepth
foriginhsynapse
porigin_server_ts
fsender
typenm.room.message
hevent_ido$
kprev_events
groom_idn!
cXJGX%
dtypenm.room.message
dtypemm.room.member
events
:synapse0
:synapse1
:synapse2
:synapse3
:synapse4
:synapse5
:synapse7
:synapse8
&exclude_threaded=true
chunk
start
end
thread_id
=%7B%22thread_id%22%3A0%7D&2=20&dir=b&from=
transaction_id
m.room.room_version
m.room.power_levels
m.room.join_rule
m.room.guest_access
user_id
dtypenm.room.aliases
dtypevm.room.canonical_aliasfsender
device_id
home_server
access_token
"dpdus\x81\xaadtypenm.room.messageedepth"
"gcontent\xa2dbody"
"gmsgtypefm.textgroom_idn"
"porigin_server_ts\x1b\x00\x00\x01gZ\xe3\xfd\x1c"
"\x11*\xd1\x02\x06\xd1\x14"
"B\x03\x00:\x8d\x87\xb10\x021Q\x00\x11* \xd1\x00\x06\xd2\x14\x01\x06\xff\xa1dpdus\x81\xaadtypenm.room.messageedepth\x18"
"bE\x00\x01\x1dr\xc1*\xb1\x06Q\x07\xff\xa1dpdus\xa0"
"\xa1dpdus\xa0"
"\xa8erooms\xa3djoin\xa0eleave\xa0finvite\xa0fgroups\xa3djoin\xa0eleave\xa0finvite\xa0hpresence\xa1fevents\x80ito_device\xa1fevents\x80jnext_batchts58_10_0_1_1_1_1_3_1laccount_data\xa1fevents\x80ldevice_lists\xa2dleft\x80gchanged\x80x\x1adevice_one_time_keys_count\xa1qsigned_curve25519\x182"
"\xa8erooms\xa3djoin\xa1n!DQQ0:synapse0\xa6estate\xa1fevents\x80gsummary\xa0htimeline\xa3fevents\x81\xa6dtypenm.room.messagefsender"
"`Zy\x1eglimited\xf4jprev_batchss16_3_0_1_1_1_1_3_1iephemeral\xa1fevents\x80laccount_data\xa1fevents\x80tunread_notifications\xa0eleave"
"ephemeral\xa1fevents\x80laccount_data\xa1fevents\x81\xa2dtypelm.fully_readgcontent\xa1hevent_idk"
"chunk\x8a\xaacage\x0cdtypemm.room.memberfsenderqgcontent\xa3javatar_url\xf6jmembershipdjoinkdisplayname\xf6groom_id"
"gcontent\xa1rhistory_visibilityfsharedgroom_id"
"dtypex\x19m.room.history_visibility"
"gcontent\xa1ijoin_rulefpublicgroom_idn"
"dtypesm.room.power_levelsfsenderq"
"gcontent\xa9cban\x182dkick\x182eusers\xa1q"
"\x18dfevents\xa5km.room.name\x182mm.room.avatar\x182sm.room.power_levels\x18dvm.room.canonical_alias\x182x\x19m.room.history_visibility\x18dfinvite\x00fredact\x182mstate_default\x182musers_default\x00nevents_default\x00groom_idn"
"gcontent\xa2gcreatorqlroom_versiona1groom_idn"
"\xa1eflows\x81\xa1dtypepm.login.password"
"\xa2eerroroNo backup foundgerrcodekM_NOT_FOUND"
"xa1kdevice_keys\xa5dkeys\xa2red25519:J"
"jalgorithms\x82x\x1cm.olm.v1.curve25519-aes-sha2tm.megolm.v1.aes-sha2jsignatures\xa1"
"\xa2fdevice\xa0fglobal\xa5droom\x80fsender\x80gcontent\x81\xa5gactions\x83fnotify\xa2evaluegdefaultiset_tweakesound\xa1iset_tweakihighlightgdefault\xf5genabled\xf5gpatternggrule_idx\x1a.m.rule.contains_user_namehoverride\x86\xa5gactions\x81kdont_notifygdefault\xf5genabled\xf4grule_idn.m.rule.masterjconditions\x80\xa5gactions\x81kdont_notifygdefault\xf5genabled\xf5grule_idx\x18.m.rule.suppress_noticesjconditions\x81\xa3ckeyocontent.msgtypedkindkevent_matchgpatternhm.notice\xa5gactions\x83fnotify\xa2evaluegdefaultiset_tweakesound\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_idu.m.rule.invite_for_mejconditions\x83\xa3ckeydtypedkindkevent_matchgpatternmm.room.member\xa3ckeyrcontent.membershipdkindkevent_matchgpatternfinvite\xa3ckeyistate_keydkindkevent_matchgpatternq\xa5gactions\x81kdont_notifygdefault\xf5genabled\xf5grule_idt.m.rule.member_eventjconditions\x81\xa3ckeydtypedkindkevent_matchgpatternmm.room.member\xa5gactions\x83fnotify\xa2evaluegdefaultiset_tweakesound\xa1iset_tweakihighlightgdefault\xf5genabled\xf5grule_idx\x1d.m.rule.contains_display_namejconditions\x81\xa1dkinducontains_display_name\xa5gactions\x82fnotify\xa2evalue\xf5iset_tweakihighlightgdefault\xf5genabled\xf5grule_idq.m.rule.roomnotifjconditions\x82\xa3ckeylcontent.bodydkindkevent_matchgpatterne@room\xa2ckeydroomdkindx\x1esender_notification_permissioniunderride\x85\xa5gactions\x83fnotify\xa2evaluedringiset_tweakesound\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_idl.m.rule.calljconditions\x81\xa3ckeydtypedkindkevent_matchgpatternmm.call.invite\xa5gactions\x83fnotify\xa2evaluegdefaultiset_tweakesound\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_idw.m.rule.room_one_to_onejconditions\x82\xa2bisa2dkindqroom_member_count\xa3ckeydtypedkindkevent_matchgpatternnm.room.message\xa5gactions\x83fnotify\xa2evaluegdefaultiset_tweakesound\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_idx!.m.rule.encrypted_room_one_to_onejconditions\x82\xa2bisa2dkindqroom_member_count\xa3ckeydtypedkindkevent_matchgpatternpm.room.encrypted\xa5gactions\x82fnotify\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_ido.m.rule.messagejconditions\x81\xa3ckeydtypedkindkevent_matchgpatternnm.room.message\xa5gactions\x82fnotify\xa2evalue\xf4iset_tweakihighlightgdefault\xf5genabled\xf5grule_idq.m.rule.encryptedjconditions\x81\xa3ckeydtypedkindkevent_matchgpatternpm.room.encrypted"
"\xa1droom\xa1htimeline\xa1elimit\x14"
"\xa8erooms\xa3djoin\xa1n!eYB0:synapse0\xa6estate\xa1fevents\x80gsummary\xa0htimeline\xa3fevents\x8e\xa7dtypemm.room.createfsenderq"
"\xa1mone_time_keys\xa5x\x18signed_curve25519:"
"\xa1sone_time_key_counts\xa1qsigned_curve25519\x05"
"\xa4jexpires_in\xfb@\xac \x00\x00\x00\x00\x00jtoken_typefBearerlaccess_tokenxrmatrix_server_namehsynapse"

View File

@@ -0,0 +1,18 @@
[
"minimum_valid_until_ts",
"v",
"limit",
"event_id",
"ver",
"limit",
"since",
"include_all_networks",
"third_party_instance_id",
"room_alias",
"user_id",
"field",
"minimum_valid_until_ts",
"filter",
"access_token",
"timeout"
]

View File

@@ -0,0 +1,519 @@
[
{
"path": "/_matrix/federation/v1/send/{txnId}",
"method": "put",
"name": "send_transaction"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/send/{eventType}/{txnId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/profile/{userId}/displayname",
"method": "get"
},
{
"path": "/_matrix/client/r0/profile/{userId}/displayname",
"method": "put"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/join",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/kick",
"method": "post"
},
{
"path": "/_matrix/client/r0/admin/whois/{userId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/receipt/{receiptType}/{eventId}",
"method": "post"
},
{
"path": "/_matrix/client/versions",
"method": "get"
},
{
"path": "/_matrix/media/r0/config",
"method": "get"
},
{
"path": "/_matrix/media/r0/download/{serverName}/{mediaId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/invite ",
"method": "post"
},
{
"path": "/_matrix/client/r0/join/{roomIdOrAlias}",
"method": "post"
},
{
"path": "/_matrix/client/r0/presence/list/{userId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/presence/list/{userId}",
"method": "post"
},
{
"path": "/_matrix/client/r0/pushrules/{scope}/{kind}/{ruleId}",
"method": "delete"
},
{
"path": "/_matrix/client/r0/pushrules/{scope}/{kind}/{ruleId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/pushrules/{scope}/{kind}/{ruleId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/state/{eventType}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/state/{eventType}",
"method": "put"
},
{
"path": "/_matrix/client/r0/account/whoami",
"method": "get"
},
{
"path": "/_matrix/client/r0/devices",
"method": "get"
},
{
"path": "/_matrix/client/r0/keys/claim",
"method": "post"
},
{
"path": "/_matrix/client/r0/login",
"method": "get"
},
{
"path": "/_matrix/client/r0/login",
"method": "post"
},
{
"path": "/_matrix/client/r0/pushrules/{scope}/{kind}/{ruleId}/actions",
"method": "put"
},
{
"path": "/_matrix/client/r0/register/available",
"method": "get"
},
{
"path": "/_matrix/client/r0/user/{userId}/filter",
"method": "post"
},
{
"path": "/_matrix/client/r0/user_directory/search",
"method": "post"
},
{
"path": "/_matrix/client/r0/account/3pid",
"method": "get"
},
{
"path": "/_matrix/client/r0/account/3pid",
"method": "post"
},
{
"path": "/_matrix/client/r0/publicRooms",
"method": "get"
},
{
"path": "/_matrix/client/r0/publicRooms",
"method": "post"
},
{
"path": "/_matrix/client/r0/register",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/ban",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/typing/{userId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/search",
"method": "post"
},
{
"path": "/_matrix/client/r0/account/password",
"method": "post"
},
{
"path": "/_matrix/client/r0/initialSync",
"method": "get"
},
{
"path": "/_matrix/client/r0/logout/all",
"method": "post"
},
{
"path": "/_matrix/client/r0/account/deactivate",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/forget",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/redact/{eventId}/{txnId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/unban",
"method": "post"
},
{
"path": "/_matrix/client/r0/keys/query",
"method": "post"
},
{
"path": "/_matrix/client/r0/user/{userId}/account_data/{type}",
"method": "put"
},
{
"path": "/_matrix/client/r0/user/{userId}/rooms/{roomId}/tags/{tag}",
"method": "delete"
},
{
"path": "/_matrix/client/r0/user/{userId}/rooms/{roomId}/tags/{tag}",
"method": "put"
},
{
"path": "/_matrix/media/r0/upload",
"method": "post"
},
{
"path": "/_matrix/client/r0/events",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/context/{eventId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/invite",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/messages",
"method": "get"
},
{
"path": "/_matrix/client/r0/account/3pid/delete",
"method": "post"
},
{
"path": "/_matrix/client/r0/createRoom",
"method": "post"
},
{
"path": "/_matrix/client/r0/profile/{userId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/pushrules/{scope}/{kind}/{ruleId}/enabled",
"method": "put"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/leave",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/members",
"method": "get"
},
{
"path": "/_matrix/client/r0/sendToDevice/{eventType}/{txnId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/voip/turnServer",
"method": "get"
},
{
"path": "/.well-known/matrix/client",
"method": "get"
},
{
"path": "/_matrix/client/r0/directory/list/appservice/{networkId}/{roomId}",
"method": "put"
},
{
"path": "/_matrix/client/r0/keys/upload",
"method": "post"
},
{
"path": "/_matrix/client/r0/pushrules/",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/initialSync",
"method": "get"
},
{
"path": "/_matrix/client/r0/user/{userId}/openid/request_token",
"method": "post"
},
{
"path": "/_matrix/client/r0/user/{userId}/rooms/{roomId}/tags",
"method": "get"
},
{
"path": "/_matrix/media/r0/download/{serverName}/{mediaId}/{fileName}",
"method": "get"
},
{
"path": "/_matrix/client/r0/delete_devices",
"method": "post"
},
{
"path": "/_matrix/client/r0/events/{eventId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/profile/{userId}/avatar_url",
"method": "get"
},
{
"path": "/_matrix/client/r0/profile/{userId}/avatar_url",
"method": "put"
},
{
"path": "/_matrix/client/r0/pushers/set",
"method": "post"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/event/{eventId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/report/{eventId}",
"method": "post"
},
{
"path": "/_matrix/media/r0/preview_url",
"method": "get"
},
{
"path": "/_matrix/client/r0/directory/room/{roomAlias}",
"method": "delete"
},
{
"path": "/_matrix/client/r0/directory/room/{roomAlias}",
"method": "get"
},
{
"path": "/_matrix/client/r0/directory/room/{roomAlias}",
"method": "put"
},
{
"path": "/_matrix/client/r0/sync",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/read_markers",
"method": "post"
},
{
"path": "/_matrix/client/r0/logout",
"method": "post"
},
{
"path": "/_matrix/client/r0/notifications",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/joined_members",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/state/{eventType}/{stateKey}",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/state/{eventType}/{stateKey}",
"method": "put"
},
{
"path": "/_matrix/client/r0/user/{userId}/filter/{filterId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/user/{userId}/rooms/{roomId}/account_data/{type}",
"method": "put"
},
{
"path": "/_matrix/client/r0/joined_rooms",
"method": "get"
},
{
"path": "/_matrix/client/r0/keys/changes",
"method": "get"
},
{
"path": "/_matrix/client/r0/presence/{userId}/status",
"method": "get"
},
{
"path": "/_matrix/client/r0/presence/{userId}/status",
"method": "put"
},
{
"path": "/_matrix/client/r0/pushers",
"method": "get"
},
{
"path": "/_matrix/client/r0/rooms/{roomId}/state",
"method": "get"
},
{
"path": "/_matrix/media/r0/thumbnail/{serverName}/{mediaId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/devices/{deviceId}",
"method": "delete"
},
{
"path": "/_matrix/client/r0/devices/{deviceId}",
"method": "get"
},
{
"path": "/_matrix/client/r0/devices/{deviceId}",
"method": "put"
},
{
"path": "/_matrix/federation/v1/backfill/{roomId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/get_missing_events/{roomId}",
"method": "post"
},
{
"path": "/_matrix/federation/v1/event_auth/{roomId}/{eventId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query_auth/{roomId}/{eventId}",
"method": "post"
},
{
"path": "/_matrix/federation/v1/state/{roomId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/state_ids/{roomId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/event/{eventId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/invite/{roomId}/{eventId}",
"method": "put"
},
{
"path": "/_matrix/federation/v1/make_join/{roomId}/{userId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/send_join/{roomId}/{eventId}",
"method": "put"
},
{
"path": "/_matrix/federation/v1/query/{serverName}/{keyId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query",
"method": "post"
},
{
"path": "/_matrix/federation/v1/server/{keyId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/make_leave/{roomId}/{userId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/send_leave/{roomId}/{eventId}",
"method": "put"
},
{
"path": "/_matrix/federation/v1/openid/userinfo",
"method": "get"
},
{
"path": "/_matrix/federation/v1/publicRooms",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query/directory",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query/profile",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query/{queryType}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/query_auth/{roomId}/{eventId}",
"method": "post"
},
{
"path": "/_matrix/federation/v1/state/{roomId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/state_ids/{roomId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/event/{eventId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/event_auth/{roomId}/{eventId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/exchange_third_party_invite/{roomId}",
"method": "put"
},
{
"path": "/_matrix/federation/v1/3pid/onbind",
"method": "put"
},
{
"path": "/_matrix/federation/v1/user/devices/{userId}",
"method": "get"
},
{
"path": "/_matrix/federation/v1/user/keys/claim",
"method": "post"
},
{
"path": "/_matrix/federation/v1/user/keys/query",
"method": "post"
}
]

BIN
docker/proxy/proxy Executable file

Binary file not shown.

View File

@@ -6,6 +6,7 @@ import sys
import subprocess
import glob
import codecs
import time
# Utility functions
convert = lambda src, dst, environ: open(dst, "w").write(jinja2.Template(open(src).read()).render(**environ))
@@ -31,6 +32,10 @@ def generate_secrets(environ, secrets):
# Prepare the configuration
mode = sys.argv[1] if len(sys.argv) > 1 else None
environ = os.environ.copy()
for e in environ:
print("%s:%s" % (e, environ[e]))
ownership = "{}:{}".format(environ.get("UID", 991), environ.get("GID", 991))
args = ["python", "-m", "synapse.app.homeserver"]
@@ -64,4 +69,9 @@ else:
args += ["--config-path", "/compiled/homeserver.yaml"]
# Generate missing keys and start synapse
subprocess.check_output(args + ["--generate-keys"])
os.execv("/sbin/su-exec", ["su-exec", ownership] + args)
# we register our test users in add_accounts.sh now to avoid having to wait for HS launch
#os.system("(sleep 10; /usr/local/bin/register_new_matrix_user -u matthew -p secret -c /compiled/homeserver.yaml -a) &");
os.execv("/usr/local/bin/python", args)

8
docker/supervisord.conf Normal file
View File

@@ -0,0 +1,8 @@
[supervisord]
nodaemon=true
[program:synapse]
command=/start.py
[program:proxy]
command=/proxy/proxy --maps-dir /proxy/maps --debug-log

View File

@@ -120,6 +120,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
"include_redundant_members": {
"type": "boolean"
},
"thread_id": {
"type": "number",
}
}
}
@@ -331,6 +334,8 @@ class Filter(object):
self.contains_url = self.filter_json.get("contains_url", None)
self.thread_id = self.filter_json.get("thread_id", None)
def filters_all_types(self):
return "*" in self.not_types

View File

@@ -38,7 +38,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
AuthError if the checks fail
Returns:
if the auth checks pass.
if the auth checks pass.
"""
if do_size_check:
_check_size_limits(event)
@@ -46,7 +46,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
if not hasattr(event, "room_id"):
raise AuthError(500, "Event has no room_id: %s" % event)
if do_sig_check:
if False and do_sig_check: # Disable all sig checks
sender_domain = get_domain_from_id(event.sender)
event_id_domain = get_domain_from_id(event.event_id)
@@ -103,6 +103,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
"No create event in auth events",
)
if event.type == "org.matrix.server_presence":
return
creating_domain = get_domain_from_id(event.room_id)
originating_domain = get_domain_from_id(event.sender)
if creating_domain != originating_domain:

View File

@@ -44,6 +44,9 @@ class _EventInternalMetadata(object):
def is_invite_from_remote(self):
return getattr(self, "invite_from_remote", False)
def is_internal_event(self):
return getattr(self, "internal_event", False)
def get_send_on_behalf_of(self):
"""Whether this server should send the event on behalf of another server.
This is used by the federation "send_join" API to forward the initial join

View File

@@ -14,9 +14,9 @@
# limitations under the License.
import copy
import string
from synapse.types import EventID
from synapse.util.stringutils import random_string
from . import EventBase, FrozenEvent, _event_dict_property
@@ -49,10 +49,10 @@ class EventBuilderFactory(object):
self.event_id_count = 0
def create_event_id(self):
i = str(self.event_id_count)
i = self.event_id_count
self.event_id_count += 1
local_part = str(int(self.clock.time())) + i + random_string(5)
local_part = _encode_id(i)
e_id = EventID(local_part, self.hostname)
@@ -73,3 +73,19 @@ class EventBuilderFactory(object):
key_values["signatures"] = {}
return EventBuilder(key_values=key_values,)
def _numberToBase(n, b):
if n == 0:
return [0]
digits = []
while n:
digits.append(int(n % b))
n //= b
return digits[::-1]
def _encode_id(i):
digits = string.digits + string.ascii_letters
val_slice = _numberToBase(i, len(digits))
return "".join(digits[x] for x in val_slice)

View File

@@ -74,6 +74,7 @@ class EventContext(object):
"delta_ids",
"prev_state_events",
"app_service",
"thread_id",
"_current_state_ids",
"_prev_state_ids",
"_prev_state_id",
@@ -89,8 +90,9 @@ class EventContext(object):
@staticmethod
def with_state(state_group, current_state_ids, prev_state_ids,
prev_group=None, delta_ids=None):
thread_id, prev_group=None, delta_ids=None):
context = EventContext()
context.thread_id = thread_id
# The current state including the current event
context._current_state_ids = current_state_ids
@@ -141,7 +143,8 @@ class EventContext(object):
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None
"app_service_id": self.app_service.id if self.app_service else None,
"thread_id": self.thread_id,
})
@staticmethod
@@ -158,6 +161,8 @@ class EventContext(object):
"""
context = EventContext()
context.thread_id = input["thread_input"]
# We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids.
context._prev_state_id = input["prev_state_id"]

View File

@@ -26,7 +26,7 @@ from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.http.servlet import assert_params_in_dict
from synapse.types import get_domain_from_id
from synapse.types import get_domain_from_id, EventID
from synapse.util import logcontext, unwrapFirstError
logger = logging.getLogger(__name__)
@@ -79,16 +79,16 @@ class FederationBase(object):
allow_none=True,
)
if not res and pdu.origin != origin:
try:
res = yield self.get_pdu(
destinations=[pdu.origin],
event_id=pdu.event_id,
outlier=outlier,
timeout=10000,
)
except SynapseError:
pass
# if not res and pdu.origin != origin:
# try:
# res = yield self.get_pdu(
# destinations=[pdu.origin],
# event_id=pdu.event_id,
# outlier=outlier,
# timeout=10000,
# )
# except SynapseError:
# pass
if not res:
logger.warn(
@@ -136,6 +136,7 @@ class FederationBase(object):
* throws a SynapseError if the signature check failed.
The deferreds run their callbacks in the sentinel logcontext.
"""
return [defer.succeed(p) for p in pdus]
deferreds = _check_sigs_on_pdus(self.keyring, pdus)
ctx = logcontext.LoggingContext.current_context()
@@ -317,7 +318,7 @@ def event_from_pdu_json(pdu_json, outlier=False):
depth = pdu_json['depth']
if not isinstance(depth, six.integer_types):
raise SynapseError(400, "Depth %r not an intger" % (depth, ),
raise SynapseError(400, "Depth %r not an integer" % (depth, ),
Codes.BAD_JSON)
if depth < 0:
@@ -325,6 +326,40 @@ def event_from_pdu_json(pdu_json, outlier=False):
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
event_id = pdu_json["event_id"]
if event_id[0] != "$":
pdu_json["event_id"] = EventID(
event_id,
get_domain_from_id(pdu_json["sender"]),
).to_string()
event_id = pdu_json["event_id"]
dtab = pdu_json.get("unsigned", {}).pop("dtab", None)
if dtab:
pdu_json.setdefault("unsigned", {})["destinations"] = {
dest: cost
for cost, destinations in dtab
for dest in destinations
}
if "auth_events" in pdu_json:
pdu_json["auth_events"] = [
(e, {}) if isinstance(e, six.string_types) else e
for e in pdu_json["auth_events"]
]
if "prev_events" in pdu_json:
pdu_json["prev_events"] = [
(e, {}) if isinstance(e, six.string_types) else e
for e in pdu_json["prev_events"]
]
if "origin" not in pdu_json:
pdu_json["origin"] = get_domain_from_id(pdu_json["sender"])
logger.info("Unmangled event to: %s", pdu_json)
event = FrozenEvent(
pdu_json
)

View File

@@ -39,6 +39,7 @@ from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from synapse.federation.units import _mangle_pdu
logger = logging.getLogger(__name__)
@@ -486,6 +487,22 @@ class FederationClient(FederationBase):
RuntimeError if no servers were reachable.
"""
healths = yield self.store.get_destination_healths(destinations)
with_healths = []
without_healths = []
for d in destinations:
if healths.get(d):
with_healths.append(d)
else:
without_healths.append(d)
with_healths.sort(key=lambda d: healths[d])
destinations = with_healths + without_healths
logger.info("Trying destinations: %r", destinations)
for destination in destinations:
if destination == self.server_name:
continue
@@ -696,7 +713,7 @@ class FederationClient(FederationBase):
destination=destination,
room_id=room_id,
event_id=event_id,
content=pdu.get_pdu_json(time_now),
content=_mangle_pdu(pdu.get_pdu_json(time_now)),
)
except HttpResponseException as e:
if e.code == 403:

View File

@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
import six
from six import iteritems
@@ -36,19 +37,19 @@ from synapse.api.errors import (
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.federation.units import Edu, Transaction, _mangle_pdu
from synapse.http.endpoint import parse_server_name
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.types import get_domain_from_id
from synapse.util import glob_to_regex
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -71,6 +72,7 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self.clock = hs.get_clock()
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
@@ -83,6 +85,8 @@ class FederationServer(FederationBase):
# come in waves.
self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
self.tracer = hs.get_tracer()
@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, room_id, versions, limit):
@@ -100,7 +104,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_incoming_transaction(self, origin, transaction_data):
def on_incoming_transaction(self, origin, transaction_data, span):
# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = self._clock.time_msec()
@@ -118,13 +122,13 @@ class FederationServer(FederationBase):
(origin, transaction.transaction_id),
)):
result = yield self._handle_incoming_transaction(
origin, transaction, request_time,
origin, transaction, request_time, span,
)
defer.returnValue(result)
@defer.inlineCallbacks
def _handle_incoming_transaction(self, origin, transaction, request_time):
def _handle_incoming_transaction(self, origin, transaction, request_time, span):
""" Process an incoming transaction and return the HTTP response
Args:
@@ -208,24 +212,48 @@ class FederationServer(FederationBase):
pdu_results[event_id] = e.error_dict()
return
thread_id = random.randint(1, 999999999)
pdu_to_thread = {}
first_in_thread = True
for pdu in reversed(pdus_by_room[room_id]):
if self.handler.should_start_thread(pdu):
pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
first_in_thread = False
else:
pdu_to_thread[pdu.event_id] = (0, False)
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
with nested_logging_context(event_id):
try:
yield self._handle_received_pdu(
origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
thread_id, new_thread = pdu_to_thread[pdu.event_id]
logger.info("Assigning thread %d to %s", thread_id, pdu.event_id)
child_span = self.tracer.start_span('handle_pdu', child_of=span)
with child_span:
child_span.set_tag("event_id", event_id)
try:
ret = yield self._handle_received_pdu(
origin, pdu, thread_id=thread_id,
new_thread=new_thread,
span=child_span,
)
if ret:
pdu_results[event_id] = ret
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
child_span.set_tag("error", True)
child_span.log_kv({"error", e})
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
child_span.set_tag("error", True)
child_span.log_kv({"error", e})
child_span.log_kv({"pdu_result": pdu_results.get(event_id)})
yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
@@ -233,7 +261,16 @@ class FederationServer(FederationBase):
)
if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus):
logger.info("Got edus: %s", transaction.edus)
edus = []
for x in transaction.edus:
try:
edus.append(Edu(**x))
except Exception:
logger.exception("Failed to handle EDU: %s", x)
for edu in edus:
yield self.received_edu(
origin,
edu.edu_type,
@@ -244,7 +281,7 @@ class FederationServer(FederationBase):
"pdus": pdu_results,
}
logger.debug("Returning: %s", str(response))
logger.info("Returning: %s", str(response))
yield self.transaction_actions.set_response(
origin,
@@ -329,8 +366,8 @@ class FederationServer(FederationBase):
)
defer.returnValue({
"pdus": [pdu.get_pdu_json() for pdu in pdus],
"auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
"pdus": [_mangle_pdu(pdu.get_pdu_json()) for pdu in pdus],
"auth_chain": [_mangle_pdu(pdu.get_pdu_json()) for pdu in auth_chain],
})
@defer.inlineCallbacks
@@ -375,7 +412,7 @@ class FederationServer(FederationBase):
yield self.check_server_matches_acl(origin_host, pdu.room_id)
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
time_now = self._clock.time_msec()
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
defer.returnValue((200, {"event": _mangle_pdu(ret_pdu.get_pdu_json(time_now))}))
@defer.inlineCallbacks
def on_send_join_request(self, origin, content):
@@ -389,9 +426,9 @@ class FederationServer(FederationBase):
res_pdus = yield self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
defer.returnValue((200, {
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
"state": [_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["state"]],
"auth_chain": [
p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["auth_chain"]
],
}))
@@ -424,7 +461,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id)
res = {
"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
"auth_chain": [_mangle_pdu(a.get_pdu_json(time_now)) for a in auth_pdus],
}
defer.returnValue((200, res))
@@ -473,7 +510,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
send_content = {
"auth_chain": [
e.get_pdu_json(time_now)
_mangle_pdu(e.get_pdu_json(time_now))
for e in ret["auth_chain"]
],
"rejects": ret.get("rejects", []),
@@ -549,7 +586,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec()
defer.returnValue({
"events": [ev.get_pdu_json(time_now) for ev in missing_events],
"events": [_mangle_pdu(ev.get_pdu_json(time_now)) for ev in missing_events],
})
@log_function
@@ -571,7 +608,7 @@ class FederationServer(FederationBase):
)
@defer.inlineCallbacks
def _handle_received_pdu(self, origin, pdu):
def _handle_received_pdu(self, origin, pdu, thread_id, new_thread, span):
""" Process a PDU received in a federation /send/ transaction.
If the event is invalid, then this method throws a FederationError.
@@ -601,30 +638,6 @@ class FederationServer(FederationBase):
if the event was unacceptable for any other reason (eg, too large,
too many prev_events, couldn't find the prev_events)
"""
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if origin != get_domain_from_id(pdu.event_id):
# We continue to accept join events from any server; this is
# necessary for the federation join dance to work correctly.
# (When we join over federation, the "helper" server is
# responsible for sending out the join event, rather than the
# origin. See bug #1893).
if not (
pdu.type == 'm.room.member' and
pdu.content and
pdu.content.get("membership", None) == 'join'
):
logger.info(
"Discarding PDU %s from invalid origin %s",
pdu.event_id, origin
)
return
else:
logger.info(
"Accepting join PDU %s from %s",
pdu.event_id, origin
)
# Check signature.
try:
pdu = yield self._check_sigs_and_hash(pdu)
@@ -636,10 +649,35 @@ class FederationServer(FederationBase):
affected=pdu.event_id,
)
destinations = pdu.unsigned.get("destinations", {})
costs = yield self.store.get_destination_healths(list(destinations))
logger.info("Destinations: %s", destinations)
logger.info("Costs: %s", costs)
dont_relay = set()
for dest, their_cost in destinations.items():
our_cost = costs.get(dest)
if our_cost and their_cost and their_cost < our_cost:
dont_relay.add(dest)
if destinations:
pdu.unsigned["destinations"] = {
d: c for d, c in destinations.items() if d not in dont_relay
}
yield self.handler.on_receive_pdu(
origin, pdu, sent_to_us_directly=True,
thread_id=thread_id, new_thread=new_thread,
span=span,
)
ret = {}
if dont_relay:
ret = {"did_not_relay": list(dont_relay)}
defer.returnValue(ret)
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name

View File

@@ -14,6 +14,10 @@
# limitations under the License.
import datetime
import logging
import random
import json
import opentracing
import string
from six import itervalues
@@ -22,7 +26,9 @@ from prometheus_client import Counter
from twisted.internet import defer
import synapse.metrics
from synapse.api.constants import EventTypes
from synapse.api.errors import FederationDeniedError, HttpResponseException
from synapse.events import FrozenEvent
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
from synapse.metrics import (
LaterGauge,
@@ -42,6 +48,8 @@ from .units import Edu, Transaction
logger = logging.getLogger(__name__)
pdu_logger = logging.getLogger("synapse.federation.pdu_destination_logger")
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count", ""
)
@@ -128,7 +136,7 @@ class TransactionQueue(object):
self.last_device_list_stream_id_by_dest = {}
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
self._next_txn_id = 1
self._order = 1
@@ -137,6 +145,8 @@ class TransactionQueue(object):
self._processing_pending_presence = False
self.tracer = hs.get_tracer()
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
@@ -169,10 +179,9 @@ class TransactionQueue(object):
@defer.inlineCallbacks
def handle_event(event):
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id)
if not is_mine and send_on_behalf_of is None:
should_relay = yield self._should_relay(event, False)
logger.info("Should relay event %s: %s", event.event_id, should_relay)
if not should_relay:
return
try:
@@ -194,15 +203,9 @@ class TransactionQueue(object):
destinations = set(destinations)
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, destinations)
self._send_pdu(event, destinations)
yield self._send_pdu(event, destinations)
@defer.inlineCallbacks
def handle_room_events(events):
@@ -248,24 +251,54 @@ class TransactionQueue(object):
finally:
self._is_processing = False
def _send_pdu(self, pdu, destinations):
@defer.inlineCallbacks
def received_new_event(self, origin, event, span):
should_relay = yield self._should_relay(event, True)
logger.info("Should relay event %s: %s", event.event_id, should_relay)
if not should_relay:
return
destinations = event.unsigned.get("destinations")
destinations = set(destinations)
logger.debug("Sending %s to %r", event, destinations)
yield self._send_pdu(event, destinations, span)
@defer.inlineCallbacks
def _send_pdu(self, pdu, destinations, span=None):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
order = self._order
self._order += 1
references = []
if span:
references = [opentracing.follows_from(span.context)]
with self.tracer.start_span('_send_pdu', references=references) as span:
span.set_tag("event_id", pdu.event_id)
span.set_tag("room_id", pdu.room_id)
span.set_tag("sender", pdu.sender)
destinations = yield self._compute_relay_destinations(
pdu, joined_hosts=destinations,
)
destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
pdu_logger.info(
"Relaying PDU %s in %s to %s",
pdu.event_id, pdu.room_id, destinations,
)
if not destinations:
return
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
# XXX: Should we decide where to route here.
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
(pdu, order)
@@ -273,6 +306,36 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination)
def _compute_relay_destinations(self, pdu, joined_hosts):
"""Compute where we should send an event. Returning an empty set stops
PDU from being sent anywhere.
"""
# XXX: Hook for routing shenanigans
send_on_behalf_of = pdu.internal_metadata.get_send_on_behalf_of()
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
# send the event to it.
joined_hosts.discard(send_on_behalf_of)
return joined_hosts
def _should_relay(self, event, from_federation):
"""Whether we should consider relaying this event.
"""
# XXX: Hook for routing shenanigans
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id)
if not is_mine and send_on_behalf_of is None:
return False
if event.internal_metadata.is_internal_event():
return False
return True
@logcontext.preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
@@ -412,6 +475,7 @@ class TransactionQueue(object):
@defer.inlineCallbacks
def _transaction_transmission_loop(self, destination):
pdu_spans = {}
pending_pdus = []
try:
self.pending_transactions[destination] = 1
@@ -423,6 +487,12 @@ class TransactionQueue(object):
pending_pdus = []
while True:
txn_id = _encode_id(self._next_txn_id)
self._next_txn_id += 1
for s in pdu_spans.values():
s.set_tag("txn-id", txn_id)
device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages(destination)
)
@@ -439,16 +509,22 @@ class TransactionQueue(object):
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
# We can only include at most 50 PDUs per transactions
pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
pending_pdus, leftover_pdus = pending_pdus[-5:], pending_pdus[:-5]
if leftover_pdus:
self.pending_pdus_by_dest[destination] = leftover_pdus
# self.pending_pdus_by_dest[destination] = leftover_pdus
for _, _, p_span in leftover_pdus:
p_span.set_tag("success", False)
p_span.log_kv({"result": "dropped"})
p_span.finish()
logger.info("TX [%s] Sending PDUs: %s", destination, pending_pdus)
pending_edus = self.pending_edus_by_dest.pop(destination, [])
# We can only include at most 100 EDUs per transactions
pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
if leftover_edus:
self.pending_edus_by_dest[destination] = leftover_edus
pending_edus, leftover_edus = pending_edus[-5:], pending_edus[:-5]
# if leftover_edus:
# self.pending_edus_by_dest[destination] = leftover_edus
pending_presence = self.pending_presence_by_dest.pop(destination, {})
@@ -485,28 +561,70 @@ class TransactionQueue(object):
)
return
pdu_span_references = []
for pdu, _, p_span in pending_pdus:
pdu_spans[pdu.event_id] = p_span
p_span.set_tag("txn-id", txn_id)
pdu_span_references.append(opentracing.follows_from(p_span.context))
# END CRITICAL SECTION
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus,
span = self.tracer.start_span(
'_send_new_transaction', references=pdu_span_references,
)
if success:
sent_transactions_counter.inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
logger.info("Marking as sent %r %r", destination, dev_list_id)
yield self.store.mark_as_sent_devices_by_remote(
destination, dev_list_id
)
with span:
span.set_tag("destination", destination)
span.set_tag("txn-id", txn_id)
self.last_device_stream_id_by_dest[destination] = device_stream_id
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else:
break
try:
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, span,
pdu_spans, txn_id,
)
except Exception as e:
success = False
span.set_tag("error", True)
span.log_kv({"error": e})
for s in pdu_spans.values():
s.set_tag("error", True)
s.log_kv({"transaction_error": e})
raise
finally:
if not success:
for p, _, _ in pending_pdus:
yield self._pdu_send_txn_failed(
destination, txn_id, p,
span=pdu_spans[p.event_id],
)
# We want to be *very* sure we del5ete this after we stop
# processing
self.pending_transactions.pop(destination, None)
for s in pdu_spans.values():
s.finish()
span.set_tag("success", success)
if success:
sent_transactions_counter.inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
logger.info(
"Marking as sent %r %r", destination, dev_list_id,
)
yield self.store.mark_as_sent_devices_by_remote(
destination, dev_list_id
)
self.last_device_stream_id_by_dest[destination] = device_stream_id
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else:
break
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
@@ -519,17 +637,14 @@ class TransactionQueue(object):
except FederationDeniedError as e:
logger.info(e)
except Exception as e:
logger.warn(
logger.exception(
"TX [%s] Failed to send transaction: %s",
destination,
e,
)
for p, _ in pending_pdus:
for p, _, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
destination)
finally:
# We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None)
@defer.inlineCallbacks
def _get_new_device_messages(self, destination):
@@ -565,7 +680,8 @@ class TransactionQueue(object):
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus):
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
span, pdu_spans, txn_id):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
@@ -574,9 +690,13 @@ class TransactionQueue(object):
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
span.log_kv({
"pdus": len(pdus),
"edus": len(edus),
})
logger.debug(
"TX [%s] {%s} Attempting new transaction"
@@ -597,8 +717,6 @@ class TransactionQueue(object):
edus=edus,
)
self._next_txn_id += 1
yield self.transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination)
@@ -628,13 +746,16 @@ class TransactionQueue(object):
try:
response = yield self.transport_layer.send_transaction(
transaction, json_data_cb
transaction, json_data_cb, span,
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
span.set_tag("error", True)
span.log_kv({"error": e})
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response",
@@ -654,18 +775,111 @@ class TransactionQueue(object):
logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination, txn_id, e_id, r,
)
else:
logger.info(
"TX [%s] {%s} got response json %s",
destination, txn_id, response
)
pdu_results = response.get("pdus", {})
for p in pdus:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination, txn_id, p.event_id,
yield self._pdu_send_result(
destination, txn_id, p,
response=pdu_results.get(p.event_id, {}),
span=pdu_spans[p.event_id],
)
else:
success = False
defer.returnValue(success)
@defer.inlineCallbacks
def _pdu_send_result(self, destination, txn_id, pdu, response, span):
"""Gets called after sending the event in a transaction, with the
result for the event from the remote server.
"""
# XXX: Hook for routing shenanigans
if "error" in response:
span.set_tag("error", True)
span.log_kv({
"error.kind": "pdu",
"response.error": response["error"],
})
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination, txn_id, pdu.event_id, response,
)
pdu_logger.info(
"SendErrorPDU",
extra={
"event_id": pdu.event_id, "room_id": pdu.room_id,
"destination": destination,
"server": self.server_name,
},
)
new_destinations = set(pdu.unsigned.get("destinations", []))
new_destinations.discard(destination)
yield self._send_pdu(pdu, list(new_destinations), span)
elif "did_not_relay" in response and response["did_not_relay"]:
new_destinations = set(response["did_not_relay"])
new_destinations.discard(destination)
pdu_logger.info(
"DidNotRelayPDU",
extra={
"event_id": pdu.event_id, "room_id": pdu.room_id,
"destination": destination,
"new_destinations": json.dumps(list(new_destinations)),
"server": self.server_name,
},
)
span.log_kv({
"did_not_relay_to": list(new_destinations),
})
yield self._send_pdu(pdu, list(new_destinations), span)
# @defer.inlineCallbacks
def _pdu_send_txn_failed(self, destination, txn_id, pdu, span):
"""Gets called when sending a transaction failed (after retries)
"""
# XXX: Hook for routing shenanigans
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination, txn_id, pdu.event_id,
)
span.set_tag("error", True)
span.log_kv({
"error.kind": "transaction",
})
pdu_logger.info(
"SendFailPDU",
extra={
"event_id": pdu.event_id, "room_id": pdu.room_id,
"destination": destination,
"server": self.server_name,
},
)
# new_destinations = set(pdu.unsigned.get("destinations", []))
# new_destinations.discard(destination)
# yield self._send_pdu(pdu, list(new_destinations), span)
def _numberToBase(n, b):
if n == 0:
return [0]
digits = []
while n:
digits.append(int(n % b))
n //= b
return digits[::-1]
def _encode_id(i):
digits = string.digits + string.ascii_letters
val_slice = _numberToBase(i, len(digits))
return "".join(digits[x] for x in val_slice)

View File

@@ -136,7 +136,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_transaction(self, transaction, json_data_callback=None):
def send_transaction(self, transaction, json_data_callback=None, span=None):
""" Sends the given Transaction to its destination
Args:
@@ -174,8 +174,9 @@ class TransportLayerClient(object):
path=path,
data=json_data,
json_data_callback=json_data_callback,
long_retries=True,
long_retries=False,
backoff_on_404=True, # If we get a 404 the other side has gone
span=span,
)
defer.returnValue(response)

View File

@@ -15,8 +15,11 @@
# limitations under the License.
import functools
import inspect
import logging
import re
import opentracing
from opentracing.ext import tags
from twisted.internet import defer
@@ -120,12 +123,12 @@ class Authenticator(object):
):
raise FederationDeniedError(origin)
if not json_request["signatures"]:
raise NoAuthenticationError(
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
)
# if not json_request["signatures"]:
# raise NoAuthenticationError(
# 401, "Missing Authorization headers", Codes.UNAUTHORIZED,
# )
yield self.keyring.verify_json_for_server(origin, json_request)
# yield self.keyring.verify_json_for_server(origin, json_request)
logger.info("Request from %s", origin)
request.authenticated_entity = origin
@@ -227,15 +230,22 @@ class BaseFederationServlet(object):
"""
REQUIRE_AUTH = True
def __init__(self, handler, authenticator, ratelimiter, server_name):
def __init__(self, handler, authenticator, ratelimiter, server_name, hs):
self.handler = handler
self.authenticator = authenticator
self.ratelimiter = ratelimiter
self.tracer = hs.get_tracer()
def _wrap(self, func):
authenticator = self.authenticator
ratelimiter = self.ratelimiter
arg_spec = inspect.signature(func)
all_args = arg_spec.parameters
include_span = "request_span" in all_args
logger.info("include_span: %s for %s", include_span, self)
@defer.inlineCallbacks
@functools.wraps(func)
def new_func(request, *args, **kwargs):
@@ -251,32 +261,67 @@ class BaseFederationServlet(object):
Deferred[(int, object)|None]: (response code, response object) as returned
by the callback method. None if the request has already been handled.
"""
content = None
if request.method in [b"PUT", b"POST"]:
# TODO: Handle other method types? other content types?
content = parse_json_object_from_request(request)
try:
origin = yield authenticator.authenticate_request(request, content)
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
logger.warn("authenticate_request failed: missing authentication")
raise
except Exception as e:
logger.warn("authenticate_request failed: %s", e)
raise
if origin:
with ratelimiter.ratelimit(origin) as d:
yield d
response = yield func(
origin, content, request.args, *args, **kwargs
)
else:
response = yield func(
origin, content, request.args, *args, **kwargs
carrier = {}
for key, value in request.requestHeaders.getAllRawHeaders():
carrier[key.decode("ascii")] = value[0].decode("ascii")
parent_ctx = self.tracer.extract(
format=opentracing.Format.HTTP_HEADERS, carrier=carrier
)
except Exception:
logger.exception("trace extract failed")
parent_ctx = None
tags_dict = {
tags.HTTP_METHOD: request.method.decode('ascii'),
tags.HTTP_URL: request.uri.decode('ascii'),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
}
span = self.tracer.start_span(
operation_name="federation-server",
child_of=parent_ctx,
tags=tags_dict,
)
with span:
content = None
if request.method in [b"PUT", b"POST"]:
# TODO: Handle other method types? other content types?
content = parse_json_object_from_request(request)
try:
origin = yield authenticator.authenticate_request(request, content)
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
logger.warn("authenticate_request failed: missing authentication")
raise
except Exception as e:
logger.warn("authenticate_request failed: %s", e)
raise
if include_span:
kwargs["request_span"] = span
try:
if origin:
span.set_tag("origin", origin)
with ratelimiter.ratelimit(origin) as d:
yield d
response = yield func(
origin, content, request.args, *args, **kwargs
)
else:
response = yield func(
origin, content, request.args, *args, **kwargs
)
except Exception as e:
span.set_tag("error", True)
span.log_kv({"error": e})
raise
span.set_tag(tags.HTTP_STATUS_CODE, response[0])
defer.returnValue(response)
@@ -307,7 +352,7 @@ class FederationSendServlet(BaseFederationServlet):
# This is when someone is trying to send us a bunch of data.
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, transaction_id):
def on_PUT(self, origin, content, query, transaction_id, request_span):
""" Called on PUT /send/<transaction_id>/
Args:
@@ -353,7 +398,7 @@ class FederationSendServlet(BaseFederationServlet):
try:
code, response = yield self.handler.on_incoming_transaction(
origin, transaction_data,
origin, transaction_data, request_span,
)
except Exception:
logger.exception("on_incoming_transaction failed")
@@ -1322,6 +1367,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in ROOM_LIST_CLASSES:
@@ -1330,6 +1376,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in GROUP_SERVER_SERVLET_CLASSES:
@@ -1338,6 +1385,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in GROUP_LOCAL_SERVLET_CLASSES:
@@ -1346,6 +1394,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES:
@@ -1354,4 +1403,5 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)

View File

@@ -17,13 +17,18 @@
server protocol.
"""
import itertools
import logging
from synapse.types import get_localpart_from_id, get_domain_from_id
from synapse.util.jsonobject import JsonEncodedObject
logger = logging.getLogger(__name__)
BUCKETS = [0, 50, 100, 200, 350, 500, 750, 1000, 2000, 5000, 10000, 100000]
class Edu(JsonEncodedObject):
""" An Edu represents a piece of data sent from one homeserver to another.
@@ -76,15 +81,14 @@ class Transaction(JsonEncodedObject):
]
internal_keys = [
"transaction_id",
"destination",
]
required_keys = [
"transaction_id",
"origin",
"destination",
"origin_server_ts",
"previous_ids",
]
required_keys = [
"pdus",
]
@@ -108,15 +112,54 @@ class Transaction(JsonEncodedObject):
""" Used to create a new transaction. Will auto fill out
transaction_id and origin_server_ts keys.
"""
if "origin_server_ts" not in kwargs:
raise KeyError(
"Require 'origin_server_ts' to construct a Transaction"
)
if "transaction_id" not in kwargs:
raise KeyError(
"Require 'transaction_id' to construct a Transaction"
)
kwargs["pdus"] = [p.get_pdu_json() for p in pdus]
kwargs["pdus"] = [
_mangle_pdu(p.get_pdu_json())
for p in pdus
]
return Transaction(**kwargs)
def _mangle_pdu(pdu_json):
pdu_json.pop("origin", None)
pdu_json.pop("hashes", None)
pdu_json.pop("signatures", None)
pdu_json.get("unsigned", {}).pop("age_ts", None)
pdu_json.get("unsigned", {}).pop("age", None)
pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"]))
pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"]))
if get_domain_from_id(pdu_json["event_id"]) == get_domain_from_id(pdu_json["sender"]):
pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"])
destinations = pdu_json["unsigned"].pop("destinations", None)
if destinations:
new_destinations = {}
for dest, cost in destinations.items():
for first, second in pairwise(BUCKETS):
if first <= cost <= second:
b = first if cost - first < second - cost else second
new_destinations.setdefault(b, []).append(dest)
break
else:
new_destinations.setdefault(b[-1], []).append(dest)
pdu_json["unsigned"]["dtab"] = list(new_destinations.items())
logger.info("Mangled PDU: %s", pdu_json)
return pdu_json
def _strip_hashes(iterable):
return (
e for e, hashes in iterable
)
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)

View File

@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import secrets
import logging
import unicodedata
@@ -748,7 +750,9 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def issue_access_token(self, user_id, device_id=None):
access_token = self.macaroon_gen.generate_access_token(user_id)
# access_token = self.macaroon_gen.generate_access_token(user_id)
access_token = base64.b64encode(secrets.token_bytes(8))
yield self.store.add_access_token_to_user(user_id, access_token,
device_id)
defer.returnValue(access_token)

View File

@@ -278,6 +278,7 @@ class DeviceHandler(BaseHandler):
"device_list_key", position, rooms=room_ids,
)
return
if hosts:
logger.info("Sending device list update notif to: %r", hosts)
for host in hosts:

View File

@@ -263,9 +263,8 @@ class DirectoryHandler(BaseHandler):
"Room alias %s not found" % (room_alias.to_string(),),
)
users = yield self.state.get_current_user_in_room(room_id)
extra_servers = set(get_domain_from_id(u) for u in users)
servers = set(extra_servers) | set(servers)
hosts = yield self.state.get_current_hosts_in_room(room_id)
servers = set(hosts) | set(servers)
# If this server is in the list of servers, return it first.
if self.server_name in servers:
@@ -276,6 +275,8 @@ class DirectoryHandler(BaseHandler):
else:
servers = list(servers)
logger.info("Returning servers %s", servers)
defer.returnValue({
"room_id": room_id,
"servers": servers,
@@ -290,14 +291,14 @@ class DirectoryHandler(BaseHandler):
400, "Room Alias is not hosted on this Home Server"
)
result = yield self.get_association_from_room_alias(
result = yield self.get_association(
room_alias
)
if result is not None:
defer.returnValue({
"room_id": result.room_id,
"servers": result.servers,
"room_id": result["room_id"],
"servers": result["servers"],
})
else:
raise NotFoundError(

View File

@@ -18,6 +18,7 @@
import itertools
import logging
import random
import six
from six import iteritems, itervalues
@@ -47,6 +48,7 @@ from synapse.crypto.event_signing import (
add_hashes_and_signatures,
compute_event_signature,
)
from synapse.events import FrozenEvent
from synapse.events.validator import EventValidator
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.federation import (
@@ -62,12 +64,15 @@ from synapse.util.distributor import user_joined_room
from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_server
from ._base import BaseHandler
logger = logging.getLogger(__name__)
pdu_logger = logging.getLogger("synapse.federation.pdu_destination_logger")
def shortstr(iterable, maxitems=5):
"""If iterable has maxitems or fewer, return the stringification of a list
@@ -109,6 +114,7 @@ class FederationHandler(BaseHandler):
self.clock = hs.get_clock()
self.store = hs.get_datastore() # type: synapse.storage.DataStore
self.federation_client = hs.get_federation_client()
self.federation_sender = hs.get_federation_sender()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
@@ -135,9 +141,14 @@ class FederationHandler(BaseHandler):
self.room_queues = {}
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
# Always start a new thread for events that have an origin_server_ts
# from before this
self.force_thread_ts = 0
@defer.inlineCallbacks
def on_receive_pdu(
self, origin, pdu, sent_to_us_directly=False,
self, origin, pdu, sent_to_us_directly=False, thread_id=None,
new_thread=False, span=None,
):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -178,8 +189,17 @@ class FederationHandler(BaseHandler):
)
if already_seen:
logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
pdu_logger.info(
"Received already seen event %s in room %s from %s",
pdu.event_id, pdu.room_id, origin,
)
return
pdu_logger.info(
"Received unseen event %s in room %s from %s",
pdu.event_id, pdu.room_id, origin,
)
# do some initial sanity-checking of the event. In particular, make
# sure it doesn't have hundreds of prev_events or auth_events, which
# could cause a huge state resolution or cascade of event fetches.
@@ -261,7 +281,8 @@ class FederationHandler(BaseHandler):
)
yield self._get_missing_events_for_pdu(
origin, pdu, prevs, min_depth
origin, pdu, prevs, min_depth,
thread_id=thread_id,
)
# Update the set of things we've seen after trying to
@@ -304,20 +325,20 @@ class FederationHandler(BaseHandler):
# but there is an interaction with min_depth that I'm not really
# following.
if sent_to_us_directly:
logger.warn(
"[%s %s] Rejecting: failed to fetch %d prev events: %s",
room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
raise FederationError(
"ERROR",
403,
(
"Your server isn't divulging details about prev_events "
"referenced in this event."
),
affected=pdu.event_id,
)
# if sent_to_us_directly:
# logger.warn(
# "[%s %s] Rejecting: failed to fetch %d prev events: %s",
# room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
# )
# raise FederationError(
# "ERROR",
# 403,
# (
# "Your server isn't divulging details about prev_events "
# "referenced in this event."
# ),
# affected=pdu.event_id,
# )
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
@@ -416,15 +437,51 @@ class FederationHandler(BaseHandler):
affected=event_id,
)
logger.info("Thread ID %r", thread_id)
# Remove destinations field before persisting
event_copy = FrozenEvent.from_event(pdu)
pdu.unsigned.pop("destinations", None)
yield self._process_received_pdu(
origin,
pdu,
state=state,
auth_chain=auth_chain,
thread_id=thread_id,
)
if sent_to_us_directly:
yield self.federation_sender.received_new_event(origin, event_copy, span)
if new_thread:
builder = self.event_builder_factory.new({
"type": "org.matrix.new_thread",
"content": {
"thread_id": thread_id,
"latest_event": pdu.event_id,
},
"event_id": random_string(24),
"origin_server_ts": self.clock.time_msec(),
"sender": "@server:server",
"room_id": pdu.room_id,
})
event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
event.internal_metadata.internal_event = True
yield self.event_creation_handler.handle_new_client_event(
create_requester(UserID("server", "server")),
event,
context,
ratelimit=False,
extra_users=[],
do_auth=False,
)
@defer.inlineCallbacks
def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth, thread_id):
"""
Args:
origin (str): Origin of the pdu. Will be called to get the missing events
@@ -506,9 +563,9 @@ class FederationHandler(BaseHandler):
room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
limit=5,
min_depth=min_depth,
timeout=60000,
timeout=15000,
)
logger.info(
@@ -520,6 +577,20 @@ class FederationHandler(BaseHandler):
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
pdu_to_thread = {}
if not thread_id:
thread_id = random.randint(1, 999999999)
first_in_thread = True
for pdu in reversed(missing_events):
if self.should_start_thread(pdu):
pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
first_in_thread = False
else:
pdu_to_thread[pdu.event_id] = (0, False)
else:
for pdu in reversed(missing_events):
pdu_to_thread[pdu.event_id] = (thread_id, False)
for ev in missing_events:
logger.info(
"[%s %s] Handling received prev_event %s",
@@ -531,6 +602,8 @@ class FederationHandler(BaseHandler):
origin,
ev,
sent_to_us_directly=False,
thread_id=pdu_to_thread[ev.event_id][0],
new_thread=pdu_to_thread[ev.event_id][1],
)
except FederationError as e:
if e.code == 403:
@@ -542,7 +615,7 @@ class FederationHandler(BaseHandler):
raise
@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain):
def _process_received_pdu(self, origin, event, state, auth_chain, thread_id):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
"""
@@ -594,6 +667,7 @@ class FederationHandler(BaseHandler):
origin,
event,
state=state,
thread_id=thread_id,
)
except AuthError as e:
raise FederationError(
@@ -676,7 +750,12 @@ class FederationHandler(BaseHandler):
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
set(e.event_id for e in events)
set(
itertools.chain.from_iterable(
itertools.chain([e.event_id], e.prev_event_ids(),)
for e in events
)
)
)
events = [e for e in events if e.event_id not in seen_events]
@@ -691,7 +770,7 @@ class FederationHandler(BaseHandler):
edges = [
ev.event_id
for ev in events
if set(ev.prev_event_ids()) - event_ids
if set(ev.prev_event_ids()) - event_ids - seen_events
]
logger.info(
@@ -725,18 +804,26 @@ class FederationHandler(BaseHandler):
})
missing_auth = required_auth - set(auth_events)
failed_to_fetch = set()
not_in_db = set()
# Try and fetch any missing auth events from both DB and remote servers.
# We repeatedly do this until we stop finding new auth events.
while missing_auth - failed_to_fetch:
logger.info("Missing auth for backfill: %r", missing_auth)
ret_events = yield self.store.get_events(missing_auth - failed_to_fetch)
auth_events.update(ret_events)
required_auth.update(
a_id for event in ret_events.values() for a_id in event.auth_event_ids()
)
missing_auth = required_auth - set(auth_events)
to_fetch_from_db = missing_auth - failed_to_fetch
while to_fetch_from_db - not_in_db:
ret_events = yield self.store.get_events(missing_auth - failed_to_fetch)
auth_events.update(ret_events)
required_auth.update(
a_id
for event in ret_events.values()
for a_id in event.auth_event_ids()
)
missing_auth = required_auth - set(auth_events)
to_fetch_from_db = required_auth - set(auth_events) - not_in_db
if missing_auth - failed_to_fetch:
logger.info(
@@ -805,6 +892,25 @@ class FederationHandler(BaseHandler):
events.sort(key=lambda e: e.depth)
event_id_to_thread = {}
event_to_parents = {}
for event in reversed(events):
threads = yield self.store.get_threads_for_backfill_event(event.event_id)
parents = event_to_parents.get(event.event_id, [])
for p in parents:
t = event_id_to_thread.get(p)
if t is not None:
threads.append(t)
if threads:
thread_id = min(threads)
else:
thread_id = 0
event_id_to_thread[event.event_id] = thread_id
for c in event.prev_event_ids():
event_to_parents.setdefault(c, set()).add(event.event_id)
for event in events:
if event in events_to_state:
continue
@@ -814,6 +920,7 @@ class FederationHandler(BaseHandler):
# TODO: We can probably do something more clever here.
yield self._handle_new_event(
dest, event, backfilled=True,
thread_id=event_id_to_thread[event.event_id],
)
defer.returnValue(events)
@@ -823,12 +930,13 @@ class FederationHandler(BaseHandler):
"""Checks the database to see if we should backfill before paginating,
and if so do.
"""
logger.info("Backfilling")
extremities = yield self.store.get_oldest_events_with_depth_in_room(
room_id
)
if not extremities:
logger.debug("Not backfilling as no extremeties found.")
logger.info("Not backfilling as no extremeties found.")
return
# Check if we reached a point where we should start backfilling.
@@ -843,7 +951,7 @@ class FederationHandler(BaseHandler):
extremities = dict(sorted_extremeties_tuple[:5])
if current_depth > max_depth:
logger.debug(
logger.info(
"Not backfilling as we don't need to. %d < %d",
max_depth, current_depth,
)
@@ -1061,6 +1169,7 @@ class FederationHandler(BaseHandler):
have finished processing the join.
"""
logger.debug("Joining %s to %s", joinee, room_id)
logger.info("Target hosts %s", target_hosts)
origin, event = yield self._make_and_verify_event(
target_hosts,
@@ -1311,6 +1420,8 @@ class FederationHandler(BaseHandler):
sender, target, event.room_id,
)
FrozenEvent.from_event(event)
event.unsigned.pop("invite_room_state", None)
defer.returnValue(event)
@defer.inlineCallbacks
@@ -1588,11 +1699,12 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _handle_new_event(self, origin, event, state=None, auth_events=None,
backfilled=False):
backfilled=False, thread_id=0):
context = yield self._prep_event(
origin, event,
state=state,
auth_events=auth_events,
thread_id=thread_id,
)
# reraise does not allow inlineCallbacks to preserve the stacktrace, so we
@@ -1751,7 +1863,7 @@ class FederationHandler(BaseHandler):
)
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
def _prep_event(self, origin, event, state=None, auth_events=None, thread_id=0):
"""
Args:
@@ -1764,7 +1876,7 @@ class FederationHandler(BaseHandler):
Deferred, which resolves to synapse.events.snapshot.EventContext
"""
context = yield self.state_handler.compute_event_context(
event, old_state=state,
event, old_state=state, thread_id=thread_id,
)
if not auth_events:
@@ -2574,3 +2686,10 @@ class FederationHandler(BaseHandler):
)
else:
return user_joined_room(self.distributor, user, room_id)
def should_start_thread(self, event):
now = self.clock.time_msec()
forced = event.origin_server_ts <= self.force_thread_ts
old = now - event.origin_server_ts > 1 * 60 * 1000
return forced or old

View File

@@ -588,6 +588,7 @@ class EventCreationHandler(object):
context,
ratelimit=True,
extra_users=[],
do_auth=True,
):
"""Processes a new event. This includes checking auth, persisting it,
notifying users, sending to remote servers, etc.
@@ -604,7 +605,8 @@ class EventCreationHandler(object):
"""
try:
yield self.auth.check_from_context(event, context)
if do_auth:
yield self.auth.check_from_context(event, context)
except AuthError as err:
logger.warn("Denying new event %r because %s", event, err)
raise err

View File

@@ -347,7 +347,7 @@ class PresenceHandler(object):
"""Checks the presence of users that have timed out and updates as
appropriate.
"""
logger.info("Handling presence timeouts")
# logger.info("Handling presence timeouts")
now = self.clock.time_msec()
try:
@@ -626,6 +626,7 @@ class PresenceHandler(object):
Args:
states (list(UserPresenceState))
"""
return
self.federation.send_presence(states)
@defer.inlineCallbacks
@@ -816,6 +817,7 @@ class PresenceHandler(object):
if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user)
else:
return
yield self.federation.send_edu(
destination=observed_user.domain,
edu_type="m.presence_invite",
@@ -836,6 +838,7 @@ class PresenceHandler(object):
if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user)
else:
return
self.federation.send_edu(
destination=observer_user.domain,
edu_type="m.presence_accept",

View File

@@ -147,6 +147,8 @@ class ReceiptsHandler(BaseHandler):
logger.debug("Sending receipt to: %r", remotedomains)
return
for domain in remotedomains:
self.federation.send_edu(
destination=domain,

View File

@@ -79,6 +79,8 @@ class RoomCreationHandler(BaseHandler):
# linearizer to stop two upgrades happening at once
self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
self._next_room_id = 0
@defer.inlineCallbacks
def upgrade_room(self, requester, old_room_id, new_version):
"""Replace a room with a new room with a different version
@@ -741,7 +743,9 @@ class RoomCreationHandler(BaseHandler):
attempts = 0
while attempts < 5:
try:
random_string = stringutils.random_string(18)
i = self._next_room_id
self._next_room_id += 1
random_string = stringutils.random_string(3) + str(i)
gen_room_id = RoomID(
random_string,
self.hs.hostname,

View File

@@ -35,6 +35,7 @@ from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
@@ -211,6 +212,7 @@ class SyncHandler(object):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.builder_factory = hs.get_event_builder_factory()
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
@@ -709,7 +711,6 @@ class SyncHandler(object):
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
members_to_fetch = None
lazy_load_members = sync_config.filter_collection.lazy_load_members()
@@ -858,6 +859,28 @@ class SyncHandler(object):
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
hosts_in_room = yield self.store.get_hosts_in_room(room_id)
destination_states = yield self.store.get_destination_states()
for host in hosts_in_room:
if host not in destination_states:
continue
if ("org.matrix.server_presence", host) in timeline_state:
continue
state[("org.matrix.server_presence", host)] = self.builder_factory.new({
"type": "org.matrix.server_presence",
"content": {
"state": "connected" if destination_states[host] else "disconnected",
},
"state_key": host,
"event_id": random_string(24),
"origin_server_ts": self.clock.time_msec(),
"sender": "@server:server",
"room_id": room_id,
})
defer.returnValue({
(e.type, e.state_key): e
for e in sync_config.filter_collection.filter_room_state(list(state.values()))
@@ -931,10 +954,7 @@ class SyncHandler(object):
newly_joined_rooms, newly_joined_users, _, _ = res
_, _, newly_left_rooms, newly_left_users = res
block_all_presence_data = (
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
block_all_presence_data = True
if self.hs_config.use_presence and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
@@ -1231,10 +1251,7 @@ class SyncHandler(object):
`(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
"""
user_id = sync_result_builder.sync_config.user.to_string()
block_all_room_ephemeral = (
sync_result_builder.since_token is None and
sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
)
block_all_room_ephemeral = True
if block_all_room_ephemeral:
ephemeral_by_room = {}

View File

@@ -86,7 +86,7 @@ class TypingHandler(object):
self._room_typing = {}
def _handle_timeouts(self):
logger.info("Checking for typing timeouts")
# logger.info("Checking for typing timeouts")
now = self.clock.time_msec()
@@ -231,6 +231,7 @@ class TypingHandler(object):
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
return
self.federation.send_edu(
destination=domain,
edu_type="m.typing",

View File

@@ -194,8 +194,11 @@ class _WrappedConnection(object):
# In Twisted >18.4; the TLS connection will be None if it has closed
# which will make abortConnection() throw. Check that the TLS connection
# is not None before trying to close it.
if self.transport.getHandle() is not None:
self.transport.abortConnection()
try:
if self.transport.getHandle() is not None:
self.transport.abortConnection()
except Exception:
logger.warning("Failed to abort connection")
def request(self, request):
self.last_request = time.time()

View File

@@ -18,8 +18,12 @@ import logging
import random
import sys
from io import BytesIO
import os
from six import PY3, string_types
import opentracing
from opentracing.ext import tags
from six import PY3, string_types, iteritems
from six.moves import urllib
import attr
@@ -55,6 +59,7 @@ outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_request
incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
"", ["method", "code"])
USE_PROXY = "SYNAPSE_USE_PROXY" in os.environ
MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
@@ -65,6 +70,18 @@ else:
MAXINT = sys.maxint
class ProxyMatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.reactor = hs.get_reactor()
self.tls_client_options_factory = hs.tls_client_options_factory
def endpointForURI(self, uri):
return matrix_federation_endpoint(
self.reactor, "localhost:8888", timeout=10,
tls_client_options_factory=None
)
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.reactor = hs.get_reactor()
@@ -190,19 +207,36 @@ class MatrixFederationHttpClient(object):
pool.retryAutomatically = False
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
self.agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=pool
if USE_PROXY:
self.agent = Agent.usingEndpointFactory(
reactor, ProxyMatrixFederationEndpointFactory(hs), pool=pool
)
else:
self.agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=pool
)
file_pool = HTTPConnectionPool(reactor)
file_pool.retryAutomatically = False
file_pool.maxPersistentPerHost = 5
file_pool.cachedConnectionTimeout = 10
self.file_agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=file_pool
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.version_string_bytes = hs.version_string.encode('ascii')
self.default_timeout = 60
self.default_timeout = 20
def schedule(x):
reactor.callLater(_EPSILON, x)
self._cooperator = Cooperator(scheduler=schedule)
self.tracer = hs.get_tracer()
@defer.inlineCallbacks
def _send_request(
self,
@@ -211,7 +245,9 @@ class MatrixFederationHttpClient(object):
timeout=None,
long_retries=False,
ignore_backoff=False,
backoff_on_404=False
backoff_on_404=False,
span=None,
agent=None,
):
"""
Sends a request to the given server.
@@ -253,13 +289,13 @@ class MatrixFederationHttpClient(object):
):
raise FederationDeniedError(request.destination)
limiter = yield synapse.util.retryutils.get_retry_limiter(
request.destination,
self.clock,
self._store,
backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
)
# limiter = yield synapse.util.retryutils.get_retry_limiter(
# request.destination,
# self.clock,
# self._store,
# backoff_on_404=backoff_on_404,
# ignore_backoff=ignore_backoff,
# )
method_bytes = request.method.encode("ascii")
destination_bytes = request.destination.encode("ascii")
@@ -274,7 +310,8 @@ class MatrixFederationHttpClient(object):
b"Host": [destination_bytes],
}
with limiter:
# with limiter:
if True:
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
if long_retries:
@@ -320,25 +357,52 @@ class MatrixFederationHttpClient(object):
url_str,
)
# we don't want all the fancy cookie and redirect handling that
# treq.request gives: just use the raw Agent.
request_deferred = self.agent.request(
method_bytes,
url_bytes,
headers=Headers(headers_dict),
bodyProducer=producer,
)
request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
with self.tracer.start_span('request', child_of=span) as child_span:
carrier = {}
opentracing.tracer.inject(
span_context=child_span.context,
format=opentracing.Format.HTTP_HEADERS,
carrier=carrier,
)
for key, value in iteritems(carrier):
headers_dict[key.encode("ascii")] = [value.encode("ascii")]
if not agent:
agent = self.agent
# we don't want all the fancy cookie and redirect handling that
# treq.request gives: just use the raw Agent.
request_deferred = agent.request(
method_bytes,
url_bytes,
headers=Headers(headers_dict),
bodyProducer=producer,
)
request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
child_span.set_tag(tags.HTTP_METHOD, request.method)
child_span.set_tag(tags.HTTP_URL, url_str)
child_span.set_tag(
tags.SPAN_KIND,
tags.SPAN_KIND_RPC_CLIENT,
)
try:
with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
)
except Exception as e:
child_span.set_tag("error", True)
child_span.log_kv({"error": e})
raise
child_span.set_tag(tags.HTTP_STATUS_CODE, response.code)
break
except Exception as e:
@@ -454,7 +518,8 @@ class MatrixFederationHttpClient(object):
json_data_callback=None,
long_retries=False, timeout=None,
ignore_backoff=False,
backoff_on_404=False):
backoff_on_404=False,
span=None):
""" Sends the specifed json data using PUT
Args:
@@ -505,6 +570,7 @@ class MatrixFederationHttpClient(object):
timeout=timeout,
ignore_backoff=ignore_backoff,
backoff_on_404=backoff_on_404,
span=span,
)
body = yield _handle_json_response(
@@ -704,6 +770,7 @@ class MatrixFederationHttpClient(object):
request,
retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
agent=self.file_agent,
)
headers = dict(response.headers.getAllRawHeaders())

View File

@@ -66,6 +66,9 @@ REQUIREMENTS = {
# we use attr.s(slots), which arrived in 16.0.0
"attrs>=16.0.0": ["attr>=16.0.0"],
"netaddr>=0.7.18": ["netaddr"],
"jaeger_client": ["jaeger_client"],
"opentracing<2,>=1.2.2": ["opentracing"],
}
CONDITIONAL_REQUIREMENTS = {

View File

@@ -32,6 +32,7 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.types import UserID, create_requester
from synapse.util.stringutils import random_string
from .base import ClientV1RestServlet, client_path_patterns
@@ -740,6 +741,114 @@ class SearchUsersRestServlet(ClientV1RestServlet):
defer.returnValue((200, ret))
class ServerHealth(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/server_health")
def __init__(self, hs):
super(ServerHealth, self).__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore()
self.builder_factory = hs.get_event_builder_factory()
self.clock = hs.get_clock()
def on_GET(self, request):
return self.do_update()
def on_POST(self, request):
return self.do_update()
@defer.inlineCallbacks
def do_update(self):
hosts = yield self.store.get_all_destination_healths()
up_servers = set(h for h, c in hosts.items() if c is not None)
down_servers = set(h for h, c in hosts.items() if c is None)
rooms_to_hosts = yield self.store.get_all_hosts_and_room()
requester = create_requester(UserID("server", "server")),
state = yield self.store.get_destination_states()
new_up = set()
new_down = set()
for host in up_servers:
if state.get(host, True):
continue
new_up.add(host)
yield self.store.store_destination_state(host, True)
for host in down_servers:
if not state.get(host, True):
continue
new_down.add(host)
yield self.store.store_destination_state(host, False)
for room_id, hosts in rooms_to_hosts.items():
for host in hosts:
if host in new_up:
new_state = "connected"
elif host in new_down:
new_state = "disconnected"
else:
continue
logger.info("Marking %s as %r", host, new_state)
builder = self.builder_factory.new({
"type": "org.matrix.server_presence",
"content": {
"state": new_state,
},
"state_key": host,
"event_id": random_string(24),
"origin_server_ts": self.clock.time_msec(),
"sender": "@server:server",
"room_id": room_id,
})
event, context = yield (
self.event_creation_handler.create_new_client_event(
builder=builder,
)
)
event.internal_metadata.internal_event = True
yield self.event_creation_handler.handle_new_client_event(
requester,
event,
context,
ratelimit=False,
extra_users=[],
do_auth=False,
)
defer.returnValue((200, {}))
class ForceThreadServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/force_thread")
def __init__(self, hs):
super(ForceThreadServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler
self.clock = hs.get_clock()
def on_GET(self, request):
return self.do_force_thread()
def on_POST(self, request):
return self.do_force_thread()
@defer.inlineCallbacks
def do_force_thread(self):
yield self.clock.sleep(0)
self.federation_handler.force_thread_ts = self.clock.time_msec()
defer.returnValue((200, {}))
def register_servlets(hs, http_server):
WhoisRestServlet(hs).register(http_server)
PurgeMediaCacheRestServlet(hs).register(http_server)
@@ -754,3 +863,5 @@ def register_servlets(hs, http_server):
QuarantineMediaInRoom(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
UserRegisterServlet(hs).register(http_server)
ServerHealth(hs).register(http_server)
ForceThreadServlet(hs).register(http_server)

View File

@@ -471,6 +471,11 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None
logger.info("filter_bytes: %s", filter_bytes)
logger.info("Event filter: %s", event_filter)
if event_filter:
logger.info("Event filter: %s", event_filter.thread_id)
msgs = yield self.pagination_handler.get_messages(
room_id=room_id,
requester=requester,
@@ -825,6 +830,30 @@ class JoinedRoomsRestServlet(ClientV1RestServlet):
defer.returnValue((200, {"joined_rooms": list(room_ids)}))
class TimestampLookupRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/timestamp_to_event$")
def __init__(self, hs):
super(TimestampLookupRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
yield self.auth.check_joined_room(room_id, requester.user.to_string())
timestamp = parse_integer(request, "ts")
thread_id = parse_integer(request, "thread_id", 0)
event_id = yield self.store.get_event_for_timestamp(
room_id, thread_id, timestamp,
)
defer.returnValue((200, {
"event_id": event_id,
}))
def register_txn_path(servlet, regex_string, http_server, with_get=False):
"""Registers a transaction-based path.
@@ -874,6 +903,7 @@ def register_servlets(hs, http_server):
JoinedRoomsRestServlet(hs).register(http_server)
RoomEventServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server)
TimestampLookupRestServlet(hs).register(http_server)
def register_deprecated_servlets(hs, http_server):

View File

@@ -118,6 +118,8 @@ class SyncRestServlet(RestServlet):
)
)
exclude_threaded = b"exclude_threaded" in request.args
request_key = (user, timeout, since, filter_id, full_state, device_id)
if filter_id:
@@ -169,13 +171,14 @@ class SyncRestServlet(RestServlet):
time_now = self.clock.time_msec()
response_content = self.encode_response(
time_now, sync_result, requester.access_token_id, filter
time_now, sync_result, requester.access_token_id, filter,
exclude_threaded=exclude_threaded,
)
defer.returnValue((200, response_content))
@staticmethod
def encode_response(time_now, sync_result, access_token_id, filter):
def encode_response(time_now, sync_result, access_token_id, filter, exclude_threaded):
if filter.event_format == 'client':
event_formatter = format_event_for_client_v2_without_room_id
elif filter.event_format == 'federation':
@@ -187,6 +190,7 @@ class SyncRestServlet(RestServlet):
sync_result.joined, time_now, access_token_id,
filter.event_fields,
event_formatter,
exclude_threaded=exclude_threaded,
)
invited = SyncRestServlet.encode_invited(
@@ -240,7 +244,8 @@ class SyncRestServlet(RestServlet):
}
@staticmethod
def encode_joined(rooms, time_now, token_id, event_fields, event_formatter):
def encode_joined(rooms, time_now, token_id, event_fields, event_formatter,
exclude_threaded):
"""
Encode the joined rooms in a sync result
@@ -263,7 +268,7 @@ class SyncRestServlet(RestServlet):
for room in rooms:
joined[room.room_id] = SyncRestServlet.encode_room(
room, time_now, token_id, joined=True, only_fields=event_fields,
event_formatter=event_formatter,
event_formatter=event_formatter, exclude_threaded=exclude_threaded,
)
return joined
@@ -330,6 +335,7 @@ class SyncRestServlet(RestServlet):
room, time_now, token_id, joined=False,
only_fields=event_fields,
event_formatter=event_formatter,
exclude_threaded=False,
)
return joined
@@ -337,7 +343,7 @@ class SyncRestServlet(RestServlet):
@staticmethod
def encode_room(
room, time_now, token_id, joined,
only_fields, event_formatter,
only_fields, event_formatter, exclude_threaded,
):
"""
Args:
@@ -377,7 +383,19 @@ class SyncRestServlet(RestServlet):
)
serialized_state = [serialize(e) for e in state_events]
serialized_timeline = [serialize(e) for e in timeline_events]
if exclude_threaded:
serialized_timeline = []
for e in reversed(timeline_events):
thread_id = e.internal_metadata.thread_id
if thread_id != 0:
pass
else:
serialized_timeline.append(serialize(e))
serialized_timeline.reverse()
else:
serialized_timeline = [serialize(e) for e in timeline_events]
account_data = room.account_data

View File

@@ -21,6 +21,9 @@
# Imports required for the default HomeServer() implementation
import abc
import logging
import os
from jaeger_client import Config
from twisted.enterprise import adbapi
from twisted.mail.smtp import sendmail
@@ -176,6 +179,7 @@ class HomeServer(object):
'pagination_handler',
'room_context_handler',
'sendmail',
'tracer',
]
# This is overridden in derived application classes
@@ -472,6 +476,39 @@ class HomeServer(object):
def build_room_context_handler(self):
return RoomContextHandler(self)
def build_tracer(self):
# TODO: Make optional
jaeger_host = os.environ.get("SYNAPSE_JAEGER_HOST", None)
if jaeger_host:
config_dict = {
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
'local_agent': {
'reporting_host': jaeger_host,
},
}
else:
config_dict = {
'sampler': {
'type': 'const',
'param': 0,
},
'logging': True,
}
config = Config(
config=config_dict,
service_name="synapse-" + self.config.server_name,
validate=True,
)
# this call also sets opentracing.tracer
tracer = config.initialize_tracer()
return tracer
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)

View File

@@ -178,7 +178,7 @@ class StateHandler(object):
defer.returnValue(joined_hosts)
@defer.inlineCallbacks
def compute_event_context(self, event, old_state=None):
def compute_event_context(self, event, old_state=None, thread_id=0):
"""Build an EventContext structure for the event.
This works out what the current state should be for the event, and
@@ -194,6 +194,8 @@ class StateHandler(object):
synapse.events.snapshot.EventContext:
"""
event.internal_metadata.thread_id = thread_id
if event.internal_metadata.is_outlier():
# If this is an outlier, then we know it shouldn't have any current
# state. Certainly store.get_current_state won't return any, and
@@ -215,6 +217,7 @@ class StateHandler(object):
# We don't store state for outliers, so we don't generate a state
# group for it.
context = EventContext.with_state(
thread_id=0, # outlier, don't care
state_group=None,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
@@ -251,6 +254,7 @@ class StateHandler(object):
)
context = EventContext.with_state(
thread_id=thread_id,
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
@@ -319,6 +323,7 @@ class StateHandler(object):
state_group = entry.state_group
context = EventContext.with_state(
thread_id=thread_id,
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,

View File

@@ -34,6 +34,24 @@ logger = logging.getLogger(__name__)
class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
SQLBaseStore):
def get_threads_for_backfill_event(self, event_id):
def _get_thread_for_backfill_event_txn(txn):
sql = """
SELECT thread_id
FROM event_edges
INNER JOIN events USING (event_id)
WHERE prev_event_id = ?
"""
txn.execute(sql, (event_id,))
return [thread_id for thread_id, in txn]
return self.runInteraction(
"get_thread_for_backfill_event",
_get_thread_for_backfill_event_txn,
)
def get_auth_chain(self, event_ids, include_given=False):
"""Get auth events for given event_ids. The events *must* be state events.

View File

@@ -402,6 +402,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# No change in extremities, so no change in state
continue
logger.info(
"Forward extremities for %s: %s -> %s",
room_id, latest_event_ids, new_latest_event_ids,
)
logger.info("Events: %s", [e.event_id for e, _ in ev_ctx_rm])
# there should always be at least one forward extremity.
# (except during the initial persistence of the send_join
# results, in which case there will be no existing
@@ -537,6 +543,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
new_events = [
event for event, ctx in event_contexts
if not event.internal_metadata.is_outlier() and not ctx.rejected
and not event.internal_metadata.is_internal_event()
]
# start with the existing forward extremities
@@ -583,6 +590,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
prev_event_id IN (%s)
AND NOT events.outlier
AND rejections.event_id IS NULL
AND NOT events.internal_event
""" % (
",".join("?" for _ in batch),
)
@@ -1282,8 +1290,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
"url" in event.content
and isinstance(event.content["url"], text_type)
),
"thread_id": ctx.thread_id,
"internal_event": event.internal_metadata.is_internal_event(),
}
for event, _ in events_and_contexts
for event, ctx in events_and_contexts
],
)

View File

@@ -352,7 +352,7 @@ class EventsWorkerStore(SQLBaseStore):
run_in_background(
self._get_event_from_row,
row["internal_metadata"], row["json"], row["redacts"],
rejected_reason=row["rejects"],
rejected_reason=row["rejects"], thread_id=row["thread_id"],
)
for row in rows
],
@@ -378,8 +378,10 @@ class EventsWorkerStore(SQLBaseStore):
" e.internal_metadata,"
" e.json,"
" r.redacts as redacts,"
" rej.event_id as rejects "
" rej.event_id as rejects, "
" ev.thread_id as thread_id"
" FROM event_json as e"
" INNER JOIN events as ev USING (event_id)"
" LEFT JOIN rejections as rej USING (event_id)"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
" WHERE e.event_id IN (%s)"
@@ -392,7 +394,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
rejected_reason=None):
thread_id, rejected_reason=None):
with Measure(self._clock, "_get_event_from_row"):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@@ -526,3 +528,43 @@ class EventsWorkerStore(SQLBaseStore):
return res
return self.runInteraction("get_rejection_reasons", f)
def get_event_for_timestamp(self, room_id, thread_id, timestamp):
sql_template = """
SELECT event_id, origin_server_ts FROM events
WHERE
origin_server_ts %s ?
AND room_id = ?
AND thread_id = ?
ORDER BY origin_server_ts
LIMIT 1;
"""
def f(txn):
txn.execute(sql_template % ("<=",), (timestamp, room_id, thread_id))
row = txn.fetchone()
if row:
event_id_before, ts_before = row
else:
event_id_before, ts_before = None, None
txn.execute(sql_template % (">=",), (timestamp, room_id, thread_id))
row = txn.fetchone()
if row:
event_id_after, ts_after = row
else:
event_id_after, ts_after = None, None
if event_id_before and event_id_after:
# Return the closest one
if (timestamp - ts_before) < (ts_after - timestamp):
return event_id_before
else:
return event_id_after
if event_id_before:
return event_id_before
return event_id_after
return self.runInteraction("get_event_for_timestamp", f)

View File

@@ -406,11 +406,6 @@ class RegistrationStore(RegistrationWorkerStore,
)
tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
for token, _, _ in tokens_and_devices:
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (token,)
)
txn.execute(
"DELETE FROM access_tokens WHERE %s" % where_clause,
values
@@ -432,10 +427,6 @@ class RegistrationStore(RegistrationWorkerStore,
},
)
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (access_token,)
)
return self.runInteraction("delete_access_token", f)
@cachedInlineCallbacks()

View File

@@ -88,6 +88,23 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return [to_ascii(r[0]) for r in txn]
return self.runInteraction("get_users_in_room", f)
def get_all_hosts_and_room(self):
def f(txn):
sql = """
SELECT DISTINCT room_id, regexp_replace(state_key, '^[^:]*:', '') AS host
FROM current_state_events
INNER JOIN room_memberships USING (event_id, room_id)
WHERE
type = 'm.room.member' AND membership = 'join'
"""
txn.execute(sql)
results = {}
for r in txn:
results.setdefault(to_ascii(r[0]), set()).add(to_ascii(r[1]))
return results
return self.runInteraction("get_users_in_room", f)
@cached(max_entries=100000)
def get_room_summary(self, room_id):
""" Get the details of a room roughly suitable for use by the room

View File

@@ -0,0 +1,16 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE events ADD COLUMN IF NOT EXISTS internal_event BOOLEAN NOT NULL DEFAULT false;

View File

@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS destination_health(
destination TEXT NOT NULL PRIMARY KEY,
cost DOUBLE PRECISION
);

View File

@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS destination_state(
destination TEXT NOT NULL PRIMARY KEY,
connected BOOLEAN NOT NULL
);

View File

@@ -0,0 +1,23 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE events ADD COLUMN IF NOT EXISTS thread_id BIGINT NOT NULL DEFAULT 0;
CREATE INDEX IF NOT EXISTS events_room_idx ON events (room_id, thread_id);
-- CREATE SEQUENCE thread_id_seq;
CREATE INDEX IF NOT EXISTS event_room_thread_ts ON events (room_id, thread_id, origin_server_ts);

View File

@@ -149,7 +149,13 @@ def filter_to_clause(event_filter):
clauses.append("contains_url = ?")
args.append(event_filter.contains_url)
return " AND ".join(clauses), args
if event_filter.thread_id is not None:
clauses.append("thread_id = ?")
args.append(event_filter.thread_id)
filter = " AND ".join(clauses), args
return filter
class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@@ -808,7 +814,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
rows, token = yield self.runInteraction(
"paginate_room_events", self._paginate_room_events_txn,
room_id, from_key, to_key, direction, limit, event_filter,
room_id, from_key, to_key, direction, limit,
event_filter=event_filter,
)
events = yield self._get_events(