hive_fork_app.py 3.82 KB
Newer Older
Marcin's avatar
Marcin committed
1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python3

import sqlalchemy

APPLICATION_CONTEXT = "trx_histogram"
SQL_CREATE_AND_REGISTER_HISTOGRAM_TABLE = """
    CREATE TABLE IF NOT EXISTS public.trx_histogram(
          day DATE
        , trx INT
        , CONSTRAINT pk_trx_histogram PRIMARY KEY( day ) )
Marcin's avatar
Marcin committed
11
12
    INHERITS( hive.{} )
    """.format( APPLICATION_CONTEXT )
Marcin's avatar
Marcin committed
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
SQL_CREATE_UPDATE_HISTOGRAM_FUNCTION = """
    CREATE OR REPLACE FUNCTION public.update_histogram( _first_block INT, _last_block INT )
    RETURNS void
    LANGUAGE plpgsql
    VOLATILE
    AS
     $function$
     BEGIN
        INSERT INTO public.trx_histogram as th( day, trx )
        SELECT
              DATE(hb.created_at) as date
            , COUNT(1) as trx
        FROM hive.trx_histogram_blocks_view hb
        JOIN hive.trx_histogram_transactions_view ht ON ht.block_num = hb.num
        WHERE hb.num >= _first_block AND hb.num <= _last_block
        GROUP BY DATE(hb.created_at)
        ON CONFLICT ON CONSTRAINT pk_trx_histogram DO UPDATE
        SET
            trx = EXCLUDED.trx + th.trx
        WHERE th.day = EXCLUDED.day;
     END;
     $function$
    """

def create_db_engine():
    return sqlalchemy.create_engine(
Marcin's avatar
Marcin committed
39
                "postgresql://alice:test@localhost:5432/psql_tools_test_db", # this is only example of db
Marcin's avatar
Marcin committed
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
                isolation_level="READ COMMITTED",
                pool_size=1,
                pool_recycle=3600,
                echo=False)

def prepare_application_data( db_connection ):
        # create a new context only if it not already exists
        exist = db_connection.execute( "SELECT hive.app_context_exists( '{}' )".format( APPLICATION_CONTEXT ) ).fetchone();
        if exist[ 0 ] == False:
            db_connection.execute( "SELECT hive.app_create_context( '{}' )".format( APPLICATION_CONTEXT ) )

        # create and register a table
        db_connection.execute( SQL_CREATE_AND_REGISTER_HISTOGRAM_TABLE )

        # create SQL function to do the application's task
        db_connection.execute( SQL_CREATE_UPDATE_HISTOGRAM_FUNCTION )

def main_loop( db_connection ):
    # forever loop
    while True:
        # start a new transaction
        with db_connection.begin():
            # get blocks range
            blocks_range = db_connection.execute( "SELECT * FROM hive.app_next_block( '{}' )".format( APPLICATION_CONTEXT ) ).fetchone()

            print( "Blocks range {}".format( blocks_range ) )
            (first_block, last_block) = blocks_range;
            # if no blocks are fetched then ask for new blocks again
            if not first_block:
                continue;

            (first_block, last_block) = blocks_range;

            # check if massive sync is required
            if ( last_block - first_block ) > 100:
                # Yes, massive sync is required
                # detach context
                db_connection.execute( "SELECT hive.app_context_detach( '{}' )".format( APPLICATION_CONTEXT ) )

                # update massivly the application's table - one commit transaction for whole massive edition
80
                db_connection.execute( "SELECT public.update_histogram( {}, {} )".format( first_block, last_block ) )
Marcin's avatar
Marcin committed
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

                # attach context and moves it to last synced block
                db_connection.execute( "SELECT hive.app_context_attach( '{}', {} )".format( APPLICATION_CONTEXT, last_block ) )
                continue

            # process the first block in range - one commit after each block
            db_connection.execute( "SELECT public.update_histogram( {}, {} )".format( first_block, last_block ) )

def start_application():
    engine = create_db_engine()
    with engine.connect() as db_connection:
        prepare_application_data( db_connection )
        main_loop( db_connection )

if __name__ == '__main__':
    try:
        start_application()
    except KeyboardInterrupt:
        print( "Break by the user request" )
        pass