Skip to content
Snippets Groups Projects
Verified Commit 905b99c6 authored by Brian of London's avatar Brian of London
Browse files

fix: update datetime usage to ensure UTC consistency in blockchainobject.py...

fix: update datetime usage to ensure UTC consistency in blockchainobject.py and add test for derive_permlink with UTC date

partial

Signed-off-by: default avatarBrian of London (home imac) <brian@v4v.app>
parent c47de580
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from beemgraphenebase.py23 import bytes_types, integer_types, string_types, text_type from beemgraphenebase.py23 import bytes_types, integer_types, string_types, text_type
from beem.instance import shared_blockchain_instance from beem.instance import shared_blockchain_instance
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
import json import json
import threading import threading
...@@ -16,7 +16,7 @@ class ObjectCache(dict): ...@@ -16,7 +16,7 @@ class ObjectCache(dict):
def __setitem__(self, key, value): def __setitem__(self, key, value):
data = { data = {
"expires": datetime.utcnow() + timedelta( "expires": datetime.now(tz=timezone.utc) + timedelta(
seconds=self.default_expiration), seconds=self.default_expiration),
"data": value "data": value
} }
...@@ -47,7 +47,7 @@ class ObjectCache(dict): ...@@ -47,7 +47,7 @@ class ObjectCache(dict):
def clear_expired_items(self): def clear_expired_items(self):
with self.lock: with self.lock:
del_list = [] del_list = []
utc_now = datetime.utcnow() utc_now = datetime.now(tz=timezone.utc)
for key in self: for key in self:
value = dict.__getitem__(self, key) value = dict.__getitem__(self, key)
if value is None: if value is None:
...@@ -64,7 +64,7 @@ class ObjectCache(dict): ...@@ -64,7 +64,7 @@ class ObjectCache(dict):
value = dict.__getitem__(self, key) value = dict.__getitem__(self, key)
if value is None: if value is None:
return False return False
if datetime.utcnow() < value["expires"]: if datetime.now(tz=timezone.utc) < value["expires"]:
return True return True
else: else:
value["data"] = None value["data"] = None
...@@ -110,7 +110,7 @@ class BlockchainObject(dict): ...@@ -110,7 +110,7 @@ class BlockchainObject(dict):
if kwargs.get("steem_instance"): if kwargs.get("steem_instance"):
blockchain_instance = kwargs["steem_instance"] blockchain_instance = kwargs["steem_instance"]
elif kwargs.get("hive_instance"): elif kwargs.get("hive_instance"):
blockchain_instance = kwargs["hive_instance"] blockchain_instance = kwargs["hive_instance"]
self.blockchain = blockchain_instance or shared_blockchain_instance() self.blockchain = blockchain_instance or shared_blockchain_instance()
self.cached = False self.cached = False
self.identifier = None self.identifier = None
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import re import ast
import difflib
import json import json
import time as timenow
import math import math
from datetime import datetime, tzinfo, timedelta, date, time import os
import pytz import re
import difflib
from ruamel.yaml import YAML
import difflib
import secrets import secrets
import string import string
import time as timenow
from datetime import date, datetime, time, timedelta, timezone
from ruamel.yaml import YAML
from beemgraphenebase.account import PasswordKey from beemgraphenebase.account import PasswordKey
import ast
import os
timeFormat = "%Y-%m-%dT%H:%M:%S" timeFormat = "%Y-%m-%dT%H:%M:%S"
# https://github.com/matiasb/python-unidiff/blob/master/unidiff/constants.py#L37 # https://github.com/matiasb/python-unidiff/blob/master/unidiff/constants.py#L37
...@@ -23,35 +23,44 @@ RE_HUNK_HEADER = re.compile( ...@@ -23,35 +23,44 @@ RE_HUNK_HEADER = re.compile(
def formatTime(t): def formatTime(t):
""" Properly Format Time for permlinks """Properly Format Time for permalinks, ensuring UTC timezone."""
"""
if isinstance(t, float): if isinstance(t, float):
return datetime.utcfromtimestamp(t).strftime("%Y%m%dt%H%M%S%Z") # Convert timestamp (assumed UTC) to UTC datetime
if isinstance(t, (datetime, date, time)): dt = datetime.fromtimestamp(t, tz=timezone.utc)
return t.strftime("%Y%m%dt%H%M%S%Z") elif isinstance(t, datetime):
# Ensure UTC if naive, leave as-is if aware
dt = t.replace(tzinfo=timezone.utc) if t.tzinfo is None else t
elif isinstance(t, date):
# Convert date to datetime at midnight UTC
dt = datetime.combine(t, time(0, 0), tzinfo=timezone.utc)
elif isinstance(t, time):
# Combine with current date in UTC
dt = datetime.combine(datetime.now(timezone.utc).date(), t, tzinfo=timezone.utc)
else:
raise ValueError(f"Unsupported type for formatTime: {type(t)}")
return dt.strftime("%Y%m%dt%H%M%S%Z")
def addTzInfo(t, timezone="UTC"):
def addTzInfo(t, new_timezone=timezone.utc):
"""Returns a datetime object with tzinfo added""" """Returns a datetime object with tzinfo added"""
if t and isinstance(t, (datetime, date, time)) and t.tzinfo is None: if t and isinstance(t, (datetime, date, time)) and t.tzinfo is None:
utc = pytz.timezone(timezone) t = t.replace(tzinfo=new_timezone)
t = utc.localize(t)
return t return t
def formatTimeString(t): def formatTimeString(t):
""" Properly Format Time for permlinks """Properly Format Time for permlinks"""
"""
if isinstance(t, (datetime, date, time)): if isinstance(t, (datetime, date, time)):
return t.strftime(timeFormat) return t.strftime(timeFormat)
return addTzInfo(datetime.strptime(t, timeFormat)) return addTzInfo(datetime.strptime(t, timeFormat))
def formatToTimeStamp(t): def formatToTimeStamp(t):
""" Returns a timestamp integer """Returns a timestamp integer
:param datetime t: datetime object :param datetime t: datetime object
:return: Timestamp as integer :return: Timestamp as integer
""" """
if isinstance(t, (datetime, date, time)): if isinstance(t, (datetime, date, time)):
t = addTzInfo(t) t = addTzInfo(t)
...@@ -62,20 +71,22 @@ def formatToTimeStamp(t): ...@@ -62,20 +71,22 @@ def formatToTimeStamp(t):
def formatTimeFromNow(secs=0): def formatTimeFromNow(secs=0):
""" Properly Format Time that is `x` seconds in the future """Properly Format Time that is `x` seconds in the future or past.
:param int secs: Seconds to go in the future (`x>0`) or the
past (`x<0`)
:return: Properly formated time for Graphene (`%Y-%m-%dT%H:%M:%S`)
:rtype: str
:param int secs: Seconds to go in the future (secs > 0) or the past (secs < 0).
Defaults to 0 (current time).
:return: Properly formatted time for Graphene (e.g., '2025-03-12T10:00:00').
:rtype: str
""" """
return datetime.utcfromtimestamp(timenow.time() + int(secs)).strftime(timeFormat) current_time = time.time() # Current time in seconds since epoch
target_time = current_time + int(secs) # Add/subtract seconds
return datetime.fromtimestamp(target_time, tz=timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%S"
)
def formatTimedelta(td): def formatTimedelta(td):
"""Format timedelta to String """Format timedelta to String"""
"""
if not isinstance(td, timedelta): if not isinstance(td, timedelta):
return "" return ""
days, seconds = td.days, td.seconds days, seconds = td.days, td.seconds
...@@ -86,11 +97,15 @@ def formatTimedelta(td): ...@@ -86,11 +97,15 @@ def formatTimedelta(td):
def parse_time(block_time): def parse_time(block_time):
"""Take a string representation of time from the blockchain, and parse it """Take a string representation of time from the blockchain and parse it
into datetime object. into a UTC-aware datetime object.
:param str block_time: Time string in format 'YYYY-MM-DDTHH:MM:SS' (e.g., '2025-03-12T10:00:00')
:return: UTC-aware datetime object
:rtype: datetime
""" """
utc = pytz.timezone("UTC") time_format = "%Y-%m-%dT%H:%M:%S"
return utc.localize(datetime.strptime(block_time, timeFormat)) return datetime.strptime(block_time, time_format).replace(tzinfo=timezone.utc)
def assets_from_string(text): def assets_from_string(text):
...@@ -111,8 +126,13 @@ def sanitize_permlink(permlink): ...@@ -111,8 +126,13 @@ def sanitize_permlink(permlink):
return permlink return permlink
def derive_permlink(title, parent_permlink=None, parent_author=None, def derive_permlink(
max_permlink_length=256, with_suffix=True): title,
parent_permlink=None,
parent_author=None,
max_permlink_length=256,
with_suffix=True,
):
"""Derive a permlink from a comment title (for root level """Derive a permlink from a comment title (for root level
comments) or the parent permlink and optionally the parent comments) or the parent permlink and optionally the parent
author (for replies). author (for replies).
...@@ -188,7 +208,7 @@ def resolve_authorperm(identifier): ...@@ -188,7 +208,7 @@ def resolve_authorperm(identifier):
def construct_authorperm(*args): def construct_authorperm(*args):
""" Create a post identifier from comment/post object or arguments. """Create a post identifier from comment/post object or arguments.
Examples: Examples:
.. code-block:: python .. code-block:: python
...@@ -238,7 +258,7 @@ def resolve_authorpermvoter(identifier): ...@@ -238,7 +258,7 @@ def resolve_authorpermvoter(identifier):
def construct_authorpermvoter(*args): def construct_authorpermvoter(*args):
""" Create a vote identifier from vote object or arguments. """Create a vote identifier from vote object or arguments.
Examples: Examples:
.. code-block:: python .. code-block:: python
...@@ -284,8 +304,8 @@ def reputation_to_score(rep): ...@@ -284,8 +304,8 @@ def reputation_to_score(rep):
def remove_from_dict(obj, keys=list(), keep_keys=True): def remove_from_dict(obj, keys=list(), keep_keys=True):
""" Prune a class or dictionary of all but keys (keep_keys=True). """Prune a class or dictionary of all but keys (keep_keys=True).
Prune a class or dictionary of specified keys.(keep_keys=False). Prune a class or dictionary of specified keys.(keep_keys=False).
""" """
if type(obj) == dict: if type(obj) == dict:
items = list(obj.items()) items = list(obj.items())
...@@ -301,9 +321,10 @@ def remove_from_dict(obj, keys=list(), keep_keys=True): ...@@ -301,9 +321,10 @@ def remove_from_dict(obj, keys=list(), keep_keys=True):
def make_patch(a, b): def make_patch(a, b):
import diff_match_patch as dmp_module import diff_match_patch as dmp_module
dmp = dmp_module.diff_match_patch() dmp = dmp_module.diff_match_patch()
patch = dmp.patch_make(a, b) patch = dmp.patch_make(a, b)
patch_text = dmp.patch_toText(patch) patch_text = dmp.patch_toText(patch)
return patch_text return patch_text
...@@ -373,7 +394,7 @@ def seperate_yaml_dict_from_body(content): ...@@ -373,7 +394,7 @@ def seperate_yaml_dict_from_body(content):
if len(content.split("---\n")) > 1: if len(content.split("---\n")) > 1:
body = content[content.find("---\n", 1) + 4 :] body = content[content.find("---\n", 1) + 4 :]
yaml_content = content[content.find("---\n") + 4 : content.find("---\n", 1)] yaml_content = content[content.find("---\n") + 4 : content.find("---\n", 1)]
yaml=YAML(typ="safe") yaml = YAML(typ="safe")
parameter = yaml.load(yaml_content) parameter = yaml.load(yaml_content)
if not isinstance(parameter, dict): if not isinstance(parameter, dict):
parameter = yaml.load(yaml_content.replace(":", ": ").replace(" ", " ")) parameter = yaml.load(yaml_content.replace(":", ": ").replace(" ", " "))
...@@ -383,59 +404,75 @@ def seperate_yaml_dict_from_body(content): ...@@ -383,59 +404,75 @@ def seperate_yaml_dict_from_body(content):
def create_yaml_header(comment, json_metadata={}, reply_identifier=None): def create_yaml_header(comment, json_metadata={}, reply_identifier=None):
yaml_prefix = '---\n' yaml_prefix = "---\n"
if comment["title"] != "": if comment["title"] != "":
yaml_prefix += 'title: "%s"\n' % comment["title"] yaml_prefix += 'title: "%s"\n' % comment["title"]
if "permlink" in comment: if "permlink" in comment:
yaml_prefix += 'permlink: %s\n' % comment["permlink"] yaml_prefix += "permlink: %s\n" % comment["permlink"]
yaml_prefix += 'author: %s\n' % comment["author"] yaml_prefix += "author: %s\n" % comment["author"]
if "author" in json_metadata: if "author" in json_metadata:
yaml_prefix += 'authored by: %s\n' % json_metadata["author"] yaml_prefix += "authored by: %s\n" % json_metadata["author"]
if "description" in json_metadata: if "description" in json_metadata:
yaml_prefix += 'description: "%s"\n' % json_metadata["description"] yaml_prefix += 'description: "%s"\n' % json_metadata["description"]
if "canonical_url" in json_metadata: if "canonical_url" in json_metadata:
yaml_prefix += 'canonical_url: %s\n' % json_metadata["canonical_url"] yaml_prefix += "canonical_url: %s\n" % json_metadata["canonical_url"]
if "app" in json_metadata: if "app" in json_metadata:
yaml_prefix += 'app: %s\n' % json_metadata["app"] yaml_prefix += "app: %s\n" % json_metadata["app"]
if "last_update" in comment: if "last_update" in comment:
yaml_prefix += 'last_update: %s\n' % comment["last_update"] yaml_prefix += "last_update: %s\n" % comment["last_update"]
elif "updated" in comment: elif "updated" in comment:
yaml_prefix += 'last_update: %s\n' % comment["updated"] yaml_prefix += "last_update: %s\n" % comment["updated"]
yaml_prefix += 'max_accepted_payout: %s\n' % str(comment["max_accepted_payout"]) yaml_prefix += "max_accepted_payout: %s\n" % str(comment["max_accepted_payout"])
if "percent_steem_dollars" in comment: if "percent_steem_dollars" in comment:
yaml_prefix += 'percent_steem_dollars: %s\n' % str(comment["percent_steem_dollars"]) yaml_prefix += "percent_steem_dollars: %s\n" % str(
comment["percent_steem_dollars"]
)
elif "percent_hbd" in comment: elif "percent_hbd" in comment:
yaml_prefix += 'percent_hbd: %s\n' % str(comment["percent_hbd"]) yaml_prefix += "percent_hbd: %s\n" % str(comment["percent_hbd"])
if "tags" in json_metadata: if "tags" in json_metadata:
if len(json_metadata["tags"]) > 0 and comment["category"] != json_metadata["tags"][0] and len(comment["category"]) > 0: if (
yaml_prefix += 'community: %s\n' % comment["category"] len(json_metadata["tags"]) > 0
yaml_prefix += 'tags: %s\n' % ",".join(json_metadata["tags"]) and comment["category"] != json_metadata["tags"][0]
and len(comment["category"]) > 0
):
yaml_prefix += "community: %s\n" % comment["category"]
yaml_prefix += "tags: %s\n" % ",".join(json_metadata["tags"])
if "beneficiaries" in comment: if "beneficiaries" in comment:
beneficiaries = [] beneficiaries = []
for b in comment["beneficiaries"]: for b in comment["beneficiaries"]:
beneficiaries.append("%s:%.2f%%" % (b["account"], b["weight"] / 10000 * 100)) beneficiaries.append(
"%s:%.2f%%" % (b["account"], b["weight"] / 10000 * 100)
)
if len(beneficiaries) > 0: if len(beneficiaries) > 0:
yaml_prefix += 'beneficiaries: %s\n' % ",".join(beneficiaries) yaml_prefix += "beneficiaries: %s\n" % ",".join(beneficiaries)
if reply_identifier is not None: if reply_identifier is not None:
yaml_prefix += 'reply_identifier: %s\n' % reply_identifier yaml_prefix += "reply_identifier: %s\n" % reply_identifier
yaml_prefix += '---\n' yaml_prefix += "---\n"
return yaml_prefix return yaml_prefix
def load_dirty_json(dirty_json): def load_dirty_json(dirty_json):
regex_replace = [(r"([ \{,:\[])(u)?'([^']+)'", r'\1"\3"'), (r" False([, \}\]])", r' false\1'), (r" True([, \}\]])", r' true\1')] regex_replace = [
(r"([ \{,:\[])(u)?'([^']+)'", r'\1"\3"'),
(r" False([, \}\]])", r" false\1"),
(r" True([, \}\]])", r" true\1"),
]
for r, s in regex_replace: for r, s in regex_replace:
dirty_json = re.sub(r, s, dirty_json) dirty_json = re.sub(r, s, dirty_json)
clean_json = json.loads(dirty_json) clean_json = json.loads(dirty_json)
return clean_json return clean_json
def create_new_password(length=32): def create_new_password(length=32):
"""Creates a random password containing alphanumeric chars with at least 1 number and 1 upper and lower char""" """Creates a random password containing alphanumeric chars with at least 1 number and 1 upper and lower char"""
alphabet = string.ascii_letters + string.digits alphabet = string.ascii_letters + string.digits
while True: while True:
import_password = ''.join(secrets.choice(alphabet) for i in range(length)) import_password = "".join(secrets.choice(alphabet) for i in range(length))
if (any(c.islower() for c in import_password) and any(c.isupper() for c in import_password) and any(c.isdigit() for c in import_password)): if (
any(c.islower() for c in import_password)
and any(c.isupper() for c in import_password)
and any(c.isdigit() for c in import_password)
):
break break
return import_password return import_password
...@@ -445,7 +482,7 @@ def import_coldcard_wif(filename): ...@@ -445,7 +482,7 @@ def import_coldcard_wif(filename):
next_var = "" next_var = ""
import_password = "" import_password = ""
path = "" path = ""
with open(filename) as fp: with open(filename) as fp:
for line in fp: for line in fp:
if line.strip() == "": if line.strip() == "":
continue continue
...@@ -469,7 +506,7 @@ def generate_password(import_password, wif=1): ...@@ -469,7 +506,7 @@ def generate_password(import_password, wif=1):
for _ in range(wif): for _ in range(wif):
pk = PasswordKey("", password, role="") pk = PasswordKey("", password, role="")
password = str(pk.get_private()) password = str(pk.get_private())
password = 'P' + password password = "P" + password
else: else:
password = import_password password = import_password
return password return password
...@@ -480,8 +517,8 @@ def import_pubkeys(import_pub): ...@@ -480,8 +517,8 @@ def import_pubkeys(import_pub):
raise Exception("File %s does not exist!" % import_pub) raise Exception("File %s does not exist!" % import_pub)
with open(import_pub) as fp: with open(import_pub) as fp:
pubkeys = fp.read() pubkeys = fp.read()
if pubkeys.find('\0') > 0: if pubkeys.find("\0") > 0:
with open(import_pub, encoding='utf-16') as fp: with open(import_pub, encoding="utf-16") as fp:
pubkeys = fp.read() pubkeys = fp.read()
pubkeys = ast.literal_eval(pubkeys) pubkeys = ast.literal_eval(pubkeys)
owner = pubkeys["owner"] owner = pubkeys["owner"]
...@@ -506,7 +543,7 @@ def import_custom_json(jsonid, json_data): ...@@ -506,7 +543,7 @@ def import_custom_json(jsonid, json_data):
return None return None
else: else:
try: try:
with open(json_data[0], 'r') as f: with open(json_data[0], "r") as f:
data = json.load(f) data = json.load(f)
except: except:
print("%s is not a valid file or json field" % json_data) print("%s is not a valid file or json field" % json_data)
......
...@@ -80,6 +80,19 @@ class Testcases(unittest.TestCase): ...@@ -80,6 +80,19 @@ class Testcases(unittest.TestCase):
self.assertEqual(len(derive_permlink("", parent_permlink=256 * "a", parent_author="test")), 256) self.assertEqual(len(derive_permlink("", parent_permlink=256 * "a", parent_author="test")), 256)
self.assertEqual(len(derive_permlink("a" * 1024)), 256) self.assertEqual(len(derive_permlink("a" * 1024)), 256)
def test_derivePermlinkUTCDate(self):
def old_code_suffix()
suffix = "-" + formatTime(datetime.utcnow()) + "z"
return suffix
def old_formatTime(dt):
return dt.strftime("%Y%m%dt%H%M%S")
perm_link_old = derive_permlink("Hello World", parent_permlink="test", parent_author="test")
perm_link_new = derive_permlink("Hello World", parent_permlink="test", parent_author="test", utc_date=date(2018, 1, 1))
self.assertTrue(perm_link_old != perm_link_new)
def test_patch(self): def test_patch(self):
self.assertEqual(make_patch("aa", "ab"), '@@ -1,2 +1,2 @@\n a\n-a\n+b\n') self.assertEqual(make_patch("aa", "ab"), '@@ -1,2 +1,2 @@\n a\n-a\n+b\n')
self.assertEqual(make_patch("aa\n", "ab\n"), '@@ -1,3 +1,3 @@\n a\n-a\n+b\n %0A\n') self.assertEqual(make_patch("aa\n", "ab\n"), '@@ -1,3 +1,3 @@\n a\n-a\n+b\n %0A\n')
...@@ -151,10 +164,10 @@ class Testcases(unittest.TestCase): ...@@ -151,10 +164,10 @@ class Testcases(unittest.TestCase):
self.assertEqual(b, [{"account": "beembot", "weight": 4000}, {"account": "holger80", "weight": 3000}]) self.assertEqual(b, [{"account": "beembot", "weight": 4000}, {"account": "holger80", "weight": 3000}])
t = "holger80:30.00%,beembot:40.00%" t = "holger80:30.00%,beembot:40.00%"
b = derive_beneficiaries(t) b = derive_beneficiaries(t)
self.assertEqual(b, [{"account": "beembot", "weight": 4000}, {"account": "holger80", "weight": 3000}]) self.assertEqual(b, [{"account": "beembot", "weight": 4000}, {"account": "holger80", "weight": 3000}])
t = "holger80:30%, beembot:40%" t = "holger80:30%, beembot:40%"
b = derive_beneficiaries(t) b = derive_beneficiaries(t)
self.assertEqual(b, [{"account": "beembot", "weight": 4000}, {"account": "holger80", "weight": 3000}]) self.assertEqual(b, [{"account": "beembot", "weight": 4000}, {"account": "holger80", "weight": 3000}])
t = "holger80:30,beembot" t = "holger80:30,beembot"
b = derive_beneficiaries(t) b = derive_beneficiaries(t)
self.assertEqual(b, [{"account": "beembot", "weight": 7000}, {"account": "holger80", "weight": 3000}]) self.assertEqual(b, [{"account": "beembot", "weight": 7000}, {"account": "holger80", "weight": 3000}])
...@@ -190,7 +203,7 @@ class Testcases(unittest.TestCase): ...@@ -190,7 +203,7 @@ class Testcases(unittest.TestCase):
parameter = yaml_safe.load(yaml_content) parameter = yaml_safe.load(yaml_content)
self.assertEqual(parameter["title"], "test") self.assertEqual(parameter["title"], "test")
self.assertEqual(parameter["author"], "holger80") self.assertEqual(parameter["author"], "holger80")
self.assertEqual(parameter["max_accepted_payout"], "100") self.assertEqual(parameter["max_accepted_payout"], "100")
def test_create_new_password(self): def test_create_new_password(self):
new_password = create_new_password() new_password = create_new_password()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment