Skip to content
Snippets Groups Projects
Commit 53e015d6 authored by Dariusz Kędzierski's avatar Dariusz Kędzierski
Browse files

[WIP] Works on block streaming

parent fab7d62a
No related branches found
No related tags found
2 merge requests!456Release candidate v1 24,!336Mock data providers
...@@ -6,3 +6,6 @@ log = logging.getLogger(__name__) ...@@ -6,3 +6,6 @@ log = logging.getLogger(__name__)
class MockBlockProvider(MockDataProvider): class MockBlockProvider(MockDataProvider):
""" Data provider for test ops """ """ Data provider for test ops """
@classmethod
def get_blocks_greater_than(cls, block_num):
return [int(block) for block in cls.block_data if int(block) >= block_num]
...@@ -10,7 +10,9 @@ class MockDataProvider(): ...@@ -10,7 +10,9 @@ class MockDataProvider():
@classmethod @classmethod
def get_max_block_number(cls): def get_max_block_number(cls):
return max([int(block) for block in cls.block_data.keys()]) block_numbers = [int(block) for block in cls.block_data]
block_numbers.append(0)
return max(block_numbers)
@classmethod @classmethod
def load_block_data(cls, data_path): def load_block_data(cls, data_path):
......
"""Streams incoming blocks from the Steem blockchain.""" """Streams incoming blocks from the Steem blockchain."""
from hive.indexer.mock_data_provider import MockDataProvider
import logging import logging
from time import sleep from time import sleep
from hive.steem.block.schedule import BlockSchedule from hive.steem.block.schedule import BlockSchedule
...@@ -66,6 +67,7 @@ class BlockStream: ...@@ -66,6 +67,7 @@ class BlockStream:
self._client = client self._client = client
self._min_gap = min_gap self._min_gap = min_gap
self._max_gap = max_gap self._max_gap = max_gap
self._last_irreversible = self._client.last_irreversible()
def _gap_ok(self, curr, head): def _gap_ok(self, curr, head):
"""Ensures gap between curr and head is within limits (max_gap).""" """Ensures gap between curr and head is within limits (max_gap)."""
...@@ -83,6 +85,13 @@ class BlockStream: ...@@ -83,6 +85,13 @@ class BlockStream:
queue = BlockQueue(self._min_gap, prev) queue = BlockQueue(self._min_gap, prev)
schedule = BlockSchedule(head) schedule = BlockSchedule(head)
mock_max_block_number = MockBlockProvider.get_max_block_number()
if curr > self._last_irreversible and curr <= mock_max_block_number:
for block_num in MockBlockProvider.get_blocks_greater_than(curr):
popped = queue.push(MockBlockProvider.get_block_data(block_num, True))
if popped:
yield popped
while self._gap_ok(curr, head): while self._gap_ok(curr, head):
head = schedule.wait_for_block(curr) head = schedule.wait_for_block(curr)
block = self._client.get_block(curr, strict=False) block = self._client.get_block(curr, strict=False)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment