Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • hive/haf
  • dan/haf
2 results
Show changes
Commits on Source (4)
......@@ -21,18 +21,9 @@ variables:
DATA_CACHE_MAINNET: /cache/replay_data_haf_mainnet_${CI_PIPELINE_ID}
BLOCK_LOG_SOURCE_DIR_5M: /blockchain/block_log_5m
# Variables required by Common CI jobs
CI_COMMON_JOB_VERSION: "656cd1670240c01fe05e56ee7158ea04877da555"
DOCKER_BUILDER_TAG: "$CI_COMMON_JOB_VERSION"
DOCKER_DIND_TAG: "$CI_COMMON_JOB_VERSION"
IMAGE_REMOVER_TAG: "$CI_COMMON_JOB_VERSION"
include:
- template: Workflows/Branch-Pipelines.gitlab-ci.yml
- local: '/scripts/ci-helpers/prepare_data_image_job.yml'
- project: 'hive/common-ci-configuration'
ref: 656cd1670240c01fe05e56ee7158ea04877da555
file: '/templates/docker_image_jobs.gitlab-ci.yml'
.hive_fork_manager_build:
stage: build_and_test_phase_1
......
Subproject commit faa8b1d33aead9e555b98adb78a5183634d9f8f5
Subproject commit cf0b147bca3705e9692b2327bf1b93a6c7a021d1
include:
- project: 'hive/hive'
ref: develop
ref: 8937722d925b1b2dfb6f899d294b9e6d6899fe0c #develop
file: '/scripts/ci-helpers/prepare_data_image_job.yml'
.prepare_haf_data_5m_image:
......
from haf_local_tools.db_adapter.db_adapter import DbAdapter
from __future__ import annotations
from typing import Any, TYPE_CHECKING, TypeAlias, Union
if TYPE_CHECKING:
from sqlalchemy.engine.row import Row
from sqlalchemy.orm.session import Session
ScalarType: TypeAlias = Any | None
ColumnType: TypeAlias = list[ScalarType]
class DbAdapter:
@staticmethod
def query_all(session: Session, sql: str, **kwargs) -> list[Row]:
"""Perform a `SELECT n*m`"""
return session.execute(sql, params=kwargs).all()
@staticmethod
def query_col(session: Session, sql: str, **kwargs) -> ColumnType:
"""Perform a `SELECT n*1`"""
return [row[0] for row in session.execute(sql, params=kwargs).all()]
@staticmethod
def query_no_return(session: Session, sql: str, **kwargs) -> None:
"""Perform a query with no return"""
session.execute(sql, params=kwargs).close()
@staticmethod
def query_row(session: Session, sql: str, **kwargs) -> Row:
"""Perform a `SELECT 1*m`"""
return session.execute(sql, params=kwargs).first()
@staticmethod
def query_one(session: Session, sql: str, **kwargs) -> ScalarType:
"""Perform a `SELECT 1*1`"""
return session.execute(sql, params=kwargs).scalar()
from haf_local_tools.haf_node.haf_node_handle import HafNodeHandle as HafNode
from __future__ import annotations
from datetime import timedelta
import math
from typing import Final, TYPE_CHECKING, TypedDict, Union
from uuid import uuid4
import sqlalchemy
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import NullPool
from sqlalchemy_utils import create_database, database_exists, drop_database
from test_tools.__private.preconfigured_node import PreconfiguredNode
from test_tools.__private.time import Time
from haf_local_tools.db_adapter import DbAdapter
if TYPE_CHECKING:
from sqlalchemy.engine.row import Row
from test_tools.__private.user_handles.handles.network_handle import NetworkHandle
from test_tools.__private.user_handles.handles.node_handles.node_handle_base import NodeHandleBase as NodeHandle
from haf_local_tools.db_adapter import ColumnType, ScalarType
class Transaction(TypedDict):
transaction_id: str
class HafNode(PreconfiguredNode):
DEFAULT_DATABASE_URL: Final[str] = "postgresql:///haf_block_log"
def __init__(
self,
*,
name: str = "HafNode",
network: NetworkHandle | None = None,
database_url: str = DEFAULT_DATABASE_URL,
keep_database: bool = False,
handle: NodeHandle | None = None,
) -> None:
super().__init__(name=name, network=network, handle=handle)
self.__database_url: str = self.__create_unique_url(database_url)
self.__session: Session | None = None
self.__keep_database: bool = keep_database
self.config.plugin.append("sql_serializer")
@property
def session(self) -> Session:
assert self.__session, "Session is not available since node was not run yet! Call the 'run()' method first."
return self.__session
@property
def database_url(self) -> str:
return self.__database_url
def _actions_before_run(self) -> None:
self.__make_database()
@staticmethod
def __create_unique_url(database_url):
return database_url + "_" + uuid4().hex
def __make_database(self) -> None:
self.config.psql_url = self.__database_url
self._logger.info(f"Preparing database {self.__database_url}")
if database_exists(self.__database_url):
drop_database(self.__database_url)
create_database(self.__database_url, template="haf_block_log")
engine = sqlalchemy.create_engine(self.__database_url, echo=False, poolclass=NullPool)
session = sessionmaker(bind=engine)
self.__session = session()
def close(self) -> None:
super().close()
self.__close_session()
def __close_session(self) -> None:
if self.__session is not None:
self.__session.close()
def _actions_after_final_cleanup(self) -> None:
if not self.__keep_database:
drop_database(self.__database_url)
def wait_for_transaction_in_database(
self,
transaction: Transaction,
*,
timeout: float | timedelta = math.inf,
poll_time: float = 1.0,
):
transaction_hash = transaction["transaction_id"]
Time.wait_for(
lambda: self.__is_transaction_in_database(transaction_hash),
timeout=timeout,
timeout_error_message=f"Waited too long for transaction {transaction_hash}",
poll_time=poll_time,
)
def __is_transaction_in_database(self, trx_id: str) -> bool:
sql = "SELECT exists(SELECT 1 FROM hive.transactions_view WHERE trx_hash LIKE decode(:hash, 'hex'));"
return self.query_one(sql, hash=trx_id)
def query_all(self, sql: str, **kwargs) -> list[Row]:
return DbAdapter.query_all(self.session, sql, **kwargs)
def query_col(self, sql: str, **kwargs) -> ColumnType:
return DbAdapter.query_col(self.session, sql, **kwargs)
def query_no_return(self, sql: str, **kwargs) -> None:
DbAdapter.query_no_return(self.session, sql, **kwargs)
def query_row(self, sql: str, **kwargs) -> Row:
return DbAdapter.query_row(self.session, sql, **kwargs)
def query_one(self, sql: str, **kwargs) -> ScalarType:
return DbAdapter.query_one(self.session, sql, **kwargs)
from __future__ import annotations
from datetime import timedelta
import math
from typing import cast, Optional, TYPE_CHECKING, Union
from haf_local_tools.haf_node._haf_node import HafNode
from test_tools.__private.user_handles.get_implementation import get_implementation
from test_tools.__private.user_handles.handles.node_handles.node_handle_base import NodeHandleBase
if TYPE_CHECKING:
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import Session
from haf_local_tools.db_adapter.db_adapter import ColumnType, ScalarType
from haf_local_tools.haf_node._haf_node import Transaction
from test_tools.__private.user_handles.handles.network_handle import NetworkHandle as Network
class HafNodeHandle(NodeHandleBase):
def __init__(
self,
network: Optional[Network] = None,
database_url: str = HafNode.DEFAULT_DATABASE_URL,
keep_database: bool = False,
) -> None:
super().__init__(
implementation=HafNode(
network=get_implementation(network),
database_url=database_url,
keep_database=keep_database,
handle=self,
)
)
@property
def __implementation(self) -> HafNode:
return cast(HafNode, get_implementation(self))
@property
def session(self) -> Session:
"""Returns Sqlalchemy database session"""
return self.__implementation.session
@property
def database_url(self) -> str:
"""Returns haf database url"""
return self.__implementation.database_url
def wait_for_transaction_in_database(
self, transaction: Transaction, *, timeout: float | timedelta = math.inf, poll_time: float = 1.0
):
"""Function that blocks program execution until a transaction appears in the database
:param transaction: A transaction that we're waiting for
:param timeout: Timeout in seconds or preferably timedelta (e.g. tt.Time.minutes(1)).
:param poll_time: Time between predicate calls.
"""
return self.__implementation.wait_for_transaction_in_database(transaction, timeout=timeout, poll_time=poll_time)
def query_all(self, sql: str, **kwargs) -> list[Row]:
"""Execute a SQL query and return all results. (`SELECT n*m`)
:param sql: The SQL query to execute.
:param kwargs: Additional parameters to pass to the query.
:return: A list of `Row` objects representing the result set.
"""
return self.__implementation.query_all(sql, **kwargs)
def query_col(self, sql: str, **kwargs) -> ColumnType:
"""Execute a SQL query and return a single column of results. (`SELECT n*1`)
:param sql: The SQL query to execute.
:param kwargs: Additional parameters to pass to the query.
:return: A list of values representing a single column of the result set.
"""
return self.__implementation.query_col(sql, **kwargs)
def query_no_return(self, sql: str, **kwargs) -> None:
"""Execute a SQL query and do not return any results.
:param sql: The SQL query to execute.
:param kwargs: Additional parameters to pass to the query.
"""
self.__implementation.query_no_return(sql, **kwargs)
def query_row(self, sql: str, **kwargs) -> Row:
"""Execute a SQL query and return a single row of results. (`SELECT 1*m`)
:param sql: The SQL query to execute.
:param kwargs: Additional parameters to pass to the query.
:return: A `Row` object representing a single row of the result set.
"""
return self.__implementation.query_row(sql, **kwargs)
def query_one(self, sql: str, **kwargs) -> ScalarType:
"""Execute a SQL query and return a single value. (`SELECT 1*1`)
:param sql: The SQL query to execute.
:param kwargs: Additional parameters to pass to the query.
:return: A single value representing the result of the query.
"""
return self.__implementation.query_one(sql, **kwargs)