Skip to content
Snippets Groups Projects
Commit 58ce55b7 authored by roadscape's avatar roadscape
Browse files

factor out fifo queue, add tests

parent ac21da34
No related branches found
No related tags found
No related merge requests found
"""Accounts indexer."""
import math
import logging
from datetime import datetime
......@@ -13,6 +12,7 @@ from hive.steem.client import SteemClient
from hive.utils.normalize import rep_log10, vests_amount
from hive.utils.timer import Timer
from hive.utils.account import safe_profile_metadata
from hive.utils.unique_fifo import UniqueFIFO
log = logging.getLogger(__name__)
......@@ -25,8 +25,7 @@ class Accounts:
_ids = {}
# fifo queue
_dirty = []
_dirty_set = set()
_dirty = UniqueFIFO()
# account core methods
# --------------------
......@@ -79,15 +78,7 @@ class Accounts:
@classmethod
def dirty(cls, accounts):
"""Marks given accounts as needing an update."""
if not accounts:
return 0
assert isinstance(accounts, set)
accounts = accounts - cls._dirty_set
if not accounts:
return 0
cls._dirty.extend(accounts)
cls._dirty_set |= set(accounts)
return len(accounts)
return cls._dirty.extend(accounts)
@classmethod
def dirty_all(cls):
......@@ -108,22 +99,15 @@ class Accounts:
- trx: bool - wrap the update in a transaction
- spread: int - spread writes over a period of `n` calls
"""
assert spread >= 1
if not cls._dirty:
accounts = cls._dirty.shift_portion(spread)
count = len(accounts)
if not count:
return 0
count = len(cls._dirty)
if spread > 1:
count = math.ceil(count / spread)
if trx:
log.info("[SYNC] update %d accounts", count)
# shift _dirty by `count` items
accounts = cls._dirty[0:count]
cls._dirty = cls._dirty[count:None]
for name in accounts:
cls._dirty_set.remove(name)
cls._cache_accounts(accounts, trx=trx)
return count
......
"""Performant FIFO queue which ignores duplicates."""
from math import ceil
class UniqueFIFO:
"""FIFO queue which ignores duplicates and shifts efficiently."""
def __init__(self):
self._queue = []
self._set = set()
def extend(self, items):
"""Push multiple items onto the queue.
Returns number of accepted items."""
if not items:
return 0
assert isinstance(items, set)
items = items - self._set
if not items:
return 0
self._queue.extend(items)
self._set |= set(items)
return len(items)
def shift_count(self, count=1):
"""Shift a number of items from the queue."""
items = len(self._queue)
if not items:
return []
if count >= items:
return self._take_all()
return self._shift(count)
def shift_portion(self, total_portions):
"""Shift a fraction of items from the queue.
Returned item count is `ceil(count / total_portions)`.
"""
count = len(self._queue)
if not count:
return []
if total_portions == 1 or count == 1:
return self._take_all()
count = ceil(count / total_portions)
return self._shift(count)
def _take_all(self):
ret = self._queue
self._queue = []
self._set = set()
return ret
def _shift(self, count):
# select relevant portion
ret = self._queue[0:count]
# prune queue and remove from set
self._queue = self._queue[count:None]
for item in ret:
self._set.remove(item)
return ret
def __len__(self):
return len(self._queue)
#pylint: disable=missing-docstring
from hive.utils.unique_fifo import UniqueFIFO
def test_unique_queue():
q = UniqueFIFO()
assert q.extend(set(['tim', 'bob'])) == 2
assert len(q) == 2
assert q.extend(set(['tim', 'foo'])) == 1
assert len(q) == 3
pop1 = q.shift_portion(3)
assert pop1 == ['tim'] or pop1 == ['bob']
assert len(q) == 2
assert q.extend(set()) == 0
assert len(q) == 2
assert q.extend(set(['foo'])) == 0
assert len(q) == 2
pop2 = q.shift_portion(1)
assert pop2 == ['bob', 'foo'] or pop2 == ['tim', 'foo']
assert len(q) == 0
assert q.shift_portion(500) == []
assert q.extend(set(['tim', 'bob'])) == 2
assert q.extend(set(['tim', 'tom'])) == 1
assert q.extend(set(['tim', 'foo'])) == 1
assert set(q.shift_count(2)) == set(['tim', 'bob'])
assert q.shift_count(1) == ['tom']
assert q.shift_count(400) == ['foo']
assert q.shift_count(400) == []
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment