This commit is contained in:
reivilibre
2025-11-10 13:10:14 +00:00
parent e8899146c8
commit 5e7d531463
4 changed files with 132 additions and 16 deletions

View File

@@ -159,7 +159,7 @@
<nav class="pagetoc"></nav> <nav class="pagetoc"></nav>
</div> </div>
<h2 id="streams"><a class="header" href="#streams">Streams</a></h2> <h1 id="streams"><a class="header" href="#streams">Streams</a></h1>
<p>Synapse has a concept of &quot;streams&quot;, which are roughly described in <a href="https://github.com/element-hq/synapse/blob/develop/synapse/storage/util/id_generators.py"><code>id_generators.py</code></a>. <p>Synapse has a concept of &quot;streams&quot;, which are roughly described in <a href="https://github.com/element-hq/synapse/blob/develop/synapse/storage/util/id_generators.py"><code>id_generators.py</code></a>.
Generally speaking, streams are a series of notifications that something in Synapse's database has changed that the application might need to respond to. Generally speaking, streams are a series of notifications that something in Synapse's database has changed that the application might need to respond to.
For example:</p> For example:</p>
@@ -171,7 +171,7 @@ For example:</p>
<p>See <a href="https://github.com/element-hq/synapse/blob/develop/synapse/replication/tcp/streams/__init__.py"><code>synapse.replication.tcp.streams</code></a> for the full list of streams.</p> <p>See <a href="https://github.com/element-hq/synapse/blob/develop/synapse/replication/tcp/streams/__init__.py"><code>synapse.replication.tcp.streams</code></a> for the full list of streams.</p>
<p>It is very helpful to understand the streams mechanism when working on any part of Synapse that needs to respond to changes—especially if those changes are made by different workers. <p>It is very helpful to understand the streams mechanism when working on any part of Synapse that needs to respond to changes—especially if those changes are made by different workers.
To that end, let's describe streams formally, paraphrasing from the docstring of <a href="https://github.com/element-hq/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96"><code>AbstractStreamIdGenerator</code></a>.</p> To that end, let's describe streams formally, paraphrasing from the docstring of <a href="https://github.com/element-hq/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96"><code>AbstractStreamIdGenerator</code></a>.</p>
<h3 id="definition"><a class="header" href="#definition">Definition</a></h3> <h2 id="definition"><a class="header" href="#definition">Definition</a></h2>
<p>A stream is an append-only log <code>T1, T2, ..., Tn, ...</code> of facts<sup class="footnote-reference"><a href="#1">1</a></sup> which grows over time. <p>A stream is an append-only log <code>T1, T2, ..., Tn, ...</code> of facts<sup class="footnote-reference"><a href="#1">1</a></sup> which grows over time.
Only &quot;writers&quot; can add facts to a stream, and there may be multiple writers.</p> Only &quot;writers&quot; can add facts to a stream, and there may be multiple writers.</p>
<p>Each fact has an ID, called its &quot;stream ID&quot;. <p>Each fact has an ID, called its &quot;stream ID&quot;.
@@ -196,7 +196,7 @@ In the happy case, completion means a fact has been written to the stream table.
But unhappy cases (e.g. transaction rollback due to an error) also count as completion. But unhappy cases (e.g. transaction rollback due to an error) also count as completion.
Once completed, the rows written with that stream ID are fixed, and no new rows Once completed, the rows written with that stream ID are fixed, and no new rows
will be inserted with that ID.</p> will be inserted with that ID.</p>
<h3 id="current-stream-id"><a class="header" href="#current-stream-id">Current stream ID</a></h3> <h2 id="current-stream-id"><a class="header" href="#current-stream-id">Current stream ID</a></h2>
<p>For any given stream reader (including writers themselves), we may define a per-writer current stream ID:</p> <p>For any given stream reader (including writers themselves), we may define a per-writer current stream ID:</p>
<blockquote> <blockquote>
<p>A current stream ID <em>for a writer W</em> is the largest stream ID such that <p>A current stream ID <em>for a writer W</em> is the largest stream ID such that
@@ -233,7 +233,7 @@ Consider a single-writer stream which is initially at ID 1.</p>
<tr><td>Complete 4</td><td>5</td><td>current ID jumps 3-&gt;5, even though 6 is pending</td></tr> <tr><td>Complete 4</td><td>5</td><td>current ID jumps 3-&gt;5, even though 6 is pending</td></tr>
<tr><td>Complete 6</td><td>6</td><td></td></tr> <tr><td>Complete 6</td><td>6</td><td></td></tr>
</tbody></table> </tbody></table>
<h3 id="multi-writer-streams"><a class="header" href="#multi-writer-streams">Multi-writer streams</a></h3> <h2 id="multi-writer-streams"><a class="header" href="#multi-writer-streams">Multi-writer streams</a></h2>
<p>There are two ways to view a multi-writer stream.</p> <p>There are two ways to view a multi-writer stream.</p>
<ol> <ol>
<li>Treat it as a collection of distinct single-writer streams, one <li>Treat it as a collection of distinct single-writer streams, one
@@ -251,7 +251,7 @@ But the background process that works through events treats them as a single lin
The facts this stream holds are instructions to &quot;you should now invalidate these cache entries&quot;. The facts this stream holds are instructions to &quot;you should now invalidate these cache entries&quot;.
We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations. We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations.
(Invalidations are self-contained facts; and the invalidations commute/are idempotent).</p> (Invalidations are self-contained facts; and the invalidations commute/are idempotent).</p>
<h3 id="writing-to-streams"><a class="header" href="#writing-to-streams">Writing to streams</a></h3> <h2 id="writing-to-streams"><a class="header" href="#writing-to-streams">Writing to streams</a></h2>
<p>Writers need to track:</p> <p>Writers need to track:</p>
<ul> <ul>
<li>track their current position (i.e. its own per-writer stream ID).</li> <li>track their current position (i.e. its own per-writer stream ID).</li>
@@ -267,7 +267,7 @@ We only ever treat this as a multiple single-writer streams as there is no impor
<p>To complete a fact, first remove it from your map of facts currently awaiting completion. <p>To complete a fact, first remove it from your map of facts currently awaiting completion.
Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream. Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream.
Upon doing so it should emit an <code>RDATA</code> message<sup class="footnote-reference"><a href="#3">3</a></sup>, once for every fact between the old and the new stream ID.</p> Upon doing so it should emit an <code>RDATA</code> message<sup class="footnote-reference"><a href="#3">3</a></sup>, once for every fact between the old and the new stream ID.</p>
<h3 id="subscribing-to-streams"><a class="header" href="#subscribing-to-streams">Subscribing to streams</a></h3> <h2 id="subscribing-to-streams"><a class="header" href="#subscribing-to-streams">Subscribing to streams</a></h2>
<p>Readers need to track the current position of every writer.</p> <p>Readers need to track the current position of every writer.</p>
<p>At startup, they can find this by contacting each writer with a <code>REPLICATE</code> message, <p>At startup, they can find this by contacting each writer with a <code>REPLICATE</code> message,
requesting that all writers reply describing their current position in their streams. requesting that all writers reply describing their current position in their streams.
@@ -276,9 +276,67 @@ Writers reply with a <code>POSITION</code> message.</p>
The <code>RDATA</code> itself is not a self-contained representation of the fact; The <code>RDATA</code> itself is not a self-contained representation of the fact;
readers will have to query the stream tables for the full details. readers will have to query the stream tables for the full details.
Readers must also advance their record of the writer's current position for that stream.</p> Readers must also advance their record of the writer's current position for that stream.</p>
<h1 id="summary"><a class="header" href="#summary">Summary</a></h1> <h2 id="summary"><a class="header" href="#summary">Summary</a></h2>
<p>In a nutshell: we have an append-only log with a &quot;buffer/scratchpad&quot; at the end where we have to wait for the sequence to be linear and contiguous.</p> <p>In a nutshell: we have an append-only log with a &quot;buffer/scratchpad&quot; at the end where we have to wait for the sequence to be linear and contiguous.</p>
<hr /> <hr />
<h2 id="cheatsheet-for-creating-a-new-stream"><a class="header" href="#cheatsheet-for-creating-a-new-stream">Cheatsheet for creating a new stream</a></h2>
<p>These rough notes and links may help you to create a new stream and add all the
necessary registration and event handling.</p>
<p><strong>Create your stream:</strong></p>
<ul>
<li><a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/_base.py#L728">create a stream class and stream row class</a>
<ul>
<li>will need an <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L75">ID generator</a>
<ul>
<li>may need <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/config/workers.py#L177">writer configuration</a>, if there isn't already an obvious source of configuration for which workers should be designated as writers to your new stream.
<ul>
<li>if adding new writer configuration, add Docker-worker configuration, which lets us configure the writer worker in Complement tests: <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L331">[1]</a>, <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L440">[2]</a></li>
</ul>
</li>
</ul>
</li>
</ul>
</li>
<li>most of the time, you will likely introduce a new datastore class for the concept represented by the new stream, unless there is already an obvious datastore that covers it.</li>
<li>consider whether it may make sense to introduce a handler</li>
</ul>
<p><strong>Register your stream in:</strong></p>
<ul>
<li><a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/__init__.py#L71"><code>STREAMS_MAP</code></a></li>
</ul>
<p><strong>Advance your stream in:</strong></p>
<ul>
<li><a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L111"><code>process_replication_position</code> of your appropriate datastore</a>
<ul>
<li>don't forget the super call</li>
</ul>
</li>
</ul>
<p><strong>If you're going to do any caching that needs invalidation from new rows:</strong></p>
<ul>
<li>add invalidations to <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L91"><code>process_replication_rows</code> of your appropriate datastore</a>
<ul>
<li>don't forget the super call</li>
</ul>
</li>
<li>add local-only <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L201">invalidations to your writer transactions</a></li>
</ul>
<p><strong>For streams to be used in sync:</strong></p>
<ul>
<li>add a new field to <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L1003"><code>StreamToken</code></a>
<ul>
<li>add a new <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L999"><code>StreamKeyType</code></a></li>
</ul>
</li>
<li>add appropriate wake-up rules
<ul>
<li>in <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/client.py#L260"><code>on_rdata</code></a></li>
<li>locally on the same worker when completing a write, <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/handlers/thread_subscriptions.py#L139">e.g. in your handler</a></li>
</ul>
</li>
<li>add the stream in <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/streams/events.py#L127"><code>bound_future_token</code></a></li>
</ul>
<hr />
<div class="footnote-definition" id="1"><sup class="footnote-definition-label">1</sup> <div class="footnote-definition" id="1"><sup class="footnote-definition-label">1</sup>
<p>we use the word <em>fact</em> here for two reasons. <p>we use the word <em>fact</em> here for two reasons.
Firstly, the word &quot;event&quot; is already heavily overloaded (PDUs, EDUs, account data, ...) and we don't need to make that worse. Firstly, the word &quot;event&quot; is already heavily overloaded (PDUs, EDUs, account data, ...) and we don't need to make that worse.

View File

@@ -19896,7 +19896,7 @@ minimal.</p>
<p>Information about how the tcp replication module is structured, including how <p>Information about how the tcp replication module is structured, including how
the classes interact, can be found in the classes interact, can be found in
<code>synapse/replication/tcp/__init__.py</code></p> <code>synapse/replication/tcp/__init__.py</code></p>
<div style="break-before: page; page-break-before: always;"></div><h2 id="streams"><a class="header" href="#streams">Streams</a></h2> <div style="break-before: page; page-break-before: always;"></div><h1 id="streams"><a class="header" href="#streams">Streams</a></h1>
<p>Synapse has a concept of &quot;streams&quot;, which are roughly described in <a href="https://github.com/element-hq/synapse/blob/develop/synapse/storage/util/id_generators.py"><code>id_generators.py</code></a>. <p>Synapse has a concept of &quot;streams&quot;, which are roughly described in <a href="https://github.com/element-hq/synapse/blob/develop/synapse/storage/util/id_generators.py"><code>id_generators.py</code></a>.
Generally speaking, streams are a series of notifications that something in Synapse's database has changed that the application might need to respond to. Generally speaking, streams are a series of notifications that something in Synapse's database has changed that the application might need to respond to.
For example:</p> For example:</p>
@@ -19908,7 +19908,7 @@ For example:</p>
<p>See <a href="https://github.com/element-hq/synapse/blob/develop/synapse/replication/tcp/streams/__init__.py"><code>synapse.replication.tcp.streams</code></a> for the full list of streams.</p> <p>See <a href="https://github.com/element-hq/synapse/blob/develop/synapse/replication/tcp/streams/__init__.py"><code>synapse.replication.tcp.streams</code></a> for the full list of streams.</p>
<p>It is very helpful to understand the streams mechanism when working on any part of Synapse that needs to respond to changes—especially if those changes are made by different workers. <p>It is very helpful to understand the streams mechanism when working on any part of Synapse that needs to respond to changes—especially if those changes are made by different workers.
To that end, let's describe streams formally, paraphrasing from the docstring of <a href="https://github.com/element-hq/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96"><code>AbstractStreamIdGenerator</code></a>.</p> To that end, let's describe streams formally, paraphrasing from the docstring of <a href="https://github.com/element-hq/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96"><code>AbstractStreamIdGenerator</code></a>.</p>
<h3 id="definition"><a class="header" href="#definition">Definition</a></h3> <h2 id="definition"><a class="header" href="#definition">Definition</a></h2>
<p>A stream is an append-only log <code>T1, T2, ..., Tn, ...</code> of facts<sup class="footnote-reference"><a href="#1">1</a></sup> which grows over time. <p>A stream is an append-only log <code>T1, T2, ..., Tn, ...</code> of facts<sup class="footnote-reference"><a href="#1">1</a></sup> which grows over time.
Only &quot;writers&quot; can add facts to a stream, and there may be multiple writers.</p> Only &quot;writers&quot; can add facts to a stream, and there may be multiple writers.</p>
<p>Each fact has an ID, called its &quot;stream ID&quot;. <p>Each fact has an ID, called its &quot;stream ID&quot;.
@@ -19933,7 +19933,7 @@ In the happy case, completion means a fact has been written to the stream table.
But unhappy cases (e.g. transaction rollback due to an error) also count as completion. But unhappy cases (e.g. transaction rollback due to an error) also count as completion.
Once completed, the rows written with that stream ID are fixed, and no new rows Once completed, the rows written with that stream ID are fixed, and no new rows
will be inserted with that ID.</p> will be inserted with that ID.</p>
<h3 id="current-stream-id"><a class="header" href="#current-stream-id">Current stream ID</a></h3> <h2 id="current-stream-id"><a class="header" href="#current-stream-id">Current stream ID</a></h2>
<p>For any given stream reader (including writers themselves), we may define a per-writer current stream ID:</p> <p>For any given stream reader (including writers themselves), we may define a per-writer current stream ID:</p>
<blockquote> <blockquote>
<p>A current stream ID <em>for a writer W</em> is the largest stream ID such that <p>A current stream ID <em>for a writer W</em> is the largest stream ID such that
@@ -19970,7 +19970,7 @@ Consider a single-writer stream which is initially at ID 1.</p>
<tr><td>Complete 4</td><td>5</td><td>current ID jumps 3-&gt;5, even though 6 is pending</td></tr> <tr><td>Complete 4</td><td>5</td><td>current ID jumps 3-&gt;5, even though 6 is pending</td></tr>
<tr><td>Complete 6</td><td>6</td><td></td></tr> <tr><td>Complete 6</td><td>6</td><td></td></tr>
</tbody></table> </tbody></table>
<h3 id="multi-writer-streams"><a class="header" href="#multi-writer-streams">Multi-writer streams</a></h3> <h2 id="multi-writer-streams"><a class="header" href="#multi-writer-streams">Multi-writer streams</a></h2>
<p>There are two ways to view a multi-writer stream.</p> <p>There are two ways to view a multi-writer stream.</p>
<ol> <ol>
<li>Treat it as a collection of distinct single-writer streams, one <li>Treat it as a collection of distinct single-writer streams, one
@@ -19988,7 +19988,7 @@ But the background process that works through events treats them as a single lin
The facts this stream holds are instructions to &quot;you should now invalidate these cache entries&quot;. The facts this stream holds are instructions to &quot;you should now invalidate these cache entries&quot;.
We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations. We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations.
(Invalidations are self-contained facts; and the invalidations commute/are idempotent).</p> (Invalidations are self-contained facts; and the invalidations commute/are idempotent).</p>
<h3 id="writing-to-streams"><a class="header" href="#writing-to-streams">Writing to streams</a></h3> <h2 id="writing-to-streams"><a class="header" href="#writing-to-streams">Writing to streams</a></h2>
<p>Writers need to track:</p> <p>Writers need to track:</p>
<ul> <ul>
<li>track their current position (i.e. its own per-writer stream ID).</li> <li>track their current position (i.e. its own per-writer stream ID).</li>
@@ -20004,7 +20004,7 @@ We only ever treat this as a multiple single-writer streams as there is no impor
<p>To complete a fact, first remove it from your map of facts currently awaiting completion. <p>To complete a fact, first remove it from your map of facts currently awaiting completion.
Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream. Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream.
Upon doing so it should emit an <code>RDATA</code> message<sup class="footnote-reference"><a href="#3">3</a></sup>, once for every fact between the old and the new stream ID.</p> Upon doing so it should emit an <code>RDATA</code> message<sup class="footnote-reference"><a href="#3">3</a></sup>, once for every fact between the old and the new stream ID.</p>
<h3 id="subscribing-to-streams"><a class="header" href="#subscribing-to-streams">Subscribing to streams</a></h3> <h2 id="subscribing-to-streams"><a class="header" href="#subscribing-to-streams">Subscribing to streams</a></h2>
<p>Readers need to track the current position of every writer.</p> <p>Readers need to track the current position of every writer.</p>
<p>At startup, they can find this by contacting each writer with a <code>REPLICATE</code> message, <p>At startup, they can find this by contacting each writer with a <code>REPLICATE</code> message,
requesting that all writers reply describing their current position in their streams. requesting that all writers reply describing their current position in their streams.
@@ -20013,9 +20013,67 @@ Writers reply with a <code>POSITION</code> message.</p>
The <code>RDATA</code> itself is not a self-contained representation of the fact; The <code>RDATA</code> itself is not a self-contained representation of the fact;
readers will have to query the stream tables for the full details. readers will have to query the stream tables for the full details.
Readers must also advance their record of the writer's current position for that stream.</p> Readers must also advance their record of the writer's current position for that stream.</p>
<h1 id="summary"><a class="header" href="#summary">Summary</a></h1> <h2 id="summary"><a class="header" href="#summary">Summary</a></h2>
<p>In a nutshell: we have an append-only log with a &quot;buffer/scratchpad&quot; at the end where we have to wait for the sequence to be linear and contiguous.</p> <p>In a nutshell: we have an append-only log with a &quot;buffer/scratchpad&quot; at the end where we have to wait for the sequence to be linear and contiguous.</p>
<hr /> <hr />
<h2 id="cheatsheet-for-creating-a-new-stream"><a class="header" href="#cheatsheet-for-creating-a-new-stream">Cheatsheet for creating a new stream</a></h2>
<p>These rough notes and links may help you to create a new stream and add all the
necessary registration and event handling.</p>
<p><strong>Create your stream:</strong></p>
<ul>
<li><a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/_base.py#L728">create a stream class and stream row class</a>
<ul>
<li>will need an <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L75">ID generator</a>
<ul>
<li>may need <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/config/workers.py#L177">writer configuration</a>, if there isn't already an obvious source of configuration for which workers should be designated as writers to your new stream.
<ul>
<li>if adding new writer configuration, add Docker-worker configuration, which lets us configure the writer worker in Complement tests: <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L331">[1]</a>, <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L440">[2]</a></li>
</ul>
</li>
</ul>
</li>
</ul>
</li>
<li>most of the time, you will likely introduce a new datastore class for the concept represented by the new stream, unless there is already an obvious datastore that covers it.</li>
<li>consider whether it may make sense to introduce a handler</li>
</ul>
<p><strong>Register your stream in:</strong></p>
<ul>
<li><a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/__init__.py#L71"><code>STREAMS_MAP</code></a></li>
</ul>
<p><strong>Advance your stream in:</strong></p>
<ul>
<li><a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L111"><code>process_replication_position</code> of your appropriate datastore</a>
<ul>
<li>don't forget the super call</li>
</ul>
</li>
</ul>
<p><strong>If you're going to do any caching that needs invalidation from new rows:</strong></p>
<ul>
<li>add invalidations to <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L91"><code>process_replication_rows</code> of your appropriate datastore</a>
<ul>
<li>don't forget the super call</li>
</ul>
</li>
<li>add local-only <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L201">invalidations to your writer transactions</a></li>
</ul>
<p><strong>For streams to be used in sync:</strong></p>
<ul>
<li>add a new field to <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L1003"><code>StreamToken</code></a>
<ul>
<li>add a new <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L999"><code>StreamKeyType</code></a></li>
</ul>
</li>
<li>add appropriate wake-up rules
<ul>
<li>in <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/client.py#L260"><code>on_rdata</code></a></li>
<li>locally on the same worker when completing a write, <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/handlers/thread_subscriptions.py#L139">e.g. in your handler</a></li>
</ul>
</li>
<li>add the stream in <a href="https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/streams/events.py#L127"><code>bound_future_token</code></a></li>
</ul>
<hr />
<div class="footnote-definition" id="1"><sup class="footnote-definition-label">1</sup> <div class="footnote-definition" id="1"><sup class="footnote-definition-label">1</sup>
<p>we use the word <em>fact</em> here for two reasons. <p>we use the word <em>fact</em> here for two reasons.
Firstly, the word &quot;event&quot; is already heavily overloaded (PDUs, EDUs, account data, ...) and we don't need to make that worse. Firstly, the word &quot;event&quot; is already heavily overloaded (PDUs, EDUs, account data, ...) and we don't need to make that worse.

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long