Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 918a5055ff | |||
| 2d97fb6740 | |||
| fbafc86aca |
@@ -16,9 +16,11 @@
|
||||
import math
|
||||
import logging
|
||||
|
||||
from fractions import Fraction
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.util.katriel_bodlaender import OrderedListStore
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
import synapse.metrics
|
||||
|
||||
@@ -59,11 +61,12 @@ class ChunkDBOrderedListStore(OrderedListStore):
|
||||
re-instantiated in each transaction, so all state needs to be stored
|
||||
in the database.
|
||||
|
||||
Internally the ordering is implemented using floats, and the average is
|
||||
taken when a node is inserted between other nodes. To avoid precision
|
||||
errors a minimum difference between sucessive orderings is attempted to be
|
||||
kept; whenever the difference is too small we attempt to rebalance. See
|
||||
the `_rebalance` function for implementation details.
|
||||
Internally the ordering is implemented using a linked list and assigning
|
||||
each chunk a fraction. `get_next` and `get_prev` are implemented via linked
|
||||
lists, and comparisons implemented using the fractions. When inserting
|
||||
chunks fractions are picked such that their denominator is the smallest
|
||||
possible. However, if the denominators grow too big then a rebalancing has
|
||||
to take place to reduce the denominators; see `_rebalance` for details.
|
||||
|
||||
Note that OrderedListStore orders nodes such that source of an edge
|
||||
comes before the target. This is counter intuitive when edges represent
|
||||
@@ -80,23 +83,24 @@ class ChunkDBOrderedListStore(OrderedListStore):
|
||||
txn
|
||||
room_id (str)
|
||||
clock
|
||||
rebalance_digits (int): When a rebalance is triggered we rebalance
|
||||
in a range around the node, where the bounds are rounded to this
|
||||
number of digits.
|
||||
min_difference (int): A rebalance is triggered when the difference
|
||||
between two successive orderings is less than the reciprocal of
|
||||
this.
|
||||
database_engine
|
||||
rebalance_max_denominator (int): When a rebalance is triggered we
|
||||
replace existing orders with those that have a denominator smaller
|
||||
or equal to this
|
||||
max_denominator (int): A rebalance is triggered when a node has an
|
||||
ordering with a denominator greater than this
|
||||
"""
|
||||
def __init__(self,
|
||||
txn, room_id, clock,
|
||||
rebalance_digits=3,
|
||||
min_difference=1000000):
|
||||
txn, room_id, clock, database_engine,
|
||||
rebalance_max_denominator=100,
|
||||
max_denominator=100000):
|
||||
self.txn = txn
|
||||
self.room_id = room_id
|
||||
self.clock = clock
|
||||
self.database_engine = database_engine
|
||||
|
||||
self.rebalance_digits = rebalance_digits
|
||||
self.min_difference = 1. / min_difference
|
||||
self.rebalance_md = rebalance_max_denominator
|
||||
self.max_denominator = max_denominator
|
||||
|
||||
def is_before(self, a, b):
|
||||
"""Implements OrderedListStore"""
|
||||
@@ -104,16 +108,13 @@ class ChunkDBOrderedListStore(OrderedListStore):
|
||||
|
||||
def get_prev(self, node_id):
|
||||
"""Implements OrderedListStore"""
|
||||
order = self._get_order(node_id)
|
||||
|
||||
sql = """
|
||||
SELECT chunk_id FROM chunk_linearized
|
||||
WHERE ordering < ? AND room_id = ?
|
||||
ORDER BY ordering DESC
|
||||
LIMIT 1
|
||||
WHERE next_chunk_id = ?
|
||||
"""
|
||||
|
||||
self.txn.execute(sql, (order, self.room_id,))
|
||||
self.txn.execute(sql, (node_id,))
|
||||
|
||||
row = self.txn.fetchone()
|
||||
if row:
|
||||
@@ -122,16 +123,13 @@ class ChunkDBOrderedListStore(OrderedListStore):
|
||||
|
||||
def get_next(self, node_id):
|
||||
"""Implements OrderedListStore"""
|
||||
order = self._get_order(node_id)
|
||||
|
||||
sql = """
|
||||
SELECT chunk_id FROM chunk_linearized
|
||||
WHERE ordering > ? AND room_id = ?
|
||||
ORDER BY ordering ASC
|
||||
LIMIT 1
|
||||
SELECT next_chunk_id FROM chunk_linearized
|
||||
WHERE chunk_id = ?
|
||||
"""
|
||||
|
||||
self.txn.execute(sql, (order, self.room_id,))
|
||||
self.txn.execute(sql, (node_id,))
|
||||
|
||||
row = self.txn.fetchone()
|
||||
if row:
|
||||
@@ -144,27 +142,26 @@ class ChunkDBOrderedListStore(OrderedListStore):
|
||||
rebalance = False # Set to true if we need to trigger a rebalance
|
||||
|
||||
if target_id:
|
||||
target_order = self._get_order(target_id)
|
||||
before_id = self.get_prev(target_id)
|
||||
|
||||
if before_id:
|
||||
before_order = self._get_order(before_id)
|
||||
new_order = (target_order + before_order) / 2.
|
||||
|
||||
rebalance = math.fabs(target_order - before_order) < self.min_difference
|
||||
new_order = self._insert_between(node_id, before_id, target_id)
|
||||
else:
|
||||
new_order = math.floor(target_order) - 1
|
||||
new_order = self._insert_at_start(node_id, target_id)
|
||||
else:
|
||||
# If target_id is None then we insert at the end.
|
||||
self.txn.execute("""
|
||||
SELECT COALESCE(MAX(ordering), 0) + 1
|
||||
SELECT chunk_id
|
||||
FROM chunk_linearized
|
||||
WHERE room_id = ?
|
||||
WHERE room_id = ? AND next_chunk_id is NULL
|
||||
""", (self.room_id,))
|
||||
|
||||
new_order, = self.txn.fetchone()
|
||||
row = self.txn.fetchone()
|
||||
if row:
|
||||
new_order = self._insert_at_end(node_id, row[0])
|
||||
else:
|
||||
new_order = self._insert_first(node_id)
|
||||
|
||||
self._insert(node_id, new_order)
|
||||
rebalance = new_order.denominator > self.max_denominator
|
||||
|
||||
if rebalance:
|
||||
self._rebalance(node_id)
|
||||
@@ -174,64 +171,204 @@ class ChunkDBOrderedListStore(OrderedListStore):
|
||||
|
||||
rebalance = False # Set to true if we need to trigger a rebalance
|
||||
|
||||
next_chunk_id = None
|
||||
if target_id:
|
||||
target_order = self._get_order(target_id)
|
||||
after_id = self.get_next(target_id)
|
||||
if after_id:
|
||||
after_order = self._get_order(after_id)
|
||||
new_order = (target_order + after_order) / 2.
|
||||
|
||||
rebalance = math.fabs(target_order - after_order) < self.min_difference
|
||||
next_chunk_id = self.get_next(target_id)
|
||||
if next_chunk_id:
|
||||
new_order = self._insert_between(node_id, target_id, next_chunk_id)
|
||||
else:
|
||||
new_order = math.ceil(target_order) + 1
|
||||
new_order = self._insert_at_end(node_id, target_id)
|
||||
else:
|
||||
# If target_id is None then we insert at the start.
|
||||
self.txn.execute("""
|
||||
SELECT COALESCE(MIN(ordering), 0) - 1
|
||||
SELECT chunk_id
|
||||
FROM chunk_linearized
|
||||
NATURAL JOIN chunk_linearized_first
|
||||
WHERE room_id = ?
|
||||
""", (self.room_id,))
|
||||
|
||||
new_order, = self.txn.fetchone()
|
||||
row = self.txn.fetchone()
|
||||
if row:
|
||||
new_order = self._insert_at_start(node_id, row[0])
|
||||
else:
|
||||
new_order = self._insert_first(node_id)
|
||||
|
||||
self._insert(node_id, new_order)
|
||||
rebalance = new_order.denominator > self.max_denominator
|
||||
|
||||
if rebalance:
|
||||
self._rebalance(node_id)
|
||||
|
||||
def _insert_between(self, node_id, left_id, right_id):
|
||||
"""Inserts node between given existing nodes.
|
||||
"""
|
||||
|
||||
left_order = self._get_order(left_id)
|
||||
right_order = self._get_order(right_id)
|
||||
|
||||
assert left_order < right_order
|
||||
|
||||
new_order = get_fraction_in_range(left_order, right_order)
|
||||
|
||||
SQLBaseStore._simple_update_one_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized",
|
||||
keyvalues={"chunk_id": left_id},
|
||||
updatevalues={"next_chunk_id": node_id},
|
||||
)
|
||||
|
||||
SQLBaseStore._simple_insert_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized",
|
||||
values={
|
||||
"chunk_id": node_id,
|
||||
"room_id": self.room_id,
|
||||
"next_chunk_id": right_id,
|
||||
"numerator": int(new_order.numerator),
|
||||
"denominator": int(new_order.denominator),
|
||||
}
|
||||
)
|
||||
|
||||
return new_order
|
||||
|
||||
def _insert_at_end(self, node_id, last_id):
|
||||
"""Inserts node at the end using existing last node.
|
||||
"""
|
||||
|
||||
last_order = self._get_order(last_id)
|
||||
new_order = Fraction(int(math.ceil(last_order)) + 1, 1)
|
||||
|
||||
SQLBaseStore._simple_update_one_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized",
|
||||
keyvalues={"chunk_id": last_id},
|
||||
updatevalues={"next_chunk_id": node_id},
|
||||
)
|
||||
|
||||
SQLBaseStore._simple_insert_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized",
|
||||
values={
|
||||
"chunk_id": node_id,
|
||||
"room_id": self.room_id,
|
||||
"next_chunk_id": None,
|
||||
"numerator": int(new_order.numerator),
|
||||
"denominator": int(new_order.denominator),
|
||||
}
|
||||
)
|
||||
|
||||
return new_order
|
||||
|
||||
def _insert_at_start(self, node_id, first_id):
|
||||
"""Inserts node at the start using existing first node.
|
||||
"""
|
||||
|
||||
first_order = self._get_order(first_id)
|
||||
new_order = get_fraction_in_range(0, first_order)
|
||||
|
||||
SQLBaseStore._simple_update_one_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized_first",
|
||||
keyvalues={"room_id": self.room_id},
|
||||
updatevalues={"chunk_id": node_id},
|
||||
)
|
||||
|
||||
SQLBaseStore._simple_insert_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized",
|
||||
values={
|
||||
"chunk_id": node_id,
|
||||
"room_id": self.room_id,
|
||||
"next_chunk_id": first_id,
|
||||
"numerator": int(new_order.numerator),
|
||||
"denominator": int(new_order.denominator),
|
||||
}
|
||||
)
|
||||
|
||||
return new_order
|
||||
|
||||
def _insert_first(self, node_id):
|
||||
"""Inserts the first node for this room.
|
||||
"""
|
||||
|
||||
SQLBaseStore._simple_insert_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized_first",
|
||||
values={
|
||||
"room_id": self.room_id,
|
||||
"chunk_id": node_id,
|
||||
},
|
||||
)
|
||||
|
||||
SQLBaseStore._simple_insert_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized",
|
||||
values={
|
||||
"chunk_id": node_id,
|
||||
"room_id": self.room_id,
|
||||
"next_chunk_id": None,
|
||||
"numerator": 1,
|
||||
"denominator": 1,
|
||||
}
|
||||
)
|
||||
|
||||
return Fraction(1, 1)
|
||||
|
||||
def get_nodes_with_edges_to(self, node_id):
|
||||
"""Implements OrderedListStore"""
|
||||
|
||||
# Note that we use the inverse relation here
|
||||
sql = """
|
||||
SELECT l.ordering, l.chunk_id FROM chunk_graph AS g
|
||||
SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
|
||||
INNER JOIN chunk_linearized AS l ON g.prev_id = l.chunk_id
|
||||
WHERE g.chunk_id = ?
|
||||
"""
|
||||
self.txn.execute(sql, (node_id,))
|
||||
return self.txn.fetchall()
|
||||
return [(Fraction(n, d), c) for c, n, d in self.txn]
|
||||
|
||||
def get_nodes_with_edges_from(self, node_id):
|
||||
"""Implements OrderedListStore"""
|
||||
|
||||
# Note that we use the inverse relation here
|
||||
sql = """
|
||||
SELECT l.ordering, l.chunk_id FROM chunk_graph AS g
|
||||
SELECT l.chunk_id, l.numerator, l.denominator FROM chunk_graph AS g
|
||||
INNER JOIN chunk_linearized AS l ON g.chunk_id = l.chunk_id
|
||||
WHERE g.prev_id = ?
|
||||
"""
|
||||
self.txn.execute(sql, (node_id,))
|
||||
return self.txn.fetchall()
|
||||
return [(Fraction(n, d), c) for c, n, d in self.txn]
|
||||
|
||||
def _delete_ordering(self, node_id):
|
||||
"""Implements OrderedListStore"""
|
||||
|
||||
next_chunk_id = SQLBaseStore._simple_select_one_onecol_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized",
|
||||
keyvalues={
|
||||
"chunk_id": node_id,
|
||||
},
|
||||
retcol="next_chunk_id",
|
||||
)
|
||||
|
||||
SQLBaseStore._simple_delete_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized",
|
||||
keyvalues={"chunk_id": node_id},
|
||||
)
|
||||
|
||||
sql = """
|
||||
UPDATE chunk_linearized SET next_chunk_id = ?
|
||||
WHERE next_chunk_id = ?
|
||||
"""
|
||||
|
||||
self.txn.execute(sql, (next_chunk_id, node_id,))
|
||||
|
||||
sql = """
|
||||
UPDATE chunk_linearized_first SET chunk_id = ?
|
||||
WHERE chunk_id = ?
|
||||
"""
|
||||
|
||||
self.txn.execute(sql, (next_chunk_id, node_id,))
|
||||
|
||||
def _add_edge_to_graph(self, source_id, target_id):
|
||||
"""Implements OrderedListStore"""
|
||||
|
||||
@@ -242,78 +379,199 @@ class ChunkDBOrderedListStore(OrderedListStore):
|
||||
values={"chunk_id": target_id, "prev_id": source_id}
|
||||
)
|
||||
|
||||
def _insert(self, node_id, order):
|
||||
"""Inserts the node with the given ordering.
|
||||
"""
|
||||
SQLBaseStore._simple_insert_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized",
|
||||
values={
|
||||
"chunk_id": node_id,
|
||||
"room_id": self.room_id,
|
||||
"ordering": order,
|
||||
}
|
||||
)
|
||||
|
||||
def _get_order(self, node_id):
|
||||
"""Get the ordering of the given node.
|
||||
"""
|
||||
|
||||
return SQLBaseStore._simple_select_one_onecol_txn(
|
||||
row = SQLBaseStore._simple_select_one_txn(
|
||||
self.txn,
|
||||
table="chunk_linearized",
|
||||
keyvalues={"chunk_id": node_id},
|
||||
retcol="ordering"
|
||||
retcols=("numerator", "denominator",),
|
||||
)
|
||||
return Fraction(row["numerator"], row["denominator"])
|
||||
|
||||
def _rebalance(self, node_id):
|
||||
"""Rebalances the list around the given node to ensure that the
|
||||
ordering floats don't get too small.
|
||||
ordering denominators aren't too big.
|
||||
|
||||
This works by finding a range that includes the given node, and
|
||||
recalculating the ordering floats such that they're equidistant in
|
||||
that range.
|
||||
This is done by starting at the given chunk and generating new orders
|
||||
based on a Farey sequence of order `self.rebalance_md` for all
|
||||
subsequent chunks that have an order less than that of the ordering
|
||||
generated by the Farey sequence.
|
||||
|
||||
For example say we have chunks (and orders): A (23/90), B (24/91) and
|
||||
C (2/3), and we have rebalance_md set to 5, a rebalancing would produce:
|
||||
|
||||
A: 23/90 -> 1/3
|
||||
B: 24/91 -> 2/5
|
||||
C: 2/3 (no change)
|
||||
|
||||
Since the farey sequence is 1/5, 1/4, 1/3, 2/5, 1/2, ... and 1/3 is the
|
||||
smallest term greater than 23/90.
|
||||
|
||||
Note that we've extended Farey Sequence to be infinite by repeating the
|
||||
sequence with an added integer. For example sequence with order 3:
|
||||
|
||||
0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ...
|
||||
"""
|
||||
|
||||
logger.info("Rebalancing room %s, chunk %s", self.room_id, node_id)
|
||||
|
||||
with Measure(self.clock, "chunk_rebalance"):
|
||||
# We pick the interval to try and minimise the number of decimal
|
||||
# places, i.e. we round to nearest float with `rebalance_digits` and
|
||||
# use that as one side of the interval
|
||||
order = self._get_order(node_id)
|
||||
a = round(order, self.rebalance_digits)
|
||||
min_order = a - 10 ** -self.rebalance_digits
|
||||
max_order = a + 10 ** -self.rebalance_digits
|
||||
old_order = self._get_order(node_id)
|
||||
|
||||
# Now we get all the nodes in the range. We add the minimum difference
|
||||
# to the bounds to ensure that we don't accidentally move a node to be
|
||||
# within the minimum difference of a node outside the range.
|
||||
sql = """
|
||||
SELECT chunk_id FROM chunk_linearized
|
||||
WHERE ordering >= ? AND ordering <= ? AND room_id = ?
|
||||
"""
|
||||
self.txn.execute(sql, (
|
||||
min_order - self.min_difference,
|
||||
max_order + self.min_difference,
|
||||
self.room_id,
|
||||
))
|
||||
a, b, c, d = find_farey_terms(old_order, self.rebalance_md)
|
||||
assert old_order < Fraction(a, b)
|
||||
assert b + d > self.rebalance_md
|
||||
|
||||
chunk_ids = [c for c, in self.txn]
|
||||
|
||||
sql = """
|
||||
UPDATE chunk_linearized
|
||||
SET ordering = ?
|
||||
WHERE chunk_id = ?
|
||||
"""
|
||||
|
||||
step = (max_order - min_order) / len(chunk_ids)
|
||||
self.txn.executemany(
|
||||
sql,
|
||||
(
|
||||
((idx * step + min_order), chunk_id)
|
||||
for idx, chunk_id in enumerate(chunk_ids)
|
||||
)
|
||||
# Since we can easily produce farey sequence terms with an iterative
|
||||
# algorithm, we can use WITH RECURSIVE to do so. This is less clear
|
||||
# than doing it in python, but saves us being killed by the RTT to the
|
||||
# DB if we need to rebalance a large number of nodes.
|
||||
with_sql = """
|
||||
WITH RECURSIVE chunks (chunk_id, next, n, a, b, c, d) AS (
|
||||
SELECT chunk_id, next_chunk_id, ?, ?, ?, ?, ?
|
||||
FROM chunk_linearized WHERE chunk_id = ?
|
||||
UNION ALL
|
||||
SELECT n.chunk_id, n.next_chunk_id, n,
|
||||
c, d, ((n + b) / d) * c - a, ((n + b) / d) * d - b
|
||||
FROM chunks AS c
|
||||
INNER JOIN chunk_linearized AS l ON l.chunk_id = c.chunk_id
|
||||
INNER JOIN chunk_linearized AS n ON n.chunk_id = l.next_chunk_id
|
||||
WHERE c * 1.0 / d > n.numerator * 1.0 / n.denominator
|
||||
)
|
||||
"""
|
||||
|
||||
rebalance_counter.inc()
|
||||
# Annoyingly, postgres 9.4 doesn't support the standard SQL subquery
|
||||
# syntax for updates.
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = with_sql + """
|
||||
UPDATE chunk_linearized AS l
|
||||
SET numerator = a, denominator = b
|
||||
FROM chunks AS c
|
||||
WHERE c.chunk_id = l.chunk_id
|
||||
"""
|
||||
else:
|
||||
sql = with_sql + """
|
||||
UPDATE chunk_linearized
|
||||
SET (numerator, denominator) = (
|
||||
SELECT a, b FROM chunks
|
||||
WHERE chunks.chunk_id = chunk_linearized.chunk_id
|
||||
)
|
||||
WHERE chunk_id in (SELECT chunk_id FROM chunks)
|
||||
"""
|
||||
|
||||
self.txn.execute(sql, (
|
||||
self.rebalance_md, a, b, c, d, node_id
|
||||
))
|
||||
|
||||
logger.info("Rebalanced %d chunks in room %s", self.txn.rowcount, self.room_id)
|
||||
|
||||
rebalance_counter.inc()
|
||||
|
||||
|
||||
def get_fraction_in_range(min_frac, max_frac):
|
||||
"""Gets a fraction in between the given numbers.
|
||||
|
||||
Uses Stern-Brocot tree to generate the fraction with the smallest
|
||||
denominator.
|
||||
|
||||
See https://en.wikipedia.org/wiki/Stern%E2%80%93Brocot_tree
|
||||
|
||||
Args:
|
||||
min_frac (numbers.Rational)
|
||||
max_frac (numbers.Rational)
|
||||
|
||||
Returns:
|
||||
numbers.Rational
|
||||
"""
|
||||
|
||||
assert 0 <= min_frac < max_frac
|
||||
|
||||
# If the determinant is 1 then the fraction with smallest numerator and
|
||||
# denominator in the range is the mediant, so we don't have to use the
|
||||
# stern brocot tree to search for it.
|
||||
determinant = (
|
||||
min_frac.denominator * max_frac.numerator
|
||||
- min_frac.numerator * max_frac.denominator
|
||||
)
|
||||
|
||||
if determinant == 1:
|
||||
return Fraction(
|
||||
min_frac.numerator + max_frac.numerator,
|
||||
min_frac.denominator + max_frac.denominator,
|
||||
)
|
||||
|
||||
# This works by tracking two fractions a/b and c/d and repeatedly replacing
|
||||
# one of them with their mediant, depending on if the mediant is smaller
|
||||
# or greater than the specified range.
|
||||
a, b, c, d = 0, 1, 1, 0
|
||||
|
||||
while True:
|
||||
f = Fraction(a + c, b + d)
|
||||
|
||||
if f <= min_frac:
|
||||
a, b, c, d = a + c, b + d, c, d
|
||||
elif min_frac < f < max_frac:
|
||||
return f
|
||||
else:
|
||||
a, b, c, d = a, b, a + c, b + d
|
||||
|
||||
|
||||
def find_farey_terms(min_frac, max_denom):
|
||||
"""Find the smallest pair of fractions that are part of the Farey sequence
|
||||
of order `max_denom` (the ordered sequence of all fraction with denominator
|
||||
less than or equal to max_denom).
|
||||
|
||||
This is useful as it can be fed into a simple iterative algorithm to
|
||||
generate subsequent entries in the sequence.
|
||||
|
||||
A pair of fractions a/b, c/d are neighbours in the sequence of order
|
||||
max(b, d) if and only if their determinant is one, i.e. bc - ad = 1. Note
|
||||
that the next order sequence is generate by taking the mediants of the
|
||||
previous order, so a/b and c/d are neighbours in all sequences with orders
|
||||
between max(b, d) and b + d.
|
||||
|
||||
We can therefore use the Stern-Brocot tree to find the closest pair of
|
||||
fractions to min_frac such that b + d is strictly greater than max_denom,
|
||||
since all neighbouring fractions in Stern-Brocot satisfy the necessary
|
||||
determinant property.
|
||||
|
||||
Note that we've extended Farey Sequence to be infinite by repeating the
|
||||
sequence with an added integer. For example sequence with order 3:
|
||||
|
||||
0/1, 1/3, 2/3, 1/1, 4/3, 5/3, 2/1, 7/3, ...
|
||||
|
||||
See https://en.wikipedia.org/wiki/Farey_sequence
|
||||
|
||||
Args:
|
||||
min_frac (numbers.Rational)
|
||||
max_frac (int)
|
||||
|
||||
Returns:
|
||||
tuple[int, int, int, int]
|
||||
"""
|
||||
|
||||
a, b, c, d = 0, 1, 1, 0
|
||||
|
||||
while True:
|
||||
cur_frac = Fraction(a + c, b + d)
|
||||
|
||||
if b + d > max_denom:
|
||||
break
|
||||
|
||||
if cur_frac <= min_frac:
|
||||
a, b, c, d = a + c, b + d, c, d
|
||||
elif min_frac < cur_frac:
|
||||
a, b, c, d = a, b, a + c, b + d
|
||||
|
||||
# a/b may be smaller than min_frac, so we run the algorithm to generate
|
||||
# next Farey sequence terms until a/b is strictly greater than min_frac
|
||||
while Fraction(a, b) <= min_frac:
|
||||
k = int((max_denom + b) / d)
|
||||
a, b, c, d = c, d, k * c - a, k * d - b
|
||||
|
||||
assert min_frac < Fraction(a, b) < Fraction(c, d)
|
||||
assert b * c - a * d == 1
|
||||
|
||||
return a, b, c, d
|
||||
|
||||
@@ -202,6 +202,7 @@ def _retry_on_integrity_error(func):
|
||||
class EventsStore(EventsWorkerStore):
|
||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
||||
EVENT_FIELDS_CHUNK = "event_fields_chunk_id"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(EventsStore, self).__init__(db_conn, hs)
|
||||
@@ -242,6 +243,11 @@ class EventsStore(EventsWorkerStore):
|
||||
psql_only=True,
|
||||
)
|
||||
|
||||
self.register_background_update_handler(
|
||||
self.EVENT_FIELDS_CHUNK,
|
||||
self._background_compute_chunks,
|
||||
)
|
||||
|
||||
self._event_persist_queue = _EventPeristenceQueue()
|
||||
|
||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||
@@ -1440,7 +1446,7 @@ class EventsStore(EventsWorkerStore):
|
||||
sibling_events.update(pes)
|
||||
|
||||
table = ChunkDBOrderedListStore(
|
||||
txn, room_id, self.clock,
|
||||
txn, room_id, self.clock, self.database_engine,
|
||||
)
|
||||
|
||||
# If there is only one previous chunk (and that isn't None), then this
|
||||
@@ -1823,6 +1829,72 @@ class EventsStore(EventsWorkerStore):
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_compute_chunks(self, progress, batch_size):
|
||||
"""Iterates over events and assigns them chunk IDs
|
||||
"""
|
||||
|
||||
up_to_stream_id = progress.get("up_to_stream_id")
|
||||
if up_to_stream_id is None:
|
||||
up_to_stream_id = self.get_current_events_token() + 1
|
||||
|
||||
rows_inserted = progress.get("rows_inserted", 0)
|
||||
|
||||
def reindex_chunks_txn(txn):
|
||||
txn.execute("""
|
||||
SELECT stream_ordering, room_id, event_id FROM events
|
||||
WHERE stream_ordering < ? AND outlier = ? AND chunk_id IS NULL
|
||||
ORDER BY stream_ordering DESC
|
||||
LIMIT ?
|
||||
""", (up_to_stream_id, False, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
|
||||
stream_ordering = up_to_stream_id
|
||||
for stream_ordering, room_id, event_id in rows:
|
||||
prev_events = self._simple_select_onecol_txn(
|
||||
txn,
|
||||
table="event_edges",
|
||||
keyvalues={
|
||||
"event_id": event_id,
|
||||
},
|
||||
retcol="prev_event_id",
|
||||
)
|
||||
|
||||
chunk_id, topo = self._insert_into_chunk_txn(
|
||||
txn, room_id, event_id, prev_events,
|
||||
)
|
||||
|
||||
self._simple_update_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={"event_id": event_id},
|
||||
updatevalues={
|
||||
"chunk_id": chunk_id,
|
||||
"topological_ordering": topo,
|
||||
},
|
||||
)
|
||||
|
||||
progress = {
|
||||
"up_to_stream_id": stream_ordering,
|
||||
"rows_inserted": rows_inserted + len(rows)
|
||||
}
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, self.EVENT_FIELDS_CHUNK, progress
|
||||
)
|
||||
|
||||
return len(rows)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
self.EVENT_FIELDS_CHUNK, reindex_chunks_txn
|
||||
)
|
||||
|
||||
if not result:
|
||||
yield self._end_background_update(self.EVENT_FIELDS_CHUNK)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
def get_current_backfill_token(self):
|
||||
"""The current minimum token that backfilled events have reached"""
|
||||
return -self._backfill_id_gen.get_current_token()
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.storage._base import SQLBaseStore, LoggingTransaction
|
||||
from synapse.storage.prepare_database import get_statements
|
||||
|
||||
SQL = """
|
||||
|
||||
ALTER TABLE events ADD COLUMN chunk_id BIGINT;
|
||||
|
||||
-- FIXME: Add index on contains_url
|
||||
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('events_chunk_index', '{}');
|
||||
|
||||
-- Stores how chunks of graph relate to each other
|
||||
CREATE TABLE chunk_graph (
|
||||
chunk_id BIGINT NOT NULL,
|
||||
prev_id BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX chunk_graph_id ON chunk_graph (chunk_id, prev_id);
|
||||
CREATE INDEX chunk_graph_prev_id ON chunk_graph (prev_id);
|
||||
|
||||
-- The extremities in each chunk. Note that these are pointing to events that
|
||||
-- we don't have, rather than boundary between chunks.
|
||||
CREATE TABLE chunk_backwards_extremities (
|
||||
chunk_id BIGINT NOT NULL,
|
||||
event_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX chunk_backwards_extremities_id ON chunk_backwards_extremities(
|
||||
chunk_id, event_id
|
||||
);
|
||||
CREATE INDEX chunk_backwards_extremities_event_id ON chunk_backwards_extremities(
|
||||
event_id
|
||||
);
|
||||
|
||||
-- Maintains an absolute ordering of chunks. Gets updated when we see new
|
||||
-- edges between chunks.
|
||||
CREATE TABLE chunk_linearized (
|
||||
chunk_id BIGINT NOT NULL,
|
||||
room_id TEXT NOT NULL,
|
||||
next_chunk_id BIGINT, -- The chunk directly after this chunk, or NULL if last chunk
|
||||
numerator BIGINT NOT NULL,
|
||||
denominator BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
|
||||
CREATE UNIQUE INDEX chunk_linearized_next_id ON chunk_linearized (
|
||||
next_chunk_id, room_id
|
||||
);
|
||||
|
||||
-- Records the first chunk in a room.
|
||||
CREATE TABLE chunk_linearized_first (
|
||||
chunk_id BIGINT NOT NULL,
|
||||
room_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX chunk_linearized_first_id ON chunk_linearized_first (room_id);
|
||||
|
||||
INSERT into background_updates (update_name, progress_json)
|
||||
VALUES ('event_fields_chunk_id', '{}');
|
||||
|
||||
"""
|
||||
|
||||
|
||||
def run_create(cur, database_engine, *args, **kwargs):
|
||||
for statement in get_statements(SQL.splitlines()):
|
||||
cur.execute(statement)
|
||||
|
||||
txn = LoggingTransaction(
|
||||
cur, "schema_update", database_engine, [], [],
|
||||
)
|
||||
|
||||
rows = SQLBaseStore._simple_select_list_txn(
|
||||
txn,
|
||||
table="event_forward_extremities",
|
||||
keyvalues={},
|
||||
retcols=("event_id", "room_id",),
|
||||
)
|
||||
|
||||
next_chunk_id = 1
|
||||
room_to_next_order = {}
|
||||
prev_chunks_by_room = {}
|
||||
|
||||
for row in rows:
|
||||
chunk_id = next_chunk_id
|
||||
next_chunk_id += 1
|
||||
|
||||
room_id = row["room_id"]
|
||||
event_id = row["event_id"]
|
||||
|
||||
SQLBaseStore._simple_update_txn(
|
||||
txn,
|
||||
table="events",
|
||||
keyvalues={"room_id": room_id, "event_id": event_id},
|
||||
updatevalues={"chunk_id": chunk_id},
|
||||
)
|
||||
|
||||
ordering = room_to_next_order.get(room_id, 1)
|
||||
room_to_next_order[room_id] = ordering + 1
|
||||
|
||||
prev_chunks = prev_chunks_by_room.setdefault(room_id, [])
|
||||
|
||||
SQLBaseStore._simple_insert_txn(
|
||||
txn,
|
||||
table="chunk_linearized",
|
||||
values={
|
||||
"chunk_id": chunk_id,
|
||||
"room_id": row["room_id"],
|
||||
"numerator": ordering,
|
||||
"denominator": 1,
|
||||
},
|
||||
)
|
||||
|
||||
if prev_chunks:
|
||||
SQLBaseStore._simple_update_one_txn(
|
||||
txn,
|
||||
table="chunk_linearized",
|
||||
keyvalues={"chunk_id": prev_chunks[-1]},
|
||||
updatevalues={"next_chunk_id": chunk_id},
|
||||
)
|
||||
else:
|
||||
SQLBaseStore._simple_insert_txn(
|
||||
txn,
|
||||
table="chunk_linearized_first",
|
||||
values={
|
||||
"chunk_id": chunk_id,
|
||||
"room_id": row["room_id"],
|
||||
},
|
||||
)
|
||||
|
||||
prev_chunks.append(chunk_id)
|
||||
|
||||
|
||||
def run_upgrade(*args, **kwargs):
|
||||
pass
|
||||
@@ -1,49 +0,0 @@
|
||||
/* Copyright 2018 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
ALTER TABLE events ADD COLUMN chunk_id BIGINT;
|
||||
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('events_chunk_index', '{}');
|
||||
|
||||
-- Stores how chunks of graph relate to each other
|
||||
CREATE TABLE chunk_graph (
|
||||
chunk_id BIGINT NOT NULL,
|
||||
prev_id BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX chunk_graph_id ON chunk_graph (chunk_id, prev_id);
|
||||
CREATE INDEX chunk_graph_prev_id ON chunk_graph (prev_id);
|
||||
|
||||
-- The extremities in each chunk. Note that these are pointing to events that
|
||||
-- we don't have, rather than boundary between chunks.
|
||||
CREATE TABLE chunk_backwards_extremities (
|
||||
chunk_id BIGINT NOT NULL,
|
||||
event_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX chunk_backwards_extremities_id ON chunk_backwards_extremities(chunk_id, event_id);
|
||||
CREATE INDEX chunk_backwards_extremities_event_id ON chunk_backwards_extremities(event_id);
|
||||
|
||||
-- Maintains an absolute ordering of chunks. Gets updated when we see new
|
||||
-- edges between chunks.
|
||||
CREATE TABLE chunk_linearized (
|
||||
chunk_id BIGINT NOT NULL,
|
||||
room_id TEXT NOT NULL,
|
||||
ordering DOUBLE PRECISION NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
|
||||
CREATE INDEX chunk_linearized_ordering ON chunk_linearized (room_id, ordering);
|
||||
@@ -792,7 +792,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
iterated_chunks = [chunk_id]
|
||||
|
||||
table = ChunkDBOrderedListStore(
|
||||
txn, room_id, self.clock,
|
||||
txn, room_id, self.clock, self.database_engine,
|
||||
)
|
||||
|
||||
if filter_clause:
|
||||
|
||||
@@ -15,11 +15,16 @@
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import itertools
|
||||
import random
|
||||
import tests.unittest
|
||||
import tests.utils
|
||||
|
||||
from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore
|
||||
from fractions import Fraction
|
||||
|
||||
from synapse.storage.chunk_ordered_table import (
|
||||
ChunkDBOrderedListStore, find_farey_terms, get_fraction_in_range,
|
||||
)
|
||||
|
||||
|
||||
class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
|
||||
@@ -42,23 +47,26 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
|
||||
|
||||
def test_txn(txn):
|
||||
table = ChunkDBOrderedListStore(
|
||||
txn, room_id, self.clock, 1, 100,
|
||||
txn, room_id, self.clock,
|
||||
self.store.database_engine,
|
||||
5, 100,
|
||||
)
|
||||
|
||||
table.add_node("A")
|
||||
table._insert_after("B", "A")
|
||||
table._insert_before("C", "A")
|
||||
table._insert_after("D", "A")
|
||||
|
||||
sql = """
|
||||
SELECT chunk_id FROM chunk_linearized
|
||||
SELECT chunk_id, numerator, denominator FROM chunk_linearized
|
||||
WHERE room_id = ?
|
||||
ORDER BY ordering ASC
|
||||
"""
|
||||
txn.execute(sql, (room_id,))
|
||||
|
||||
ordered = [r for r, in txn]
|
||||
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
|
||||
ordered = [c for _, c in ordered]
|
||||
|
||||
self.assertEqual(["C", "A", "B"], ordered)
|
||||
self.assertEqual(["C", "A", "D", "B"], ordered)
|
||||
|
||||
yield self.store.runInteraction("test", test_txn)
|
||||
|
||||
@@ -68,7 +76,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
|
||||
|
||||
def test_txn(txn):
|
||||
table = ChunkDBOrderedListStore(
|
||||
txn, room_id, self.clock, 1, 20,
|
||||
txn, room_id, self.clock,
|
||||
self.store.database_engine,
|
||||
5, 100,
|
||||
)
|
||||
|
||||
nodes = [(i, "node_%d" % (i,)) for i in xrange(1, 1000)]
|
||||
@@ -95,13 +105,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
|
||||
already_inserted.sort()
|
||||
|
||||
sql = """
|
||||
SELECT chunk_id FROM chunk_linearized
|
||||
SELECT chunk_id, numerator, denominator FROM chunk_linearized
|
||||
WHERE room_id = ?
|
||||
ORDER BY ordering ASC
|
||||
"""
|
||||
txn.execute(sql, (room_id,))
|
||||
|
||||
ordered = [r for r, in txn]
|
||||
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
|
||||
ordered = [c for _, c in ordered]
|
||||
|
||||
self.assertEqual(expected, ordered)
|
||||
|
||||
@@ -113,7 +123,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
|
||||
|
||||
def test_txn(txn):
|
||||
table = ChunkDBOrderedListStore(
|
||||
txn, room_id, self.clock, 1, 20,
|
||||
txn, room_id, self.clock,
|
||||
self.store.database_engine,
|
||||
5, 1000,
|
||||
)
|
||||
|
||||
table.add_node("a")
|
||||
@@ -131,13 +143,13 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
|
||||
expected.append(node_id)
|
||||
|
||||
sql = """
|
||||
SELECT chunk_id FROM chunk_linearized
|
||||
SELECT chunk_id, numerator, denominator FROM chunk_linearized
|
||||
WHERE room_id = ?
|
||||
ORDER BY ordering ASC
|
||||
"""
|
||||
txn.execute(sql, (room_id,))
|
||||
|
||||
ordered = [r for r, in txn]
|
||||
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
|
||||
ordered = [c for _, c in ordered]
|
||||
|
||||
self.assertEqual(expected, ordered)
|
||||
|
||||
@@ -149,7 +161,9 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
|
||||
|
||||
def test_txn(txn):
|
||||
table = ChunkDBOrderedListStore(
|
||||
txn, room_id, self.clock, 1, 100,
|
||||
txn, room_id, self.clock,
|
||||
self.store.database_engine,
|
||||
5, 100,
|
||||
)
|
||||
|
||||
table.add_node("a")
|
||||
@@ -170,16 +184,119 @@ class ChunkLinearizerStoreTestCase(tests.unittest.TestCase):
|
||||
prev_node = node_id
|
||||
|
||||
sql = """
|
||||
SELECT chunk_id FROM chunk_linearized
|
||||
SELECT chunk_id, numerator, denominator FROM chunk_linearized
|
||||
WHERE room_id = ?
|
||||
ORDER BY ordering ASC
|
||||
"""
|
||||
txn.execute(sql, (room_id,))
|
||||
|
||||
ordered = [r for r, in txn]
|
||||
ordered = sorted([(Fraction(n, d), r) for r, n, d in txn])
|
||||
ordered = [c for _, c in ordered]
|
||||
|
||||
expected = expected_prefix + list(reversed(expected_suffix))
|
||||
|
||||
self.assertEqual(expected, ordered)
|
||||
|
||||
yield self.store.runInteraction("test", test_txn)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_get_edges_to(self):
|
||||
room_id = "foo_room4"
|
||||
|
||||
def test_txn(txn):
|
||||
table = ChunkDBOrderedListStore(
|
||||
txn, room_id, self.clock,
|
||||
self.store.database_engine,
|
||||
5, 100,
|
||||
)
|
||||
|
||||
table.add_node("A")
|
||||
table._insert_after("B", "A")
|
||||
table._add_edge_to_graph("A", "B")
|
||||
table._insert_before("C", "A")
|
||||
table._add_edge_to_graph("C", "A")
|
||||
|
||||
nodes = table.get_nodes_with_edges_from("A")
|
||||
self.assertEqual([n for _, n in nodes], ["B"])
|
||||
|
||||
nodes = table.get_nodes_with_edges_to("A")
|
||||
self.assertEqual([n for _, n in nodes], ["C"])
|
||||
|
||||
yield self.store.runInteraction("test", test_txn)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_get_next_and_prev(self):
|
||||
room_id = "foo_room5"
|
||||
|
||||
def test_txn(txn):
|
||||
table = ChunkDBOrderedListStore(
|
||||
txn, room_id, self.clock,
|
||||
self.store.database_engine,
|
||||
5, 100,
|
||||
)
|
||||
|
||||
table.add_node("A")
|
||||
table._insert_after("B", "A")
|
||||
table._insert_before("C", "A")
|
||||
|
||||
self.assertEqual(table.get_next("A"), "B")
|
||||
self.assertEqual(table.get_prev("A"), "C")
|
||||
|
||||
yield self.store.runInteraction("test", test_txn)
|
||||
|
||||
def test_find_farey_terms(self):
|
||||
def _test(min_frac, max_denom):
|
||||
""""Calls `find_farey_terms` with given values and checks they
|
||||
are neighbours in the Farey Sequence.
|
||||
"""
|
||||
|
||||
a, b, c, d = find_farey_terms(min_frac, max_denom)
|
||||
|
||||
p = Fraction(a, b)
|
||||
q = Fraction(c, d)
|
||||
|
||||
assert min_frac < p < q
|
||||
|
||||
for x, y in _pairwise(_farey_generator(max_denom)):
|
||||
if min_frac < x < y:
|
||||
self.assertEqual(x, p)
|
||||
self.assertEqual(y, q)
|
||||
break
|
||||
|
||||
_test(Fraction(5, 3), 12)
|
||||
_test(Fraction(1, 3), 12)
|
||||
_test(Fraction(1, 2), 9)
|
||||
_test(Fraction(1, 2), 10)
|
||||
_test(Fraction(1, 2), 15)
|
||||
|
||||
def test_get_fraction_in_range(self):
|
||||
def _test(x, y):
|
||||
assert x < get_fraction_in_range(x, y) < y
|
||||
|
||||
_test(Fraction(1, 2), Fraction(2, 3))
|
||||
_test(Fraction(1, 2), Fraction(3, 2))
|
||||
_test(Fraction(5, 203), Fraction(6, 204))
|
||||
|
||||
|
||||
def _farey_generator(n):
|
||||
"""Generates Farey sequence of order `n`.
|
||||
|
||||
Note that this doesn't terminate.
|
||||
|
||||
Taken from https://en.wikipedia.org/wiki/Farey_sequence#Next_term
|
||||
"""
|
||||
|
||||
a, b, c, d = 0, 1, 1, n
|
||||
|
||||
yield Fraction(a, b)
|
||||
|
||||
while True:
|
||||
k = int((n + b) / d)
|
||||
a, b, c, d = c, d, (k * c - a), (k * d - b)
|
||||
yield Fraction(a, b)
|
||||
|
||||
|
||||
def _pairwise(iterable):
|
||||
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
|
||||
a, b = itertools.tee(iterable)
|
||||
next(b, None)
|
||||
return itertools.izip(a, b)
|
||||
|
||||
@@ -56,3 +56,29 @@ class KatrielBodlaenderTests(unittest.TestCase):
|
||||
store.add_edge("node_4", "node_3")
|
||||
|
||||
self.assertEqual(list(reversed(nodes)), store.list)
|
||||
|
||||
def test_divergent_graph(self):
|
||||
store = InMemoryOrderedListStore()
|
||||
|
||||
nodes = [
|
||||
"node_1",
|
||||
"node_2",
|
||||
"node_3",
|
||||
"node_4",
|
||||
"node_5",
|
||||
"node_6",
|
||||
]
|
||||
|
||||
for node in reversed(nodes):
|
||||
store.add_node(node)
|
||||
|
||||
store.add_edge("node_2", "node_3")
|
||||
store.add_edge("node_2", "node_5")
|
||||
store.add_edge("node_1", "node_2")
|
||||
store.add_edge("node_3", "node_4")
|
||||
store.add_edge("node_1", "node_3")
|
||||
store.add_edge("node_4", "node_5")
|
||||
store.add_edge("node_5", "node_6")
|
||||
store.add_edge("node_4", "node_6")
|
||||
|
||||
self.assertEqual(nodes, store.list)
|
||||
|
||||
Reference in New Issue
Block a user