Compare commits
693 Commits
matthew/st
...
hhs-2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
897d976c1e | ||
|
|
45fc0ead10 | ||
|
|
cdd24449ee | ||
|
|
14d49c51db | ||
|
|
84b4e76fed | ||
|
|
c780d84d66 | ||
|
|
1d67b13674 | ||
|
|
92d50e3c2a | ||
|
|
e94cdbaecf | ||
|
|
9b1bc593c5 | ||
|
|
7f147d623b | ||
|
|
15e8dd2ccc | ||
|
|
05fe8a6c1c | ||
|
|
f584d6108f | ||
|
|
28d7b546cb | ||
|
|
f2cbbda956 | ||
|
|
cd77270a66 | ||
|
|
242a0483eb | ||
|
|
60cf1c6e16 | ||
|
|
7e6e588e60 | ||
|
|
1ca2744621 | ||
|
|
dddb5aa7bb | ||
|
|
4eb8408ed2 | ||
|
|
c5842dff1a | ||
|
|
7a0da69eee | ||
|
|
a5806aba27 | ||
|
|
fd2dbf1836 | ||
|
|
9643a6f7f2 | ||
|
|
c7181dcc6c | ||
|
|
74854a9719 | ||
|
|
48fec67536 | ||
|
|
3504982cb7 | ||
|
|
4e5a4549b6 | ||
|
|
9b7d9d8ba0 | ||
|
|
db10f553ba | ||
|
|
764030cf63 | ||
|
|
8432e2ebd7 | ||
|
|
a81f140880 | ||
|
|
47b25ba5f3 | ||
|
|
3bf8bab8f9 | ||
|
|
a4cf660a32 | ||
|
|
0d568ff403 | ||
|
|
d7585a4c83 | ||
|
|
365f588d98 | ||
|
|
eb7be75a10 | ||
|
|
dd0ac1614c | ||
|
|
bb81e78ec6 | ||
|
|
afb4b490a4 | ||
|
|
f7bf181a90 | ||
|
|
f7baff6f7b | ||
|
|
a52f276990 | ||
|
|
46c832eaac | ||
|
|
2b1a4b2596 | ||
|
|
cd6937fb26 | ||
|
|
c2c153dd3b | ||
|
|
79d3b4689e | ||
|
|
808d8e06aa | ||
|
|
3f6762f0bb | ||
|
|
3b5b64ac99 | ||
|
|
23d7e63a4a | ||
|
|
8bd585b09b | ||
|
|
012d612f9d | ||
|
|
f89f6b7c09 | ||
|
|
be6527325a | ||
|
|
55e6bdf287 | ||
|
|
e2c0aa2c26 | ||
|
|
b01a755498 | ||
|
|
1058d14127 | ||
|
|
80bf7d3580 | ||
|
|
9a2f960736 | ||
|
|
324525f40c | ||
|
|
4d664278af | ||
|
|
8dee601054 | ||
|
|
e21c368b8b | ||
|
|
cf6f9a8b53 | ||
|
|
48a910e128 | ||
|
|
f2a48d87df | ||
|
|
2aa7cc6a46 | ||
|
|
e07970165f | ||
|
|
c5171bf171 | ||
|
|
ba1fbf7d5b | ||
|
|
3cef867cc1 | ||
|
|
c144252a8c | ||
|
|
c334ca67bb | ||
|
|
04f5d2db62 | ||
|
|
ab822a2d1f | ||
|
|
91cdb6de08 | ||
|
|
d49b77404b | ||
|
|
63260397c6 | ||
|
|
3f8709ffe4 | ||
|
|
3ee57bdcbb | ||
|
|
4c22b4047b | ||
|
|
782689bd40 | ||
|
|
ca87ad1def | ||
|
|
b5f638f1f4 | ||
|
|
9fd161c6fb | ||
|
|
0195dfbf52 | ||
|
|
69c49d3fa3 | ||
|
|
a2d872e7b3 | ||
|
|
fa27073b14 | ||
|
|
38f708a2bb | ||
|
|
73737bd0f9 | ||
|
|
3b2dcfff78 | ||
|
|
521d369e7a | ||
|
|
b99a0f3941 | ||
|
|
a8ffc27db7 | ||
|
|
4c9da1440f | ||
|
|
d9efd87d55 | ||
|
|
0e8d78f6aa | ||
|
|
66f7dc8c87 | ||
|
|
7e51342196 | ||
|
|
bcfeb44afe | ||
|
|
372bf073c1 | ||
|
|
7edd11623d | ||
|
|
13ad9930c8 | ||
|
|
51b17ec566 | ||
|
|
3c1080b6e4 | ||
|
|
eff3ae3b9a | ||
|
|
b4d6db5c4a | ||
|
|
aae86a81ef | ||
|
|
7d5b1a60a3 | ||
|
|
bfb6c58624 | ||
|
|
a675f9c556 | ||
|
|
c151b32b1d | ||
|
|
762a758fea | ||
|
|
25d2b5d55f | ||
|
|
df1e4f259f | ||
|
|
c055c91655 | ||
|
|
3f543dc021 | ||
|
|
859989f958 | ||
|
|
8cfad2e686 | ||
|
|
81d727efa9 | ||
|
|
da7fe43899 | ||
|
|
e2e846628d | ||
|
|
2f78f432c4 | ||
|
|
fc5d937550 | ||
|
|
86a00e05e1 | ||
|
|
d82fa0e9a6 | ||
|
|
a87af25fbb | ||
|
|
eabc5f8271 | ||
|
|
c24fc9797b | ||
|
|
afcd655ab6 | ||
|
|
dc56c47dc0 | ||
|
|
4601129c44 | ||
|
|
ef184caf30 | ||
|
|
488ffe6fdb | ||
|
|
773db62a22 | ||
|
|
39176f27f7 | ||
|
|
2de813a611 | ||
|
|
eaaa2248ff | ||
|
|
87a824bad1 | ||
|
|
c4eb97518f | ||
|
|
55afba0fc5 | ||
|
|
75c663c7b9 | ||
|
|
1c5e690a6b | ||
|
|
fef2e65d12 | ||
|
|
596ce63576 | ||
|
|
b8429c7c81 | ||
|
|
ab035bdeac | ||
|
|
19b433e3f4 | ||
|
|
b586b8b986 | ||
|
|
70e48cbbb1 | ||
|
|
aa3220df6a | ||
|
|
7277216d01 | ||
|
|
cd0c749c4f | ||
|
|
9ecbaf8ba8 | ||
|
|
1522ed9c07 | ||
|
|
629f390035 | ||
|
|
e5962f845c | ||
|
|
e7d091fb86 | ||
|
|
414d54b61a | ||
|
|
2545993ce4 | ||
|
|
06b331ff40 | ||
|
|
8f9a7eb58d | ||
|
|
c74c71128d | ||
|
|
ed4bc3d2fc | ||
|
|
bd92c8eaa7 | ||
|
|
99ebaed8e6 | ||
|
|
9b5bf3d858 | ||
|
|
e25d87d97b | ||
|
|
e2c9fe0a6a | ||
|
|
614e6d517d | ||
|
|
0bdc362598 | ||
|
|
591bf87c6a | ||
|
|
bdfbd934d6 | ||
|
|
9b75c78b4d | ||
|
|
8f0c430ca4 | ||
|
|
63417c31e9 | ||
|
|
1f24d8681b | ||
|
|
f4b49152e2 | ||
|
|
b6d55588e1 | ||
|
|
8ba2dac6ea | ||
|
|
50bcaf1a27 | ||
|
|
cd32c19a60 | ||
|
|
34f51babc5 | ||
|
|
8226e5597a | ||
|
|
ce7de9ae6b | ||
|
|
0d43f991a1 | ||
|
|
99dd975dae | ||
|
|
ac205a54b2 | ||
|
|
31fa743567 | ||
|
|
c79c4c0a7d | ||
|
|
6c6aba76e1 | ||
|
|
a001038b92 | ||
|
|
807449d8f2 | ||
|
|
c31793a784 | ||
|
|
c08f9d95b2 | ||
|
|
dd16e7dfcc | ||
|
|
8b3d9b6b19 | ||
|
|
b37c472419 | ||
|
|
c75b71a397 | ||
|
|
3c0213a217 | ||
|
|
178ab76ac0 | ||
|
|
638d35ef08 | ||
|
|
01021c812f | ||
|
|
2e9c73e8ca | ||
|
|
64899341dc | ||
|
|
885ea9c602 | ||
|
|
c1f9dec92a | ||
|
|
04df714259 | ||
|
|
09cf130898 | ||
|
|
5075e444f4 | ||
|
|
3e19beb941 | ||
|
|
bb99b1f550 | ||
|
|
ce6db0e547 | ||
|
|
152c0aa58e | ||
|
|
119451dcd1 | ||
|
|
c6b28fb479 | ||
|
|
d967653705 | ||
|
|
69ce057ea6 | ||
|
|
3dce9050cf | ||
|
|
0ad98e38d0 | ||
|
|
a5ef110749 | ||
|
|
a6c813761a | ||
|
|
54a9bea88c | ||
|
|
5c6226707d | ||
|
|
8876ce7f77 | ||
|
|
b179537f2a | ||
|
|
72d1902bbe | ||
|
|
984376745b | ||
|
|
67dbe4c899 | ||
|
|
dafe90a6be | ||
|
|
3d6f9fba19 | ||
|
|
484a0ebdfc | ||
|
|
5785b93711 | ||
|
|
bf7598f582 | ||
|
|
2bdafaf3c1 | ||
|
|
62564797f5 | ||
|
|
2511f3f8a0 | ||
|
|
bb89c84614 | ||
|
|
d5c0ce4cad | ||
|
|
e92fb00f32 | ||
|
|
839a317c96 | ||
|
|
5298d79fb5 | ||
|
|
8521ae13e3 | ||
|
|
d2f3ef98ac | ||
|
|
54685d294d | ||
|
|
cc187debf3 | ||
|
|
2b5baebeba | ||
|
|
be59910b93 | ||
|
|
990fe9fc23 | ||
|
|
312ae74746 | ||
|
|
d92675a486 | ||
|
|
360ba89c50 | ||
|
|
7f3d897e7a | ||
|
|
bebe325e6c | ||
|
|
5011417632 | ||
|
|
e6d73b8582 | ||
|
|
bab94da79c | ||
|
|
53bca4690b | ||
|
|
ef3589063a | ||
|
|
e8eba2b4e3 | ||
|
|
e5d2c67844 | ||
|
|
3523f5432a | ||
|
|
9b92720d88 | ||
|
|
23cd86554e | ||
|
|
848431be1d | ||
|
|
cf8ef11f35 | ||
|
|
3b9662339b | ||
|
|
f81f421086 | ||
|
|
cd9765805e | ||
|
|
495cb100d1 | ||
|
|
865d07cd38 | ||
|
|
ca9bc1f4fe | ||
|
|
a74b25faaa | ||
|
|
fbe255f9a4 | ||
|
|
7daa8a78c5 | ||
|
|
89834d9a29 | ||
|
|
e54794f5b6 | ||
|
|
7bcf126b18 | ||
|
|
1911c037cb | ||
|
|
33bd07d062 | ||
|
|
16d78be315 | ||
|
|
7f3f108561 | ||
|
|
19a17068f1 | ||
|
|
96a9a29645 | ||
|
|
62ace05c45 | ||
|
|
1e2bed9656 | ||
|
|
a3f5bf79a0 | ||
|
|
e26dbd82ef | ||
|
|
051a99c400 | ||
|
|
f900d50824 | ||
|
|
42c6823827 | ||
|
|
e40a510fbf | ||
|
|
d08296f9f2 | ||
|
|
886be75ad1 | ||
|
|
16d9701892 | ||
|
|
e10830e976 | ||
|
|
3777fa26aa | ||
|
|
0d63d93ca8 | ||
|
|
0ca459ea33 | ||
|
|
15c1ae45e5 | ||
|
|
5593ff6773 | ||
|
|
b2aab04d2c | ||
|
|
da90337d89 | ||
|
|
950807d93a | ||
|
|
df7f9871c1 | ||
|
|
897c51d274 | ||
|
|
cb298ff623 | ||
|
|
42960aa047 | ||
|
|
c3f596180f | ||
|
|
637b11b9ed | ||
|
|
feacd13932 | ||
|
|
c0affa7b4f | ||
|
|
5e2f7b8084 | ||
|
|
4ecb4bdac9 | ||
|
|
8a86d7ea96 | ||
|
|
70baa61e95 | ||
|
|
4c4a123bd0 | ||
|
|
489949879e | ||
|
|
1758f4e1c7 | ||
|
|
26a37f3d4d | ||
|
|
0d25724419 | ||
|
|
1fa98495d0 | ||
|
|
bdae8f2e68 | ||
|
|
74b1d46ad9 | ||
|
|
9180061b49 | ||
|
|
704c3e6239 | ||
|
|
43ecfe0b10 | ||
|
|
c2a83349f0 | ||
|
|
db1f33fb36 | ||
|
|
14a4e7d5a4 | ||
|
|
50d9d97408 | ||
|
|
8cefc690c9 | ||
|
|
0bf5ec0db7 | ||
|
|
a937497cf5 | ||
|
|
a013404292 | ||
|
|
14fa9d4d92 | ||
|
|
c4ffbecb68 | ||
|
|
0a65450d04 | ||
|
|
00f99f74b1 | ||
|
|
4a6725d9d1 | ||
|
|
165e067033 | ||
|
|
40c1c59cf4 | ||
|
|
08281fe6b7 | ||
|
|
c21d82bab3 | ||
|
|
ec716a35b2 | ||
|
|
d766f26de9 | ||
|
|
085435e13a | ||
|
|
b8d7d3996b | ||
|
|
908be65e64 | ||
|
|
b7f203a566 | ||
|
|
7ff44d9215 | ||
|
|
38b98e5a98 | ||
|
|
01e93f48ed | ||
|
|
018d75a148 | ||
|
|
fa7dc889f1 | ||
|
|
c82ccd3027 | ||
|
|
da7785147d | ||
|
|
c480c4c962 | ||
|
|
6eed16d8a2 | ||
|
|
303f1c851f | ||
|
|
a6d7b74915 | ||
|
|
4b256b9271 | ||
|
|
4e5ac901dd | ||
|
|
f9f5559971 | ||
|
|
4e6e00152c | ||
|
|
0aba3d361a | ||
|
|
2c54f1c225 | ||
|
|
c4842e16cb | ||
|
|
6e63d6868c | ||
|
|
f49147d14f | ||
|
|
cab782c17e | ||
|
|
6023cdd227 | ||
|
|
7931393495 | ||
|
|
c507fa15ce | ||
|
|
37be52ac34 | ||
|
|
70af98e361 | ||
|
|
5e2ee64660 | ||
|
|
1841672c85 | ||
|
|
6ef983ce5c | ||
|
|
bdbdceeafa | ||
|
|
16bd63f32f | ||
|
|
443da003bc | ||
|
|
729b672823 | ||
|
|
d81602b75a | ||
|
|
5de936caa1 | ||
|
|
5bb39b1e0c | ||
|
|
df2235e7fa | ||
|
|
0bc9b9e397 | ||
|
|
7d05406a07 | ||
|
|
82977477e3 | ||
|
|
9c14c2b561 | ||
|
|
6aab397ada | ||
|
|
52384f2ee5 | ||
|
|
e908b86832 | ||
|
|
254e8267e2 | ||
|
|
21276ff846 | ||
|
|
fef7e58ac6 | ||
|
|
cefac79c10 | ||
|
|
9b13817e06 | ||
|
|
251e6c1210 | ||
|
|
143f1a2532 | ||
|
|
2903e65aff | ||
|
|
a8cbce0ced | ||
|
|
e9b2d047f6 | ||
|
|
ad7cd95a78 | ||
|
|
f102c05856 | ||
|
|
8e3f75b39a | ||
|
|
b0b5566f36 | ||
|
|
781c2bd21a | ||
|
|
7041cd872b | ||
|
|
d42455cdc9 | ||
|
|
65c8dee900 | ||
|
|
a75231b507 | ||
|
|
9e68b1bd2d | ||
|
|
49254d43a6 | ||
|
|
7d32f0d745 | ||
|
|
ef9d51b081 | ||
|
|
85531a06a2 | ||
|
|
51d7df1915 | ||
|
|
5c1d301fd9 | ||
|
|
cf78eaebad | ||
|
|
bd4b25f4d0 | ||
|
|
1b4d73fa52 | ||
|
|
a15ed52267 | ||
|
|
21e878ebb6 | ||
|
|
03751a6420 | ||
|
|
1bcd0490c2 | ||
|
|
bc7944e6d2 | ||
|
|
a4fe9d2d36 | ||
|
|
6185650f9c | ||
|
|
d8e65ed7e1 | ||
|
|
2565804030 | ||
|
|
0620d27f4d | ||
|
|
0a7ee0ab8b | ||
|
|
7d9fb88617 | ||
|
|
78a691d005 | ||
|
|
3849f7f69f | ||
|
|
cee1ae1b72 | ||
|
|
32b30e15f2 | ||
|
|
4081bd1560 | ||
|
|
1bfb5bed1d | ||
|
|
7780a7b47c | ||
|
|
1be94440d3 | ||
|
|
07defd5fe6 | ||
|
|
9c237a7ab5 | ||
|
|
55acd6856c | ||
|
|
f59be4eb0e | ||
|
|
a297ff2b16 | ||
|
|
3f11d84534 | ||
|
|
371da42ae4 | ||
|
|
0b300d323a | ||
|
|
ec56121b0d | ||
|
|
cb5c37a57c | ||
|
|
38eaa5280d | ||
|
|
1e5dbdcbb1 | ||
|
|
1674a85238 | ||
|
|
87951d3891 | ||
|
|
f14c866e37 | ||
|
|
8b8c4f34a3 | ||
|
|
3188973857 | ||
|
|
60a1d147a7 | ||
|
|
709c309b0e | ||
|
|
8f65ab98d2 | ||
|
|
ed0dd68731 | ||
|
|
f33c596533 | ||
|
|
811ac73a42 | ||
|
|
a79410e7b8 | ||
|
|
f7b133fc26 | ||
|
|
81946db9cf | ||
|
|
a321f78991 | ||
|
|
93b0722c50 | ||
|
|
454f59b7ad | ||
|
|
1a01a5b964 | ||
|
|
223341205e | ||
|
|
e22700c3dd | ||
|
|
7417951117 | ||
|
|
eb1d911ab7 | ||
|
|
0a8e4f3af9 | ||
|
|
d19fba3655 | ||
|
|
cd241d6bda | ||
|
|
30bfed5aa5 | ||
|
|
97acd385a3 | ||
|
|
0fa73e4a63 | ||
|
|
2581eb3e1d | ||
|
|
69292de6cc | ||
|
|
ff5426f6b8 | ||
|
|
a678145010 | ||
|
|
d436ad332c | ||
|
|
2601ee28bd | ||
|
|
536bc63a4e | ||
|
|
cf2d15c6a9 | ||
|
|
c6e66821a9 | ||
|
|
8dff6e0322 | ||
|
|
30957a941a | ||
|
|
69fb5dbdab | ||
|
|
1938cffaea | ||
|
|
efcdacad7d | ||
|
|
004a83b43a | ||
|
|
d8709df739 | ||
|
|
ce0c18dec5 | ||
|
|
c1f80effbe | ||
|
|
adfe29ec0b | ||
|
|
254fb430d1 | ||
|
|
cc99256e90 | ||
|
|
5c705f70c9 | ||
|
|
f559119de0 | ||
|
|
8b9f164fff | ||
|
|
2d5bba151b | ||
|
|
50c60e5fad | ||
|
|
4f5cc8e4e7 | ||
|
|
dae6dc1e77 | ||
|
|
a646bdc670 | ||
|
|
9f41ad491d | ||
|
|
0faa3223cd | ||
|
|
37e87611bc | ||
|
|
a4d24781bf | ||
|
|
999bcf9d01 | ||
|
|
9c294ea864 | ||
|
|
4797ed000e | ||
|
|
726a0b1e64 | ||
|
|
f0a1b8e4cd | ||
|
|
8fbe418777 | ||
|
|
0b0b24cb82 | ||
|
|
4fc52b1037 | ||
|
|
f3182bb1d0 | ||
|
|
027bc01a1b | ||
|
|
e42510ba63 | ||
|
|
440b8845b5 | ||
|
|
842cdece42 | ||
|
|
959f4b9074 | ||
|
|
acbfdc3442 | ||
|
|
354a99c968 | ||
|
|
9b34f3ea3a | ||
|
|
c1bf2b587e | ||
|
|
3132b89f12 | ||
|
|
5c88bb722f | ||
|
|
0ecf68aedc | ||
|
|
ff48ab8527 | ||
|
|
5c30cb709a | ||
|
|
3d6df84658 | ||
|
|
4f67623674 | ||
|
|
e1a237eaab | ||
|
|
683f4058c1 | ||
|
|
7c712f95bb | ||
|
|
8462c26485 | ||
|
|
d7275eecf3 | ||
|
|
650daf5628 | ||
|
|
1fa4f7e03e | ||
|
|
2f558300cc | ||
|
|
bcaec2915a | ||
|
|
924eb34d94 | ||
|
|
7044af3298 | ||
|
|
5f3658baf5 | ||
|
|
fd9b08873c | ||
|
|
11d592290c | ||
|
|
87086ac69d | ||
|
|
7814e4cc86 | ||
|
|
6284f579bf | ||
|
|
7cf76c9a09 | ||
|
|
37af0d2a13 | ||
|
|
252f80094c | ||
|
|
9a8acdc720 | ||
|
|
d69decd5c7 | ||
|
|
38f53399a2 | ||
|
|
13d501c773 | ||
|
|
ce0545eca1 | ||
|
|
95ccb6e2ec | ||
|
|
ba6477feac | ||
|
|
be3adfc331 | ||
|
|
9e40834f74 | ||
|
|
f1a15ea206 | ||
|
|
1ffb7bec20 | ||
|
|
2de3d994f3 | ||
|
|
c754e006f4 | ||
|
|
a97c845271 | ||
|
|
c0685f67c0 | ||
|
|
18a2b2c0b4 | ||
|
|
00bc979137 | ||
|
|
f1dd89fe86 | ||
|
|
6f62a6ef21 | ||
|
|
77091d7e8e | ||
|
|
eed24893fa | ||
|
|
3f9e649f17 | ||
|
|
8c69b735e3 | ||
|
|
08436c556a | ||
|
|
667fba68f3 | ||
|
|
3a993a660d | ||
|
|
9b596177ae | ||
|
|
bacdf0cbf9 | ||
|
|
8cb8df55e9 | ||
|
|
65d6a0e477 | ||
|
|
5bd0a47fcd | ||
|
|
00845c49d2 | ||
|
|
e45a46b6e4 | ||
|
|
dab00faa83 | ||
|
|
c91a44572e | ||
|
|
92aecd557b | ||
|
|
6e3fc657b4 | ||
|
|
21d3b87943 | ||
|
|
5f3d02f6eb | ||
|
|
0aed3fc346 | ||
|
|
79eb339c66 | ||
|
|
4a11df5b64 | ||
|
|
d897be6a98 | ||
|
|
9c04b4abf9 | ||
|
|
94440ae994 | ||
|
|
bc006b3c9d | ||
|
|
c7320a5564 | ||
|
|
2172a3d8cb | ||
|
|
b2aa05a8d6 | ||
|
|
850238b4ef | ||
|
|
9952d18e4d | ||
|
|
547b1355d3 | ||
|
|
fe089b13cb | ||
|
|
3fe0938b76 | ||
|
|
fe10dd9fb2 | ||
|
|
9677b1d1c0 | ||
|
|
2731bf7ac3 | ||
|
|
09e29fb58b | ||
|
|
15b13b537f | ||
|
|
78a9ddcf9a | ||
|
|
ea69d35651 | ||
|
|
4a27000548 | ||
|
|
505530f36a | ||
|
|
bc832f822f | ||
|
|
5f263b607e | ||
|
|
77b692e65d | ||
|
|
37c4fba0ac | ||
|
|
12ec58301f | ||
|
|
5797f5542b | ||
|
|
1b5425527c | ||
|
|
36f4fd3e1e | ||
|
|
2ef3f84945 | ||
|
|
b5e157d895 | ||
|
|
c4e7ad0e0f | ||
|
|
a4ab491371 | ||
|
|
ff65916108 | ||
|
|
95341a8f6f | ||
|
|
26651b0d6a | ||
|
|
b7f34ee348 | ||
|
|
07b4f88de9 | ||
|
|
3d605853c8 | ||
|
|
94700e55fa | ||
|
|
c96d882a02 | ||
|
|
b800834351 | ||
|
|
8503dd0047 | ||
|
|
28f09fcdd5 | ||
|
|
5f6122fe10 | ||
|
|
9bbb9f5556 | ||
|
|
8df7bad839 | ||
|
|
b69ff33d9e | ||
|
|
5e6b31f0da | ||
|
|
a6c8f7c875 | ||
|
|
7a6df013cc | ||
|
|
b2f2282947 | ||
|
|
478af0f720 | ||
|
|
366f730bf6 | ||
|
|
0b56290f0b | ||
|
|
fc5397fdf5 | ||
|
|
4f0493c850 | ||
|
|
f7dcc404f2 | ||
|
|
5b3b3aada8 | ||
|
|
bf49d2dca8 | ||
|
|
3bc5bd2d22 | ||
|
|
056a6df546 | ||
|
|
9f77001e27 | ||
|
|
4d0cfef6ee | ||
|
|
c9d72e4571 | ||
|
|
ccca02846d | ||
|
|
f0f9a0605b | ||
|
|
12350e3f9a | ||
|
|
14a9d2f73d | ||
|
|
afbf4d3dcc | ||
|
|
865377a70d | ||
|
|
b2aba9e430 | ||
|
|
52f7e23c72 | ||
|
|
1b1c137771 | ||
|
|
fdedcd1f4d | ||
|
|
97c0496cfa | ||
|
|
8713365265 | ||
|
|
9b334b3f97 |
48
.circleci/config.yml
Normal file
48
.circleci/config.yml
Normal file
@@ -0,0 +1,48 @@
|
||||
version: 2
|
||||
jobs:
|
||||
sytestpy2:
|
||||
machine: true
|
||||
steps:
|
||||
- checkout
|
||||
- run: docker pull matrixdotorg/sytest-synapsepy2
|
||||
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
sytestpy2postgres:
|
||||
machine: true
|
||||
steps:
|
||||
- checkout
|
||||
- run: docker pull matrixdotorg/sytest-synapsepy2
|
||||
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
sytestpy3:
|
||||
machine: true
|
||||
steps:
|
||||
- checkout
|
||||
- run: docker pull matrixdotorg/sytest-synapsepy3
|
||||
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
sytestpy3postgres:
|
||||
machine: true
|
||||
steps:
|
||||
- checkout
|
||||
- run: docker pull matrixdotorg/sytest-synapsepy3
|
||||
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3
|
||||
- store_artifacts:
|
||||
path: ~/project/logs
|
||||
destination: logs
|
||||
|
||||
workflows:
|
||||
version: 2
|
||||
build:
|
||||
jobs:
|
||||
- sytestpy2
|
||||
- sytestpy2postgres
|
||||
# Currently broken while the Python 3 port is incomplete
|
||||
# - sytestpy3
|
||||
# - sytestpy3postgres
|
||||
@@ -3,3 +3,6 @@ Dockerfile
|
||||
.gitignore
|
||||
demo/etc
|
||||
tox.ini
|
||||
synctl
|
||||
.git/*
|
||||
.tox/*
|
||||
|
||||
5
.github/ISSUE_TEMPLATE.md
vendored
5
.github/ISSUE_TEMPLATE.md
vendored
@@ -27,8 +27,9 @@ Describe here the problem that you are experiencing, or the feature you are requ
|
||||
|
||||
Describe how what happens differs from what you expected.
|
||||
|
||||
If you can identify any relevant log snippets from _homeserver.log_, please include
|
||||
those here (please be careful to remove any personal or private data):
|
||||
<!-- If you can identify any relevant log snippets from _homeserver.log_, please include
|
||||
those (please be careful to remove any personal or private data). Please surround them with
|
||||
``` (three backticks, on a line on their own), so that they are formatted legibly. -->
|
||||
|
||||
### Version information
|
||||
|
||||
|
||||
13
.travis.yml
13
.travis.yml
@@ -8,6 +8,9 @@ before_script:
|
||||
- git remote set-branches --add origin develop
|
||||
- git fetch origin develop
|
||||
|
||||
services:
|
||||
- postgresql
|
||||
|
||||
matrix:
|
||||
fast_finish: true
|
||||
include:
|
||||
@@ -20,12 +23,22 @@ matrix:
|
||||
- python: 2.7
|
||||
env: TOX_ENV=py27
|
||||
|
||||
- python: 2.7
|
||||
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
|
||||
|
||||
- python: 3.6
|
||||
env: TOX_ENV=py36
|
||||
|
||||
- python: 3.6
|
||||
env: TOX_ENV=check_isort
|
||||
|
||||
- python: 3.6
|
||||
env: TOX_ENV=check-newsfragment
|
||||
|
||||
allow_failures:
|
||||
- python: 2.7
|
||||
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
|
||||
|
||||
install:
|
||||
- pip install tox
|
||||
|
||||
|
||||
@@ -62,4 +62,7 @@ Christoph Witzany <christoph at web.crofting.com>
|
||||
* Add LDAP support for authentication
|
||||
|
||||
Pierre Jaury <pierre at jaury.eu>
|
||||
* Docker packaging
|
||||
* Docker packaging
|
||||
|
||||
Serban Constantin <serban.constantin at gmail dot com>
|
||||
* Small bug fix
|
||||
2634
CHANGES.md
Normal file
2634
CHANGES.md
Normal file
File diff suppressed because it is too large
Load Diff
2839
CHANGES.rst
2839
CHANGES.rst
File diff suppressed because it is too large
Load Diff
@@ -30,11 +30,11 @@ use github's pull request workflow to review the contribution, and either ask
|
||||
you to make any refinements needed or merge it and make them ourselves. The
|
||||
changes will then land on master when we next do a release.
|
||||
|
||||
We use `Jenkins <http://matrix.org/jenkins>`_ and
|
||||
We use `Jenkins <http://matrix.org/jenkins>`_ and
|
||||
`Travis <https://travis-ci.org/matrix-org/synapse>`_ for continuous
|
||||
integration. All pull requests to synapse get automatically tested by Travis;
|
||||
the Jenkins builds require an adminstrator to start them. If your change
|
||||
breaks the build, this will be shown in github, so please keep an eye on the
|
||||
integration. All pull requests to synapse get automatically tested by Travis;
|
||||
the Jenkins builds require an adminstrator to start them. If your change
|
||||
breaks the build, this will be shown in github, so please keep an eye on the
|
||||
pull request for feedback.
|
||||
|
||||
Code style
|
||||
@@ -51,22 +51,22 @@ makes it horribly hard to review otherwise.
|
||||
Changelog
|
||||
~~~~~~~~~
|
||||
|
||||
All changes, even minor ones, need a corresponding changelog
|
||||
All changes, even minor ones, need a corresponding changelog / newsfragment
|
||||
entry. These are managed by Towncrier
|
||||
(https://github.com/hawkowl/towncrier).
|
||||
|
||||
To create a changelog entry, make a new file in the ``changelog.d``
|
||||
file named in the format of ``issuenumberOrPR.type``. The type can be
|
||||
file named in the format of ``PRnumber.type``. The type can be
|
||||
one of ``feature``, ``bugfix``, ``removal`` (also used for
|
||||
deprecations), or ``misc`` (for internal-only changes). The content of
|
||||
the file is your changelog entry, which can contain RestructuredText
|
||||
formatting. A note of contributors is welcomed in changelogs for
|
||||
non-misc changes (the content of misc changes is not displayed).
|
||||
|
||||
For example, a fix for a bug reported in #1234 would have its
|
||||
changelog entry in ``changelog.d/1234.bugfix``, and contain content
|
||||
like "The security levels of Florbs are now validated when
|
||||
recieved over federation. Contributed by Jane Matrix".
|
||||
For example, a fix in PR #1234 would have its changelog entry in
|
||||
``changelog.d/1234.bugfix``, and contain content like "The security levels of
|
||||
Florbs are now validated when recieved over federation. Contributed by Jane
|
||||
Matrix".
|
||||
|
||||
Attribution
|
||||
~~~~~~~~~~~
|
||||
@@ -125,7 +125,7 @@ the contribution or otherwise have the right to contribute it to Matrix::
|
||||
personal information I submit with it, including my sign-off) is
|
||||
maintained indefinitely and may be redistributed consistent with
|
||||
this project or the open source license(s) involved.
|
||||
|
||||
|
||||
If you agree to this for your contribution, then all that's needed is to
|
||||
include the line in your commit or pull request comment::
|
||||
|
||||
|
||||
19
Dockerfile
19
Dockerfile
@@ -1,19 +0,0 @@
|
||||
FROM docker.io/python:2-alpine3.7
|
||||
|
||||
RUN apk add --no-cache --virtual .nacl_deps su-exec build-base libffi-dev zlib-dev libressl-dev libjpeg-turbo-dev linux-headers postgresql-dev libxslt-dev
|
||||
|
||||
COPY . /synapse
|
||||
|
||||
# A wheel cache may be provided in ./cache for faster build
|
||||
RUN cd /synapse \
|
||||
&& pip install --upgrade pip setuptools psycopg2 lxml \
|
||||
&& mkdir -p /synapse/cache \
|
||||
&& pip install -f /synapse/cache --upgrade --process-dependency-links . \
|
||||
&& mv /synapse/contrib/docker/start.py /synapse/contrib/docker/conf / \
|
||||
&& rm -rf setup.py setup.cfg synapse
|
||||
|
||||
VOLUME ["/data"]
|
||||
|
||||
EXPOSE 8008/tcp 8448/tcp
|
||||
|
||||
ENTRYPOINT ["/start.py"]
|
||||
@@ -2,6 +2,7 @@ include synctl
|
||||
include LICENSE
|
||||
include VERSION
|
||||
include *.rst
|
||||
include *.md
|
||||
include demo/README
|
||||
include demo/demo.tls.dh
|
||||
include demo/*.py
|
||||
@@ -34,3 +35,5 @@ recursive-include changelog.d *
|
||||
|
||||
prune .github
|
||||
prune demo/etc
|
||||
prune docker
|
||||
prune .circleci
|
||||
|
||||
42
README.rst
42
README.rst
@@ -71,7 +71,7 @@ We'd like to invite you to join #matrix:matrix.org (via
|
||||
https://matrix.org/docs/projects/try-matrix-now.html), run a homeserver, take a look
|
||||
at the `Matrix spec <https://matrix.org/docs/spec>`_, and experiment with the
|
||||
`APIs <https://matrix.org/docs/api>`_ and `Client SDKs
|
||||
<http://matrix.org/docs/projects/try-matrix-now.html#client-sdks>`_.
|
||||
<https://matrix.org/docs/projects/try-matrix-now.html#client-sdks>`_.
|
||||
|
||||
Thanks for using Matrix!
|
||||
|
||||
@@ -157,12 +157,19 @@ if you prefer.
|
||||
|
||||
In case of problems, please see the _`Troubleshooting` section below.
|
||||
|
||||
There is an offical synapse image available at https://hub.docker.com/r/matrixdotorg/synapse/tags/ which can be used with the docker-compose file available at `contrib/docker`. Further information on this including configuration options is available in `contrib/docker/README.md`.
|
||||
There is an offical synapse image available at
|
||||
https://hub.docker.com/r/matrixdotorg/synapse/tags/ which can be used with
|
||||
the docker-compose file available at `contrib/docker <contrib/docker>`_. Further information on
|
||||
this including configuration options is available in the README on
|
||||
hub.docker.com.
|
||||
|
||||
Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate a synapse server in a single Docker image, at https://hub.docker.com/r/avhost/docker-matrix/tags/
|
||||
Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a
|
||||
Dockerfile to automate a synapse server in a single Docker image, at
|
||||
https://hub.docker.com/r/avhost/docker-matrix/tags/
|
||||
|
||||
Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
|
||||
tested with VirtualBox/AWS/DigitalOcean - see https://github.com/EMnify/matrix-synapse-auto-deploy
|
||||
tested with VirtualBox/AWS/DigitalOcean - see
|
||||
https://github.com/EMnify/matrix-synapse-auto-deploy
|
||||
for details.
|
||||
|
||||
Configuring synapse
|
||||
@@ -283,7 +290,7 @@ Connecting to Synapse from a client
|
||||
|
||||
The easiest way to try out your new Synapse installation is by connecting to it
|
||||
from a web client. The easiest option is probably the one at
|
||||
http://riot.im/app. You will need to specify a "Custom server" when you log on
|
||||
https://riot.im/app. You will need to specify a "Custom server" when you log on
|
||||
or register: set this to ``https://domain.tld`` if you setup a reverse proxy
|
||||
following the recommended setup, or ``https://localhost:8448`` - remember to specify the
|
||||
port (``:8448``) if not ``:443`` unless you changed the configuration. (Leave the identity
|
||||
@@ -329,7 +336,7 @@ Security Note
|
||||
=============
|
||||
|
||||
Matrix serves raw user generated data in some APIs - specifically the `content
|
||||
repository endpoints <http://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid>`_.
|
||||
repository endpoints <https://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid>`_.
|
||||
|
||||
Whilst we have tried to mitigate against possible XSS attacks (e.g.
|
||||
https://github.com/matrix-org/synapse/pull/1021) we recommend running
|
||||
@@ -348,7 +355,7 @@ Platform-Specific Instructions
|
||||
Debian
|
||||
------
|
||||
|
||||
Matrix provides official Debian packages via apt from http://matrix.org/packages/debian/.
|
||||
Matrix provides official Debian packages via apt from https://matrix.org/packages/debian/.
|
||||
Note that these packages do not include a client - choose one from
|
||||
https://matrix.org/docs/projects/try-matrix-now.html (or build your own with one of our SDKs :)
|
||||
|
||||
@@ -362,6 +369,19 @@ Synapse is in the Fedora repositories as ``matrix-synapse``::
|
||||
Oleg Girko provides Fedora RPMs at
|
||||
https://obs.infoserver.lv/project/monitor/matrix-synapse
|
||||
|
||||
OpenSUSE
|
||||
--------
|
||||
|
||||
Synapse is in the OpenSUSE repositories as ``matrix-synapse``::
|
||||
|
||||
sudo zypper install matrix-synapse
|
||||
|
||||
SUSE Linux Enterprise Server
|
||||
----------------------------
|
||||
|
||||
Unofficial package are built for SLES 15 in the openSUSE:Backports:SLE-15 repository at
|
||||
https://download.opensuse.org/repositories/openSUSE:/Backports:/SLE-15/standard/
|
||||
|
||||
ArchLinux
|
||||
---------
|
||||
|
||||
@@ -524,7 +544,7 @@ Troubleshooting Running
|
||||
-----------------------
|
||||
|
||||
If synapse fails with ``missing "sodium.h"`` crypto errors, you may need
|
||||
to manually upgrade PyNaCL, as synapse uses NaCl (http://nacl.cr.yp.to/) for
|
||||
to manually upgrade PyNaCL, as synapse uses NaCl (https://nacl.cr.yp.to/) for
|
||||
encryption and digital signatures.
|
||||
Unfortunately PyNACL currently has a few issues
|
||||
(https://github.com/pyca/pynacl/issues/53) and
|
||||
@@ -672,8 +692,8 @@ useful just for development purposes. See `<demo/README>`_.
|
||||
Using PostgreSQL
|
||||
================
|
||||
|
||||
As of Synapse 0.9, `PostgreSQL <http://www.postgresql.org>`_ is supported as an
|
||||
alternative to the `SQLite <http://sqlite.org/>`_ database that Synapse has
|
||||
As of Synapse 0.9, `PostgreSQL <https://www.postgresql.org>`_ is supported as an
|
||||
alternative to the `SQLite <https://sqlite.org/>`_ database that Synapse has
|
||||
traditionally used for convenience and simplicity.
|
||||
|
||||
The advantages of Postgres include:
|
||||
@@ -697,7 +717,7 @@ Using a reverse proxy with Synapse
|
||||
It is recommended to put a reverse proxy such as
|
||||
`nginx <https://nginx.org/en/docs/http/ngx_http_proxy_module.html>`_,
|
||||
`Apache <https://httpd.apache.org/docs/current/mod/mod_proxy_http.html>`_ or
|
||||
`HAProxy <http://www.haproxy.org/>`_ in front of Synapse. One advantage of
|
||||
`HAProxy <https://www.haproxy.org/>`_ in front of Synapse. One advantage of
|
||||
doing so is that it means that you can expose the default https port (443) to
|
||||
Matrix clients without needing to run Synapse with root privileges.
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Enforce the specified API for report_event
|
||||
@@ -1 +0,0 @@
|
||||
Include CPU time from database threads in request/block metrics.
|
||||
@@ -1 +0,0 @@
|
||||
Add CPU metrics for _fetch_event_list
|
||||
@@ -1 +0,0 @@
|
||||
Reduce database consumption when processing large numbers of receipts
|
||||
@@ -1 +0,0 @@
|
||||
Cache optimisation for /sync requests
|
||||
@@ -1 +0,0 @@
|
||||
Fix queued federation requests being processed in the wrong order
|
||||
@@ -1 +0,0 @@
|
||||
refactor: use parse_{string,integer} and assert's from http.servlet for deduplication
|
||||
1
changelog.d/3659.feature
Normal file
1
changelog.d/3659.feature
Normal file
@@ -0,0 +1 @@
|
||||
Support profile API endpoints on workers
|
||||
1
changelog.d/3673.misc
Normal file
1
changelog.d/3673.misc
Normal file
@@ -0,0 +1 @@
|
||||
Refactor state module to support multiple room versions
|
||||
1
changelog.d/3680.feature
Normal file
1
changelog.d/3680.feature
Normal file
@@ -0,0 +1 @@
|
||||
Server notices for resource limit blocking
|
||||
1
changelog.d/3722.bugfix
Normal file
1
changelog.d/3722.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix error collecting prometheus metrics when run on dedicated thread due to threading concurrency issues
|
||||
1
changelog.d/3724.feature
Normal file
1
changelog.d/3724.feature
Normal file
@@ -0,0 +1 @@
|
||||
Allow guests to use /rooms/:roomId/event/:eventId
|
||||
1
changelog.d/3726.misc
Normal file
1
changelog.d/3726.misc
Normal file
@@ -0,0 +1 @@
|
||||
Split the state_group_cache into member and non-member state events (and so speed up LL /sync)
|
||||
1
changelog.d/3727.misc
Normal file
1
changelog.d/3727.misc
Normal file
@@ -0,0 +1 @@
|
||||
Log failure to authenticate remote servers as warnings (without stack traces)
|
||||
1
changelog.d/3734.misc
Normal file
1
changelog.d/3734.misc
Normal file
@@ -0,0 +1 @@
|
||||
Reference the need for an HTTP replication port when using the federation_reader worker
|
||||
1
changelog.d/3735.misc
Normal file
1
changelog.d/3735.misc
Normal file
@@ -0,0 +1 @@
|
||||
Fix minor spelling error in federation client documentation.
|
||||
1
changelog.d/3746.misc
Normal file
1
changelog.d/3746.misc
Normal file
@@ -0,0 +1 @@
|
||||
Fix MAU cache invalidation due to missing yield
|
||||
1
changelog.d/3747.bugfix
Normal file
1
changelog.d/3747.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix bug where we resent "limit exceeded" server notices repeatedly
|
||||
1
changelog.d/3749.feature
Normal file
1
changelog.d/3749.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add mau_trial_days config param, so that users only get counted as MAU after N days.
|
||||
1
changelog.d/3751.feature
Normal file
1
changelog.d/3751.feature
Normal file
@@ -0,0 +1 @@
|
||||
Require twisted 17.1 or later (fixes [#3741](https://github.com/matrix-org/synapse/issues/3741)).
|
||||
1
changelog.d/3753.bugfix
Normal file
1
changelog.d/3753.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix bug where we broke sync when using limit_usage_by_mau but hadn't configured server notices
|
||||
1
changelog.d/3754.bugfix
Normal file
1
changelog.d/3754.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix 'federation_domain_whitelist' such that an empty list correctly blocks all outbound federation traffic
|
||||
1
changelog.d/3755.bugfix
Normal file
1
changelog.d/3755.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix tagging of server notice rooms
|
||||
@@ -1,29 +1,5 @@
|
||||
# Synapse Docker
|
||||
|
||||
The `matrixdotorg/synapse` Docker image will run Synapse as a single process. It does not provide a
|
||||
database server or a TURN server, you should run these separately.
|
||||
|
||||
If you run a Postgres server, you should simply include it in the same Compose
|
||||
project or set the proper environment variables and the image will automatically
|
||||
use that server.
|
||||
|
||||
## Build
|
||||
|
||||
Build the docker image with the `docker build` command from the root of the synapse repository.
|
||||
|
||||
```
|
||||
docker build -t docker.io/matrixdotorg/synapse .
|
||||
```
|
||||
|
||||
The `-t` option sets the image tag. Official images are tagged `matrixdotorg/synapse:<version>` where `<version>` is the same as the release tag in the synapse git repository.
|
||||
|
||||
You may have a local Python wheel cache available, in which case copy the relevant packages in the ``cache/`` directory at the root of the project.
|
||||
|
||||
## Run
|
||||
|
||||
This image is designed to run either with an automatically generated configuration
|
||||
file or with a custom configuration that requires manual edition.
|
||||
|
||||
### Automated configuration
|
||||
|
||||
It is recommended that you use Docker Compose to run your containers, including
|
||||
@@ -60,94 +36,6 @@ Then, customize your configuration and run the server:
|
||||
docker-compose up -d
|
||||
```
|
||||
|
||||
### Without Compose
|
||||
### More information
|
||||
|
||||
If you do not wish to use Compose, you may still run this image using plain
|
||||
Docker commands. Note that the following is just a guideline and you may need
|
||||
to add parameters to the docker run command to account for the network situation
|
||||
with your postgres database.
|
||||
|
||||
```
|
||||
docker run \
|
||||
-d \
|
||||
--name synapse \
|
||||
-v ${DATA_PATH}:/data \
|
||||
-e SYNAPSE_SERVER_NAME=my.matrix.host \
|
||||
-e SYNAPSE_REPORT_STATS=yes \
|
||||
docker.io/matrixdotorg/synapse:latest
|
||||
```
|
||||
|
||||
## Volumes
|
||||
|
||||
The image expects a single volume, located at ``/data``, that will hold:
|
||||
|
||||
* temporary files during uploads;
|
||||
* uploaded media and thumbnails;
|
||||
* the SQLite database if you do not configure postgres;
|
||||
* the appservices configuration.
|
||||
|
||||
You are free to use separate volumes depending on storage endpoints at your
|
||||
disposal. For instance, ``/data/media`` coud be stored on a large but low
|
||||
performance hdd storage while other files could be stored on high performance
|
||||
endpoints.
|
||||
|
||||
In order to setup an application service, simply create an ``appservices``
|
||||
directory in the data volume and write the application service Yaml
|
||||
configuration file there. Multiple application services are supported.
|
||||
|
||||
## Environment
|
||||
|
||||
Unless you specify a custom path for the configuration file, a very generic
|
||||
file will be generated, based on the following environment settings.
|
||||
These are a good starting point for setting up your own deployment.
|
||||
|
||||
Global settings:
|
||||
|
||||
* ``UID``, the user id Synapse will run as [default 991]
|
||||
* ``GID``, the group id Synapse will run as [default 991]
|
||||
* ``SYNAPSE_CONFIG_PATH``, path to a custom config file
|
||||
|
||||
If ``SYNAPSE_CONFIG_PATH`` is set, you should generate a configuration file
|
||||
then customize it manually. No other environment variable is required.
|
||||
|
||||
Otherwise, a dynamic configuration file will be used. The following environment
|
||||
variables are available for configuration:
|
||||
|
||||
* ``SYNAPSE_SERVER_NAME`` (mandatory), the current server public hostname.
|
||||
* ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous
|
||||
statistics reporting back to the Matrix project which helps us to get funding.
|
||||
* ``SYNAPSE_NO_TLS``, set this variable to disable TLS in Synapse (use this if
|
||||
you run your own TLS-capable reverse proxy).
|
||||
* ``SYNAPSE_ENABLE_REGISTRATION``, set this variable to enable registration on
|
||||
the Synapse instance.
|
||||
* ``SYNAPSE_ALLOW_GUEST``, set this variable to allow guest joining this server.
|
||||
* ``SYNAPSE_EVENT_CACHE_SIZE``, the event cache size [default `10K`].
|
||||
* ``SYNAPSE_CACHE_FACTOR``, the cache factor [default `0.5`].
|
||||
* ``SYNAPSE_RECAPTCHA_PUBLIC_KEY``, set this variable to the recaptcha public
|
||||
key in order to enable recaptcha upon registration.
|
||||
* ``SYNAPSE_RECAPTCHA_PRIVATE_KEY``, set this variable to the recaptcha private
|
||||
key in order to enable recaptcha upon registration.
|
||||
* ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN
|
||||
uris to enable TURN for this homeserver.
|
||||
* ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required.
|
||||
|
||||
Shared secrets, that will be initialized to random values if not set:
|
||||
|
||||
* ``SYNAPSE_REGISTRATION_SHARED_SECRET``, secret for registrering users if
|
||||
registration is disable.
|
||||
* ``SYNAPSE_MACAROON_SECRET_KEY`` secret for signing access tokens
|
||||
to the server.
|
||||
|
||||
Database specific values (will use SQLite if not set):
|
||||
|
||||
* `POSTGRES_DB` - The database name for the synapse postgres database. [default: `synapse`]
|
||||
* `POSTGRES_HOST` - The host of the postgres database if you wish to use postgresql instead of sqlite3. [default: `db` which is useful when using a container on the same docker network in a compose file where the postgres service is called `db`]
|
||||
* `POSTGRES_PASSWORD` - The password for the synapse postgres database. **If this is set then postgres will be used instead of sqlite3.** [default: none] **NOTE**: You are highly encouraged to use postgresql! Please use the compose file to make it easier to deploy.
|
||||
* `POSTGRES_USER` - The user for the synapse postgres database. [default: `matrix`]
|
||||
|
||||
Mail server specific values (will not send emails if not set):
|
||||
|
||||
* ``SYNAPSE_SMTP_HOST``, hostname to the mail server.
|
||||
* ``SYNAPSE_SMTP_PORT``, TCP port for accessing the mail server [default ``25``].
|
||||
* ``SYNAPSE_SMTP_USER``, username for authenticating against the mail server if any.
|
||||
* ``SYNAPSE_SMTP_PASSWORD``, password for authenticating against the mail server if any.
|
||||
For more information on required environment variables and mounts, see the main docker documentation at [/docker/README.md](../../docker/README.md)
|
||||
|
||||
@@ -6,6 +6,7 @@ version: '3'
|
||||
services:
|
||||
|
||||
synapse:
|
||||
build: ../..
|
||||
image: docker.io/matrixdotorg/synapse:latest
|
||||
# Since snyapse does not retry to connect to the database, restart upon
|
||||
# failure
|
||||
|
||||
6
contrib/grafana/README.md
Normal file
6
contrib/grafana/README.md
Normal file
@@ -0,0 +1,6 @@
|
||||
# Using the Synapse Grafana dashboard
|
||||
|
||||
0. Set up Prometheus and Grafana. Out of scope for this readme. Useful documentation about using Grafana with Prometheus: http://docs.grafana.org/features/datasources/prometheus/
|
||||
1. Have your Prometheus scrape your Synapse. https://github.com/matrix-org/synapse/blob/master/docs/metrics-howto.rst
|
||||
2. Import dashboard into Grafana. Download `synapse.json`. Import it to Grafana and select the correct Prometheus datasource. http://docs.grafana.org/reference/export_import/
|
||||
3. Set up additional recording rules
|
||||
4969
contrib/grafana/synapse.json
Normal file
4969
contrib/grafana/synapse.json
Normal file
File diff suppressed because it is too large
Load Diff
35
docker/Dockerfile
Normal file
35
docker/Dockerfile
Normal file
@@ -0,0 +1,35 @@
|
||||
FROM docker.io/python:2-alpine3.8
|
||||
|
||||
RUN apk add --no-cache --virtual .nacl_deps \
|
||||
build-base \
|
||||
libffi-dev \
|
||||
libjpeg-turbo-dev \
|
||||
libressl-dev \
|
||||
libxslt-dev \
|
||||
linux-headers \
|
||||
postgresql-dev \
|
||||
su-exec \
|
||||
zlib-dev
|
||||
|
||||
COPY . /synapse
|
||||
|
||||
# A wheel cache may be provided in ./cache for faster build
|
||||
RUN cd /synapse \
|
||||
&& pip install --upgrade \
|
||||
lxml \
|
||||
pip \
|
||||
psycopg2 \
|
||||
setuptools \
|
||||
&& mkdir -p /synapse/cache \
|
||||
&& pip install -f /synapse/cache --upgrade --process-dependency-links . \
|
||||
&& mv /synapse/docker/start.py /synapse/docker/conf / \
|
||||
&& rm -rf \
|
||||
setup.cfg \
|
||||
setup.py \
|
||||
synapse
|
||||
|
||||
VOLUME ["/data"]
|
||||
|
||||
EXPOSE 8008/tcp 8448/tcp
|
||||
|
||||
ENTRYPOINT ["/start.py"]
|
||||
124
docker/README.md
Normal file
124
docker/README.md
Normal file
@@ -0,0 +1,124 @@
|
||||
# Synapse Docker
|
||||
|
||||
This Docker image will run Synapse as a single process. It does not provide a database
|
||||
server or a TURN server, you should run these separately.
|
||||
|
||||
## Run
|
||||
|
||||
We do not currently offer a `latest` image, as this has somewhat undefined semantics.
|
||||
We instead release only tagged versions so upgrading between releases is entirely
|
||||
within your control.
|
||||
|
||||
### Using docker-compose (easier)
|
||||
|
||||
This image is designed to run either with an automatically generated configuration
|
||||
file or with a custom configuration that requires manual editing.
|
||||
|
||||
An easy way to make use of this image is via docker-compose. See the
|
||||
[contrib/docker](../contrib/docker)
|
||||
section of the synapse project for examples.
|
||||
|
||||
### Without Compose (harder)
|
||||
|
||||
If you do not wish to use Compose, you may still run this image using plain
|
||||
Docker commands. Note that the following is just a guideline and you may need
|
||||
to add parameters to the docker run command to account for the network situation
|
||||
with your postgres database.
|
||||
|
||||
```
|
||||
docker run \
|
||||
-d \
|
||||
--name synapse \
|
||||
-v ${DATA_PATH}:/data \
|
||||
-e SYNAPSE_SERVER_NAME=my.matrix.host \
|
||||
-e SYNAPSE_REPORT_STATS=yes \
|
||||
docker.io/matrixdotorg/synapse:latest
|
||||
```
|
||||
|
||||
## Volumes
|
||||
|
||||
The image expects a single volume, located at ``/data``, that will hold:
|
||||
|
||||
* temporary files during uploads;
|
||||
* uploaded media and thumbnails;
|
||||
* the SQLite database if you do not configure postgres;
|
||||
* the appservices configuration.
|
||||
|
||||
You are free to use separate volumes depending on storage endpoints at your
|
||||
disposal. For instance, ``/data/media`` coud be stored on a large but low
|
||||
performance hdd storage while other files could be stored on high performance
|
||||
endpoints.
|
||||
|
||||
In order to setup an application service, simply create an ``appservices``
|
||||
directory in the data volume and write the application service Yaml
|
||||
configuration file there. Multiple application services are supported.
|
||||
|
||||
## Environment
|
||||
|
||||
Unless you specify a custom path for the configuration file, a very generic
|
||||
file will be generated, based on the following environment settings.
|
||||
These are a good starting point for setting up your own deployment.
|
||||
|
||||
Global settings:
|
||||
|
||||
* ``UID``, the user id Synapse will run as [default 991]
|
||||
* ``GID``, the group id Synapse will run as [default 991]
|
||||
* ``SYNAPSE_CONFIG_PATH``, path to a custom config file
|
||||
|
||||
If ``SYNAPSE_CONFIG_PATH`` is set, you should generate a configuration file
|
||||
then customize it manually. No other environment variable is required.
|
||||
|
||||
Otherwise, a dynamic configuration file will be used. The following environment
|
||||
variables are available for configuration:
|
||||
|
||||
* ``SYNAPSE_SERVER_NAME`` (mandatory), the current server public hostname.
|
||||
* ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous
|
||||
statistics reporting back to the Matrix project which helps us to get funding.
|
||||
* ``SYNAPSE_NO_TLS``, set this variable to disable TLS in Synapse (use this if
|
||||
you run your own TLS-capable reverse proxy).
|
||||
* ``SYNAPSE_ENABLE_REGISTRATION``, set this variable to enable registration on
|
||||
the Synapse instance.
|
||||
* ``SYNAPSE_ALLOW_GUEST``, set this variable to allow guest joining this server.
|
||||
* ``SYNAPSE_EVENT_CACHE_SIZE``, the event cache size [default `10K`].
|
||||
* ``SYNAPSE_CACHE_FACTOR``, the cache factor [default `0.5`].
|
||||
* ``SYNAPSE_RECAPTCHA_PUBLIC_KEY``, set this variable to the recaptcha public
|
||||
key in order to enable recaptcha upon registration.
|
||||
* ``SYNAPSE_RECAPTCHA_PRIVATE_KEY``, set this variable to the recaptcha private
|
||||
key in order to enable recaptcha upon registration.
|
||||
* ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN
|
||||
uris to enable TURN for this homeserver.
|
||||
* ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required.
|
||||
|
||||
Shared secrets, that will be initialized to random values if not set:
|
||||
|
||||
* ``SYNAPSE_REGISTRATION_SHARED_SECRET``, secret for registrering users if
|
||||
registration is disable.
|
||||
* ``SYNAPSE_MACAROON_SECRET_KEY`` secret for signing access tokens
|
||||
to the server.
|
||||
|
||||
Database specific values (will use SQLite if not set):
|
||||
|
||||
* `POSTGRES_DB` - The database name for the synapse postgres database. [default: `synapse`]
|
||||
* `POSTGRES_HOST` - The host of the postgres database if you wish to use postgresql instead of sqlite3. [default: `db` which is useful when using a container on the same docker network in a compose file where the postgres service is called `db`]
|
||||
* `POSTGRES_PASSWORD` - The password for the synapse postgres database. **If this is set then postgres will be used instead of sqlite3.** [default: none] **NOTE**: You are highly encouraged to use postgresql! Please use the compose file to make it easier to deploy.
|
||||
* `POSTGRES_USER` - The user for the synapse postgres database. [default: `matrix`]
|
||||
|
||||
Mail server specific values (will not send emails if not set):
|
||||
|
||||
* ``SYNAPSE_SMTP_HOST``, hostname to the mail server.
|
||||
* ``SYNAPSE_SMTP_PORT``, TCP port for accessing the mail server [default ``25``].
|
||||
* ``SYNAPSE_SMTP_USER``, username for authenticating against the mail server if any.
|
||||
* ``SYNAPSE_SMTP_PASSWORD``, password for authenticating against the mail server if any.
|
||||
|
||||
## Build
|
||||
|
||||
Build the docker image with the `docker build` command from the root of the synapse repository.
|
||||
|
||||
```
|
||||
docker build -t docker.io/matrixdotorg/synapse . -f docker/Dockerfile
|
||||
```
|
||||
|
||||
The `-t` option sets the image tag. Official images are tagged `matrixdotorg/synapse:<version>` where `<version>` is the same as the release tag in the synapse git repository.
|
||||
|
||||
You may have a local Python wheel cache available, in which case copy the relevant
|
||||
packages in the ``cache/`` directory at the root of the project.
|
||||
63
docs/admin_api/register_api.rst
Normal file
63
docs/admin_api/register_api.rst
Normal file
@@ -0,0 +1,63 @@
|
||||
Shared-Secret Registration
|
||||
==========================
|
||||
|
||||
This API allows for the creation of users in an administrative and
|
||||
non-interactive way. This is generally used for bootstrapping a Synapse
|
||||
instance with administrator accounts.
|
||||
|
||||
To authenticate yourself to the server, you will need both the shared secret
|
||||
(``registration_shared_secret`` in the homeserver configuration), and a
|
||||
one-time nonce. If the registration shared secret is not configured, this API
|
||||
is not enabled.
|
||||
|
||||
To fetch the nonce, you need to request one from the API::
|
||||
|
||||
> GET /_matrix/client/r0/admin/register
|
||||
|
||||
< {"nonce": "thisisanonce"}
|
||||
|
||||
Once you have the nonce, you can make a ``POST`` to the same URL with a JSON
|
||||
body containing the nonce, username, password, whether they are an admin
|
||||
(optional, False by default), and a HMAC digest of the content.
|
||||
|
||||
As an example::
|
||||
|
||||
> POST /_matrix/client/r0/admin/register
|
||||
> {
|
||||
"nonce": "thisisanonce",
|
||||
"username": "pepper_roni",
|
||||
"password": "pizza",
|
||||
"admin": true,
|
||||
"mac": "mac_digest_here"
|
||||
}
|
||||
|
||||
< {
|
||||
"access_token": "token_here",
|
||||
"user_id": "@pepper_roni:localhost",
|
||||
"home_server": "test",
|
||||
"device_id": "device_id_here"
|
||||
}
|
||||
|
||||
The MAC is the hex digest output of the HMAC-SHA1 algorithm, with the key being
|
||||
the shared secret and the content being the nonce, user, password, and either
|
||||
the string "admin" or "notadmin", each separated by NULs. For an example of
|
||||
generation in Python::
|
||||
|
||||
import hmac, hashlib
|
||||
|
||||
def generate_mac(nonce, user, password, admin=False):
|
||||
|
||||
mac = hmac.new(
|
||||
key=shared_secret,
|
||||
digestmod=hashlib.sha1,
|
||||
)
|
||||
|
||||
mac.update(nonce.encode('utf8'))
|
||||
mac.update(b"\x00")
|
||||
mac.update(user.encode('utf8'))
|
||||
mac.update(b"\x00")
|
||||
mac.update(password.encode('utf8'))
|
||||
mac.update(b"\x00")
|
||||
mac.update(b"admin" if admin else b"notadmin")
|
||||
|
||||
return mac.hexdigest()
|
||||
@@ -74,7 +74,7 @@ replication endpoints that it's talking to on the main synapse process.
|
||||
``worker_replication_port`` should point to the TCP replication listener port and
|
||||
``worker_replication_http_port`` should point to the HTTP replication port.
|
||||
|
||||
Currently, only the ``event_creator`` worker requires specifying
|
||||
Currently, the ``event_creator`` and ``federation_reader`` workers require specifying
|
||||
``worker_replication_http_port``.
|
||||
|
||||
For instance::
|
||||
@@ -173,10 +173,23 @@ endpoints matching the following regular expressions::
|
||||
^/_matrix/federation/v1/backfill/
|
||||
^/_matrix/federation/v1/get_missing_events/
|
||||
^/_matrix/federation/v1/publicRooms
|
||||
^/_matrix/federation/v1/query/
|
||||
^/_matrix/federation/v1/make_join/
|
||||
^/_matrix/federation/v1/make_leave/
|
||||
^/_matrix/federation/v1/send_join/
|
||||
^/_matrix/federation/v1/send_leave/
|
||||
^/_matrix/federation/v1/invite/
|
||||
^/_matrix/federation/v1/query_auth/
|
||||
^/_matrix/federation/v1/event_auth/
|
||||
^/_matrix/federation/v1/exchange_third_party_invite/
|
||||
^/_matrix/federation/v1/send/
|
||||
|
||||
The above endpoints should all be routed to the federation_reader worker by the
|
||||
reverse-proxy configuration.
|
||||
|
||||
The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single
|
||||
instance.
|
||||
|
||||
``synapse.app.federation_sender``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
@@ -206,6 +219,10 @@ Handles client API endpoints. It can handle REST endpoints matching the
|
||||
following regular expressions::
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$
|
||||
|
||||
``synapse.app.user_dir``
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
@@ -224,6 +241,14 @@ regular expressions::
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/keys/upload
|
||||
|
||||
If ``use_presence`` is False in the homeserver config, it can also handle REST
|
||||
endpoints matching the following regular expressions::
|
||||
|
||||
^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status
|
||||
|
||||
This "stub" presence handler will pass through ``GET`` request but make the
|
||||
``PUT`` effectively a no-op.
|
||||
|
||||
It will proxy any requests it cannot handle to the main synapse instance. It
|
||||
must therefore be configured with the location of the main instance, via
|
||||
the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration
|
||||
@@ -240,6 +265,7 @@ Handles some event creation. It can handle REST endpoints matching::
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
|
||||
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
|
||||
^/_matrix/client/(api/v1|r0|unstable)/join/
|
||||
^/_matrix/client/(api/v1|r0|unstable)/profile/
|
||||
|
||||
It will create events locally and then send them on to the main synapse
|
||||
instance to be persisted and handled.
|
||||
|
||||
@@ -1,5 +1,30 @@
|
||||
[tool.towncrier]
|
||||
package = "synapse"
|
||||
filename = "CHANGES.rst"
|
||||
filename = "CHANGES.md"
|
||||
directory = "changelog.d"
|
||||
issue_format = "`#{issue} <https://github.com/matrix-org/synapse/issues/{issue}>`_"
|
||||
issue_format = "[\\#{issue}](https://github.com/matrix-org/synapse/issues/{issue})"
|
||||
|
||||
[[tool.towncrier.type]]
|
||||
directory = "feature"
|
||||
name = "Features"
|
||||
showcontent = true
|
||||
|
||||
[[tool.towncrier.type]]
|
||||
directory = "bugfix"
|
||||
name = "Bugfixes"
|
||||
showcontent = true
|
||||
|
||||
[[tool.towncrier.type]]
|
||||
directory = "doc"
|
||||
name = "Improved Documentation"
|
||||
showcontent = true
|
||||
|
||||
[[tool.towncrier.type]]
|
||||
directory = "removal"
|
||||
name = "Deprecations and Removals"
|
||||
showcontent = true
|
||||
|
||||
[[tool.towncrier.type]]
|
||||
directory = "misc"
|
||||
name = "Internal Changes"
|
||||
showcontent = true
|
||||
|
||||
@@ -26,11 +26,37 @@ import yaml
|
||||
|
||||
|
||||
def request_registration(user, password, server_location, shared_secret, admin=False):
|
||||
req = urllib2.Request(
|
||||
"%s/_matrix/client/r0/admin/register" % (server_location,),
|
||||
headers={'Content-Type': 'application/json'}
|
||||
)
|
||||
|
||||
try:
|
||||
if sys.version_info[:3] >= (2, 7, 9):
|
||||
# As of version 2.7.9, urllib2 now checks SSL certs
|
||||
import ssl
|
||||
f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23))
|
||||
else:
|
||||
f = urllib2.urlopen(req)
|
||||
body = f.read()
|
||||
f.close()
|
||||
nonce = json.loads(body)["nonce"]
|
||||
except urllib2.HTTPError as e:
|
||||
print "ERROR! Received %d %s" % (e.code, e.reason,)
|
||||
if 400 <= e.code < 500:
|
||||
if e.info().type == "application/json":
|
||||
resp = json.load(e)
|
||||
if "error" in resp:
|
||||
print resp["error"]
|
||||
sys.exit(1)
|
||||
|
||||
mac = hmac.new(
|
||||
key=shared_secret,
|
||||
digestmod=hashlib.sha1,
|
||||
)
|
||||
|
||||
mac.update(nonce)
|
||||
mac.update("\x00")
|
||||
mac.update(user)
|
||||
mac.update("\x00")
|
||||
mac.update(password)
|
||||
@@ -40,10 +66,10 @@ def request_registration(user, password, server_location, shared_secret, admin=F
|
||||
mac = mac.hexdigest()
|
||||
|
||||
data = {
|
||||
"user": user,
|
||||
"nonce": nonce,
|
||||
"username": user,
|
||||
"password": password,
|
||||
"mac": mac,
|
||||
"type": "org.matrix.login.shared_secret",
|
||||
"admin": admin,
|
||||
}
|
||||
|
||||
@@ -52,7 +78,7 @@ def request_registration(user, password, server_location, shared_secret, admin=F
|
||||
print "Sending registration request..."
|
||||
|
||||
req = urllib2.Request(
|
||||
"%s/_matrix/client/api/v1/register" % (server_location,),
|
||||
"%s/_matrix/client/r0/admin/register" % (server_location,),
|
||||
data=json.dumps(data),
|
||||
headers={'Content-Type': 'application/json'}
|
||||
)
|
||||
|
||||
12
setup.cfg
12
setup.cfg
@@ -14,12 +14,17 @@ ignore =
|
||||
pylint.cfg
|
||||
tox.ini
|
||||
|
||||
[flake8]
|
||||
[pep8]
|
||||
max-line-length = 90
|
||||
# W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it.
|
||||
# E203 is contrary to PEP8.
|
||||
# W503 requires that binary operators be at the end, not start, of lines. Erik
|
||||
# doesn't like it. E203 is contrary to PEP8.
|
||||
ignore = W503,E203
|
||||
|
||||
[flake8]
|
||||
# note that flake8 inherits the "ignore" settings from "pep8" (because it uses
|
||||
# pep8 to do those checks), but not the "max-line-length" setting
|
||||
max-line-length = 90
|
||||
|
||||
[isort]
|
||||
line_length = 89
|
||||
not_skip = __init__.py
|
||||
@@ -31,3 +36,4 @@ known_compat = mock,six
|
||||
known_twisted=twisted,OpenSSL
|
||||
multi_line_output=3
|
||||
include_trailing_comma=true
|
||||
combine_as_imports=true
|
||||
|
||||
@@ -17,4 +17,4 @@
|
||||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.32.2"
|
||||
__version__ = "0.33.3"
|
||||
|
||||
@@ -25,7 +25,7 @@ from twisted.internet import defer
|
||||
import synapse.types
|
||||
from synapse import event_auth
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
from synapse.api.errors import AuthError, Codes
|
||||
from synapse.api.errors import AuthError, Codes, ResourceLimitError
|
||||
from synapse.types import UserID
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
@@ -65,8 +65,9 @@ class Auth(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_from_context(self, event, context, do_sig_check=True):
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
auth_events_ids = yield self.compute_auth_events(
|
||||
event, context.prev_state_ids, for_verification=True,
|
||||
event, prev_state_ids, for_verification=True,
|
||||
)
|
||||
auth_events = yield self.store.get_events(auth_events_ids)
|
||||
auth_events = {
|
||||
@@ -210,9 +211,9 @@ class Auth(object):
|
||||
user_agent = request.requestHeaders.getRawHeaders(
|
||||
b"User-Agent",
|
||||
default=[b""]
|
||||
)[0]
|
||||
)[0].decode('ascii', 'surrogateescape')
|
||||
if user and access_token and ip_addr:
|
||||
self.store.insert_client_ip(
|
||||
yield self.store.insert_client_ip(
|
||||
user_id=user.to_string(),
|
||||
access_token=access_token,
|
||||
ip=ip_addr,
|
||||
@@ -251,10 +252,10 @@ class Auth(object):
|
||||
if ip_address not in app_service.ip_range_whitelist:
|
||||
defer.returnValue((None, None))
|
||||
|
||||
if "user_id" not in request.args:
|
||||
if b"user_id" not in request.args:
|
||||
defer.returnValue((app_service.sender, app_service))
|
||||
|
||||
user_id = request.args["user_id"][0]
|
||||
user_id = request.args[b"user_id"][0].decode('utf8')
|
||||
if app_service.sender == user_id:
|
||||
defer.returnValue((app_service.sender, app_service))
|
||||
|
||||
@@ -544,7 +545,8 @@ class Auth(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_auth_events(self, builder, context):
|
||||
auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids)
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
auth_ids = yield self.compute_auth_events(builder, prev_state_ids)
|
||||
|
||||
auth_events_entries = yield self.store.add_event_hashes(
|
||||
auth_ids
|
||||
@@ -680,7 +682,7 @@ class Auth(object):
|
||||
Returns:
|
||||
bool: False if no access_token was given, True otherwise.
|
||||
"""
|
||||
query_params = request.args.get("access_token")
|
||||
query_params = request.args.get(b"access_token")
|
||||
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
|
||||
return bool(query_params) or bool(auth_headers)
|
||||
|
||||
@@ -696,7 +698,7 @@ class Auth(object):
|
||||
401 since some of the old clients depended on auth errors returning
|
||||
403.
|
||||
Returns:
|
||||
str: The access_token
|
||||
unicode: The access_token
|
||||
Raises:
|
||||
AuthError: If there isn't an access_token in the request.
|
||||
"""
|
||||
@@ -718,9 +720,9 @@ class Auth(object):
|
||||
"Too many Authorization headers.",
|
||||
errcode=Codes.MISSING_TOKEN,
|
||||
)
|
||||
parts = auth_headers[0].split(" ")
|
||||
if parts[0] == "Bearer" and len(parts) == 2:
|
||||
return parts[1]
|
||||
parts = auth_headers[0].split(b" ")
|
||||
if parts[0] == b"Bearer" and len(parts) == 2:
|
||||
return parts[1].decode('ascii')
|
||||
else:
|
||||
raise AuthError(
|
||||
token_not_found_http_status,
|
||||
@@ -736,4 +738,81 @@ class Auth(object):
|
||||
errcode=Codes.MISSING_TOKEN
|
||||
)
|
||||
|
||||
return query_params[0]
|
||||
return query_params[0].decode('ascii')
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_in_room_or_world_readable(self, room_id, user_id):
|
||||
"""Checks that the user is or was in the room or the room is world
|
||||
readable. If it isn't then an exception is raised.
|
||||
|
||||
Returns:
|
||||
Deferred[tuple[str, str|None]]: Resolves to the current membership of
|
||||
the user in the room and the membership event ID of the user. If
|
||||
the user is not in the room and never has been, then
|
||||
`(Membership.JOIN, None)` is returned.
|
||||
"""
|
||||
|
||||
try:
|
||||
# check_user_was_in_room will return the most recent membership
|
||||
# event for the user if:
|
||||
# * The user is a non-guest user, and was ever in the room
|
||||
# * The user is a guest user, and has joined the room
|
||||
# else it will throw.
|
||||
member_event = yield self.check_user_was_in_room(room_id, user_id)
|
||||
defer.returnValue((member_event.membership, member_event.event_id))
|
||||
except AuthError:
|
||||
visibility = yield self.state.get_current_state(
|
||||
room_id, EventTypes.RoomHistoryVisibility, ""
|
||||
)
|
||||
if (
|
||||
visibility and
|
||||
visibility.content["history_visibility"] == "world_readable"
|
||||
):
|
||||
defer.returnValue((Membership.JOIN, None))
|
||||
return
|
||||
raise AuthError(
|
||||
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_auth_blocking(self, user_id=None):
|
||||
"""Checks if the user should be rejected for some external reason,
|
||||
such as monthly active user limiting or global disable flag
|
||||
|
||||
Args:
|
||||
user_id(str|None): If present, checks for presence against existing
|
||||
MAU cohort
|
||||
"""
|
||||
|
||||
# Never fail an auth check for the server notices users
|
||||
# This can be a problem where event creation is prohibited due to blocking
|
||||
if user_id == self.hs.config.server_notices_mxid:
|
||||
return
|
||||
|
||||
if self.hs.config.hs_disabled:
|
||||
raise ResourceLimitError(
|
||||
403, self.hs.config.hs_disabled_message,
|
||||
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
|
||||
admin_uri=self.hs.config.admin_uri,
|
||||
limit_type=self.hs.config.hs_disabled_limit_type
|
||||
)
|
||||
if self.hs.config.limit_usage_by_mau is True:
|
||||
# If the user is already part of the MAU cohort or a trial user
|
||||
if user_id:
|
||||
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
|
||||
if timestamp:
|
||||
return
|
||||
|
||||
is_trial = yield self.store.is_trial_user(user_id)
|
||||
if is_trial:
|
||||
return
|
||||
# Else if there is no room in the MAU bucket, bail
|
||||
current_mau = yield self.store.get_monthly_active_count()
|
||||
if current_mau >= self.hs.config.max_mau_value:
|
||||
raise ResourceLimitError(
|
||||
403, "Monthly Active User Limit Exceeded",
|
||||
|
||||
admin_uri=self.hs.config.admin_uri,
|
||||
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
|
||||
limit_type="monthly_active_user"
|
||||
)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
# Copyright 2018 New Vector Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -68,7 +69,6 @@ class EventTypes(object):
|
||||
|
||||
RoomHistoryVisibility = "m.room.history_visibility"
|
||||
CanonicalAlias = "m.room.canonical_alias"
|
||||
Encryption = "m.room.encryption"
|
||||
RoomAvatar = "m.room.avatar"
|
||||
GuestAccess = "m.room.guest_access"
|
||||
|
||||
@@ -78,6 +78,7 @@ class EventTypes(object):
|
||||
Name = "m.room.name"
|
||||
|
||||
ServerACL = "m.room.server_acl"
|
||||
Pinned = "m.room.pinned_events"
|
||||
|
||||
|
||||
class RejectedReason(object):
|
||||
@@ -95,3 +96,19 @@ class RoomCreationPreset(object):
|
||||
class ThirdPartyEntityKind(object):
|
||||
USER = "user"
|
||||
LOCATION = "location"
|
||||
|
||||
|
||||
class RoomVersions(object):
|
||||
V1 = "1"
|
||||
VDH_TEST = "vdh-test-version"
|
||||
|
||||
|
||||
# the version we will give rooms which are created on this server
|
||||
DEFAULT_ROOM_VERSION = RoomVersions.V1
|
||||
|
||||
# vdh-test-version is a placeholder to get room versioning support working and tested
|
||||
# until we have a working v2.
|
||||
KNOWN_ROOM_VERSIONS = {RoomVersions.V1, RoomVersions.VDH_TEST}
|
||||
|
||||
ServerNoticeMsgType = "m.server_notice"
|
||||
ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -55,6 +56,9 @@ class Codes(object):
|
||||
SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
|
||||
CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
|
||||
CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
|
||||
RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED"
|
||||
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
|
||||
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
|
||||
|
||||
|
||||
class CodeMessageException(RuntimeError):
|
||||
@@ -69,20 +73,6 @@ class CodeMessageException(RuntimeError):
|
||||
self.code = code
|
||||
self.msg = msg
|
||||
|
||||
def error_dict(self):
|
||||
return cs_error(self.msg)
|
||||
|
||||
|
||||
class MatrixCodeMessageException(CodeMessageException):
|
||||
"""An error from a general matrix endpoint, eg. from a proxied Matrix API call.
|
||||
|
||||
Attributes:
|
||||
errcode (str): Matrix error code e.g 'M_FORBIDDEN'
|
||||
"""
|
||||
def __init__(self, code, msg, errcode=Codes.UNKNOWN):
|
||||
super(MatrixCodeMessageException, self).__init__(code, msg)
|
||||
self.errcode = errcode
|
||||
|
||||
|
||||
class SynapseError(CodeMessageException):
|
||||
"""A base exception type for matrix errors which have an errcode and error
|
||||
@@ -108,38 +98,28 @@ class SynapseError(CodeMessageException):
|
||||
self.errcode,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_http_response_exception(cls, err):
|
||||
"""Make a SynapseError based on an HTTPResponseException
|
||||
|
||||
This is useful when a proxied request has failed, and we need to
|
||||
decide how to map the failure onto a matrix error to send back to the
|
||||
client.
|
||||
class ProxiedRequestError(SynapseError):
|
||||
"""An error from a general matrix endpoint, eg. from a proxied Matrix API call.
|
||||
|
||||
An attempt is made to parse the body of the http response as a matrix
|
||||
error. If that succeeds, the errcode and error message from the body
|
||||
are used as the errcode and error message in the new synapse error.
|
||||
Attributes:
|
||||
errcode (str): Matrix error code e.g 'M_FORBIDDEN'
|
||||
"""
|
||||
def __init__(self, code, msg, errcode=Codes.UNKNOWN, additional_fields=None):
|
||||
super(ProxiedRequestError, self).__init__(
|
||||
code, msg, errcode
|
||||
)
|
||||
if additional_fields is None:
|
||||
self._additional_fields = {}
|
||||
else:
|
||||
self._additional_fields = dict(additional_fields)
|
||||
|
||||
Otherwise, the errcode is set to M_UNKNOWN, and the error message is
|
||||
set to the reason code from the HTTP response.
|
||||
|
||||
Args:
|
||||
err (HttpResponseException):
|
||||
|
||||
Returns:
|
||||
SynapseError:
|
||||
"""
|
||||
# try to parse the body as json, to get better errcode/msg, but
|
||||
# default to M_UNKNOWN with the HTTP status as the error text
|
||||
try:
|
||||
j = json.loads(err.response)
|
||||
except ValueError:
|
||||
j = {}
|
||||
errcode = j.get('errcode', Codes.UNKNOWN)
|
||||
errmsg = j.get('error', err.msg)
|
||||
|
||||
res = SynapseError(err.code, errmsg, errcode)
|
||||
return res
|
||||
def error_dict(self):
|
||||
return cs_error(
|
||||
self.msg,
|
||||
self.errcode,
|
||||
**self._additional_fields
|
||||
)
|
||||
|
||||
|
||||
class ConsentNotGivenError(SynapseError):
|
||||
@@ -251,6 +231,30 @@ class AuthError(SynapseError):
|
||||
super(AuthError, self).__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class ResourceLimitError(SynapseError):
|
||||
"""
|
||||
Any error raised when there is a problem with resource usage.
|
||||
For instance, the monthly active user limit for the server has been exceeded
|
||||
"""
|
||||
def __init__(
|
||||
self, code, msg,
|
||||
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
|
||||
admin_uri=None,
|
||||
limit_type=None,
|
||||
):
|
||||
self.admin_uri = admin_uri
|
||||
self.limit_type = limit_type
|
||||
super(ResourceLimitError, self).__init__(code, msg, errcode=errcode)
|
||||
|
||||
def error_dict(self):
|
||||
return cs_error(
|
||||
self.msg,
|
||||
self.errcode,
|
||||
admin_uri=self.admin_uri,
|
||||
limit_type=self.limit_type
|
||||
)
|
||||
|
||||
|
||||
class EventSizeError(SynapseError):
|
||||
"""An error raised when an event is too big."""
|
||||
|
||||
@@ -308,12 +312,25 @@ class LimitExceededError(SynapseError):
|
||||
)
|
||||
|
||||
|
||||
def cs_exception(exception):
|
||||
if isinstance(exception, CodeMessageException):
|
||||
return exception.error_dict()
|
||||
else:
|
||||
logger.error("Unknown exception type: %s", type(exception))
|
||||
return {}
|
||||
class IncompatibleRoomVersionError(SynapseError):
|
||||
"""A server is trying to join a room whose version it does not support."""
|
||||
|
||||
def __init__(self, room_version):
|
||||
super(IncompatibleRoomVersionError, self).__init__(
|
||||
code=400,
|
||||
msg="Your homeserver does not support the features required to "
|
||||
"join this room",
|
||||
errcode=Codes.INCOMPATIBLE_ROOM_VERSION,
|
||||
)
|
||||
|
||||
self._room_version = room_version
|
||||
|
||||
def error_dict(self):
|
||||
return cs_error(
|
||||
self.msg,
|
||||
self.errcode,
|
||||
room_version=self._room_version,
|
||||
)
|
||||
|
||||
|
||||
def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
|
||||
@@ -372,7 +389,7 @@ class HttpResponseException(CodeMessageException):
|
||||
Represents an HTTP-level failure of an outbound request
|
||||
|
||||
Attributes:
|
||||
response (str): body of response
|
||||
response (bytes): body of response
|
||||
"""
|
||||
def __init__(self, code, msg, response):
|
||||
"""
|
||||
@@ -380,7 +397,39 @@ class HttpResponseException(CodeMessageException):
|
||||
Args:
|
||||
code (int): HTTP status code
|
||||
msg (str): reason phrase from HTTP response status line
|
||||
response (str): body of response
|
||||
response (bytes): body of response
|
||||
"""
|
||||
super(HttpResponseException, self).__init__(code, msg)
|
||||
self.response = response
|
||||
|
||||
def to_synapse_error(self):
|
||||
"""Make a SynapseError based on an HTTPResponseException
|
||||
|
||||
This is useful when a proxied request has failed, and we need to
|
||||
decide how to map the failure onto a matrix error to send back to the
|
||||
client.
|
||||
|
||||
An attempt is made to parse the body of the http response as a matrix
|
||||
error. If that succeeds, the errcode and error message from the body
|
||||
are used as the errcode and error message in the new synapse error.
|
||||
|
||||
Otherwise, the errcode is set to M_UNKNOWN, and the error message is
|
||||
set to the reason code from the HTTP response.
|
||||
|
||||
Returns:
|
||||
SynapseError:
|
||||
"""
|
||||
# try to parse the body as json, to get better errcode/msg, but
|
||||
# default to M_UNKNOWN with the HTTP status as the error text
|
||||
try:
|
||||
j = json.loads(self.response)
|
||||
except ValueError:
|
||||
j = {}
|
||||
|
||||
if not isinstance(j, dict):
|
||||
j = {}
|
||||
|
||||
errcode = j.pop('errcode', Codes.UNKNOWN)
|
||||
errmsg = j.pop('error', self.msg)
|
||||
|
||||
return ProxiedRequestError(self.code, errmsg, errcode, j)
|
||||
|
||||
@@ -113,7 +113,13 @@ ROOM_EVENT_FILTER_SCHEMA = {
|
||||
},
|
||||
"contains_url": {
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"lazy_load_members": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"include_redundant_members": {
|
||||
"type": "boolean"
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -261,6 +267,12 @@ class FilterCollection(object):
|
||||
def ephemeral_limit(self):
|
||||
return self._room_ephemeral_filter.limit()
|
||||
|
||||
def lazy_load_members(self):
|
||||
return self._room_state_filter.lazy_load_members()
|
||||
|
||||
def include_redundant_members(self):
|
||||
return self._room_state_filter.include_redundant_members()
|
||||
|
||||
def filter_presence(self, events):
|
||||
return self._presence_filter.filter(events)
|
||||
|
||||
@@ -417,6 +429,12 @@ class Filter(object):
|
||||
def limit(self):
|
||||
return self.filter_json.get("limit", 10)
|
||||
|
||||
def lazy_load_members(self):
|
||||
return self.filter_json.get("lazy_load_members", False)
|
||||
|
||||
def include_redundant_members(self):
|
||||
return self.filter_json.get("include_redundant_members", False)
|
||||
|
||||
|
||||
def _matches_wildcard(actual_value, filter_value):
|
||||
if filter_value.endswith("*"):
|
||||
|
||||
@@ -72,7 +72,7 @@ class Ratelimiter(object):
|
||||
return allowed, time_allowed
|
||||
|
||||
def prune_message_counts(self, time_now_s):
|
||||
for user_id in self.message_counts.keys():
|
||||
for user_id in list(self.message_counts.keys()):
|
||||
message_count, time_start, msg_rate_hz = (
|
||||
self.message_counts[user_id]
|
||||
)
|
||||
|
||||
@@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port):
|
||||
logger.info("Metrics now reporting on %s:%d", host, port)
|
||||
|
||||
|
||||
def listen_tcp(bind_addresses, port, factory, backlog=50):
|
||||
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
|
||||
"""
|
||||
Create a TCP socket for a port and several addresses
|
||||
"""
|
||||
@@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50):
|
||||
check_bind_error(e, address, bind_addresses)
|
||||
|
||||
|
||||
def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
|
||||
def listen_ssl(
|
||||
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
|
||||
):
|
||||
"""
|
||||
Create an SSL socket for a port and several addresses
|
||||
"""
|
||||
|
||||
@@ -117,8 +117,9 @@ class ASReplicationHandler(ReplicationClientHandler):
|
||||
super(ASReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
|
||||
if stream_name == "events":
|
||||
max_stream_id = self.store.get_room_max_stream_ordering()
|
||||
|
||||
@@ -31,6 +31,7 @@ from synapse.http.site import SynapseSite
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
@@ -38,9 +39,15 @@ from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.room import PublicRoomListRestServlet
|
||||
from synapse.rest.client.v1.room import (
|
||||
JoinedRoomMemberListRestServlet,
|
||||
PublicRoomListRestServlet,
|
||||
RoomEventContextServlet,
|
||||
RoomMemberListRestServlet,
|
||||
RoomStateRestServlet,
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
@@ -52,13 +59,14 @@ logger = logging.getLogger("synapse.app.client_reader")
|
||||
|
||||
|
||||
class ClientReaderSlavedStore(
|
||||
SlavedAccountDataStore,
|
||||
SlavedEventStore,
|
||||
SlavedKeyStore,
|
||||
RoomStore,
|
||||
DirectoryStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
TransactionStore,
|
||||
SlavedTransactionStore,
|
||||
SlavedClientIpStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
@@ -82,7 +90,13 @@ class ClientReaderServer(HomeServer):
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
|
||||
PublicRoomListRestServlet(self).register(resource)
|
||||
RoomMemberListRestServlet(self).register(resource)
|
||||
JoinedRoomMemberListRestServlet(self).register(resource)
|
||||
RoomStateRestServlet(self).register(resource)
|
||||
RoomEventContextServlet(self).register(resource)
|
||||
|
||||
resources.update({
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
@@ -154,11 +168,13 @@ def start(config_options):
|
||||
database_engine = create_engine(config.database_config)
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ss = ClientReaderServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
||||
@@ -43,8 +43,13 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.profile import (
|
||||
ProfileAvatarURLRestServlet,
|
||||
ProfileDisplaynameRestServlet,
|
||||
ProfileRestServlet,
|
||||
)
|
||||
from synapse.rest.client.v1.room import (
|
||||
JoinRoomAliasServlet,
|
||||
RoomMembershipRestServlet,
|
||||
@@ -53,6 +58,7 @@ from synapse.rest.client.v1.room import (
|
||||
)
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.user_directory import UserDirectoryStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
@@ -62,8 +68,11 @@ logger = logging.getLogger("synapse.app.event_creator")
|
||||
|
||||
|
||||
class EventCreatorSlavedStore(
|
||||
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
|
||||
# rather than going via the correct worker.
|
||||
UserDirectoryStore,
|
||||
DirectoryStore,
|
||||
TransactionStore,
|
||||
SlavedTransactionStore,
|
||||
SlavedProfileStore,
|
||||
SlavedAccountDataStore,
|
||||
SlavedPusherStore,
|
||||
@@ -101,6 +110,9 @@ class EventCreatorServer(HomeServer):
|
||||
RoomMembershipRestServlet(self).register(resource)
|
||||
RoomStateEventRestServlet(self).register(resource)
|
||||
JoinRoomAliasServlet(self).register(resource)
|
||||
ProfileAvatarURLRestServlet(self).register(resource)
|
||||
ProfileDisplaynameRestServlet(self).register(resource)
|
||||
ProfileRestServlet(self).register(resource)
|
||||
resources.update({
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
@@ -174,11 +186,13 @@ def start(config_options):
|
||||
database_engine = create_engine(config.database_config)
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ss = EventCreatorServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
||||
@@ -32,11 +32,17 @@ from synapse.http.site import SynapseSite
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.keys import SlavedKeyStore
|
||||
from synapse.replication.slave.storage.profile import SlavedProfileStore
|
||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||
from synapse.replication.slave.storage.pushers import SlavedPusherStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.room import RoomStore
|
||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
@@ -49,11 +55,17 @@ logger = logging.getLogger("synapse.app.federation_reader")
|
||||
|
||||
|
||||
class FederationReaderSlavedStore(
|
||||
SlavedAccountDataStore,
|
||||
SlavedProfileStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedPusherStore,
|
||||
SlavedPushRuleStore,
|
||||
SlavedReceiptsStore,
|
||||
SlavedEventStore,
|
||||
SlavedKeyStore,
|
||||
RoomStore,
|
||||
DirectoryStore,
|
||||
TransactionStore,
|
||||
SlavedTransactionStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
pass
|
||||
@@ -143,11 +155,13 @@ def start(config_options):
|
||||
database_engine = create_engine(config.database_config)
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ss = FederationReaderServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
||||
@@ -36,11 +36,11 @@ from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.presence import SlavedPresenceStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||
from synapse.util.manhole import manhole
|
||||
@@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender")
|
||||
|
||||
|
||||
class FederationSenderSlaveStore(
|
||||
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
|
||||
SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
|
||||
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
|
||||
):
|
||||
def __init__(self, db_conn, hs):
|
||||
@@ -144,8 +144,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
|
||||
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.send_handler = FederationSenderHandler(hs, self)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(FederationSenderReplicationHandler, self).on_rdata(
|
||||
yield super(FederationSenderReplicationHandler, self).on_rdata(
|
||||
stream_name, token, rows
|
||||
)
|
||||
self.send_handler.process_replication_rows(stream_name, token, rows)
|
||||
@@ -186,11 +187,13 @@ def start(config_options):
|
||||
config.send_federation = True
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ps = FederationSenderServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
||||
@@ -38,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
|
||||
from synapse.rest.client.v2_alpha._base import client_v2_patterns
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
@@ -49,6 +50,35 @@ from synapse.util.versionstring import get_version_string
|
||||
logger = logging.getLogger("synapse.app.frontend_proxy")
|
||||
|
||||
|
||||
class PresenceStatusStubServlet(ClientV1RestServlet):
|
||||
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
|
||||
|
||||
def __init__(self, hs):
|
||||
super(PresenceStatusStubServlet, self).__init__(hs)
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.auth = hs.get_auth()
|
||||
self.main_uri = hs.config.worker_main_http_uri
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, user_id):
|
||||
# Pass through the auth headers, if any, in case the access token
|
||||
# is there.
|
||||
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
|
||||
headers = {
|
||||
"Authorization": auth_headers,
|
||||
}
|
||||
result = yield self.http_client.get_json(
|
||||
self.main_uri + request.uri,
|
||||
headers=headers,
|
||||
)
|
||||
defer.returnValue((200, result))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, request, user_id):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
class KeyUploadServlet(RestServlet):
|
||||
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
|
||||
|
||||
@@ -135,6 +165,12 @@ class FrontendProxyServer(HomeServer):
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
KeyUploadServlet(self).register(resource)
|
||||
|
||||
# If presence is disabled, use the stub servlet that does
|
||||
# not allow sending presence
|
||||
if not self.config.use_presence:
|
||||
PresenceStatusStubServlet(self).register(resource)
|
||||
|
||||
resources.update({
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
@@ -153,7 +189,8 @@ class FrontendProxyServer(HomeServer):
|
||||
listener_config,
|
||||
root_resource,
|
||||
self.version_string,
|
||||
)
|
||||
),
|
||||
reactor=self.get_reactor()
|
||||
)
|
||||
|
||||
logger.info("Synapse client reader now listening on port %d", port)
|
||||
@@ -208,11 +245,13 @@ def start(config_options):
|
||||
database_engine = create_engine(config.database_config)
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ss = FrontendProxyServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
||||
@@ -18,6 +18,10 @@ import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from six import iteritems
|
||||
|
||||
from prometheus_client import Gauge
|
||||
|
||||
from twisted.application import service
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import EncodingResourceWrapper, NoResource
|
||||
@@ -47,6 +51,7 @@ from synapse.http.additional_resource import AdditionalResource
|
||||
from synapse.http.server import RootRedirect
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.module_api import ModuleApi
|
||||
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
|
||||
@@ -297,6 +302,11 @@ class SynapseHomeServer(HomeServer):
|
||||
quit_with_error(e.message)
|
||||
|
||||
|
||||
# Gauges to expose monthly active user control metrics
|
||||
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
|
||||
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
|
||||
|
||||
|
||||
def setup(config_options):
|
||||
"""
|
||||
Args:
|
||||
@@ -328,6 +338,7 @@ def setup(config_options):
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
|
||||
@@ -336,6 +347,7 @@ def setup(config_options):
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
@@ -425,6 +437,9 @@ def run(hs):
|
||||
# currently either 0 or 1
|
||||
stats_process = []
|
||||
|
||||
def start_phone_stats_home():
|
||||
return run_as_background_process("phone_stats_home", phone_stats_home)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def phone_stats_home():
|
||||
logger.info("Gathering stats for reporting")
|
||||
@@ -442,7 +457,7 @@ def run(hs):
|
||||
stats["total_nonbridged_users"] = total_nonbridged_users
|
||||
|
||||
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
|
||||
for name, count in daily_user_type_results.iteritems():
|
||||
for name, count in iteritems(daily_user_type_results):
|
||||
stats["daily_user_type_" + name] = count
|
||||
|
||||
room_count = yield hs.get_datastore().get_room_count()
|
||||
@@ -453,7 +468,7 @@ def run(hs):
|
||||
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
|
||||
|
||||
r30_results = yield hs.get_datastore().count_r30_users()
|
||||
for name, count in r30_results.iteritems():
|
||||
for name, count in iteritems(r30_results):
|
||||
stats["r30_users_" + name] = count
|
||||
|
||||
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
|
||||
@@ -496,16 +511,41 @@ def run(hs):
|
||||
)
|
||||
|
||||
def generate_user_daily_visit_stats():
|
||||
hs.get_datastore().generate_user_daily_visits()
|
||||
return run_as_background_process(
|
||||
"generate_user_daily_visits",
|
||||
hs.get_datastore().generate_user_daily_visits,
|
||||
)
|
||||
|
||||
# Rather than update on per session basis, batch up the requests.
|
||||
# If you increase the loop period, the accuracy of user_daily_visits
|
||||
# table will decrease
|
||||
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
|
||||
|
||||
# monthly active user limiting functionality
|
||||
clock.looping_call(
|
||||
hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
|
||||
)
|
||||
hs.get_datastore().reap_monthly_active_users()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def generate_monthly_active_users():
|
||||
count = 0
|
||||
if hs.config.limit_usage_by_mau:
|
||||
count = yield hs.get_datastore().get_monthly_active_count()
|
||||
current_mau_gauge.set(float(count))
|
||||
max_mau_gauge.set(float(hs.config.max_mau_value))
|
||||
|
||||
hs.get_datastore().initialise_reserved_users(
|
||||
hs.config.mau_limits_reserved_threepids
|
||||
)
|
||||
generate_monthly_active_users()
|
||||
if hs.config.limit_usage_by_mau:
|
||||
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
|
||||
# End of monthly active user settings
|
||||
|
||||
if hs.config.report_stats:
|
||||
logger.info("Scheduling stats reporting for 3 hour intervals")
|
||||
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
|
||||
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
|
||||
|
||||
# We need to defer this init for the cases that we daemonize
|
||||
# otherwise the process ID we get is that of the non-daemon process
|
||||
@@ -513,7 +553,7 @@ def run(hs):
|
||||
|
||||
# We wait 5 minutes to send the first set of stats as the server can
|
||||
# be quite busy the first few minutes
|
||||
clock.call_later(5 * 60, phone_stats_home)
|
||||
clock.call_later(5 * 60, start_phone_stats_home)
|
||||
|
||||
if hs.config.daemonize and hs.config.print_pidfile:
|
||||
print (hs.config.pid_file)
|
||||
|
||||
@@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.media.v0.content_repository import ContentRepoResource
|
||||
from synapse.server import HomeServer
|
||||
@@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedClientIpStore,
|
||||
TransactionStore,
|
||||
SlavedTransactionStore,
|
||||
BaseSlavedStore,
|
||||
MediaRepositoryStore,
|
||||
):
|
||||
@@ -155,11 +155,13 @@ def start(config_options):
|
||||
database_engine = create_engine(config.database_config)
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ss = MediaRepositoryServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
||||
@@ -148,8 +148,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
|
||||
|
||||
self.pusher_pool = hs.get_pusherpool()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
run_in_background(self.poke_pushers, stream_name, token, rows)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -162,11 +163,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
|
||||
else:
|
||||
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
|
||||
elif stream_name == "events":
|
||||
yield self.pusher_pool.on_new_notifications(
|
||||
self.pusher_pool.on_new_notifications(
|
||||
token, token,
|
||||
)
|
||||
elif stream_name == "receipts":
|
||||
yield self.pusher_pool.on_new_receipts(
|
||||
self.pusher_pool.on_new_receipts(
|
||||
token, token, set(row.room_id for row in rows)
|
||||
)
|
||||
except Exception:
|
||||
|
||||
@@ -55,7 +55,6 @@ from synapse.rest.client.v2_alpha import sync
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.presence import UserPresenceState
|
||||
from synapse.storage.roommember import RoomMemberStore
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||
from synapse.util.manhole import manhole
|
||||
@@ -81,9 +80,7 @@ class SynchrotronSlavedStore(
|
||||
RoomStore,
|
||||
BaseSlavedStore,
|
||||
):
|
||||
did_forget = (
|
||||
RoomMemberStore.__dict__["did_forget"]
|
||||
)
|
||||
pass
|
||||
|
||||
|
||||
UPDATE_SYNCING_USERS_MS = 10 * 1000
|
||||
@@ -117,7 +114,10 @@ class SynchrotronPresence(object):
|
||||
logger.info("Presence process_id is %r", self.process_id)
|
||||
|
||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
||||
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
|
||||
if self.hs.config.use_presence:
|
||||
self.hs.get_tcp_replication().send_user_sync(
|
||||
user_id, is_syncing, last_sync_ms
|
||||
)
|
||||
|
||||
def mark_as_coming_online(self, user_id):
|
||||
"""A user has started syncing. Send a UserSync to the master, unless they
|
||||
@@ -214,10 +214,13 @@ class SynchrotronPresence(object):
|
||||
yield self.notify_from_replication(states, stream_id)
|
||||
|
||||
def get_currently_syncing_users(self):
|
||||
return [
|
||||
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
|
||||
if count > 0
|
||||
]
|
||||
if self.hs.config.use_presence:
|
||||
return [
|
||||
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
|
||||
if count > 0
|
||||
]
|
||||
else:
|
||||
return set()
|
||||
|
||||
|
||||
class SynchrotronTyping(object):
|
||||
@@ -335,8 +338,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.notifier = hs.get_notifier()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||
run_in_background(self.process_and_notify, stream_name, token, rows)
|
||||
|
||||
def get_streams_to_replicate(self):
|
||||
|
||||
@@ -25,6 +25,8 @@ import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
from six import iteritems
|
||||
|
||||
import yaml
|
||||
|
||||
SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
|
||||
@@ -173,7 +175,7 @@ def main():
|
||||
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
|
||||
|
||||
cache_factors = config.get("synctl_cache_factors", {})
|
||||
for cache_name, factor in cache_factors.iteritems():
|
||||
for cache_name, factor in iteritems(cache_factors):
|
||||
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
|
||||
|
||||
worker_configfiles = []
|
||||
|
||||
@@ -169,8 +169,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
|
||||
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
|
||||
self.user_directory = hs.get_user_directory_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_rdata(self, stream_name, token, rows):
|
||||
super(UserDirectoryReplicationHandler, self).on_rdata(
|
||||
yield super(UserDirectoryReplicationHandler, self).on_rdata(
|
||||
stream_name, token, rows
|
||||
)
|
||||
if stream_name == "current_state_deltas":
|
||||
@@ -214,11 +215,13 @@ def start(config_options):
|
||||
config.update_user_directory = True
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ps = UserDirectoryServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
||||
@@ -168,7 +168,8 @@ def setup_logging(config, use_worker_options=False):
|
||||
if log_file:
|
||||
# TODO: Customisable file size / backup count
|
||||
handler = logging.handlers.RotatingFileHandler(
|
||||
log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
|
||||
log_file, maxBytes=(1000 * 1000 * 100), backupCount=3,
|
||||
encoding='utf8'
|
||||
)
|
||||
|
||||
def sighup(signum, stack):
|
||||
@@ -193,9 +194,8 @@ def setup_logging(config, use_worker_options=False):
|
||||
|
||||
def sighup(signum, stack):
|
||||
# it might be better to use a file watcher or something for this.
|
||||
logging.info("Reloading log config from %s due to SIGHUP",
|
||||
log_config)
|
||||
load_log_config()
|
||||
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
|
||||
|
||||
load_log_config()
|
||||
|
||||
|
||||
@@ -49,6 +49,9 @@ class ServerConfig(Config):
|
||||
# "disable" federation
|
||||
self.send_federation = config.get("send_federation", True)
|
||||
|
||||
# Whether to enable user presence.
|
||||
self.use_presence = config.get("use_presence", True)
|
||||
|
||||
# Whether to update the user directory or not. This should be set to
|
||||
# false only if we are updating the user directory in a worker
|
||||
self.update_user_directory = config.get("update_user_directory", True)
|
||||
@@ -67,6 +70,31 @@ class ServerConfig(Config):
|
||||
"block_non_admin_invites", False,
|
||||
)
|
||||
|
||||
# Options to control access by tracking MAU
|
||||
self.limit_usage_by_mau = config.get("limit_usage_by_mau", False)
|
||||
self.max_mau_value = 0
|
||||
if self.limit_usage_by_mau:
|
||||
self.max_mau_value = config.get(
|
||||
"max_mau_value", 0,
|
||||
)
|
||||
|
||||
self.mau_limits_reserved_threepids = config.get(
|
||||
"mau_limit_reserved_threepids", []
|
||||
)
|
||||
|
||||
self.mau_trial_days = config.get(
|
||||
"mau_trial_days", 0,
|
||||
)
|
||||
|
||||
# Options to disable HS
|
||||
self.hs_disabled = config.get("hs_disabled", False)
|
||||
self.hs_disabled_message = config.get("hs_disabled_message", "")
|
||||
self.hs_disabled_limit_type = config.get("hs_disabled_limit_type", "")
|
||||
|
||||
# Admin uri to direct users at should their instance become blocked
|
||||
# due to resource constraints
|
||||
self.admin_uri = config.get("admin_uri", None)
|
||||
|
||||
# FIXME: federation_domain_whitelist needs sytests
|
||||
self.federation_domain_whitelist = None
|
||||
federation_domain_whitelist = config.get(
|
||||
@@ -209,6 +237,8 @@ class ServerConfig(Config):
|
||||
# different cores. See
|
||||
# https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/.
|
||||
#
|
||||
# This setting requires the affinity package to be installed!
|
||||
#
|
||||
# cpu_affinity: 0xFFFFFFFF
|
||||
|
||||
# Whether to serve a web client from the HTTP/HTTPS root resource.
|
||||
@@ -228,6 +258,9 @@ class ServerConfig(Config):
|
||||
# hard limit.
|
||||
soft_file_limit: 0
|
||||
|
||||
# Set to false to disable presence tracking on this homeserver.
|
||||
use_presence: true
|
||||
|
||||
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
|
||||
# gc_thresholds: [700, 10, 10]
|
||||
|
||||
@@ -319,6 +352,33 @@ class ServerConfig(Config):
|
||||
# - port: 9000
|
||||
# bind_addresses: ['::1', '127.0.0.1']
|
||||
# type: manhole
|
||||
|
||||
|
||||
# Homeserver blocking
|
||||
#
|
||||
# How to reach the server admin, used in ResourceLimitError
|
||||
# admin_uri: 'mailto:admin@server.com'
|
||||
#
|
||||
# Global block config
|
||||
#
|
||||
# hs_disabled: False
|
||||
# hs_disabled_message: 'Human readable reason for why the HS is blocked'
|
||||
# hs_disabled_limit_type: 'error code(str), to help clients decode reason'
|
||||
#
|
||||
# Monthly Active User Blocking
|
||||
#
|
||||
# Enables monthly active user checking
|
||||
# limit_usage_by_mau: False
|
||||
# max_mau_value: 50
|
||||
# mau_trial_days: 2
|
||||
#
|
||||
# Sometimes the server admin will want to ensure certain accounts are
|
||||
# never blocked by mau checking. These accounts are specified here.
|
||||
#
|
||||
# mau_limit_reserved_threepids:
|
||||
# - medium: 'email'
|
||||
# address: 'reserved_user@example.com'
|
||||
|
||||
""" % locals()
|
||||
|
||||
def read_arguments(self, args):
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ._base import Config
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
class StatsConfig(Config):
|
||||
"""Stats Configuration
|
||||
Configuration for the behaviour of synapse's stats engine
|
||||
"""
|
||||
|
||||
def read_config(self, config):
|
||||
self.stats_enable = False
|
||||
self.stats_bucket_size = 86400
|
||||
self.stats_retention = sys.maxint
|
||||
stats_config = config.get("stats", None)
|
||||
if stats_config:
|
||||
self.stats_enable = stats_config.get("enable", self.stats_enable)
|
||||
self.stats_bucket_size = stats_config.get(
|
||||
"bucket_size", self.stats_bucket_size
|
||||
)
|
||||
self.stats_retention = stats_config.get("retention", self.stats_retention)
|
||||
|
||||
def default_config(self, config_dir_path, server_name, **kwargs):
|
||||
return """
|
||||
# Stats configuration
|
||||
#
|
||||
# stats:
|
||||
# enable: false
|
||||
# bucket_size: 86400 # 1 day
|
||||
# retention: 31536000 # 1 year
|
||||
"""
|
||||
@@ -30,10 +30,10 @@ class VoipConfig(Config):
|
||||
## Turn ##
|
||||
|
||||
# The public URIs of the TURN server to give to clients
|
||||
turn_uris: []
|
||||
#turn_uris: []
|
||||
|
||||
# The shared secret used to compute passwords for the TURN server
|
||||
turn_shared_secret: "YOUR_SHARED_SECRET"
|
||||
#turn_shared_secret: "YOUR_SHARED_SECRET"
|
||||
|
||||
# The Username and password if the TURN server needs them and
|
||||
# does not use a token
|
||||
|
||||
@@ -11,19 +11,22 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from zope.interface import implementer
|
||||
|
||||
from OpenSSL import SSL, crypto
|
||||
from twisted.internet import ssl
|
||||
from twisted.internet._sslverify import _defaultCurveName
|
||||
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
|
||||
from twisted.internet.ssl import CertificateOptions, ContextFactory
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ServerContextFactory(ssl.ContextFactory):
|
||||
class ServerContextFactory(ContextFactory):
|
||||
"""Factory for PyOpenSSL SSL contexts that are used to handle incoming
|
||||
connections and to make connections to remote servers."""
|
||||
connections."""
|
||||
|
||||
def __init__(self, config):
|
||||
self._context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
@@ -48,3 +51,78 @@ class ServerContextFactory(ssl.ContextFactory):
|
||||
|
||||
def getContext(self):
|
||||
return self._context
|
||||
|
||||
|
||||
def _idnaBytes(text):
|
||||
"""
|
||||
Convert some text typed by a human into some ASCII bytes. This is a
|
||||
copy of twisted.internet._idna._idnaBytes. For documentation, see the
|
||||
twisted documentation.
|
||||
"""
|
||||
try:
|
||||
import idna
|
||||
except ImportError:
|
||||
return text.encode("idna")
|
||||
else:
|
||||
return idna.encode(text)
|
||||
|
||||
|
||||
def _tolerateErrors(wrapped):
|
||||
"""
|
||||
Wrap up an info_callback for pyOpenSSL so that if something goes wrong
|
||||
the error is immediately logged and the connection is dropped if possible.
|
||||
This is a copy of twisted.internet._sslverify._tolerateErrors. For
|
||||
documentation, see the twisted documentation.
|
||||
"""
|
||||
|
||||
def infoCallback(connection, where, ret):
|
||||
try:
|
||||
return wrapped(connection, where, ret)
|
||||
except: # noqa: E722, taken from the twisted implementation
|
||||
f = Failure()
|
||||
logger.exception("Error during info_callback")
|
||||
connection.get_app_data().failVerification(f)
|
||||
|
||||
return infoCallback
|
||||
|
||||
|
||||
@implementer(IOpenSSLClientConnectionCreator)
|
||||
class ClientTLSOptions(object):
|
||||
"""
|
||||
Client creator for TLS without certificate identity verification. This is a
|
||||
copy of twisted.internet._sslverify.ClientTLSOptions with the identity
|
||||
verification left out. For documentation, see the twisted documentation.
|
||||
"""
|
||||
|
||||
def __init__(self, hostname, ctx):
|
||||
self._ctx = ctx
|
||||
self._hostname = hostname
|
||||
self._hostnameBytes = _idnaBytes(hostname)
|
||||
ctx.set_info_callback(
|
||||
_tolerateErrors(self._identityVerifyingInfoCallback)
|
||||
)
|
||||
|
||||
def clientConnectionForTLS(self, tlsProtocol):
|
||||
context = self._ctx
|
||||
connection = SSL.Connection(context, None)
|
||||
connection.set_app_data(tlsProtocol)
|
||||
return connection
|
||||
|
||||
def _identityVerifyingInfoCallback(self, connection, where, ret):
|
||||
if where & SSL.SSL_CB_HANDSHAKE_START:
|
||||
connection.set_tlsext_host_name(self._hostnameBytes)
|
||||
|
||||
|
||||
class ClientTLSOptionsFactory(object):
|
||||
"""Factory for Twisted ClientTLSOptions that are used to make connections
|
||||
to remote servers for federation."""
|
||||
|
||||
def __init__(self, config):
|
||||
# We don't use config options yet
|
||||
pass
|
||||
|
||||
def get_options(self, host):
|
||||
return ClientTLSOptions(
|
||||
host.decode('utf-8'),
|
||||
CertificateOptions(verify=False).getContext()
|
||||
)
|
||||
|
||||
@@ -18,7 +18,9 @@ import logging
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.internet.error import ConnectError
|
||||
from twisted.internet.protocol import Factory
|
||||
from twisted.names.error import DomainError
|
||||
from twisted.web.http import HTTPClient
|
||||
|
||||
from synapse.http.endpoint import matrix_federation_endpoint
|
||||
@@ -30,14 +32,14 @@ KEY_API_V1 = b"/_matrix/key/v1/"
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def fetch_server_key(server_name, ssl_context_factory, path=KEY_API_V1):
|
||||
def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
|
||||
"""Fetch the keys for a remote server."""
|
||||
|
||||
factory = SynapseKeyClientFactory()
|
||||
factory.path = path
|
||||
factory.host = server_name
|
||||
endpoint = matrix_federation_endpoint(
|
||||
reactor, server_name, ssl_context_factory, timeout=30
|
||||
reactor, server_name, tls_client_options_factory, timeout=30
|
||||
)
|
||||
|
||||
for i in range(5):
|
||||
@@ -47,12 +49,14 @@ def fetch_server_key(server_name, ssl_context_factory, path=KEY_API_V1):
|
||||
server_response, server_certificate = yield protocol.remote_key
|
||||
defer.returnValue((server_response, server_certificate))
|
||||
except SynapseKeyClientError as e:
|
||||
logger.exception("Error getting key for %r" % (server_name,))
|
||||
logger.warn("Error getting key for %r: %s", server_name, e)
|
||||
if e.status.startswith("4"):
|
||||
# Don't retry for 4xx responses.
|
||||
raise IOError("Cannot get key for %r" % server_name)
|
||||
except (ConnectError, DomainError) as e:
|
||||
logger.warn("Error getting key for %r: %s", server_name, e)
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
logger.exception("Error getting key for %r", server_name)
|
||||
raise IOError("Cannot get key for %r" % server_name)
|
||||
|
||||
|
||||
|
||||
@@ -512,7 +512,7 @@ class Keyring(object):
|
||||
continue
|
||||
|
||||
(response, tls_certificate) = yield fetch_server_key(
|
||||
server_name, self.hs.tls_server_context_factory,
|
||||
server_name, self.hs.tls_client_options_factory,
|
||||
path=(b"/_matrix/key/v2/server/%s" % (
|
||||
urllib.quote(requested_key_id),
|
||||
)).encode("ascii"),
|
||||
@@ -655,7 +655,7 @@ class Keyring(object):
|
||||
# Try to fetch the key from the remote server.
|
||||
|
||||
(response, tls_certificate) = yield fetch_server_key(
|
||||
server_name, self.hs.tls_server_context_factory
|
||||
server_name, self.hs.tls_client_options_factory
|
||||
)
|
||||
|
||||
# Check the response.
|
||||
|
||||
@@ -20,7 +20,7 @@ from signedjson.key import decode_verify_key_bytes
|
||||
from signedjson.sign import SignatureVerifyException, verify_signed_json
|
||||
from unpaddedbase64 import decode_base64
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules, Membership
|
||||
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, JoinRules, Membership
|
||||
from synapse.api.errors import AuthError, EventSizeError, SynapseError
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
|
||||
@@ -83,6 +83,14 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
|
||||
403,
|
||||
"Creation event's room_id domain does not match sender's"
|
||||
)
|
||||
|
||||
room_version = event.content.get("room_version", "1")
|
||||
if room_version not in KNOWN_ROOM_VERSIONS:
|
||||
raise AuthError(
|
||||
403,
|
||||
"room appears to have unsupported version %s" % (
|
||||
room_version,
|
||||
))
|
||||
# FIXME
|
||||
logger.debug("Allowing! %s", event)
|
||||
return
|
||||
|
||||
@@ -13,22 +13,18 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from six import iteritems
|
||||
|
||||
from frozendict import frozendict
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
|
||||
class EventContext(object):
|
||||
"""
|
||||
Attributes:
|
||||
current_state_ids (dict[(str, str), str]):
|
||||
The current state map including the current event.
|
||||
(type, state_key) -> event_id
|
||||
|
||||
prev_state_ids (dict[(str, str), str]):
|
||||
The current state map excluding the current event.
|
||||
(type, state_key) -> event_id
|
||||
|
||||
state_group (int|None): state group id, if the state has been stored
|
||||
as a state group. This is usually only None if e.g. the event is
|
||||
an outlier.
|
||||
@@ -45,38 +41,77 @@ class EventContext(object):
|
||||
|
||||
prev_state_events (?): XXX: is this ever set to anything other than
|
||||
the empty list?
|
||||
|
||||
_current_state_ids (dict[(str, str), str]|None):
|
||||
The current state map including the current event. None if outlier
|
||||
or we haven't fetched the state from DB yet.
|
||||
(type, state_key) -> event_id
|
||||
|
||||
_prev_state_ids (dict[(str, str), str]|None):
|
||||
The current state map excluding the current event. None if outlier
|
||||
or we haven't fetched the state from DB yet.
|
||||
(type, state_key) -> event_id
|
||||
|
||||
_fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
|
||||
been calculated. None if we haven't started calculating yet
|
||||
|
||||
_event_type (str): The type of the event the context is associated with.
|
||||
Only set when state has not been fetched yet.
|
||||
|
||||
_event_state_key (str|None): The state_key of the event the context is
|
||||
associated with. Only set when state has not been fetched yet.
|
||||
|
||||
_prev_state_id (str|None): If the event associated with the context is
|
||||
a state event, then `_prev_state_id` is the event_id of the state
|
||||
that was replaced.
|
||||
Only set when state has not been fetched yet.
|
||||
"""
|
||||
|
||||
__slots__ = [
|
||||
"current_state_ids",
|
||||
"prev_state_ids",
|
||||
"state_group",
|
||||
"rejected",
|
||||
"prev_group",
|
||||
"delta_ids",
|
||||
"prev_state_events",
|
||||
"app_service",
|
||||
"_current_state_ids",
|
||||
"_prev_state_ids",
|
||||
"_prev_state_id",
|
||||
"_event_type",
|
||||
"_event_state_key",
|
||||
"_fetching_state_deferred",
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
# The current state including the current event
|
||||
self.current_state_ids = None
|
||||
# The current state excluding the current event
|
||||
self.prev_state_ids = None
|
||||
self.state_group = None
|
||||
|
||||
self.prev_state_events = []
|
||||
self.rejected = False
|
||||
self.app_service = None
|
||||
|
||||
@staticmethod
|
||||
def with_state(state_group, current_state_ids, prev_state_ids,
|
||||
prev_group=None, delta_ids=None):
|
||||
context = EventContext()
|
||||
|
||||
# The current state including the current event
|
||||
context._current_state_ids = current_state_ids
|
||||
# The current state excluding the current event
|
||||
context._prev_state_ids = prev_state_ids
|
||||
context.state_group = state_group
|
||||
|
||||
context._prev_state_id = None
|
||||
context._event_type = None
|
||||
context._event_state_key = None
|
||||
context._fetching_state_deferred = defer.succeed(None)
|
||||
|
||||
# A previously persisted state group and a delta between that
|
||||
# and this state.
|
||||
self.prev_group = None
|
||||
self.delta_ids = None
|
||||
context.prev_group = prev_group
|
||||
context.delta_ids = delta_ids
|
||||
|
||||
self.prev_state_events = None
|
||||
return context
|
||||
|
||||
self.app_service = None
|
||||
|
||||
def serialize(self, event):
|
||||
@defer.inlineCallbacks
|
||||
def serialize(self, event, store):
|
||||
"""Converts self to a type that can be serialized as JSON, and then
|
||||
deserialized by `deserialize`
|
||||
|
||||
@@ -92,11 +127,12 @@ class EventContext(object):
|
||||
# the prev_state_ids, so if we're a state event we include the event
|
||||
# id that we replaced in the state.
|
||||
if event.is_state():
|
||||
prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
|
||||
prev_state_ids = yield self.get_prev_state_ids(store)
|
||||
prev_state_id = prev_state_ids.get((event.type, event.state_key))
|
||||
else:
|
||||
prev_state_id = None
|
||||
|
||||
return {
|
||||
defer.returnValue({
|
||||
"prev_state_id": prev_state_id,
|
||||
"event_type": event.type,
|
||||
"event_state_key": event.state_key if event.is_state() else None,
|
||||
@@ -106,10 +142,9 @@ class EventContext(object):
|
||||
"delta_ids": _encode_state_dict(self.delta_ids),
|
||||
"prev_state_events": self.prev_state_events,
|
||||
"app_service_id": self.app_service.id if self.app_service else None
|
||||
}
|
||||
})
|
||||
|
||||
@staticmethod
|
||||
@defer.inlineCallbacks
|
||||
def deserialize(store, input):
|
||||
"""Converts a dict that was produced by `serialize` back into a
|
||||
EventContext.
|
||||
@@ -122,32 +157,115 @@ class EventContext(object):
|
||||
EventContext
|
||||
"""
|
||||
context = EventContext()
|
||||
context.state_group = input["state_group"]
|
||||
context.rejected = input["rejected"]
|
||||
context.prev_group = input["prev_group"]
|
||||
context.delta_ids = _decode_state_dict(input["delta_ids"])
|
||||
context.prev_state_events = input["prev_state_events"]
|
||||
|
||||
# We use the state_group and prev_state_id stuff to pull the
|
||||
# current_state_ids out of the DB and construct prev_state_ids.
|
||||
prev_state_id = input["prev_state_id"]
|
||||
event_type = input["event_type"]
|
||||
event_state_key = input["event_state_key"]
|
||||
context._prev_state_id = input["prev_state_id"]
|
||||
context._event_type = input["event_type"]
|
||||
context._event_state_key = input["event_state_key"]
|
||||
|
||||
context.current_state_ids = yield store.get_state_ids_for_group(
|
||||
context.state_group,
|
||||
)
|
||||
if prev_state_id and event_state_key:
|
||||
context.prev_state_ids = dict(context.current_state_ids)
|
||||
context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
|
||||
else:
|
||||
context.prev_state_ids = context.current_state_ids
|
||||
context._current_state_ids = None
|
||||
context._prev_state_ids = None
|
||||
context._fetching_state_deferred = None
|
||||
|
||||
context.state_group = input["state_group"]
|
||||
context.prev_group = input["prev_group"]
|
||||
context.delta_ids = _decode_state_dict(input["delta_ids"])
|
||||
|
||||
context.rejected = input["rejected"]
|
||||
context.prev_state_events = input["prev_state_events"]
|
||||
|
||||
app_service_id = input["app_service_id"]
|
||||
if app_service_id:
|
||||
context.app_service = store.get_app_service_by_id(app_service_id)
|
||||
|
||||
defer.returnValue(context)
|
||||
return context
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_current_state_ids(self, store):
|
||||
"""Gets the current state IDs
|
||||
|
||||
Returns:
|
||||
Deferred[dict[(str, str), str]|None]: Returns None if state_group
|
||||
is None, which happens when the associated event is an outlier.
|
||||
"""
|
||||
|
||||
if not self._fetching_state_deferred:
|
||||
self._fetching_state_deferred = run_in_background(
|
||||
self._fill_out_state, store,
|
||||
)
|
||||
|
||||
yield make_deferred_yieldable(self._fetching_state_deferred)
|
||||
|
||||
defer.returnValue(self._current_state_ids)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_prev_state_ids(self, store):
|
||||
"""Gets the prev state IDs
|
||||
|
||||
Returns:
|
||||
Deferred[dict[(str, str), str]|None]: Returns None if state_group
|
||||
is None, which happens when the associated event is an outlier.
|
||||
"""
|
||||
|
||||
if not self._fetching_state_deferred:
|
||||
self._fetching_state_deferred = run_in_background(
|
||||
self._fill_out_state, store,
|
||||
)
|
||||
|
||||
yield make_deferred_yieldable(self._fetching_state_deferred)
|
||||
|
||||
defer.returnValue(self._prev_state_ids)
|
||||
|
||||
def get_cached_current_state_ids(self):
|
||||
"""Gets the current state IDs if we have them already cached.
|
||||
|
||||
Returns:
|
||||
dict[(str, str), str]|None: Returns None if we haven't cached the
|
||||
state or if state_group is None, which happens when the associated
|
||||
event is an outlier.
|
||||
"""
|
||||
|
||||
return self._current_state_ids
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _fill_out_state(self, store):
|
||||
"""Called to populate the _current_state_ids and _prev_state_ids
|
||||
attributes by loading from the database.
|
||||
"""
|
||||
if self.state_group is None:
|
||||
return
|
||||
|
||||
self._current_state_ids = yield store.get_state_ids_for_group(
|
||||
self.state_group,
|
||||
)
|
||||
if self._prev_state_id and self._event_state_key is not None:
|
||||
self._prev_state_ids = dict(self._current_state_ids)
|
||||
|
||||
key = (self._event_type, self._event_state_key)
|
||||
self._prev_state_ids[key] = self._prev_state_id
|
||||
else:
|
||||
self._prev_state_ids = self._current_state_ids
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_state(self, state_group, prev_state_ids, current_state_ids,
|
||||
prev_group, delta_ids):
|
||||
"""Replace the state in the context
|
||||
"""
|
||||
|
||||
# We need to make sure we wait for any ongoing fetching of state
|
||||
# to complete so that the updated state doesn't get clobbered
|
||||
if self._fetching_state_deferred:
|
||||
yield make_deferred_yieldable(self._fetching_state_deferred)
|
||||
|
||||
self.state_group = state_group
|
||||
self._prev_state_ids = prev_state_ids
|
||||
self.prev_group = prev_group
|
||||
self._current_state_ids = current_state_ids
|
||||
self.delta_ids = delta_ids
|
||||
|
||||
# We need to ensure that that we've marked as having fetched the state
|
||||
self._fetching_state_deferred = defer.succeed(None)
|
||||
|
||||
|
||||
def _encode_state_dict(state_dict):
|
||||
@@ -159,7 +277,7 @@ def _encode_state_dict(state_dict):
|
||||
|
||||
return [
|
||||
(etype, state_key, v)
|
||||
for (etype, state_key), v in state_dict.iteritems()
|
||||
for (etype, state_key), v in iteritems(state_dict)
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import Membership
|
||||
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException,
|
||||
FederationDeniedError,
|
||||
@@ -48,6 +48,13 @@ sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["t
|
||||
PDU_RETRY_TIME_MS = 1 * 60 * 1000
|
||||
|
||||
|
||||
class InvalidResponseError(RuntimeError):
|
||||
"""Helper for _try_destination_list: indicates that the server returned a response
|
||||
we couldn't parse
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class FederationClient(FederationBase):
|
||||
def __init__(self, hs):
|
||||
super(FederationClient, self).__init__(hs)
|
||||
@@ -458,8 +465,63 @@ class FederationClient(FederationBase):
|
||||
defer.returnValue(signed_auth)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _try_destination_list(self, description, destinations, callback):
|
||||
"""Try an operation on a series of servers, until it succeeds
|
||||
|
||||
Args:
|
||||
description (unicode): description of the operation we're doing, for logging
|
||||
|
||||
destinations (Iterable[unicode]): list of server_names to try
|
||||
|
||||
callback (callable): Function to run for each server. Passed a single
|
||||
argument: the server_name to try. May return a deferred.
|
||||
|
||||
If the callback raises a CodeMessageException with a 300/400 code,
|
||||
attempts to perform the operation stop immediately and the exception is
|
||||
reraised.
|
||||
|
||||
Otherwise, if the callback raises an Exception the error is logged and the
|
||||
next server tried. Normally the stacktrace is logged but this is
|
||||
suppressed if the exception is an InvalidResponseError.
|
||||
|
||||
Returns:
|
||||
The [Deferred] result of callback, if it succeeds
|
||||
|
||||
Raises:
|
||||
SynapseError if the chosen remote server returns a 300/400 code.
|
||||
|
||||
RuntimeError if no servers were reachable.
|
||||
"""
|
||||
for destination in destinations:
|
||||
if destination == self.server_name:
|
||||
continue
|
||||
|
||||
try:
|
||||
res = yield callback(destination)
|
||||
defer.returnValue(res)
|
||||
except InvalidResponseError as e:
|
||||
logger.warn(
|
||||
"Failed to %s via %s: %s",
|
||||
description, destination, e,
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
if not 500 <= e.code < 600:
|
||||
raise e.to_synapse_error()
|
||||
else:
|
||||
logger.warn(
|
||||
"Failed to %s via %s: %i %s",
|
||||
description, destination, e.code, e.message,
|
||||
)
|
||||
except Exception:
|
||||
logger.warn(
|
||||
"Failed to %s via %s",
|
||||
description, destination, exc_info=1,
|
||||
)
|
||||
|
||||
raise RuntimeError("Failed to %s via any server" % (description, ))
|
||||
|
||||
def make_membership_event(self, destinations, room_id, user_id, membership,
|
||||
content={},):
|
||||
content, params):
|
||||
"""
|
||||
Creates an m.room.member event, with context, without participating in the room.
|
||||
|
||||
@@ -475,13 +537,15 @@ class FederationClient(FederationBase):
|
||||
user_id (str): The user whose membership is being evented.
|
||||
membership (str): The "membership" property of the event. Must be
|
||||
one of "join" or "leave".
|
||||
content (object): Any additional data to put into the content field
|
||||
content (dict): Any additional data to put into the content field
|
||||
of the event.
|
||||
params (dict[str, str|Iterable[str]]): Query parameters to include in the
|
||||
request.
|
||||
Return:
|
||||
Deferred: resolves to a tuple of (origin (str), event (object))
|
||||
where origin is the remote homeserver which generated the event.
|
||||
|
||||
Fails with a ``CodeMessageException`` if the chosen remote server
|
||||
Fails with a ``SynapseError`` if the chosen remote server
|
||||
returns a 300/400 code.
|
||||
|
||||
Fails with a ``RuntimeError`` if no servers were reachable.
|
||||
@@ -492,50 +556,37 @@ class FederationClient(FederationBase):
|
||||
"make_membership_event called with membership='%s', must be one of %s" %
|
||||
(membership, ",".join(valid_memberships))
|
||||
)
|
||||
for destination in destinations:
|
||||
if destination == self.server_name:
|
||||
continue
|
||||
|
||||
try:
|
||||
ret = yield self.transport_layer.make_membership_event(
|
||||
destination, room_id, user_id, membership
|
||||
)
|
||||
@defer.inlineCallbacks
|
||||
def send_request(destination):
|
||||
ret = yield self.transport_layer.make_membership_event(
|
||||
destination, room_id, user_id, membership, params,
|
||||
)
|
||||
|
||||
pdu_dict = ret["event"]
|
||||
pdu_dict = ret.get("event", None)
|
||||
if not isinstance(pdu_dict, dict):
|
||||
raise InvalidResponseError("Bad 'event' field in response")
|
||||
|
||||
logger.debug("Got response to make_%s: %s", membership, pdu_dict)
|
||||
logger.debug("Got response to make_%s: %s", membership, pdu_dict)
|
||||
|
||||
pdu_dict["content"].update(content)
|
||||
pdu_dict["content"].update(content)
|
||||
|
||||
# The protoevent received over the JSON wire may not have all
|
||||
# the required fields. Lets just gloss over that because
|
||||
# there's some we never care about
|
||||
if "prev_state" not in pdu_dict:
|
||||
pdu_dict["prev_state"] = []
|
||||
# The protoevent received over the JSON wire may not have all
|
||||
# the required fields. Lets just gloss over that because
|
||||
# there's some we never care about
|
||||
if "prev_state" not in pdu_dict:
|
||||
pdu_dict["prev_state"] = []
|
||||
|
||||
ev = builder.EventBuilder(pdu_dict)
|
||||
ev = builder.EventBuilder(pdu_dict)
|
||||
|
||||
defer.returnValue(
|
||||
(destination, ev)
|
||||
)
|
||||
break
|
||||
except CodeMessageException as e:
|
||||
if not 500 <= e.code < 600:
|
||||
raise
|
||||
else:
|
||||
logger.warn(
|
||||
"Failed to make_%s via %s: %s",
|
||||
membership, destination, e.message
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warn(
|
||||
"Failed to make_%s via %s: %s",
|
||||
membership, destination, e.message
|
||||
)
|
||||
defer.returnValue(
|
||||
(destination, ev)
|
||||
)
|
||||
|
||||
raise RuntimeError("Failed to send to any server.")
|
||||
return self._try_destination_list(
|
||||
"make_" + membership, destinations, send_request,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send_join(self, destinations, pdu):
|
||||
"""Sends a join event to one of a list of homeservers.
|
||||
|
||||
@@ -552,103 +603,111 @@ class FederationClient(FederationBase):
|
||||
giving the serer the event was sent to, ``state`` (?) and
|
||||
``auth_chain``.
|
||||
|
||||
Fails with a ``CodeMessageException`` if the chosen remote server
|
||||
Fails with a ``SynapseError`` if the chosen remote server
|
||||
returns a 300/400 code.
|
||||
|
||||
Fails with a ``RuntimeError`` if no servers were reachable.
|
||||
"""
|
||||
|
||||
for destination in destinations:
|
||||
if destination == self.server_name:
|
||||
continue
|
||||
|
||||
try:
|
||||
time_now = self._clock.time_msec()
|
||||
_, content = yield self.transport_layer.send_join(
|
||||
destination=destination,
|
||||
room_id=pdu.room_id,
|
||||
event_id=pdu.event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
def check_authchain_validity(signed_auth_chain):
|
||||
for e in signed_auth_chain:
|
||||
if e.type == EventTypes.Create:
|
||||
create_event = e
|
||||
break
|
||||
else:
|
||||
raise InvalidResponseError(
|
||||
"no %s in auth chain" % (EventTypes.Create,),
|
||||
)
|
||||
|
||||
logger.debug("Got content: %s", content)
|
||||
# the room version should be sane.
|
||||
room_version = create_event.content.get("room_version", "1")
|
||||
if room_version not in KNOWN_ROOM_VERSIONS:
|
||||
# This shouldn't be possible, because the remote server should have
|
||||
# rejected the join attempt during make_join.
|
||||
raise InvalidResponseError(
|
||||
"room appears to have unsupported version %s" % (
|
||||
room_version,
|
||||
))
|
||||
|
||||
state = [
|
||||
event_from_pdu_json(p, outlier=True)
|
||||
for p in content.get("state", [])
|
||||
]
|
||||
@defer.inlineCallbacks
|
||||
def send_request(destination):
|
||||
time_now = self._clock.time_msec()
|
||||
_, content = yield self.transport_layer.send_join(
|
||||
destination=destination,
|
||||
room_id=pdu.room_id,
|
||||
event_id=pdu.event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
)
|
||||
|
||||
auth_chain = [
|
||||
event_from_pdu_json(p, outlier=True)
|
||||
for p in content.get("auth_chain", [])
|
||||
]
|
||||
logger.debug("Got content: %s", content)
|
||||
|
||||
pdus = {
|
||||
p.event_id: p
|
||||
for p in itertools.chain(state, auth_chain)
|
||||
}
|
||||
state = [
|
||||
event_from_pdu_json(p, outlier=True)
|
||||
for p in content.get("state", [])
|
||||
]
|
||||
|
||||
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
|
||||
destination, list(pdus.values()),
|
||||
outlier=True,
|
||||
)
|
||||
auth_chain = [
|
||||
event_from_pdu_json(p, outlier=True)
|
||||
for p in content.get("auth_chain", [])
|
||||
]
|
||||
|
||||
valid_pdus_map = {
|
||||
p.event_id: p
|
||||
for p in valid_pdus
|
||||
}
|
||||
pdus = {
|
||||
p.event_id: p
|
||||
for p in itertools.chain(state, auth_chain)
|
||||
}
|
||||
|
||||
# NB: We *need* to copy to ensure that we don't have multiple
|
||||
# references being passed on, as that causes... issues.
|
||||
signed_state = [
|
||||
copy.copy(valid_pdus_map[p.event_id])
|
||||
for p in state
|
||||
if p.event_id in valid_pdus_map
|
||||
]
|
||||
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
|
||||
destination, list(pdus.values()),
|
||||
outlier=True,
|
||||
)
|
||||
|
||||
signed_auth = [
|
||||
valid_pdus_map[p.event_id]
|
||||
for p in auth_chain
|
||||
if p.event_id in valid_pdus_map
|
||||
]
|
||||
valid_pdus_map = {
|
||||
p.event_id: p
|
||||
for p in valid_pdus
|
||||
}
|
||||
|
||||
# NB: We *need* to copy to ensure that we don't have multiple
|
||||
# references being passed on, as that causes... issues.
|
||||
for s in signed_state:
|
||||
s.internal_metadata = copy.deepcopy(s.internal_metadata)
|
||||
# NB: We *need* to copy to ensure that we don't have multiple
|
||||
# references being passed on, as that causes... issues.
|
||||
signed_state = [
|
||||
copy.copy(valid_pdus_map[p.event_id])
|
||||
for p in state
|
||||
if p.event_id in valid_pdus_map
|
||||
]
|
||||
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
signed_auth = [
|
||||
valid_pdus_map[p.event_id]
|
||||
for p in auth_chain
|
||||
if p.event_id in valid_pdus_map
|
||||
]
|
||||
|
||||
defer.returnValue({
|
||||
"state": signed_state,
|
||||
"auth_chain": signed_auth,
|
||||
"origin": destination,
|
||||
})
|
||||
except CodeMessageException as e:
|
||||
if not 500 <= e.code < 600:
|
||||
raise
|
||||
else:
|
||||
logger.exception(
|
||||
"Failed to send_join via %s: %s",
|
||||
destination, e.message
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Failed to send_join via %s: %s",
|
||||
destination, e.message
|
||||
)
|
||||
# NB: We *need* to copy to ensure that we don't have multiple
|
||||
# references being passed on, as that causes... issues.
|
||||
for s in signed_state:
|
||||
s.internal_metadata = copy.deepcopy(s.internal_metadata)
|
||||
|
||||
raise RuntimeError("Failed to send to any server.")
|
||||
check_authchain_validity(signed_auth)
|
||||
|
||||
defer.returnValue({
|
||||
"state": signed_state,
|
||||
"auth_chain": signed_auth,
|
||||
"origin": destination,
|
||||
})
|
||||
return self._try_destination_list("send_join", destinations, send_request)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send_invite(self, destination, room_id, event_id, pdu):
|
||||
time_now = self._clock.time_msec()
|
||||
code, content = yield self.transport_layer.send_invite(
|
||||
destination=destination,
|
||||
room_id=room_id,
|
||||
event_id=event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
)
|
||||
try:
|
||||
code, content = yield self.transport_layer.send_invite(
|
||||
destination=destination,
|
||||
room_id=room_id,
|
||||
event_id=event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
if e.code == 403:
|
||||
raise e.to_synapse_error()
|
||||
raise
|
||||
|
||||
pdu_dict = content["event"]
|
||||
|
||||
@@ -663,7 +722,6 @@ class FederationClient(FederationBase):
|
||||
|
||||
defer.returnValue(pdu)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send_leave(self, destinations, pdu):
|
||||
"""Sends a leave event to one of a list of homeservers.
|
||||
|
||||
@@ -680,35 +738,25 @@ class FederationClient(FederationBase):
|
||||
Return:
|
||||
Deferred: resolves to None.
|
||||
|
||||
Fails with a ``CodeMessageException`` if the chosen remote server
|
||||
returns a non-200 code.
|
||||
Fails with a ``SynapseError`` if the chosen remote server
|
||||
returns a 300/400 code.
|
||||
|
||||
Fails with a ``RuntimeError`` if no servers were reachable.
|
||||
"""
|
||||
for destination in destinations:
|
||||
if destination == self.server_name:
|
||||
continue
|
||||
@defer.inlineCallbacks
|
||||
def send_request(destination):
|
||||
time_now = self._clock.time_msec()
|
||||
_, content = yield self.transport_layer.send_leave(
|
||||
destination=destination,
|
||||
room_id=pdu.room_id,
|
||||
event_id=pdu.event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
)
|
||||
|
||||
try:
|
||||
time_now = self._clock.time_msec()
|
||||
_, content = yield self.transport_layer.send_leave(
|
||||
destination=destination,
|
||||
room_id=pdu.room_id,
|
||||
event_id=pdu.event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
)
|
||||
logger.debug("Got content: %s", content)
|
||||
defer.returnValue(None)
|
||||
|
||||
logger.debug("Got content: %s", content)
|
||||
defer.returnValue(None)
|
||||
except CodeMessageException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Failed to send_leave via %s: %s",
|
||||
destination, e.message
|
||||
)
|
||||
|
||||
raise RuntimeError("Failed to send to any server.")
|
||||
return self._try_destination_list("send_leave", destinations, send_request)
|
||||
|
||||
def get_public_rooms(self, destination, limit=None, since_token=None,
|
||||
search_filter=None, include_all_networks=False,
|
||||
|
||||
@@ -24,16 +24,27 @@ from prometheus_client import Counter
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.abstract import isIPAddress
|
||||
from twisted.python import failure
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
FederationError,
|
||||
IncompatibleRoomVersionError,
|
||||
NotFoundError,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.crypto.event_signing import compute_event_signature
|
||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||
from synapse.federation.persistence import TransactionActions
|
||||
from synapse.federation.units import Edu, Transaction
|
||||
from synapse.http.endpoint import parse_server_name
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationFederationSendEduRestServlet,
|
||||
ReplicationGetQueryRestServlet,
|
||||
)
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util import async
|
||||
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.logutils import log_function
|
||||
|
||||
@@ -60,8 +71,8 @@ class FederationServer(FederationBase):
|
||||
self.auth = hs.get_auth()
|
||||
self.handler = hs.get_handlers().federation_handler
|
||||
|
||||
self._server_linearizer = async.Linearizer("fed_server")
|
||||
self._transaction_linearizer = async.Linearizer("fed_txn_handler")
|
||||
self._server_linearizer = Linearizer("fed_server")
|
||||
self._transaction_linearizer = Linearizer("fed_txn_handler")
|
||||
|
||||
self.transaction_actions = TransactionActions(self.store)
|
||||
|
||||
@@ -186,10 +197,14 @@ class FederationServer(FederationBase):
|
||||
logger.warn("Error handling PDU %s: %s", event_id, e)
|
||||
pdu_results[event_id] = {"error": str(e)}
|
||||
except Exception as e:
|
||||
f = failure.Failure()
|
||||
pdu_results[event_id] = {"error": str(e)}
|
||||
logger.exception("Failed to handle PDU %s", event_id)
|
||||
logger.error(
|
||||
"Failed to handle PDU %s: %s",
|
||||
event_id, f.getTraceback().rstrip(),
|
||||
)
|
||||
|
||||
yield async.concurrently_execute(
|
||||
yield concurrently_execute(
|
||||
process_pdus_for_room, pdus_by_room.keys(),
|
||||
TRANSACTION_CONCURRENCY_LIMIT,
|
||||
)
|
||||
@@ -202,10 +217,6 @@ class FederationServer(FederationBase):
|
||||
edu.content
|
||||
)
|
||||
|
||||
pdu_failures = getattr(transaction, "pdu_failures", [])
|
||||
for failure in pdu_failures:
|
||||
logger.info("Got failure %r", failure)
|
||||
|
||||
response = {
|
||||
"pdus": pdu_results,
|
||||
}
|
||||
@@ -322,12 +333,21 @@ class FederationServer(FederationBase):
|
||||
defer.returnValue((200, resp))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_make_join_request(self, origin, room_id, user_id):
|
||||
def on_make_join_request(self, origin, room_id, user_id, supported_versions):
|
||||
origin_host, _ = parse_server_name(origin)
|
||||
yield self.check_server_matches_acl(origin_host, room_id)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
if room_version not in supported_versions:
|
||||
logger.warn("Room version %s not in %s", room_version, supported_versions)
|
||||
raise IncompatibleRoomVersionError(room_version=room_version)
|
||||
|
||||
pdu = yield self.handler.on_make_join_request(room_id, user_id)
|
||||
time_now = self._clock.time_msec()
|
||||
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
|
||||
defer.returnValue({
|
||||
"event": pdu.get_pdu_json(time_now),
|
||||
"room_version": room_version,
|
||||
})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_invite_request(self, origin, content):
|
||||
@@ -425,6 +445,7 @@ class FederationServer(FederationBase):
|
||||
ret = yield self.handler.on_query_auth(
|
||||
origin,
|
||||
event_id,
|
||||
room_id,
|
||||
signed_auth,
|
||||
content.get("rejects", []),
|
||||
content.get("missing", []),
|
||||
@@ -743,6 +764,8 @@ class FederationHandlerRegistry(object):
|
||||
if edu_type in self.edu_handlers:
|
||||
raise KeyError("Already have an EDU handler for %s" % (edu_type,))
|
||||
|
||||
logger.info("Registering federation EDU handler for %r", edu_type)
|
||||
|
||||
self.edu_handlers[edu_type] = handler
|
||||
|
||||
def register_query_handler(self, query_type, handler):
|
||||
@@ -761,6 +784,8 @@ class FederationHandlerRegistry(object):
|
||||
"Already have a Query handler for %s" % (query_type,)
|
||||
)
|
||||
|
||||
logger.info("Registering federation query handler for %r", query_type)
|
||||
|
||||
self.query_handlers[query_type] = handler
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -783,3 +808,49 @@ class FederationHandlerRegistry(object):
|
||||
raise NotFoundError("No handler for Query type '%s'" % (query_type,))
|
||||
|
||||
return handler(args)
|
||||
|
||||
|
||||
class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
|
||||
"""A FederationHandlerRegistry for worker processes.
|
||||
|
||||
When receiving EDU or queries it will check if an appropriate handler has
|
||||
been registered on the worker, if there isn't one then it calls off to the
|
||||
master process.
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
self.config = hs.config
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
|
||||
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
|
||||
|
||||
super(ReplicationFederationHandlerRegistry, self).__init__()
|
||||
|
||||
def on_edu(self, edu_type, origin, content):
|
||||
"""Overrides FederationHandlerRegistry
|
||||
"""
|
||||
handler = self.edu_handlers.get(edu_type)
|
||||
if handler:
|
||||
return super(ReplicationFederationHandlerRegistry, self).on_edu(
|
||||
edu_type, origin, content,
|
||||
)
|
||||
|
||||
return self._send_edu(
|
||||
edu_type=edu_type,
|
||||
origin=origin,
|
||||
content=content,
|
||||
)
|
||||
|
||||
def on_query(self, query_type, args):
|
||||
"""Overrides FederationHandlerRegistry
|
||||
"""
|
||||
handler = self.query_handlers.get(query_type)
|
||||
if handler:
|
||||
return handler(args)
|
||||
|
||||
return self._get_query_client(
|
||||
query_type=query_type,
|
||||
args=args,
|
||||
)
|
||||
|
||||
@@ -62,8 +62,6 @@ class FederationRemoteSendQueue(object):
|
||||
|
||||
self.edus = SortedDict() # stream position -> Edu
|
||||
|
||||
self.failures = SortedDict() # stream position -> (destination, Failure)
|
||||
|
||||
self.device_messages = SortedDict() # stream position -> destination
|
||||
|
||||
self.pos = 1
|
||||
@@ -79,7 +77,7 @@ class FederationRemoteSendQueue(object):
|
||||
|
||||
for queue_name in [
|
||||
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
|
||||
"edus", "failures", "device_messages", "pos_time",
|
||||
"edus", "device_messages", "pos_time",
|
||||
]:
|
||||
register(queue_name, getattr(self, queue_name))
|
||||
|
||||
@@ -149,12 +147,6 @@ class FederationRemoteSendQueue(object):
|
||||
for key in keys[:i]:
|
||||
del self.edus[key]
|
||||
|
||||
# Delete things out of failure map
|
||||
keys = self.failures.keys()
|
||||
i = self.failures.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.failures[key]
|
||||
|
||||
# Delete things out of device map
|
||||
keys = self.device_messages.keys()
|
||||
i = self.device_messages.bisect_left(position_to_delete)
|
||||
@@ -204,13 +196,6 @@ class FederationRemoteSendQueue(object):
|
||||
|
||||
self.notifier.on_new_replication_data()
|
||||
|
||||
def send_failure(self, failure, destination):
|
||||
"""As per TransactionQueue"""
|
||||
pos = self._next_pos()
|
||||
|
||||
self.failures[pos] = (destination, str(failure))
|
||||
self.notifier.on_new_replication_data()
|
||||
|
||||
def send_device_messages(self, destination):
|
||||
"""As per TransactionQueue"""
|
||||
pos = self._next_pos()
|
||||
@@ -285,17 +270,6 @@ class FederationRemoteSendQueue(object):
|
||||
for (pos, edu) in edus:
|
||||
rows.append((pos, EduRow(edu)))
|
||||
|
||||
# Fetch changed failures
|
||||
i = self.failures.bisect_right(from_token)
|
||||
j = self.failures.bisect_right(to_token) + 1
|
||||
failures = self.failures.items()[i:j]
|
||||
|
||||
for (pos, (destination, failure)) in failures:
|
||||
rows.append((pos, FailureRow(
|
||||
destination=destination,
|
||||
failure=failure,
|
||||
)))
|
||||
|
||||
# Fetch changed device messages
|
||||
i = self.device_messages.bisect_right(from_token)
|
||||
j = self.device_messages.bisect_right(to_token) + 1
|
||||
@@ -417,34 +391,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", (
|
||||
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
|
||||
|
||||
|
||||
class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
|
||||
"destination", # str
|
||||
"failure",
|
||||
))):
|
||||
"""Streams failures to a remote server. Failures are issued when there was
|
||||
something wrong with a transaction the remote sent us, e.g. it included
|
||||
an event that was invalid.
|
||||
"""
|
||||
|
||||
TypeId = "f"
|
||||
|
||||
@staticmethod
|
||||
def from_data(data):
|
||||
return FailureRow(
|
||||
destination=data["destination"],
|
||||
failure=data["failure"],
|
||||
)
|
||||
|
||||
def to_data(self):
|
||||
return {
|
||||
"destination": self.destination,
|
||||
"failure": self.failure,
|
||||
}
|
||||
|
||||
def add_to_buffer(self, buff):
|
||||
buff.failures.setdefault(self.destination, []).append(self.failure)
|
||||
|
||||
|
||||
class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
|
||||
"destination", # str
|
||||
))):
|
||||
@@ -471,7 +417,6 @@ TypeToRow = {
|
||||
PresenceRow,
|
||||
KeyedEduRow,
|
||||
EduRow,
|
||||
FailureRow,
|
||||
DeviceRow,
|
||||
)
|
||||
}
|
||||
@@ -481,7 +426,6 @@ ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
|
||||
"presence", # list(UserPresenceState)
|
||||
"keyed_edus", # dict of destination -> { key -> Edu }
|
||||
"edus", # dict of destination -> [Edu]
|
||||
"failures", # dict of destination -> [failures]
|
||||
"device_destinations", # set of destinations
|
||||
))
|
||||
|
||||
@@ -503,7 +447,6 @@ def process_rows_for_federation(transaction_queue, rows):
|
||||
presence=[],
|
||||
keyed_edus={},
|
||||
edus={},
|
||||
failures={},
|
||||
device_destinations=set(),
|
||||
)
|
||||
|
||||
@@ -532,9 +475,5 @@ def process_rows_for_federation(transaction_queue, rows):
|
||||
edu.destination, edu.edu_type, edu.content, key=None,
|
||||
)
|
||||
|
||||
for destination, failure_list in iteritems(buff.failures):
|
||||
for failure in failure_list:
|
||||
transaction_queue.send_failure(destination, failure)
|
||||
|
||||
for destination in buff.device_destinations:
|
||||
transaction_queue.send_device_messages(destination)
|
||||
|
||||
@@ -26,11 +26,14 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException
|
||||
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
|
||||
from synapse.metrics import (
|
||||
LaterGauge,
|
||||
event_processing_loop_counter,
|
||||
event_processing_loop_room_count,
|
||||
events_processed_counter,
|
||||
sent_edus_counter,
|
||||
sent_transactions_counter,
|
||||
)
|
||||
from synapse.util import PreserveLoggingContext, logcontext
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
||||
|
||||
@@ -55,6 +58,7 @@ class TransactionQueue(object):
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.server_name = hs.hostname
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
@@ -115,9 +119,6 @@ class TransactionQueue(object):
|
||||
),
|
||||
)
|
||||
|
||||
# destination -> list of tuple(failure, deferred)
|
||||
self.pending_failures_by_dest = {}
|
||||
|
||||
# destination -> stream_id of last successfully sent to-device message.
|
||||
# NB: may be a long or an int.
|
||||
self.last_device_stream_id_by_dest = {}
|
||||
@@ -165,10 +166,11 @@ class TransactionQueue(object):
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
# fire off a processing loop in the background. It's likely it will
|
||||
# outlast the current request, so run it in the sentinel logcontext.
|
||||
with PreserveLoggingContext():
|
||||
self._process_event_queue_loop()
|
||||
# fire off a processing loop in the background
|
||||
run_as_background_process(
|
||||
"process_event_queue_for_federation",
|
||||
self._process_event_queue_loop,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _process_event_queue_loop(self):
|
||||
@@ -254,7 +256,13 @@ class TransactionQueue(object):
|
||||
synapse.metrics.event_processing_last_ts.labels(
|
||||
"federation_sender").set(ts)
|
||||
|
||||
events_processed_counter.inc(len(events))
|
||||
events_processed_counter.inc(len(events))
|
||||
|
||||
event_processing_loop_room_count.labels(
|
||||
"federation_sender"
|
||||
).inc(len(events_by_room))
|
||||
|
||||
event_processing_loop_counter.labels("federation_sender").inc()
|
||||
|
||||
synapse.metrics.event_processing_positions.labels(
|
||||
"federation_sender").set(next_token)
|
||||
@@ -301,6 +309,9 @@ class TransactionQueue(object):
|
||||
Args:
|
||||
states (list(UserPresenceState))
|
||||
"""
|
||||
if not self.hs.config.use_presence:
|
||||
# No-op if presence is disabled.
|
||||
return
|
||||
|
||||
# First we queue up the new presence by user ID, so multiple presence
|
||||
# updates in quick successtion are correctly handled
|
||||
@@ -380,19 +391,6 @@ class TransactionQueue(object):
|
||||
|
||||
self._attempt_new_transaction(destination)
|
||||
|
||||
def send_failure(self, failure, destination):
|
||||
if destination == self.server_name or destination == "localhost":
|
||||
return
|
||||
|
||||
if not self.can_send_to(destination):
|
||||
return
|
||||
|
||||
self.pending_failures_by_dest.setdefault(
|
||||
destination, []
|
||||
).append(failure)
|
||||
|
||||
self._attempt_new_transaction(destination)
|
||||
|
||||
def send_device_messages(self, destination):
|
||||
if destination == self.server_name or destination == "localhost":
|
||||
return
|
||||
@@ -432,14 +430,11 @@ class TransactionQueue(object):
|
||||
|
||||
logger.debug("TX [%s] Starting transaction loop", destination)
|
||||
|
||||
# Drop the logcontext before starting the transaction. It doesn't
|
||||
# really make sense to log all the outbound transactions against
|
||||
# whatever path led us to this point: that's pretty arbitrary really.
|
||||
#
|
||||
# (this also means we can fire off _perform_transaction without
|
||||
# yielding)
|
||||
with logcontext.PreserveLoggingContext():
|
||||
self._transaction_transmission_loop(destination)
|
||||
run_as_background_process(
|
||||
"federation_transaction_transmission_loop",
|
||||
self._transaction_transmission_loop,
|
||||
destination,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _transaction_transmission_loop(self, destination):
|
||||
@@ -470,7 +465,6 @@ class TransactionQueue(object):
|
||||
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||
pending_presence = self.pending_presence_by_dest.pop(destination, {})
|
||||
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
||||
|
||||
pending_edus.extend(
|
||||
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
|
||||
@@ -498,7 +492,7 @@ class TransactionQueue(object):
|
||||
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|
||||
destination, len(pending_pdus))
|
||||
|
||||
if not pending_pdus and not pending_edus and not pending_failures:
|
||||
if not pending_pdus and not pending_edus:
|
||||
logger.debug("TX [%s] Nothing to send", destination)
|
||||
self.last_device_stream_id_by_dest[destination] = (
|
||||
device_stream_id
|
||||
@@ -508,7 +502,7 @@ class TransactionQueue(object):
|
||||
# END CRITICAL SECTION
|
||||
|
||||
success = yield self._send_new_transaction(
|
||||
destination, pending_pdus, pending_edus, pending_failures,
|
||||
destination, pending_pdus, pending_edus,
|
||||
)
|
||||
if success:
|
||||
sent_transactions_counter.inc()
|
||||
@@ -585,14 +579,12 @@ class TransactionQueue(object):
|
||||
|
||||
@measure_func("_send_new_transaction")
|
||||
@defer.inlineCallbacks
|
||||
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
|
||||
pending_failures):
|
||||
def _send_new_transaction(self, destination, pending_pdus, pending_edus):
|
||||
|
||||
# Sort based on the order field
|
||||
pending_pdus.sort(key=lambda t: t[1])
|
||||
pdus = [x[0] for x in pending_pdus]
|
||||
edus = pending_edus
|
||||
failures = [x.get_dict() for x in pending_failures]
|
||||
|
||||
success = True
|
||||
|
||||
@@ -602,11 +594,10 @@ class TransactionQueue(object):
|
||||
|
||||
logger.debug(
|
||||
"TX [%s] {%s} Attempting new transaction"
|
||||
" (pdus: %d, edus: %d, failures: %d)",
|
||||
" (pdus: %d, edus: %d)",
|
||||
destination, txn_id,
|
||||
len(pdus),
|
||||
len(edus),
|
||||
len(failures)
|
||||
)
|
||||
|
||||
logger.debug("TX [%s] Persisting transaction...", destination)
|
||||
@@ -618,7 +609,6 @@ class TransactionQueue(object):
|
||||
destination=destination,
|
||||
pdus=pdus,
|
||||
edus=edus,
|
||||
pdu_failures=failures,
|
||||
)
|
||||
|
||||
self._next_txn_id += 1
|
||||
@@ -628,12 +618,11 @@ class TransactionQueue(object):
|
||||
logger.debug("TX [%s] Persisted transaction", destination)
|
||||
logger.info(
|
||||
"TX [%s] {%s} Sending transaction [%s],"
|
||||
" (PDUs: %d, EDUs: %d, failures: %d)",
|
||||
" (PDUs: %d, EDUs: %d)",
|
||||
destination, txn_id,
|
||||
transaction.transaction_id,
|
||||
len(pdus),
|
||||
len(edus),
|
||||
len(failures),
|
||||
)
|
||||
|
||||
# Actually send the transaction
|
||||
|
||||
@@ -106,7 +106,7 @@ class TransportLayerClient(object):
|
||||
dest (str)
|
||||
room_id (str)
|
||||
event_tuples (list)
|
||||
limt (int)
|
||||
limit (int)
|
||||
|
||||
Returns:
|
||||
Deferred: Results in a dict received from the remote homeserver.
|
||||
@@ -195,7 +195,7 @@ class TransportLayerClient(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def make_membership_event(self, destination, room_id, user_id, membership):
|
||||
def make_membership_event(self, destination, room_id, user_id, membership, params):
|
||||
"""Asks a remote server to build and sign us a membership event
|
||||
|
||||
Note that this does not append any events to any graphs.
|
||||
@@ -205,6 +205,8 @@ class TransportLayerClient(object):
|
||||
room_id (str): room to join/leave
|
||||
user_id (str): user to be joined/left
|
||||
membership (str): one of join/leave
|
||||
params (dict[str, str|Iterable[str]]): Query parameters to include in the
|
||||
request.
|
||||
|
||||
Returns:
|
||||
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
||||
@@ -241,6 +243,7 @@ class TransportLayerClient(object):
|
||||
content = yield self.client.get_json(
|
||||
destination=destination,
|
||||
path=path,
|
||||
args=params,
|
||||
retry_on_dns_fail=retry_on_dns_fail,
|
||||
timeout=20000,
|
||||
ignore_backoff=ignore_backoff,
|
||||
|
||||
@@ -165,7 +165,7 @@ def _parse_auth_header(header_bytes):
|
||||
param_dict = dict(kv.split("=") for kv in params)
|
||||
|
||||
def strip_quotes(value):
|
||||
if value.startswith(b"\""):
|
||||
if value.startswith("\""):
|
||||
return value[1:-1]
|
||||
else:
|
||||
return value
|
||||
@@ -190,6 +190,41 @@ def _parse_auth_header(header_bytes):
|
||||
|
||||
|
||||
class BaseFederationServlet(object):
|
||||
"""Abstract base class for federation servlet classes.
|
||||
|
||||
The servlet object should have a PATH attribute which takes the form of a regexp to
|
||||
match against the request path (excluding the /federation/v1 prefix).
|
||||
|
||||
The servlet should also implement one or more of on_GET, on_POST, on_PUT, to match
|
||||
the appropriate HTTP method. These methods have the signature:
|
||||
|
||||
on_<METHOD>(self, origin, content, query, **kwargs)
|
||||
|
||||
With arguments:
|
||||
|
||||
origin (unicode|None): The authenticated server_name of the calling server,
|
||||
unless REQUIRE_AUTH is set to False and authentication failed.
|
||||
|
||||
content (unicode|None): decoded json body of the request. None if the
|
||||
request was a GET.
|
||||
|
||||
query (dict[bytes, list[bytes]]): Query params from the request. url-decoded
|
||||
(ie, '+' and '%xx' are decoded) but note that it is *not* utf8-decoded
|
||||
yet.
|
||||
|
||||
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
|
||||
components as specified in the path match regexp.
|
||||
|
||||
Returns:
|
||||
Deferred[(int, object)|None]: either (response code, response object) to
|
||||
return a JSON response, or None if the request has already been handled.
|
||||
|
||||
Raises:
|
||||
SynapseError: to return an error code
|
||||
|
||||
Exception: other exceptions will be caught, logged, and a 500 will be
|
||||
returned.
|
||||
"""
|
||||
REQUIRE_AUTH = True
|
||||
|
||||
def __init__(self, handler, authenticator, ratelimiter, server_name):
|
||||
@@ -204,6 +239,18 @@ class BaseFederationServlet(object):
|
||||
@defer.inlineCallbacks
|
||||
@functools.wraps(func)
|
||||
def new_func(request, *args, **kwargs):
|
||||
""" A callback which can be passed to HttpServer.RegisterPaths
|
||||
|
||||
Args:
|
||||
request (twisted.web.http.Request):
|
||||
*args: unused?
|
||||
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
|
||||
components as specified in the path match regexp.
|
||||
|
||||
Returns:
|
||||
Deferred[(int, object)|None]: (response code, response object) as returned
|
||||
by the callback method. None if the request has already been handled.
|
||||
"""
|
||||
content = None
|
||||
if request.method in ["PUT", "POST"]:
|
||||
# TODO: Handle other method types? other content types?
|
||||
@@ -214,10 +261,10 @@ class BaseFederationServlet(object):
|
||||
except NoAuthenticationError:
|
||||
origin = None
|
||||
if self.REQUIRE_AUTH:
|
||||
logger.exception("authenticate_request failed")
|
||||
logger.warn("authenticate_request failed: missing authentication")
|
||||
raise
|
||||
except Exception:
|
||||
logger.exception("authenticate_request failed")
|
||||
except Exception as e:
|
||||
logger.warn("authenticate_request failed: %s", e)
|
||||
raise
|
||||
|
||||
if origin:
|
||||
@@ -283,11 +330,10 @@ class FederationSendServlet(BaseFederationServlet):
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)",
|
||||
"Received txn %s from %s. (PDUs: %d, EDUs: %d)",
|
||||
transaction_id, origin,
|
||||
len(transaction_data.get("pdus", [])),
|
||||
len(transaction_data.get("edus", [])),
|
||||
len(transaction_data.get("failures", [])),
|
||||
)
|
||||
|
||||
# We should ideally be getting this from the security layer.
|
||||
@@ -385,9 +431,31 @@ class FederationMakeJoinServlet(BaseFederationServlet):
|
||||
PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, origin, content, query, context, user_id):
|
||||
def on_GET(self, origin, _content, query, context, user_id):
|
||||
"""
|
||||
Args:
|
||||
origin (unicode): The authenticated server_name of the calling server
|
||||
|
||||
_content (None): (GETs don't have bodies)
|
||||
|
||||
query (dict[bytes, list[bytes]]): Query params from the request.
|
||||
|
||||
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
|
||||
components as specified in the path match regexp.
|
||||
|
||||
Returns:
|
||||
Deferred[(int, object)|None]: either (response code, response object) to
|
||||
return a JSON response, or None if the request has already been handled.
|
||||
"""
|
||||
versions = query.get(b'ver')
|
||||
if versions is not None:
|
||||
supported_versions = [v.decode("utf-8") for v in versions]
|
||||
else:
|
||||
supported_versions = ["1"]
|
||||
|
||||
content = yield self.handler.on_make_join_request(
|
||||
origin, context, user_id,
|
||||
supported_versions=supported_versions,
|
||||
)
|
||||
defer.returnValue((200, content))
|
||||
|
||||
@@ -404,10 +472,10 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
|
||||
|
||||
|
||||
class FederationSendLeaveServlet(BaseFederationServlet):
|
||||
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)"
|
||||
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, origin, content, query, room_id, txid):
|
||||
def on_PUT(self, origin, content, query, room_id, event_id):
|
||||
content = yield self.handler.on_send_leave_request(origin, content)
|
||||
defer.returnValue((200, content))
|
||||
|
||||
|
||||
@@ -73,7 +73,6 @@ class Transaction(JsonEncodedObject):
|
||||
"previous_ids",
|
||||
"pdus",
|
||||
"edus",
|
||||
"pdu_failures",
|
||||
]
|
||||
|
||||
internal_keys = [
|
||||
|
||||
@@ -43,6 +43,7 @@ from signedjson.sign import sign_json
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util.logcontext import run_in_background
|
||||
|
||||
@@ -129,7 +130,7 @@ class GroupAttestionRenewer(object):
|
||||
self.attestations = hs.get_groups_attestation_signing()
|
||||
|
||||
self._renew_attestations_loop = self.clock.looping_call(
|
||||
self._renew_attestations, 30 * 60 * 1000,
|
||||
self._start_renew_attestations, 30 * 60 * 1000,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -151,6 +152,9 @@ class GroupAttestionRenewer(object):
|
||||
|
||||
defer.returnValue({})
|
||||
|
||||
def _start_renew_attestations(self):
|
||||
return run_as_background_process("renew_attestations", self._renew_attestations)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _renew_attestations(self):
|
||||
"""Called periodically to check if we need to update any of our attestations
|
||||
|
||||
@@ -17,9 +17,7 @@ from .admin import AdminHandler
|
||||
from .directory import DirectoryHandler
|
||||
from .federation import FederationHandler
|
||||
from .identity import IdentityHandler
|
||||
from .message import MessageHandler
|
||||
from .register import RegistrationHandler
|
||||
from .room import RoomContextHandler
|
||||
from .search import SearchHandler
|
||||
|
||||
|
||||
@@ -44,10 +42,8 @@ class Handlers(object):
|
||||
|
||||
def __init__(self, hs):
|
||||
self.registration_handler = RegistrationHandler(hs)
|
||||
self.message_handler = MessageHandler(hs)
|
||||
self.federation_handler = FederationHandler(hs)
|
||||
self.directory_handler = DirectoryHandler(hs)
|
||||
self.admin_handler = AdminHandler(hs)
|
||||
self.identity_handler = IdentityHandler(hs)
|
||||
self.search_handler = SearchHandler(hs)
|
||||
self.room_context_handler = RoomContextHandler(hs)
|
||||
|
||||
@@ -112,8 +112,9 @@ class BaseHandler(object):
|
||||
guest_access = event.content.get("guest_access", "forbidden")
|
||||
if guest_access != "can_join":
|
||||
if context:
|
||||
current_state_ids = yield context.get_current_state_ids(self.store)
|
||||
current_state = yield self.store.get_events(
|
||||
list(context.current_state_ids.values())
|
||||
list(current_state_ids.values())
|
||||
)
|
||||
else:
|
||||
current_state = yield self.state_handler.get_current_state(
|
||||
|
||||
@@ -23,6 +23,11 @@ from twisted.internet import defer
|
||||
|
||||
import synapse
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.metrics import (
|
||||
event_processing_loop_counter,
|
||||
event_processing_loop_room_count,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
@@ -106,7 +111,9 @@ class ApplicationServicesHandler(object):
|
||||
yield self._check_user_exists(event.state_key)
|
||||
|
||||
if not self.started_scheduler:
|
||||
self.scheduler.start().addErrback(log_failure)
|
||||
def start_scheduler():
|
||||
return self.scheduler.start().addErrback(log_failure)
|
||||
run_as_background_process("as_scheduler", start_scheduler)
|
||||
self.started_scheduler = True
|
||||
|
||||
# Fork off pushes to these services
|
||||
@@ -133,6 +140,12 @@ class ApplicationServicesHandler(object):
|
||||
|
||||
events_processed_counter.inc(len(events))
|
||||
|
||||
event_processing_loop_room_count.labels(
|
||||
"appservice_sender"
|
||||
).inc(len(events_by_room))
|
||||
|
||||
event_processing_loop_counter.labels("appservice_sender").inc()
|
||||
|
||||
synapse.metrics.event_processing_lag.labels(
|
||||
"appservice_sender").set(now - ts)
|
||||
synapse.metrics.event_processing_last_ts.labels(
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import unicodedata
|
||||
|
||||
import attr
|
||||
import bcrypt
|
||||
@@ -519,6 +520,7 @@ class AuthHandler(BaseHandler):
|
||||
"""
|
||||
logger.info("Logging in user %s on device %s", user_id, device_id)
|
||||
access_token = yield self.issue_access_token(user_id, device_id)
|
||||
yield self.auth.check_auth_blocking(user_id)
|
||||
|
||||
# the device *should* have been registered before we got here; however,
|
||||
# it's possible we raced against a DELETE operation. The thing we
|
||||
@@ -626,6 +628,7 @@ class AuthHandler(BaseHandler):
|
||||
# special case to check for "password" for the check_password interface
|
||||
# for the auth providers
|
||||
password = login_submission.get("password")
|
||||
|
||||
if login_type == LoginType.PASSWORD:
|
||||
if not self._password_enabled:
|
||||
raise SynapseError(400, "Password login has been disabled.")
|
||||
@@ -707,9 +710,10 @@ class AuthHandler(BaseHandler):
|
||||
multiple inexact matches.
|
||||
|
||||
Args:
|
||||
user_id (str): complete @user:id
|
||||
user_id (unicode): complete @user:id
|
||||
password (unicode): the provided password
|
||||
Returns:
|
||||
(str) the canonical_user_id, or None if unknown user / bad password
|
||||
(unicode) the canonical_user_id, or None if unknown user / bad password
|
||||
"""
|
||||
lookupres = yield self._find_user_id_and_pwd_hash(user_id)
|
||||
if not lookupres:
|
||||
@@ -728,15 +732,18 @@ class AuthHandler(BaseHandler):
|
||||
device_id)
|
||||
defer.returnValue(access_token)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def validate_short_term_login_token_and_get_user_id(self, login_token):
|
||||
auth_api = self.hs.get_auth()
|
||||
user_id = None
|
||||
try:
|
||||
macaroon = pymacaroons.Macaroon.deserialize(login_token)
|
||||
user_id = auth_api.get_user_id_from_macaroon(macaroon)
|
||||
auth_api.validate_macaroon(macaroon, "login", True, user_id)
|
||||
return user_id
|
||||
except Exception:
|
||||
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
|
||||
yield self.auth.check_auth_blocking(user_id)
|
||||
defer.returnValue(user_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_access_token(self, access_token):
|
||||
@@ -821,12 +828,26 @@ class AuthHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_threepid(self, user_id, medium, address):
|
||||
"""Attempts to unbind the 3pid on the identity servers and deletes it
|
||||
from the local database.
|
||||
|
||||
Args:
|
||||
user_id (str)
|
||||
medium (str)
|
||||
address (str)
|
||||
|
||||
Returns:
|
||||
Deferred[bool]: Returns True if successfully unbound the 3pid on
|
||||
the identity server, False if identity server doesn't support the
|
||||
unbind API.
|
||||
"""
|
||||
|
||||
# 'Canonicalise' email addresses as per above
|
||||
if medium == 'email':
|
||||
address = address.lower()
|
||||
|
||||
identity_handler = self.hs.get_handlers().identity_handler
|
||||
yield identity_handler.unbind_threepid(
|
||||
result = yield identity_handler.try_unbind_threepid(
|
||||
user_id,
|
||||
{
|
||||
'medium': medium,
|
||||
@@ -834,10 +855,10 @@ class AuthHandler(BaseHandler):
|
||||
},
|
||||
)
|
||||
|
||||
ret = yield self.store.user_delete_threepid(
|
||||
yield self.store.user_delete_threepid(
|
||||
user_id, medium, address,
|
||||
)
|
||||
defer.returnValue(ret)
|
||||
defer.returnValue(result)
|
||||
|
||||
def _save_session(self, session):
|
||||
# TODO: Persistent storage
|
||||
@@ -849,14 +870,19 @@ class AuthHandler(BaseHandler):
|
||||
"""Computes a secure hash of password.
|
||||
|
||||
Args:
|
||||
password (str): Password to hash.
|
||||
password (unicode): Password to hash.
|
||||
|
||||
Returns:
|
||||
Deferred(str): Hashed password.
|
||||
Deferred(unicode): Hashed password.
|
||||
"""
|
||||
def _do_hash():
|
||||
return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
|
||||
bcrypt.gensalt(self.bcrypt_rounds))
|
||||
# Normalise the Unicode in the password
|
||||
pw = unicodedata.normalize("NFKC", password)
|
||||
|
||||
return bcrypt.hashpw(
|
||||
pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
|
||||
bcrypt.gensalt(self.bcrypt_rounds),
|
||||
).decode('ascii')
|
||||
|
||||
return make_deferred_yieldable(
|
||||
threads.deferToThreadPool(
|
||||
@@ -868,16 +894,19 @@ class AuthHandler(BaseHandler):
|
||||
"""Validates that self.hash(password) == stored_hash.
|
||||
|
||||
Args:
|
||||
password (str): Password to hash.
|
||||
stored_hash (str): Expected hash value.
|
||||
password (unicode): Password to hash.
|
||||
stored_hash (unicode): Expected hash value.
|
||||
|
||||
Returns:
|
||||
Deferred(bool): Whether self.hash(password) == stored_hash.
|
||||
"""
|
||||
|
||||
def _do_validate_hash():
|
||||
# Normalise the Unicode in the password
|
||||
pw = unicodedata.normalize("NFKC", password)
|
||||
|
||||
return bcrypt.checkpw(
|
||||
password.encode('utf8') + self.hs.config.password_pepper,
|
||||
pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
|
||||
stored_hash.encode('utf8')
|
||||
)
|
||||
|
||||
|
||||
@@ -51,7 +51,8 @@ class DeactivateAccountHandler(BaseHandler):
|
||||
erase_data (bool): whether to GDPR-erase the user's data
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
Deferred[bool]: True if identity server supports removing
|
||||
threepids, otherwise False.
|
||||
"""
|
||||
# FIXME: Theoretically there is a race here wherein user resets
|
||||
# password using threepid.
|
||||
@@ -60,16 +61,22 @@ class DeactivateAccountHandler(BaseHandler):
|
||||
# leave the user still active so they can try again.
|
||||
# Ideally we would prevent password resets and then do this in the
|
||||
# background thread.
|
||||
|
||||
# This will be set to false if the identity server doesn't support
|
||||
# unbinding
|
||||
identity_server_supports_unbinding = True
|
||||
|
||||
threepids = yield self.store.user_get_threepids(user_id)
|
||||
for threepid in threepids:
|
||||
try:
|
||||
yield self._identity_handler.unbind_threepid(
|
||||
result = yield self._identity_handler.try_unbind_threepid(
|
||||
user_id,
|
||||
{
|
||||
'medium': threepid['medium'],
|
||||
'address': threepid['address'],
|
||||
},
|
||||
)
|
||||
identity_server_supports_unbinding &= result
|
||||
except Exception:
|
||||
# Do we want this to be a fatal error or should we carry on?
|
||||
logger.exception("Failed to remove threepid from ID server")
|
||||
@@ -103,6 +110,8 @@ class DeactivateAccountHandler(BaseHandler):
|
||||
# parts users from rooms (if it isn't already running)
|
||||
self._start_user_parting()
|
||||
|
||||
defer.returnValue(identity_server_supports_unbinding)
|
||||
|
||||
def _start_user_parting(self):
|
||||
"""
|
||||
Start the process that goes through the table of users
|
||||
|
||||
@@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import FederationDeniedError
|
||||
from synapse.types import RoomStreamToken, get_domain_from_id
|
||||
from synapse.util import stringutils
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
@@ -19,10 +19,12 @@ import random
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import AuthError
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.types import UserID
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
@@ -129,11 +131,13 @@ class EventStreamHandler(BaseHandler):
|
||||
class EventHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_event(self, user, event_id):
|
||||
def get_event(self, user, room_id, event_id):
|
||||
"""Retrieve a single specified event.
|
||||
|
||||
Args:
|
||||
user (synapse.types.UserID): The user requesting the event
|
||||
room_id (str|None): The expected room id. We'll return None if the
|
||||
event's room does not match.
|
||||
event_id (str): The event ID to obtain.
|
||||
Returns:
|
||||
dict: An event, or None if there is no event matching this ID.
|
||||
@@ -142,13 +146,26 @@ class EventHandler(BaseHandler):
|
||||
AuthError if the user does not have the rights to inspect this
|
||||
event.
|
||||
"""
|
||||
event = yield self.store.get_event(event_id)
|
||||
event = yield self.store.get_event(event_id, check_room_id=room_id)
|
||||
|
||||
if not event:
|
||||
defer.returnValue(None)
|
||||
return
|
||||
|
||||
if hasattr(event, "room_id"):
|
||||
yield self.auth.check_joined_room(event.room_id, user.to_string())
|
||||
users = yield self.store.get_users_in_room(event.room_id)
|
||||
is_peeking = user.to_string() not in users
|
||||
|
||||
filtered = yield filter_events_for_client(
|
||||
self.store,
|
||||
user.to_string(),
|
||||
[event],
|
||||
is_peeking=is_peeking
|
||||
)
|
||||
|
||||
if not filtered:
|
||||
raise AuthError(
|
||||
403,
|
||||
"You don't have permission to access that event."
|
||||
)
|
||||
|
||||
defer.returnValue(event)
|
||||
|
||||
@@ -21,8 +21,8 @@ import logging
|
||||
import sys
|
||||
|
||||
import six
|
||||
from six import iteritems
|
||||
from six.moves import http_client
|
||||
from six import iteritems, itervalues
|
||||
from six.moves import http_client, zip
|
||||
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
from signedjson.sign import verify_signed_json
|
||||
@@ -30,7 +30,12 @@ from unpaddedbase64 import decode_base64
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership, RejectedReason
|
||||
from synapse.api.constants import (
|
||||
KNOWN_ROOM_VERSIONS,
|
||||
EventTypes,
|
||||
Membership,
|
||||
RejectedReason,
|
||||
)
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
CodeMessageException,
|
||||
@@ -43,17 +48,21 @@ from synapse.crypto.event_signing import (
|
||||
add_hashes_and_signatures,
|
||||
compute_event_signature,
|
||||
)
|
||||
from synapse.events.utils import prune_event
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationCleanRoomRestServlet,
|
||||
ReplicationFederationSendEventsRestServlet,
|
||||
)
|
||||
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
|
||||
from synapse.state import resolve_events_with_factory
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.distributor import user_joined_room
|
||||
from synapse.util.frozenutils import unfreeze
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.visibility import filter_events_for_server
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
@@ -77,7 +86,7 @@ class FederationHandler(BaseHandler):
|
||||
self.hs = hs
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.replication_layer = hs.get_federation_client()
|
||||
self.federation_client = hs.get_federation_client()
|
||||
self.state_handler = hs.get_state_handler()
|
||||
self.server_name = hs.hostname
|
||||
self.keyring = hs.get_keyring()
|
||||
@@ -87,6 +96,18 @@ class FederationHandler(BaseHandler):
|
||||
self.spam_checker = hs.get_spam_checker()
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
self._server_notices_mxid = hs.config.server_notices_mxid
|
||||
self.config = hs.config
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
|
||||
self._send_events_to_master = (
|
||||
ReplicationFederationSendEventsRestServlet.make_client(hs)
|
||||
)
|
||||
self._notify_user_membership_change = (
|
||||
ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
|
||||
)
|
||||
self._clean_room_for_join_client = (
|
||||
ReplicationCleanRoomRestServlet.make_client(hs)
|
||||
)
|
||||
|
||||
# When joining a room we need to queue any events for that room up
|
||||
self.room_queues = {}
|
||||
@@ -256,7 +277,7 @@ class FederationHandler(BaseHandler):
|
||||
# know about
|
||||
for p in prevs - seen:
|
||||
state, got_auth_chain = (
|
||||
yield self.replication_layer.get_state_for_room(
|
||||
yield self.federation_client.get_state_for_room(
|
||||
origin, pdu.room_id, p
|
||||
)
|
||||
)
|
||||
@@ -270,8 +291,9 @@ class FederationHandler(BaseHandler):
|
||||
ev_ids, get_prev_content=False, check_redacted=False
|
||||
)
|
||||
|
||||
room_version = yield self.store.get_room_version(pdu.room_id)
|
||||
state_map = yield resolve_events_with_factory(
|
||||
state_groups, {pdu.event_id: pdu}, fetch
|
||||
room_version, state_groups, {pdu.event_id: pdu}, fetch
|
||||
)
|
||||
|
||||
state = (yield self.store.get_events(state_map.values())).values()
|
||||
@@ -339,7 +361,7 @@ class FederationHandler(BaseHandler):
|
||||
#
|
||||
# see https://github.com/matrix-org/synapse/pull/1744
|
||||
|
||||
missing_events = yield self.replication_layer.get_missing_events(
|
||||
missing_events = yield self.federation_client.get_missing_events(
|
||||
origin,
|
||||
pdu.room_id,
|
||||
earliest_events_ids=list(latest),
|
||||
@@ -401,7 +423,7 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
|
||||
try:
|
||||
event_stream_id, max_stream_id = yield self._persist_auth_tree(
|
||||
yield self._persist_auth_tree(
|
||||
origin, auth_chain, state, event
|
||||
)
|
||||
except AuthError as e:
|
||||
@@ -445,7 +467,7 @@ class FederationHandler(BaseHandler):
|
||||
yield self._handle_new_events(origin, event_infos)
|
||||
|
||||
try:
|
||||
context, event_stream_id, max_stream_id = yield self._handle_new_event(
|
||||
context = yield self._handle_new_event(
|
||||
origin,
|
||||
event,
|
||||
state=state,
|
||||
@@ -470,24 +492,16 @@ class FederationHandler(BaseHandler):
|
||||
except StoreError:
|
||||
logger.exception("Failed to store room.")
|
||||
|
||||
extra_users = []
|
||||
if event.type == EventTypes.Member:
|
||||
target_user_id = event.state_key
|
||||
target_user = UserID.from_string(target_user_id)
|
||||
extra_users.append(target_user)
|
||||
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=extra_users
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member:
|
||||
if event.membership == Membership.JOIN:
|
||||
# Only fire user_joined_room if the user has acutally
|
||||
# joined the room. Don't bother if the user is just
|
||||
# changing their profile info.
|
||||
newly_joined = True
|
||||
prev_state_id = context.prev_state_ids.get(
|
||||
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
|
||||
prev_state_id = prev_state_ids.get(
|
||||
(event.type, event.state_key)
|
||||
)
|
||||
if prev_state_id:
|
||||
@@ -499,138 +513,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
if newly_joined:
|
||||
user = UserID.from_string(event.state_key)
|
||||
yield user_joined_room(self.distributor, user, event.room_id)
|
||||
|
||||
@measure_func("_filter_events_for_server")
|
||||
@defer.inlineCallbacks
|
||||
def _filter_events_for_server(self, server_name, room_id, events):
|
||||
"""Filter the given events for the given server, redacting those the
|
||||
server can't see.
|
||||
|
||||
Assumes the server is currently in the room.
|
||||
|
||||
Returns
|
||||
list[FrozenEvent]
|
||||
"""
|
||||
# First lets check to see if all the events have a history visibility
|
||||
# of "shared" or "world_readable". If thats the case then we don't
|
||||
# need to check membership (as we know the server is in the room).
|
||||
event_to_state_ids = yield self.store.get_state_ids_for_events(
|
||||
frozenset(e.event_id for e in events),
|
||||
types=(
|
||||
(EventTypes.RoomHistoryVisibility, ""),
|
||||
)
|
||||
)
|
||||
|
||||
visibility_ids = set()
|
||||
for sids in event_to_state_ids.itervalues():
|
||||
hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
|
||||
if hist:
|
||||
visibility_ids.add(hist)
|
||||
|
||||
# If we failed to find any history visibility events then the default
|
||||
# is "shared" visiblity.
|
||||
if not visibility_ids:
|
||||
defer.returnValue(events)
|
||||
|
||||
event_map = yield self.store.get_events(visibility_ids)
|
||||
all_open = all(
|
||||
e.content.get("history_visibility") in (None, "shared", "world_readable")
|
||||
for e in event_map.itervalues()
|
||||
)
|
||||
|
||||
if all_open:
|
||||
defer.returnValue(events)
|
||||
|
||||
# Ok, so we're dealing with events that have non-trivial visibility
|
||||
# rules, so we need to also get the memberships of the room.
|
||||
|
||||
event_to_state_ids = yield self.store.get_state_ids_for_events(
|
||||
frozenset(e.event_id for e in events),
|
||||
types=(
|
||||
(EventTypes.RoomHistoryVisibility, ""),
|
||||
(EventTypes.Member, None),
|
||||
)
|
||||
)
|
||||
|
||||
# We only want to pull out member events that correspond to the
|
||||
# server's domain.
|
||||
|
||||
def check_match(id):
|
||||
try:
|
||||
return server_name == get_domain_from_id(id)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
# Parses mapping `event_id -> (type, state_key) -> state event_id`
|
||||
# to get all state ids that we're interested in.
|
||||
event_map = yield self.store.get_events([
|
||||
e_id
|
||||
for key_to_eid in list(event_to_state_ids.values())
|
||||
for key, e_id in key_to_eid.items()
|
||||
if key[0] != EventTypes.Member or check_match(key[1])
|
||||
])
|
||||
|
||||
event_to_state = {
|
||||
e_id: {
|
||||
key: event_map[inner_e_id]
|
||||
for key, inner_e_id in key_to_eid.iteritems()
|
||||
if inner_e_id in event_map
|
||||
}
|
||||
for e_id, key_to_eid in event_to_state_ids.iteritems()
|
||||
}
|
||||
|
||||
erased_senders = yield self.store.are_users_erased(
|
||||
e.sender for e in events,
|
||||
)
|
||||
|
||||
def redact_disallowed(event, state):
|
||||
# if the sender has been gdpr17ed, always return a redacted
|
||||
# copy of the event.
|
||||
if erased_senders[event.sender]:
|
||||
logger.info(
|
||||
"Sender of %s has been erased, redacting",
|
||||
event.event_id,
|
||||
)
|
||||
return prune_event(event)
|
||||
|
||||
if not state:
|
||||
return event
|
||||
|
||||
history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
|
||||
if history:
|
||||
visibility = history.content.get("history_visibility", "shared")
|
||||
if visibility in ["invited", "joined"]:
|
||||
# We now loop through all state events looking for
|
||||
# membership states for the requesting server to determine
|
||||
# if the server is either in the room or has been invited
|
||||
# into the room.
|
||||
for ev in state.itervalues():
|
||||
if ev.type != EventTypes.Member:
|
||||
continue
|
||||
try:
|
||||
domain = get_domain_from_id(ev.state_key)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if domain != server_name:
|
||||
continue
|
||||
|
||||
memtype = ev.membership
|
||||
if memtype == Membership.JOIN:
|
||||
return event
|
||||
elif memtype == Membership.INVITE:
|
||||
if visibility == "invited":
|
||||
return event
|
||||
else:
|
||||
return prune_event(event)
|
||||
|
||||
return event
|
||||
|
||||
defer.returnValue([
|
||||
redact_disallowed(e, event_to_state[e.event_id])
|
||||
for e in events
|
||||
])
|
||||
yield self.user_joined_room(user, event.room_id)
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
@@ -651,7 +534,7 @@ class FederationHandler(BaseHandler):
|
||||
if dest == self.server_name:
|
||||
raise SynapseError(400, "Can't backfill from self.")
|
||||
|
||||
events = yield self.replication_layer.backfill(
|
||||
events = yield self.federation_client.backfill(
|
||||
dest,
|
||||
room_id,
|
||||
limit=limit,
|
||||
@@ -699,7 +582,7 @@ class FederationHandler(BaseHandler):
|
||||
state_events = {}
|
||||
events_to_state = {}
|
||||
for e_id in edges:
|
||||
state, auth = yield self.replication_layer.get_state_for_room(
|
||||
state, auth = yield self.federation_client.get_state_for_room(
|
||||
destination=dest,
|
||||
room_id=room_id,
|
||||
event_id=e_id
|
||||
@@ -741,7 +624,7 @@ class FederationHandler(BaseHandler):
|
||||
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||
[
|
||||
logcontext.run_in_background(
|
||||
self.replication_layer.get_pdu,
|
||||
self.federation_client.get_pdu,
|
||||
[dest],
|
||||
event_id,
|
||||
outlier=True,
|
||||
@@ -863,7 +746,7 @@ class FederationHandler(BaseHandler):
|
||||
"""
|
||||
joined_users = [
|
||||
(state_key, int(event.depth))
|
||||
for (e_type, state_key), event in state.iteritems()
|
||||
for (e_type, state_key), event in iteritems(state)
|
||||
if e_type == EventTypes.Member
|
||||
and event.membership == Membership.JOIN
|
||||
]
|
||||
@@ -880,7 +763,7 @@ class FederationHandler(BaseHandler):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return sorted(joined_domains.iteritems(), key=lambda d: d[1])
|
||||
return sorted(joined_domains.items(), key=lambda d: d[1])
|
||||
|
||||
curr_domains = get_domains_from_state(curr_state)
|
||||
|
||||
@@ -943,7 +826,7 @@ class FederationHandler(BaseHandler):
|
||||
tried_domains = set(likely_domains)
|
||||
tried_domains.add(self.server_name)
|
||||
|
||||
event_ids = list(extremities.iterkeys())
|
||||
event_ids = list(extremities.keys())
|
||||
|
||||
logger.debug("calling resolve_state_groups in _maybe_backfill")
|
||||
resolve = logcontext.preserve_fn(
|
||||
@@ -959,15 +842,15 @@ class FederationHandler(BaseHandler):
|
||||
states = dict(zip(event_ids, [s.state for s in states]))
|
||||
|
||||
state_map = yield self.store.get_events(
|
||||
[e_id for ids in states.itervalues() for e_id in ids.itervalues()],
|
||||
[e_id for ids in itervalues(states) for e_id in itervalues(ids)],
|
||||
get_prev_content=False
|
||||
)
|
||||
states = {
|
||||
key: {
|
||||
k: state_map[e_id]
|
||||
for k, e_id in state_dict.iteritems()
|
||||
for k, e_id in iteritems(state_dict)
|
||||
if e_id in state_map
|
||||
} for key, state_dict in states.iteritems()
|
||||
} for key, state_dict in iteritems(states)
|
||||
}
|
||||
|
||||
for e_id, _ in sorted_extremeties_tuple:
|
||||
@@ -1022,7 +905,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
Invites must be signed by the invitee's server before distribution.
|
||||
"""
|
||||
pdu = yield self.replication_layer.send_invite(
|
||||
pdu = yield self.federation_client.send_invite(
|
||||
destination=target_host,
|
||||
room_id=event.room_id,
|
||||
event_id=event.event_id,
|
||||
@@ -1038,16 +921,6 @@ class FederationHandler(BaseHandler):
|
||||
[auth_id for auth_id, _ in event.auth_events],
|
||||
include_given=True
|
||||
)
|
||||
|
||||
for event in auth:
|
||||
event.signatures.update(
|
||||
compute_event_signature(
|
||||
event,
|
||||
self.hs.hostname,
|
||||
self.hs.config.signing_key[0]
|
||||
)
|
||||
)
|
||||
|
||||
defer.returnValue([e for e in auth])
|
||||
|
||||
@log_function
|
||||
@@ -1072,6 +945,9 @@ class FederationHandler(BaseHandler):
|
||||
joinee,
|
||||
"join",
|
||||
content,
|
||||
params={
|
||||
"ver": KNOWN_ROOM_VERSIONS,
|
||||
},
|
||||
)
|
||||
|
||||
# This shouldn't happen, because the RoomMemberHandler has a
|
||||
@@ -1081,7 +957,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
self.room_queues[room_id] = []
|
||||
|
||||
yield self.store.clean_room_for_join(room_id)
|
||||
yield self._clean_room_for_join(room_id)
|
||||
|
||||
handled_events = set()
|
||||
|
||||
@@ -1094,7 +970,7 @@ class FederationHandler(BaseHandler):
|
||||
target_hosts.insert(0, origin)
|
||||
except ValueError:
|
||||
pass
|
||||
ret = yield self.replication_layer.send_join(target_hosts, event)
|
||||
ret = yield self.federation_client.send_join(target_hosts, event)
|
||||
|
||||
origin = ret["origin"]
|
||||
state = ret["state"]
|
||||
@@ -1120,15 +996,10 @@ class FederationHandler(BaseHandler):
|
||||
# FIXME
|
||||
pass
|
||||
|
||||
event_stream_id, max_stream_id = yield self._persist_auth_tree(
|
||||
yield self._persist_auth_tree(
|
||||
origin, auth_chain, state, event
|
||||
)
|
||||
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=[joinee]
|
||||
)
|
||||
|
||||
logger.debug("Finished joining %s to %s", joinee, room_id)
|
||||
finally:
|
||||
room_queue = self.room_queues[room_id]
|
||||
@@ -1223,7 +1094,7 @@ class FederationHandler(BaseHandler):
|
||||
# would introduce the danger of backwards-compatibility problems.
|
||||
event.internal_metadata.send_on_behalf_of = origin
|
||||
|
||||
context, event_stream_id, max_stream_id = yield self._handle_new_event(
|
||||
context = yield self._handle_new_event(
|
||||
origin, event
|
||||
)
|
||||
|
||||
@@ -1233,25 +1104,17 @@ class FederationHandler(BaseHandler):
|
||||
event.signatures,
|
||||
)
|
||||
|
||||
extra_users = []
|
||||
if event.type == EventTypes.Member:
|
||||
target_user_id = event.state_key
|
||||
target_user = UserID.from_string(target_user_id)
|
||||
extra_users.append(target_user)
|
||||
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member:
|
||||
if event.content["membership"] == Membership.JOIN:
|
||||
user = UserID.from_string(event.state_key)
|
||||
yield user_joined_room(self.distributor, user, event.room_id)
|
||||
yield self.user_joined_room(user, event.room_id)
|
||||
|
||||
state_ids = list(context.prev_state_ids.values())
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
|
||||
state_ids = list(prev_state_ids.values())
|
||||
auth_chain = yield self.store.get_auth_chain(state_ids)
|
||||
|
||||
state = yield self.store.get_events(list(context.prev_state_ids.values()))
|
||||
state = yield self.store.get_events(list(prev_state_ids.values()))
|
||||
|
||||
defer.returnValue({
|
||||
"state": list(state.values()),
|
||||
@@ -1313,17 +1176,7 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
|
||||
context = yield self.state_handler.compute_event_context(event)
|
||||
|
||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||
event,
|
||||
context=context,
|
||||
)
|
||||
|
||||
target_user = UserID.from_string(event.state_key)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=[target_user],
|
||||
)
|
||||
yield self.persist_events_and_notify([(event, context)])
|
||||
|
||||
defer.returnValue(event)
|
||||
|
||||
@@ -1348,35 +1201,26 @@ class FederationHandler(BaseHandler):
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
yield self.replication_layer.send_leave(
|
||||
yield self.federation_client.send_leave(
|
||||
target_hosts,
|
||||
event
|
||||
)
|
||||
|
||||
context = yield self.state_handler.compute_event_context(event)
|
||||
|
||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||
event,
|
||||
context=context,
|
||||
)
|
||||
|
||||
target_user = UserID.from_string(event.state_key)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=[target_user],
|
||||
)
|
||||
yield self.persist_events_and_notify([(event, context)])
|
||||
|
||||
defer.returnValue(event)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
|
||||
content={},):
|
||||
origin, pdu = yield self.replication_layer.make_membership_event(
|
||||
content={}, params=None):
|
||||
origin, pdu = yield self.federation_client.make_membership_event(
|
||||
target_hosts,
|
||||
room_id,
|
||||
user_id,
|
||||
membership,
|
||||
content,
|
||||
params=params,
|
||||
)
|
||||
|
||||
logger.debug("Got response to make_%s: %s", membership, pdu)
|
||||
@@ -1416,7 +1260,7 @@ class FederationHandler(BaseHandler):
|
||||
@log_function
|
||||
def on_make_leave_request(self, room_id, user_id):
|
||||
""" We've received a /make_leave/ request, so we create a partial
|
||||
join event for the room and return that. We do *not* persist or
|
||||
leave event for the room and return that. We do *not* persist or
|
||||
process it until the other server has signed it and sent it back.
|
||||
"""
|
||||
builder = self.event_builder_factory.new({
|
||||
@@ -1455,7 +1299,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
event.internal_metadata.outlier = False
|
||||
|
||||
context, event_stream_id, max_stream_id = yield self._handle_new_event(
|
||||
yield self._handle_new_event(
|
||||
origin, event
|
||||
)
|
||||
|
||||
@@ -1465,22 +1309,17 @@ class FederationHandler(BaseHandler):
|
||||
event.signatures,
|
||||
)
|
||||
|
||||
extra_users = []
|
||||
if event.type == EventTypes.Member:
|
||||
target_user_id = event.state_key
|
||||
target_user = UserID.from_string(target_user_id)
|
||||
extra_users.append(target_user)
|
||||
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
)
|
||||
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_for_pdu(self, room_id, event_id):
|
||||
"""Returns the state at the event. i.e. not including said event.
|
||||
"""
|
||||
|
||||
event = yield self.store.get_event(
|
||||
event_id, allow_none=False, check_room_id=room_id,
|
||||
)
|
||||
|
||||
state_groups = yield self.store.get_state_groups(
|
||||
room_id, [event_id]
|
||||
)
|
||||
@@ -1491,8 +1330,7 @@ class FederationHandler(BaseHandler):
|
||||
(e.type, e.state_key): e for e in state
|
||||
}
|
||||
|
||||
event = yield self.store.get_event(event_id)
|
||||
if event and event.is_state():
|
||||
if event.is_state():
|
||||
# Get previous state
|
||||
if "replaces_state" in event.unsigned:
|
||||
prev_id = event.unsigned["replaces_state"]
|
||||
@@ -1503,18 +1341,6 @@ class FederationHandler(BaseHandler):
|
||||
del results[(event.type, event.state_key)]
|
||||
|
||||
res = list(results.values())
|
||||
for event in res:
|
||||
# We sign these again because there was a bug where we
|
||||
# incorrectly signed things the first time round
|
||||
if self.is_mine_id(event.event_id):
|
||||
event.signatures.update(
|
||||
compute_event_signature(
|
||||
event,
|
||||
self.hs.hostname,
|
||||
self.hs.config.signing_key[0]
|
||||
)
|
||||
)
|
||||
|
||||
defer.returnValue(res)
|
||||
else:
|
||||
defer.returnValue([])
|
||||
@@ -1523,6 +1349,10 @@ class FederationHandler(BaseHandler):
|
||||
def get_state_ids_for_pdu(self, room_id, event_id):
|
||||
"""Returns the state at the event. i.e. not including said event.
|
||||
"""
|
||||
event = yield self.store.get_event(
|
||||
event_id, allow_none=False, check_room_id=room_id,
|
||||
)
|
||||
|
||||
state_groups = yield self.store.get_state_groups_ids(
|
||||
room_id, [event_id]
|
||||
)
|
||||
@@ -1531,8 +1361,7 @@ class FederationHandler(BaseHandler):
|
||||
_, state = state_groups.items().pop()
|
||||
results = state
|
||||
|
||||
event = yield self.store.get_event(event_id)
|
||||
if event and event.is_state():
|
||||
if event.is_state():
|
||||
# Get previous state
|
||||
if "replaces_state" in event.unsigned:
|
||||
prev_id = event.unsigned["replaces_state"]
|
||||
@@ -1558,7 +1387,7 @@ class FederationHandler(BaseHandler):
|
||||
limit
|
||||
)
|
||||
|
||||
events = yield self._filter_events_for_server(origin, room_id, events)
|
||||
events = yield filter_events_for_server(self.store, origin, events)
|
||||
|
||||
defer.returnValue(events)
|
||||
|
||||
@@ -1586,18 +1415,6 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
|
||||
if event:
|
||||
if self.is_mine_id(event.event_id):
|
||||
# FIXME: This is a temporary work around where we occasionally
|
||||
# return events slightly differently than when they were
|
||||
# originally signed
|
||||
event.signatures.update(
|
||||
compute_event_signature(
|
||||
event,
|
||||
self.hs.hostname,
|
||||
self.hs.config.signing_key[0]
|
||||
)
|
||||
)
|
||||
|
||||
in_room = yield self.auth.check_host_in_room(
|
||||
event.room_id,
|
||||
origin
|
||||
@@ -1605,8 +1422,8 @@ class FederationHandler(BaseHandler):
|
||||
if not in_room:
|
||||
raise AuthError(403, "Host not in room.")
|
||||
|
||||
events = yield self._filter_events_for_server(
|
||||
origin, event.room_id, [event]
|
||||
events = yield filter_events_for_server(
|
||||
self.store, origin, [event],
|
||||
)
|
||||
event = events[0]
|
||||
defer.returnValue(event)
|
||||
@@ -1633,9 +1450,8 @@ class FederationHandler(BaseHandler):
|
||||
event, context
|
||||
)
|
||||
|
||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||
event,
|
||||
context=context,
|
||||
yield self.persist_events_and_notify(
|
||||
[(event, context)],
|
||||
backfilled=backfilled,
|
||||
)
|
||||
except: # noqa: E722, as we reraise the exception this is fine.
|
||||
@@ -1648,15 +1464,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
six.reraise(tp, value, tb)
|
||||
|
||||
if not backfilled:
|
||||
# this intentionally does not yield: we don't care about the result
|
||||
# and don't need to wait for it.
|
||||
logcontext.run_in_background(
|
||||
self.pusher_pool.on_new_notifications,
|
||||
event_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
defer.returnValue((context, event_stream_id, max_stream_id))
|
||||
defer.returnValue(context)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_new_events(self, origin, event_infos, backfilled=False):
|
||||
@@ -1664,6 +1472,8 @@ class FederationHandler(BaseHandler):
|
||||
should not depend on one another, e.g. this should be used to persist
|
||||
a bunch of outliers, but not a chunk of individual events that depend
|
||||
on each other for state calculations.
|
||||
|
||||
Notifies about the events where appropriate.
|
||||
"""
|
||||
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||
[
|
||||
@@ -1678,10 +1488,10 @@ class FederationHandler(BaseHandler):
|
||||
], consumeErrors=True,
|
||||
))
|
||||
|
||||
yield self.store.persist_events(
|
||||
yield self.persist_events_and_notify(
|
||||
[
|
||||
(ev_info["event"], context)
|
||||
for ev_info, context in itertools.izip(event_infos, contexts)
|
||||
for ev_info, context in zip(event_infos, contexts)
|
||||
],
|
||||
backfilled=backfilled,
|
||||
)
|
||||
@@ -1690,7 +1500,8 @@ class FederationHandler(BaseHandler):
|
||||
def _persist_auth_tree(self, origin, auth_events, state, event):
|
||||
"""Checks the auth chain is valid (and passes auth checks) for the
|
||||
state and event. Then persists the auth chain and state atomically.
|
||||
Persists the event seperately.
|
||||
Persists the event separately. Notifies about the persisted events
|
||||
where appropriate.
|
||||
|
||||
Will attempt to fetch missing auth events.
|
||||
|
||||
@@ -1701,8 +1512,7 @@ class FederationHandler(BaseHandler):
|
||||
event (Event)
|
||||
|
||||
Returns:
|
||||
2-tuple of (event_stream_id, max_stream_id) from the persist_event
|
||||
call for `event`
|
||||
Deferred
|
||||
"""
|
||||
events_to_context = {}
|
||||
for e in itertools.chain(auth_events, state):
|
||||
@@ -1728,7 +1538,7 @@ class FederationHandler(BaseHandler):
|
||||
missing_auth_events.add(e_id)
|
||||
|
||||
for e_id in missing_auth_events:
|
||||
m_ev = yield self.replication_layer.get_pdu(
|
||||
m_ev = yield self.federation_client.get_pdu(
|
||||
[origin],
|
||||
e_id,
|
||||
outlier=True,
|
||||
@@ -1766,7 +1576,7 @@ class FederationHandler(BaseHandler):
|
||||
raise
|
||||
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
||||
|
||||
yield self.store.persist_events(
|
||||
yield self.persist_events_and_notify(
|
||||
[
|
||||
(e, events_to_context[e.event_id])
|
||||
for e in itertools.chain(auth_events, state)
|
||||
@@ -1777,12 +1587,10 @@ class FederationHandler(BaseHandler):
|
||||
event, old_state=state
|
||||
)
|
||||
|
||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||
event, new_event_context,
|
||||
yield self.persist_events_and_notify(
|
||||
[(event, new_event_context)],
|
||||
)
|
||||
|
||||
defer.returnValue((event_stream_id, max_stream_id))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _prep_event(self, origin, event, state=None, auth_events=None):
|
||||
"""
|
||||
@@ -1801,8 +1609,9 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
|
||||
if not auth_events:
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
auth_events_ids = yield self.auth.compute_auth_events(
|
||||
event, context.prev_state_ids, for_verification=True,
|
||||
event, prev_state_ids, for_verification=True,
|
||||
)
|
||||
auth_events = yield self.store.get_events(auth_events_ids)
|
||||
auth_events = {
|
||||
@@ -1838,8 +1647,19 @@ class FederationHandler(BaseHandler):
|
||||
defer.returnValue(context)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
|
||||
def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects,
|
||||
missing):
|
||||
in_room = yield self.auth.check_host_in_room(
|
||||
room_id,
|
||||
origin
|
||||
)
|
||||
if not in_room:
|
||||
raise AuthError(403, "Host not in room.")
|
||||
|
||||
event = yield self.store.get_event(
|
||||
event_id, allow_none=False, check_room_id=room_id
|
||||
)
|
||||
|
||||
# Just go through and process each event in `remote_auth_chain`. We
|
||||
# don't want to fall into the trap of `missing` being wrong.
|
||||
for e in remote_auth_chain:
|
||||
@@ -1849,7 +1669,6 @@ class FederationHandler(BaseHandler):
|
||||
pass
|
||||
|
||||
# Now get the current auth_chain for the event.
|
||||
event = yield self.store.get_event(event_id)
|
||||
local_auth_chain = yield self.store.get_auth_chain(
|
||||
[auth_id for auth_id, _ in event.auth_events],
|
||||
include_given=True
|
||||
@@ -1862,15 +1681,6 @@ class FederationHandler(BaseHandler):
|
||||
local_auth_chain, remote_auth_chain
|
||||
)
|
||||
|
||||
for event in ret["auth_chain"]:
|
||||
event.signatures.update(
|
||||
compute_event_signature(
|
||||
event,
|
||||
self.hs.hostname,
|
||||
self.hs.config.signing_key[0]
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug("on_query_auth returning: %s", ret)
|
||||
|
||||
defer.returnValue(ret)
|
||||
@@ -1896,8 +1706,8 @@ class FederationHandler(BaseHandler):
|
||||
min_depth=min_depth,
|
||||
)
|
||||
|
||||
missing_events = yield self._filter_events_for_server(
|
||||
origin, room_id, missing_events,
|
||||
missing_events = yield filter_events_for_server(
|
||||
self.store, origin, missing_events,
|
||||
)
|
||||
|
||||
defer.returnValue(missing_events)
|
||||
@@ -1946,7 +1756,7 @@ class FederationHandler(BaseHandler):
|
||||
logger.info("Missing auth: %s", missing_auth)
|
||||
# If we don't have all the auth events, we need to get them.
|
||||
try:
|
||||
remote_auth_chain = yield self.replication_layer.get_event_auth(
|
||||
remote_auth_chain = yield self.federation_client.get_event_auth(
|
||||
origin, event.room_id, event.event_id
|
||||
)
|
||||
|
||||
@@ -2019,7 +1829,10 @@ class FederationHandler(BaseHandler):
|
||||
(d.type, d.state_key): d for d in different_events if d
|
||||
})
|
||||
|
||||
room_version = yield self.store.get_room_version(event.room_id)
|
||||
|
||||
new_state = self.state_handler.resolve_events(
|
||||
room_version,
|
||||
[list(local_view.values()), list(remote_view.values())],
|
||||
event
|
||||
)
|
||||
@@ -2051,9 +1864,10 @@ class FederationHandler(BaseHandler):
|
||||
break
|
||||
|
||||
if do_resolution:
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
# 1. Get what we think is the auth chain.
|
||||
auth_ids = yield self.auth.compute_auth_events(
|
||||
event, context.prev_state_ids
|
||||
event, prev_state_ids
|
||||
)
|
||||
local_auth_chain = yield self.store.get_auth_chain(
|
||||
auth_ids, include_given=True
|
||||
@@ -2061,7 +1875,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
try:
|
||||
# 2. Get remote difference.
|
||||
result = yield self.replication_layer.query_auth(
|
||||
result = yield self.federation_client.query_auth(
|
||||
origin,
|
||||
event.room_id,
|
||||
event.event_id,
|
||||
@@ -2143,21 +1957,34 @@ class FederationHandler(BaseHandler):
|
||||
k: a.event_id for k, a in iteritems(auth_events)
|
||||
if k != event_key
|
||||
}
|
||||
context.current_state_ids = dict(context.current_state_ids)
|
||||
context.current_state_ids.update(state_updates)
|
||||
if context.delta_ids is not None:
|
||||
context.delta_ids = dict(context.delta_ids)
|
||||
context.delta_ids.update(state_updates)
|
||||
context.prev_state_ids = dict(context.prev_state_ids)
|
||||
context.prev_state_ids.update({
|
||||
current_state_ids = yield context.get_current_state_ids(self.store)
|
||||
current_state_ids = dict(current_state_ids)
|
||||
|
||||
current_state_ids.update(state_updates)
|
||||
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
prev_state_ids = dict(prev_state_ids)
|
||||
|
||||
prev_state_ids.update({
|
||||
k: a.event_id for k, a in iteritems(auth_events)
|
||||
})
|
||||
context.state_group = yield self.store.store_state_group(
|
||||
|
||||
# create a new state group as a delta from the existing one.
|
||||
prev_group = context.state_group
|
||||
state_group = yield self.store.store_state_group(
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
prev_group=context.prev_group,
|
||||
delta_ids=context.delta_ids,
|
||||
current_state_ids=context.current_state_ids,
|
||||
prev_group=prev_group,
|
||||
delta_ids=state_updates,
|
||||
current_state_ids=current_state_ids,
|
||||
)
|
||||
|
||||
yield context.update_state(
|
||||
state_group=state_group,
|
||||
current_state_ids=current_state_ids,
|
||||
prev_state_ids=prev_state_ids,
|
||||
prev_group=prev_group,
|
||||
delta_ids=state_updates,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -2347,7 +2174,7 @@ class FederationHandler(BaseHandler):
|
||||
yield member_handler.send_membership_event(None, event, context)
|
||||
else:
|
||||
destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
|
||||
yield self.replication_layer.forward_third_party_invite(
|
||||
yield self.federation_client.forward_third_party_invite(
|
||||
destinations,
|
||||
room_id,
|
||||
event_dict,
|
||||
@@ -2397,7 +2224,8 @@ class FederationHandler(BaseHandler):
|
||||
event.content["third_party_invite"]["signed"]["token"]
|
||||
)
|
||||
original_invite = None
|
||||
original_invite_id = context.prev_state_ids.get(key)
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
original_invite_id = prev_state_ids.get(key)
|
||||
if original_invite_id:
|
||||
original_invite = yield self.store.get_event(
|
||||
original_invite_id, allow_none=True
|
||||
@@ -2439,7 +2267,8 @@ class FederationHandler(BaseHandler):
|
||||
signed = event.content["third_party_invite"]["signed"]
|
||||
token = signed["token"]
|
||||
|
||||
invite_event_id = context.prev_state_ids.get(
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
invite_event_id = prev_state_ids.get(
|
||||
(EventTypes.ThirdPartyInvite, token,)
|
||||
)
|
||||
|
||||
@@ -2489,7 +2318,7 @@ class FederationHandler(BaseHandler):
|
||||
for revocation.
|
||||
"""
|
||||
try:
|
||||
response = yield self.hs.get_simple_http_client().get_json(
|
||||
response = yield self.http_client.get_json(
|
||||
url,
|
||||
{"public_key": public_key}
|
||||
)
|
||||
@@ -2500,3 +2329,91 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
if "valid" not in response or not response["valid"]:
|
||||
raise AuthError(403, "Third party certificate was invalid")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def persist_events_and_notify(self, event_and_contexts, backfilled=False):
|
||||
"""Persists events and tells the notifier/pushers about them, if
|
||||
necessary.
|
||||
|
||||
Args:
|
||||
event_and_contexts(list[tuple[FrozenEvent, EventContext]])
|
||||
backfilled (bool): Whether these events are a result of
|
||||
backfilling or not
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
if self.config.worker_app:
|
||||
yield self._send_events_to_master(
|
||||
store=self.store,
|
||||
event_and_contexts=event_and_contexts,
|
||||
backfilled=backfilled
|
||||
)
|
||||
else:
|
||||
max_stream_id = yield self.store.persist_events(
|
||||
event_and_contexts,
|
||||
backfilled=backfilled,
|
||||
)
|
||||
|
||||
if not backfilled: # Never notify for backfilled events
|
||||
for event, _ in event_and_contexts:
|
||||
self._notify_persisted_event(event, max_stream_id)
|
||||
|
||||
def _notify_persisted_event(self, event, max_stream_id):
|
||||
"""Checks to see if notifier/pushers should be notified about the
|
||||
event or not.
|
||||
|
||||
Args:
|
||||
event (FrozenEvent)
|
||||
max_stream_id (int): The max_stream_id returned by persist_events
|
||||
"""
|
||||
|
||||
extra_users = []
|
||||
if event.type == EventTypes.Member:
|
||||
target_user_id = event.state_key
|
||||
|
||||
# We notify for memberships if its an invite for one of our
|
||||
# users
|
||||
if event.internal_metadata.is_outlier():
|
||||
if event.membership != Membership.INVITE:
|
||||
if not self.is_mine_id(target_user_id):
|
||||
return
|
||||
|
||||
target_user = UserID.from_string(target_user_id)
|
||||
extra_users.append(target_user)
|
||||
elif event.internal_metadata.is_outlier():
|
||||
return
|
||||
|
||||
event_stream_id = event.internal_metadata.stream_ordering
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=extra_users
|
||||
)
|
||||
|
||||
self.pusher_pool.on_new_notifications(
|
||||
event_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
def _clean_room_for_join(self, room_id):
|
||||
"""Called to clean up any data in DB for a given room, ready for the
|
||||
server to join the room.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
"""
|
||||
if self.config.worker_app:
|
||||
return self._clean_room_for_join_client(room_id)
|
||||
else:
|
||||
return self.store.clean_room_for_join(room_id)
|
||||
|
||||
def user_joined_room(self, user, room_id):
|
||||
"""Called when a new user has joined the room
|
||||
"""
|
||||
if self.config.worker_app:
|
||||
return self._notify_user_membership_change(
|
||||
room_id=room_id,
|
||||
user_id=user.to_string(),
|
||||
change="joined",
|
||||
)
|
||||
else:
|
||||
return user_joined_room(self.distributor, user, room_id)
|
||||
|
||||
@@ -26,7 +26,7 @@ from twisted.internet import defer
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException,
|
||||
Codes,
|
||||
MatrixCodeMessageException,
|
||||
HttpResponseException,
|
||||
SynapseError,
|
||||
)
|
||||
|
||||
@@ -85,7 +85,6 @@ class IdentityHandler(BaseHandler):
|
||||
)
|
||||
defer.returnValue(None)
|
||||
|
||||
data = {}
|
||||
try:
|
||||
data = yield self.http_client.get_json(
|
||||
"https://%s%s" % (
|
||||
@@ -94,11 +93,9 @@ class IdentityHandler(BaseHandler):
|
||||
),
|
||||
{'sid': creds['sid'], 'client_secret': client_secret}
|
||||
)
|
||||
except MatrixCodeMessageException as e:
|
||||
except HttpResponseException as e:
|
||||
logger.info("getValidated3pid failed with Matrix error: %r", e)
|
||||
raise SynapseError(e.code, e.msg, e.errcode)
|
||||
except CodeMessageException as e:
|
||||
data = json.loads(e.msg)
|
||||
raise e.to_synapse_error()
|
||||
|
||||
if 'medium' in data:
|
||||
defer.returnValue(data)
|
||||
@@ -136,19 +133,23 @@ class IdentityHandler(BaseHandler):
|
||||
)
|
||||
logger.debug("bound threepid %r to %s", creds, mxid)
|
||||
except CodeMessageException as e:
|
||||
data = json.loads(e.msg)
|
||||
data = json.loads(e.msg) # XXX WAT?
|
||||
defer.returnValue(data)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def unbind_threepid(self, mxid, threepid):
|
||||
"""
|
||||
Removes a binding from an identity server
|
||||
def try_unbind_threepid(self, mxid, threepid):
|
||||
"""Removes a binding from an identity server
|
||||
|
||||
Args:
|
||||
mxid (str): Matrix user ID of binding to be removed
|
||||
threepid (dict): Dict with medium & address of binding to be removed
|
||||
|
||||
Raises:
|
||||
SynapseError: If we failed to contact the identity server
|
||||
|
||||
Returns:
|
||||
Deferred[bool]: True on success, otherwise False
|
||||
Deferred[bool]: True on success, otherwise False if the identity
|
||||
server doesn't support unbinding
|
||||
"""
|
||||
logger.debug("unbinding threepid %r from %s", threepid, mxid)
|
||||
if not self.trusted_id_servers:
|
||||
@@ -178,11 +179,21 @@ class IdentityHandler(BaseHandler):
|
||||
content=content,
|
||||
destination_is=id_server,
|
||||
)
|
||||
yield self.http_client.post_json_get_json(
|
||||
url,
|
||||
content,
|
||||
headers,
|
||||
)
|
||||
try:
|
||||
yield self.http_client.post_json_get_json(
|
||||
url,
|
||||
content,
|
||||
headers,
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
if e.code in (400, 404, 501,):
|
||||
# The remote server probably doesn't support unbinding (yet)
|
||||
logger.warn("Received %d response while unbinding threepid", e.code)
|
||||
defer.returnValue(False)
|
||||
else:
|
||||
logger.error("Failed to unbind threepid on identity server: %s", e)
|
||||
raise SynapseError(502, "Failed to contact identity server")
|
||||
|
||||
defer.returnValue(True)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -209,12 +220,9 @@ class IdentityHandler(BaseHandler):
|
||||
params
|
||||
)
|
||||
defer.returnValue(data)
|
||||
except MatrixCodeMessageException as e:
|
||||
logger.info("Proxied requestToken failed with Matrix error: %r", e)
|
||||
raise SynapseError(e.code, e.msg, e.errcode)
|
||||
except CodeMessageException as e:
|
||||
except HttpResponseException as e:
|
||||
logger.info("Proxied requestToken failed: %r", e)
|
||||
raise e
|
||||
raise e.to_synapse_error()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def requestMsisdnToken(
|
||||
@@ -244,9 +252,6 @@ class IdentityHandler(BaseHandler):
|
||||
params
|
||||
)
|
||||
defer.returnValue(data)
|
||||
except MatrixCodeMessageException as e:
|
||||
logger.info("Proxied requestToken failed with Matrix error: %r", e)
|
||||
raise SynapseError(e.code, e.msg, e.errcode)
|
||||
except CodeMessageException as e:
|
||||
except HttpResponseException as e:
|
||||
logger.info("Proxied requestToken failed: %r", e)
|
||||
raise e
|
||||
raise e.to_synapse_error()
|
||||
|
||||
@@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import StreamToken, UserID
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async import concurrently_execute
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.snapshot_cache import SnapshotCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.visibility import filter_events_for_client
|
||||
@@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
|
||||
try:
|
||||
if event.membership == Membership.JOIN:
|
||||
room_end_token = now_token.room_key
|
||||
deferred_room_state = self.state_handler.get_current_state(
|
||||
event.room_id
|
||||
deferred_room_state = run_in_background(
|
||||
self.state_handler.get_current_state,
|
||||
event.room_id,
|
||||
)
|
||||
elif event.membership == Membership.LEAVE:
|
||||
room_end_token = "s%d" % (event.stream_ordering,)
|
||||
deferred_room_state = self.store.get_state_for_events(
|
||||
[event.event_id], None
|
||||
deferred_room_state = run_in_background(
|
||||
self.store.get_state_for_events,
|
||||
[event.event_id], None,
|
||||
)
|
||||
deferred_room_state.addCallback(
|
||||
lambda states: states[event.event_id]
|
||||
@@ -370,6 +372,10 @@ class InitialSyncHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_presence():
|
||||
# If presence is disabled, return an empty list
|
||||
if not self.hs.config.use_presence:
|
||||
defer.returnValue([])
|
||||
|
||||
states = yield presence_handler.get_states(
|
||||
[m.user_id for m in room_members],
|
||||
as_event=True,
|
||||
@@ -387,19 +393,21 @@ class InitialSyncHandler(BaseHandler):
|
||||
receipts = []
|
||||
defer.returnValue(receipts)
|
||||
|
||||
presence, receipts, (messages, token) = yield defer.gatherResults(
|
||||
[
|
||||
run_in_background(get_presence),
|
||||
run_in_background(get_receipts),
|
||||
run_in_background(
|
||||
self.store.get_recent_events_for_room,
|
||||
room_id,
|
||||
limit=limit,
|
||||
end_token=now_token.room_key,
|
||||
)
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError)
|
||||
presence, receipts, (messages, token) = yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
[
|
||||
run_in_background(get_presence),
|
||||
run_in_background(get_receipts),
|
||||
run_in_background(
|
||||
self.store.get_recent_events_for_room,
|
||||
room_id,
|
||||
limit=limit,
|
||||
end_token=now_token.room_key,
|
||||
)
|
||||
],
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError),
|
||||
)
|
||||
|
||||
messages = yield filter_events_for_client(
|
||||
self.store, user_id, messages, is_peeking=is_peeking,
|
||||
|
||||
@@ -23,21 +23,25 @@ from canonicaljson import encode_canonical_json, json
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import succeed
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
||||
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
Codes,
|
||||
ConsentNotGivenError,
|
||||
NotFoundError,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.api.urls import ConsentURIBuilder
|
||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.replication.http.send_event import send_event_to_master
|
||||
from synapse.types import RoomAlias, RoomStreamToken, UserID
|
||||
from synapse.util.async import Limiter, ReadWriteLock
|
||||
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||
from synapse.types import RoomAlias, UserID
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
from ._base import BaseHandler
|
||||
@@ -45,234 +49,15 @@ from ._base import BaseHandler
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PurgeStatus(object):
|
||||
"""Object tracking the status of a purge request
|
||||
|
||||
This class contains information on the progress of a purge request, for
|
||||
return by get_purge_status.
|
||||
|
||||
Attributes:
|
||||
status (int): Tracks whether this request has completed. One of
|
||||
STATUS_{ACTIVE,COMPLETE,FAILED}
|
||||
class MessageHandler(object):
|
||||
"""Contains some read only APIs to get state about a room
|
||||
"""
|
||||
|
||||
STATUS_ACTIVE = 0
|
||||
STATUS_COMPLETE = 1
|
||||
STATUS_FAILED = 2
|
||||
|
||||
STATUS_TEXT = {
|
||||
STATUS_ACTIVE: "active",
|
||||
STATUS_COMPLETE: "complete",
|
||||
STATUS_FAILED: "failed",
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.status = PurgeStatus.STATUS_ACTIVE
|
||||
|
||||
def asdict(self):
|
||||
return {
|
||||
"status": PurgeStatus.STATUS_TEXT[self.status]
|
||||
}
|
||||
|
||||
|
||||
class MessageHandler(BaseHandler):
|
||||
|
||||
def __init__(self, hs):
|
||||
super(MessageHandler, self).__init__(hs)
|
||||
self.hs = hs
|
||||
self.state = hs.get_state_handler()
|
||||
self.auth = hs.get_auth()
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
self.pagination_lock = ReadWriteLock()
|
||||
self._purges_in_progress_by_room = set()
|
||||
# map from purge id to PurgeStatus
|
||||
self._purges_by_id = {}
|
||||
|
||||
def start_purge_history(self, room_id, token,
|
||||
delete_local_events=False):
|
||||
"""Start off a history purge on a room.
|
||||
|
||||
Args:
|
||||
room_id (str): The room to purge from
|
||||
|
||||
token (str): topological token to delete events before
|
||||
delete_local_events (bool): True to delete local events as well as
|
||||
remote ones
|
||||
|
||||
Returns:
|
||||
str: unique ID for this purge transaction.
|
||||
"""
|
||||
if room_id in self._purges_in_progress_by_room:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"History purge already in progress for %s" % (room_id, ),
|
||||
)
|
||||
|
||||
purge_id = random_string(16)
|
||||
|
||||
# we log the purge_id here so that it can be tied back to the
|
||||
# request id in the log lines.
|
||||
logger.info("[purge] starting purge_id %s", purge_id)
|
||||
|
||||
self._purges_by_id[purge_id] = PurgeStatus()
|
||||
run_in_background(
|
||||
self._purge_history,
|
||||
purge_id, room_id, token, delete_local_events,
|
||||
)
|
||||
return purge_id
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _purge_history(self, purge_id, room_id, token,
|
||||
delete_local_events):
|
||||
"""Carry out a history purge on a room.
|
||||
|
||||
Args:
|
||||
purge_id (str): The id for this purge
|
||||
room_id (str): The room to purge from
|
||||
token (str): topological token to delete events before
|
||||
delete_local_events (bool): True to delete local events as well as
|
||||
remote ones
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
self._purges_in_progress_by_room.add(room_id)
|
||||
try:
|
||||
with (yield self.pagination_lock.write(room_id)):
|
||||
yield self.store.purge_history(
|
||||
room_id, token, delete_local_events,
|
||||
)
|
||||
logger.info("[purge] complete")
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
||||
except Exception:
|
||||
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
||||
finally:
|
||||
self._purges_in_progress_by_room.discard(room_id)
|
||||
|
||||
# remove the purge from the list 24 hours after it completes
|
||||
def clear_purge():
|
||||
del self._purges_by_id[purge_id]
|
||||
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
|
||||
|
||||
def get_purge_status(self, purge_id):
|
||||
"""Get the current status of an active purge
|
||||
|
||||
Args:
|
||||
purge_id (str): purge_id returned by start_purge_history
|
||||
|
||||
Returns:
|
||||
PurgeStatus|None
|
||||
"""
|
||||
return self._purges_by_id.get(purge_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_messages(self, requester, room_id=None, pagin_config=None,
|
||||
as_client_event=True, event_filter=None):
|
||||
"""Get messages in a room.
|
||||
|
||||
Args:
|
||||
requester (Requester): The user requesting messages.
|
||||
room_id (str): The room they want messages from.
|
||||
pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
||||
config rules to apply, if any.
|
||||
as_client_event (bool): True to get events in client-server format.
|
||||
event_filter (Filter): Filter to apply to results or None
|
||||
Returns:
|
||||
dict: Pagination API results
|
||||
"""
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
if pagin_config.from_token:
|
||||
room_token = pagin_config.from_token.room_key
|
||||
else:
|
||||
pagin_config.from_token = (
|
||||
yield self.hs.get_event_sources().get_current_token_for_room(
|
||||
room_id=room_id
|
||||
)
|
||||
)
|
||||
room_token = pagin_config.from_token.room_key
|
||||
|
||||
room_token = RoomStreamToken.parse(room_token)
|
||||
|
||||
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
|
||||
"room_key", str(room_token)
|
||||
)
|
||||
|
||||
source_config = pagin_config.get_source_config("room")
|
||||
|
||||
with (yield self.pagination_lock.read(room_id)):
|
||||
membership, member_event_id = yield self._check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
|
||||
if source_config.direction == 'b':
|
||||
# if we're going backwards, we might need to backfill. This
|
||||
# requires that we have a topo token.
|
||||
if room_token.topological:
|
||||
max_topo = room_token.topological
|
||||
else:
|
||||
max_topo = yield self.store.get_max_topological_token(
|
||||
room_id, room_token.stream
|
||||
)
|
||||
|
||||
if membership == Membership.LEAVE:
|
||||
# If they have left the room then clamp the token to be before
|
||||
# they left the room, to save the effort of loading from the
|
||||
# database.
|
||||
leave_token = yield self.store.get_topological_token_for_event(
|
||||
member_event_id
|
||||
)
|
||||
leave_token = RoomStreamToken.parse(leave_token)
|
||||
if leave_token.topological < max_topo:
|
||||
source_config.from_key = str(leave_token)
|
||||
|
||||
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
||||
room_id, max_topo
|
||||
)
|
||||
|
||||
events, next_key = yield self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=source_config.from_key,
|
||||
to_key=source_config.to_key,
|
||||
direction=source_config.direction,
|
||||
limit=source_config.limit,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
|
||||
next_token = pagin_config.from_token.copy_and_replace(
|
||||
"room_key", next_key
|
||||
)
|
||||
|
||||
if not events:
|
||||
defer.returnValue({
|
||||
"chunk": [],
|
||||
"start": pagin_config.from_token.to_string(),
|
||||
"end": next_token.to_string(),
|
||||
})
|
||||
|
||||
if event_filter:
|
||||
events = event_filter.filter(events)
|
||||
|
||||
events = yield filter_events_for_client(
|
||||
self.store,
|
||||
user_id,
|
||||
events,
|
||||
is_peeking=(member_event_id is None),
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
chunk = {
|
||||
"chunk": [
|
||||
serialize_event(e, time_now, as_client_event)
|
||||
for e in events
|
||||
],
|
||||
"start": pagin_config.from_token.to_string(),
|
||||
"end": next_token.to_string(),
|
||||
}
|
||||
|
||||
defer.returnValue(chunk)
|
||||
self.state = hs.get_state_handler()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_room_data(self, user_id=None, room_id=None,
|
||||
@@ -286,12 +71,12 @@ class MessageHandler(BaseHandler):
|
||||
Raises:
|
||||
SynapseError if something went wrong.
|
||||
"""
|
||||
membership, membership_event_id = yield self._check_in_room_or_world_readable(
|
||||
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
data = yield self.state_handler.get_current_state(
|
||||
data = yield self.state.get_current_state(
|
||||
room_id, event_type, state_key
|
||||
)
|
||||
elif membership == Membership.LEAVE:
|
||||
@@ -304,53 +89,85 @@ class MessageHandler(BaseHandler):
|
||||
defer.returnValue(data)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_in_room_or_world_readable(self, room_id, user_id):
|
||||
try:
|
||||
# check_user_was_in_room will return the most recent membership
|
||||
# event for the user if:
|
||||
# * The user is a non-guest user, and was ever in the room
|
||||
# * The user is a guest user, and has joined the room
|
||||
# else it will throw.
|
||||
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
|
||||
defer.returnValue((member_event.membership, member_event.event_id))
|
||||
return
|
||||
except AuthError:
|
||||
visibility = yield self.state_handler.get_current_state(
|
||||
room_id, EventTypes.RoomHistoryVisibility, ""
|
||||
)
|
||||
if (
|
||||
visibility and
|
||||
visibility.content["history_visibility"] == "world_readable"
|
||||
):
|
||||
defer.returnValue((Membership.JOIN, None))
|
||||
return
|
||||
raise AuthError(
|
||||
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_events(self, user_id, room_id, is_guest=False):
|
||||
def get_state_events(
|
||||
self, user_id, room_id, types=None, filtered_types=None,
|
||||
at_token=None, is_guest=False,
|
||||
):
|
||||
"""Retrieve all state events for a given room. If the user is
|
||||
joined to the room then return the current state. If the user has
|
||||
left the room return the state events from when they left.
|
||||
left the room return the state events from when they left. If an explicit
|
||||
'at' parameter is passed, return the state events as of that event, if
|
||||
visible.
|
||||
|
||||
Args:
|
||||
user_id(str): The user requesting state events.
|
||||
room_id(str): The room ID to get all state events from.
|
||||
types(list[(str, str|None)]|None): List of (type, state_key) tuples
|
||||
which are used to filter the state fetched. If `state_key` is None,
|
||||
all events are returned of the given type.
|
||||
May be None, which matches any key.
|
||||
filtered_types(list[str]|None): Only apply filtering via `types` to this
|
||||
list of event types. Other types of events are returned unfiltered.
|
||||
If None, `types` filtering is applied to all events.
|
||||
at_token(StreamToken|None): the stream token of the at which we are requesting
|
||||
the stats. If the user is not allowed to view the state as of that
|
||||
stream token, we raise a 403 SynapseError. If None, returns the current
|
||||
state based on the current_state_events table.
|
||||
is_guest(bool): whether this user is a guest
|
||||
Returns:
|
||||
A list of dicts representing state events. [{}, {}, {}]
|
||||
"""
|
||||
membership, membership_event_id = yield self._check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
Raises:
|
||||
NotFoundError (404) if the at token does not yield an event
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
room_state = yield self.state_handler.get_current_state(room_id)
|
||||
elif membership == Membership.LEAVE:
|
||||
room_state = yield self.store.get_state_for_events(
|
||||
[membership_event_id], None
|
||||
AuthError (403) if the user doesn't have permission to view
|
||||
members of this room.
|
||||
"""
|
||||
if at_token:
|
||||
# FIXME this claims to get the state at a stream position, but
|
||||
# get_recent_events_for_room operates by topo ordering. This therefore
|
||||
# does not reliably give you the state at the given stream position.
|
||||
# (https://github.com/matrix-org/synapse/issues/3305)
|
||||
last_events, _ = yield self.store.get_recent_events_for_room(
|
||||
room_id, end_token=at_token.room_key, limit=1,
|
||||
)
|
||||
room_state = room_state[membership_event_id]
|
||||
|
||||
if not last_events:
|
||||
raise NotFoundError("Can't find event for token %s" % (at_token, ))
|
||||
|
||||
visible_events = yield filter_events_for_client(
|
||||
self.store, user_id, last_events,
|
||||
)
|
||||
|
||||
event = last_events[0]
|
||||
if visible_events:
|
||||
room_state = yield self.store.get_state_for_events(
|
||||
[event.event_id], types, filtered_types=filtered_types,
|
||||
)
|
||||
room_state = room_state[event.event_id]
|
||||
else:
|
||||
raise AuthError(
|
||||
403,
|
||||
"User %s not allowed to view events in room %s at token %s" % (
|
||||
user_id, room_id, at_token,
|
||||
)
|
||||
)
|
||||
else:
|
||||
membership, membership_event_id = (
|
||||
yield self.auth.check_in_room_or_world_readable(
|
||||
room_id, user_id,
|
||||
)
|
||||
)
|
||||
|
||||
if membership == Membership.JOIN:
|
||||
state_ids = yield self.store.get_filtered_current_state_ids(
|
||||
room_id, types, filtered_types=filtered_types,
|
||||
)
|
||||
room_state = yield self.store.get_events(state_ids.values())
|
||||
elif membership == Membership.LEAVE:
|
||||
room_state = yield self.store.get_state_for_events(
|
||||
[membership_event_id], types, filtered_types=filtered_types,
|
||||
)
|
||||
room_state = room_state[membership_event_id]
|
||||
|
||||
now = self.clock.time_msec()
|
||||
defer.returnValue(
|
||||
@@ -373,7 +190,7 @@ class MessageHandler(BaseHandler):
|
||||
if not requester.app_service:
|
||||
# We check AS auth after fetching the room membership, as it
|
||||
# requires us to pull out all joined members anyway.
|
||||
membership, _ = yield self._check_in_room_or_world_readable(
|
||||
membership, _ = yield self.auth.check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
if membership != Membership.JOIN:
|
||||
@@ -418,7 +235,7 @@ class EventCreationHandler(object):
|
||||
self.notifier = hs.get_notifier()
|
||||
self.config = hs.config
|
||||
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
|
||||
|
||||
# This is only used to get at ratelimit function, and maybe_kick_guest_users
|
||||
self.base_handler = BaseHandler(hs)
|
||||
@@ -427,7 +244,7 @@ class EventCreationHandler(object):
|
||||
|
||||
# We arbitrarily limit concurrent event creation for a room to 5.
|
||||
# This is to stop us from diverging history *too* much.
|
||||
self.limiter = Limiter(max_count=5)
|
||||
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
|
||||
|
||||
self.action_generator = hs.get_action_generator()
|
||||
|
||||
@@ -459,10 +276,14 @@ class EventCreationHandler(object):
|
||||
where *hashes* is a map from algorithm to hash.
|
||||
|
||||
If None, they will be requested from the database.
|
||||
|
||||
Raises:
|
||||
ResourceLimitError if server is blocked to some resource being
|
||||
exceeded
|
||||
Returns:
|
||||
Tuple of created event (FrozenEvent), Context
|
||||
"""
|
||||
yield self.auth.check_auth_blocking(requester.user.to_string())
|
||||
|
||||
builder = self.event_builder_factory.new(event_dict)
|
||||
|
||||
self.validator.validate_new(builder)
|
||||
@@ -630,7 +451,8 @@ class EventCreationHandler(object):
|
||||
If so, returns the version of the event in context.
|
||||
Otherwise, returns None.
|
||||
"""
|
||||
prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
prev_event_id = prev_state_ids.get((event.type, event.state_key))
|
||||
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
|
||||
if not prev_event:
|
||||
return
|
||||
@@ -752,8 +574,8 @@ class EventCreationHandler(object):
|
||||
event = builder.build()
|
||||
|
||||
logger.debug(
|
||||
"Created event %s with state: %s",
|
||||
event.event_id, context.prev_state_ids,
|
||||
"Created event %s",
|
||||
event.event_id,
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
@@ -805,11 +627,9 @@ class EventCreationHandler(object):
|
||||
try:
|
||||
# If we're a worker we need to hit out to the master.
|
||||
if self.config.worker_app:
|
||||
yield send_event_to_master(
|
||||
self.hs.get_clock(),
|
||||
self.http_client,
|
||||
host=self.config.worker_replication_host,
|
||||
port=self.config.worker_replication_http_port,
|
||||
yield self.send_event_to_master(
|
||||
event_id=event.event_id,
|
||||
store=self.store,
|
||||
requester=requester,
|
||||
event=event,
|
||||
context=context,
|
||||
@@ -884,9 +704,11 @@ class EventCreationHandler(object):
|
||||
e.sender == event.sender
|
||||
)
|
||||
|
||||
current_state_ids = yield context.get_current_state_ids(self.store)
|
||||
|
||||
state_to_include_ids = [
|
||||
e_id
|
||||
for k, e_id in iteritems(context.current_state_ids)
|
||||
for k, e_id in iteritems(current_state_ids)
|
||||
if k[0] in self.hs.config.room_invite_state_types
|
||||
or k == (EventTypes.Member, event.sender)
|
||||
]
|
||||
@@ -922,8 +744,9 @@ class EventCreationHandler(object):
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Redaction:
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
auth_events_ids = yield self.auth.compute_auth_events(
|
||||
event, context.prev_state_ids, for_verification=True,
|
||||
event, prev_state_ids, for_verification=True,
|
||||
)
|
||||
auth_events = yield self.store.get_events(auth_events_ids)
|
||||
auth_events = {
|
||||
@@ -943,21 +766,20 @@ class EventCreationHandler(object):
|
||||
"You don't have permission to redact events"
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Create and context.prev_state_ids:
|
||||
raise AuthError(
|
||||
403,
|
||||
"Changing the room create event is forbidden",
|
||||
)
|
||||
if event.type == EventTypes.Create:
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
if prev_state_ids:
|
||||
raise AuthError(
|
||||
403,
|
||||
"Changing the room create event is forbidden",
|
||||
)
|
||||
|
||||
(event_stream_id, max_stream_id) = yield self.store.persist_event(
|
||||
event, context=context
|
||||
)
|
||||
|
||||
# this intentionally does not yield: we don't care about the result
|
||||
# and don't need to wait for it.
|
||||
run_in_background(
|
||||
self.pusher_pool.on_new_notifications,
|
||||
event_stream_id, max_stream_id
|
||||
self.pusher_pool.on_new_notifications(
|
||||
event_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
def _notify():
|
||||
|
||||
298
synapse/handlers/pagination.py
Normal file
298
synapse/handlers/pagination.py
Normal file
@@ -0,0 +1,298 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 - 2016 OpenMarket Ltd
|
||||
# Copyright 2017 - 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.util.async_helpers import ReadWriteLock
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PurgeStatus(object):
|
||||
"""Object tracking the status of a purge request
|
||||
|
||||
This class contains information on the progress of a purge request, for
|
||||
return by get_purge_status.
|
||||
|
||||
Attributes:
|
||||
status (int): Tracks whether this request has completed. One of
|
||||
STATUS_{ACTIVE,COMPLETE,FAILED}
|
||||
"""
|
||||
|
||||
STATUS_ACTIVE = 0
|
||||
STATUS_COMPLETE = 1
|
||||
STATUS_FAILED = 2
|
||||
|
||||
STATUS_TEXT = {
|
||||
STATUS_ACTIVE: "active",
|
||||
STATUS_COMPLETE: "complete",
|
||||
STATUS_FAILED: "failed",
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.status = PurgeStatus.STATUS_ACTIVE
|
||||
|
||||
def asdict(self):
|
||||
return {
|
||||
"status": PurgeStatus.STATUS_TEXT[self.status]
|
||||
}
|
||||
|
||||
|
||||
class PaginationHandler(object):
|
||||
"""Handles pagination and purge history requests.
|
||||
|
||||
These are in the same handler due to the fact we need to block clients
|
||||
paginating during a purge.
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
self.pagination_lock = ReadWriteLock()
|
||||
self._purges_in_progress_by_room = set()
|
||||
# map from purge id to PurgeStatus
|
||||
self._purges_by_id = {}
|
||||
|
||||
def start_purge_history(self, room_id, token,
|
||||
delete_local_events=False):
|
||||
"""Start off a history purge on a room.
|
||||
|
||||
Args:
|
||||
room_id (str): The room to purge from
|
||||
|
||||
token (str): topological token to delete events before
|
||||
delete_local_events (bool): True to delete local events as well as
|
||||
remote ones
|
||||
|
||||
Returns:
|
||||
str: unique ID for this purge transaction.
|
||||
"""
|
||||
if room_id in self._purges_in_progress_by_room:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"History purge already in progress for %s" % (room_id, ),
|
||||
)
|
||||
|
||||
purge_id = random_string(16)
|
||||
|
||||
# we log the purge_id here so that it can be tied back to the
|
||||
# request id in the log lines.
|
||||
logger.info("[purge] starting purge_id %s", purge_id)
|
||||
|
||||
self._purges_by_id[purge_id] = PurgeStatus()
|
||||
run_in_background(
|
||||
self._purge_history,
|
||||
purge_id, room_id, token, delete_local_events,
|
||||
)
|
||||
return purge_id
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _purge_history(self, purge_id, room_id, token,
|
||||
delete_local_events):
|
||||
"""Carry out a history purge on a room.
|
||||
|
||||
Args:
|
||||
purge_id (str): The id for this purge
|
||||
room_id (str): The room to purge from
|
||||
token (str): topological token to delete events before
|
||||
delete_local_events (bool): True to delete local events as well as
|
||||
remote ones
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
self._purges_in_progress_by_room.add(room_id)
|
||||
try:
|
||||
with (yield self.pagination_lock.write(room_id)):
|
||||
yield self.store.purge_history(
|
||||
room_id, token, delete_local_events,
|
||||
)
|
||||
logger.info("[purge] complete")
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
||||
except Exception:
|
||||
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
||||
finally:
|
||||
self._purges_in_progress_by_room.discard(room_id)
|
||||
|
||||
# remove the purge from the list 24 hours after it completes
|
||||
def clear_purge():
|
||||
del self._purges_by_id[purge_id]
|
||||
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
|
||||
|
||||
def get_purge_status(self, purge_id):
|
||||
"""Get the current status of an active purge
|
||||
|
||||
Args:
|
||||
purge_id (str): purge_id returned by start_purge_history
|
||||
|
||||
Returns:
|
||||
PurgeStatus|None
|
||||
"""
|
||||
return self._purges_by_id.get(purge_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_messages(self, requester, room_id=None, pagin_config=None,
|
||||
as_client_event=True, event_filter=None):
|
||||
"""Get messages in a room.
|
||||
|
||||
Args:
|
||||
requester (Requester): The user requesting messages.
|
||||
room_id (str): The room they want messages from.
|
||||
pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
||||
config rules to apply, if any.
|
||||
as_client_event (bool): True to get events in client-server format.
|
||||
event_filter (Filter): Filter to apply to results or None
|
||||
Returns:
|
||||
dict: Pagination API results
|
||||
"""
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
if pagin_config.from_token:
|
||||
room_token = pagin_config.from_token.room_key
|
||||
else:
|
||||
pagin_config.from_token = (
|
||||
yield self.hs.get_event_sources().get_current_token_for_room(
|
||||
room_id=room_id
|
||||
)
|
||||
)
|
||||
room_token = pagin_config.from_token.room_key
|
||||
|
||||
room_token = RoomStreamToken.parse(room_token)
|
||||
|
||||
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
|
||||
"room_key", str(room_token)
|
||||
)
|
||||
|
||||
source_config = pagin_config.get_source_config("room")
|
||||
|
||||
with (yield self.pagination_lock.read(room_id)):
|
||||
membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
|
||||
room_id, user_id
|
||||
)
|
||||
|
||||
if source_config.direction == 'b':
|
||||
# if we're going backwards, we might need to backfill. This
|
||||
# requires that we have a topo token.
|
||||
if room_token.topological:
|
||||
max_topo = room_token.topological
|
||||
else:
|
||||
max_topo = yield self.store.get_max_topological_token(
|
||||
room_id, room_token.stream
|
||||
)
|
||||
|
||||
if membership == Membership.LEAVE:
|
||||
# If they have left the room then clamp the token to be before
|
||||
# they left the room, to save the effort of loading from the
|
||||
# database.
|
||||
leave_token = yield self.store.get_topological_token_for_event(
|
||||
member_event_id
|
||||
)
|
||||
leave_token = RoomStreamToken.parse(leave_token)
|
||||
if leave_token.topological < max_topo:
|
||||
source_config.from_key = str(leave_token)
|
||||
|
||||
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
||||
room_id, max_topo
|
||||
)
|
||||
|
||||
events, next_key = yield self.store.paginate_room_events(
|
||||
room_id=room_id,
|
||||
from_key=source_config.from_key,
|
||||
to_key=source_config.to_key,
|
||||
direction=source_config.direction,
|
||||
limit=source_config.limit,
|
||||
event_filter=event_filter,
|
||||
)
|
||||
|
||||
next_token = pagin_config.from_token.copy_and_replace(
|
||||
"room_key", next_key
|
||||
)
|
||||
|
||||
if not events:
|
||||
defer.returnValue({
|
||||
"chunk": [],
|
||||
"start": pagin_config.from_token.to_string(),
|
||||
"end": next_token.to_string(),
|
||||
})
|
||||
|
||||
if event_filter:
|
||||
events = event_filter.filter(events)
|
||||
|
||||
events = yield filter_events_for_client(
|
||||
self.store,
|
||||
user_id,
|
||||
events,
|
||||
is_peeking=(member_event_id is None),
|
||||
)
|
||||
|
||||
state = None
|
||||
if event_filter and event_filter.lazy_load_members():
|
||||
# TODO: remove redundant members
|
||||
|
||||
types = [
|
||||
(EventTypes.Member, state_key)
|
||||
for state_key in set(
|
||||
event.sender # FIXME: we also care about invite targets etc.
|
||||
for event in events
|
||||
)
|
||||
]
|
||||
|
||||
state_ids = yield self.store.get_state_ids_for_event(
|
||||
events[0].event_id, types=types,
|
||||
)
|
||||
|
||||
if state_ids:
|
||||
state = yield self.store.get_events(list(state_ids.values()))
|
||||
|
||||
if state:
|
||||
state = yield filter_events_for_client(
|
||||
self.store,
|
||||
user_id,
|
||||
state.values(),
|
||||
is_peeking=(member_event_id is None),
|
||||
)
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
||||
chunk = {
|
||||
"chunk": [
|
||||
serialize_event(e, time_now, as_client_event)
|
||||
for e in events
|
||||
],
|
||||
"start": pagin_config.from_token.to_string(),
|
||||
"end": next_token.to_string(),
|
||||
}
|
||||
|
||||
if state:
|
||||
chunk["state"] = [
|
||||
serialize_event(e, time_now, as_client_event)
|
||||
for e in state
|
||||
]
|
||||
|
||||
defer.returnValue(chunk)
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user