diff --git a/beem/blockchainobject.py b/beem/blockchainobject.py index 82315a49e1e9575f77cc30fbe7cadc08db6d5901..1e4423b4466a4525e4dd01946c821b4a549d3084 100644 --- a/beem/blockchainobject.py +++ b/beem/blockchainobject.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- from beemgraphenebase.py23 import bytes_types, integer_types, string_types, text_type from beem.instance import shared_blockchain_instance -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import json import threading @@ -16,7 +16,7 @@ class ObjectCache(dict): def __setitem__(self, key, value): data = { - "expires": datetime.utcnow() + timedelta( + "expires": datetime.now(tz=timezone.utc) + timedelta( seconds=self.default_expiration), "data": value } @@ -47,7 +47,7 @@ class ObjectCache(dict): def clear_expired_items(self): with self.lock: del_list = [] - utc_now = datetime.utcnow() + utc_now = datetime.now(tz=timezone.utc) for key in self: value = dict.__getitem__(self, key) if value is None: @@ -64,7 +64,7 @@ class ObjectCache(dict): value = dict.__getitem__(self, key) if value is None: return False - if datetime.utcnow() < value["expires"]: + if datetime.now(tz=timezone.utc) < value["expires"]: return True else: value["data"] = None @@ -110,7 +110,7 @@ class BlockchainObject(dict): if kwargs.get("steem_instance"): blockchain_instance = kwargs["steem_instance"] elif kwargs.get("hive_instance"): - blockchain_instance = kwargs["hive_instance"] + blockchain_instance = kwargs["hive_instance"] self.blockchain = blockchain_instance or shared_blockchain_instance() self.cached = False self.identifier = None diff --git a/beem/utils.py b/beem/utils.py index 700d5d89abed68d644a3f44be66b4153de934746..3cef7b3f9272431c707e6117b099182cacf3c363 100644 --- a/beem/utils.py +++ b/beem/utils.py @@ -1,18 +1,18 @@ # -*- coding: utf-8 -*- -import re +import ast +import difflib import json -import time as timenow import math -from datetime import datetime, tzinfo, timedelta, date, time -import pytz -import difflib -from ruamel.yaml import YAML -import difflib +import os +import re import secrets import string +import time as timenow +from datetime import date, datetime, time, timedelta, timezone + +from ruamel.yaml import YAML + from beemgraphenebase.account import PasswordKey -import ast -import os timeFormat = "%Y-%m-%dT%H:%M:%S" # https://github.com/matiasb/python-unidiff/blob/master/unidiff/constants.py#L37 @@ -23,35 +23,44 @@ RE_HUNK_HEADER = re.compile( def formatTime(t): - """ Properly Format Time for permlinks - """ + """Properly Format Time for permalinks, ensuring UTC timezone.""" if isinstance(t, float): - return datetime.utcfromtimestamp(t).strftime("%Y%m%dt%H%M%S%Z") - if isinstance(t, (datetime, date, time)): - return t.strftime("%Y%m%dt%H%M%S%Z") + # Convert timestamp (assumed UTC) to UTC datetime + dt = datetime.fromtimestamp(t, tz=timezone.utc) + elif isinstance(t, datetime): + # Ensure UTC if naive, leave as-is if aware + dt = t.replace(tzinfo=timezone.utc) if t.tzinfo is None else t + elif isinstance(t, date): + # Convert date to datetime at midnight UTC + dt = datetime.combine(t, time(0, 0), tzinfo=timezone.utc) + elif isinstance(t, time): + # Combine with current date in UTC + dt = datetime.combine(datetime.now(timezone.utc).date(), t, tzinfo=timezone.utc) + else: + raise ValueError(f"Unsupported type for formatTime: {type(t)}") + return dt.strftime("%Y%m%dt%H%M%S%Z") -def addTzInfo(t, timezone="UTC"): + +def addTzInfo(t, new_timezone=timezone.utc): """Returns a datetime object with tzinfo added""" if t and isinstance(t, (datetime, date, time)) and t.tzinfo is None: - utc = pytz.timezone(timezone) - t = utc.localize(t) + t = t.replace(tzinfo=new_timezone) return t def formatTimeString(t): - """ Properly Format Time for permlinks - """ + """Properly Format Time for permlinks""" if isinstance(t, (datetime, date, time)): return t.strftime(timeFormat) return addTzInfo(datetime.strptime(t, timeFormat)) def formatToTimeStamp(t): - """ Returns a timestamp integer + """Returns a timestamp integer - :param datetime t: datetime object - :return: Timestamp as integer + :param datetime t: datetime object + :return: Timestamp as integer """ if isinstance(t, (datetime, date, time)): t = addTzInfo(t) @@ -62,20 +71,22 @@ def formatToTimeStamp(t): def formatTimeFromNow(secs=0): - """ Properly Format Time that is `x` seconds in the future - - :param int secs: Seconds to go in the future (`x>0`) or the - past (`x<0`) - :return: Properly formated time for Graphene (`%Y-%m-%dT%H:%M:%S`) - :rtype: str + """Properly Format Time that is `x` seconds in the future or past. + :param int secs: Seconds to go in the future (secs > 0) or the past (secs < 0). + Defaults to 0 (current time). + :return: Properly formatted time for Graphene (e.g., '2025-03-12T10:00:00'). + :rtype: str """ - return datetime.utcfromtimestamp(timenow.time() + int(secs)).strftime(timeFormat) + current_time = time.time() # Current time in seconds since epoch + target_time = current_time + int(secs) # Add/subtract seconds + return datetime.fromtimestamp(target_time, tz=timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%S" + ) def formatTimedelta(td): - """Format timedelta to String - """ + """Format timedelta to String""" if not isinstance(td, timedelta): return "" days, seconds = td.days, td.seconds @@ -86,11 +97,15 @@ def formatTimedelta(td): def parse_time(block_time): - """Take a string representation of time from the blockchain, and parse it - into datetime object. + """Take a string representation of time from the blockchain and parse it + into a UTC-aware datetime object. + + :param str block_time: Time string in format 'YYYY-MM-DDTHH:MM:SS' (e.g., '2025-03-12T10:00:00') + :return: UTC-aware datetime object + :rtype: datetime """ - utc = pytz.timezone("UTC") - return utc.localize(datetime.strptime(block_time, timeFormat)) + time_format = "%Y-%m-%dT%H:%M:%S" + return datetime.strptime(block_time, time_format).replace(tzinfo=timezone.utc) def assets_from_string(text): @@ -111,8 +126,13 @@ def sanitize_permlink(permlink): return permlink -def derive_permlink(title, parent_permlink=None, parent_author=None, - max_permlink_length=256, with_suffix=True): +def derive_permlink( + title, + parent_permlink=None, + parent_author=None, + max_permlink_length=256, + with_suffix=True, +): """Derive a permlink from a comment title (for root level comments) or the parent permlink and optionally the parent author (for replies). @@ -188,7 +208,7 @@ def resolve_authorperm(identifier): def construct_authorperm(*args): - """ Create a post identifier from comment/post object or arguments. + """Create a post identifier from comment/post object or arguments. Examples: .. code-block:: python @@ -238,7 +258,7 @@ def resolve_authorpermvoter(identifier): def construct_authorpermvoter(*args): - """ Create a vote identifier from vote object or arguments. + """Create a vote identifier from vote object or arguments. Examples: .. code-block:: python @@ -284,8 +304,8 @@ def reputation_to_score(rep): def remove_from_dict(obj, keys=list(), keep_keys=True): - """ Prune a class or dictionary of all but keys (keep_keys=True). - Prune a class or dictionary of specified keys.(keep_keys=False). + """Prune a class or dictionary of all but keys (keep_keys=True). + Prune a class or dictionary of specified keys.(keep_keys=False). """ if type(obj) == dict: items = list(obj.items()) @@ -301,9 +321,10 @@ def remove_from_dict(obj, keys=list(), keep_keys=True): def make_patch(a, b): import diff_match_patch as dmp_module + dmp = dmp_module.diff_match_patch() patch = dmp.patch_make(a, b) - patch_text = dmp.patch_toText(patch) + patch_text = dmp.patch_toText(patch) return patch_text @@ -373,7 +394,7 @@ def seperate_yaml_dict_from_body(content): if len(content.split("---\n")) > 1: body = content[content.find("---\n", 1) + 4 :] yaml_content = content[content.find("---\n") + 4 : content.find("---\n", 1)] - yaml=YAML(typ="safe") + yaml = YAML(typ="safe") parameter = yaml.load(yaml_content) if not isinstance(parameter, dict): parameter = yaml.load(yaml_content.replace(":", ": ").replace(" ", " ")) @@ -383,59 +404,75 @@ def seperate_yaml_dict_from_body(content): def create_yaml_header(comment, json_metadata={}, reply_identifier=None): - yaml_prefix = '---\n' + yaml_prefix = "---\n" if comment["title"] != "": yaml_prefix += 'title: "%s"\n' % comment["title"] if "permlink" in comment: - yaml_prefix += 'permlink: %s\n' % comment["permlink"] - yaml_prefix += 'author: %s\n' % comment["author"] + yaml_prefix += "permlink: %s\n" % comment["permlink"] + yaml_prefix += "author: %s\n" % comment["author"] if "author" in json_metadata: - yaml_prefix += 'authored by: %s\n' % json_metadata["author"] + yaml_prefix += "authored by: %s\n" % json_metadata["author"] if "description" in json_metadata: yaml_prefix += 'description: "%s"\n' % json_metadata["description"] if "canonical_url" in json_metadata: - yaml_prefix += 'canonical_url: %s\n' % json_metadata["canonical_url"] + yaml_prefix += "canonical_url: %s\n" % json_metadata["canonical_url"] if "app" in json_metadata: - yaml_prefix += 'app: %s\n' % json_metadata["app"] + yaml_prefix += "app: %s\n" % json_metadata["app"] if "last_update" in comment: - yaml_prefix += 'last_update: %s\n' % comment["last_update"] + yaml_prefix += "last_update: %s\n" % comment["last_update"] elif "updated" in comment: - yaml_prefix += 'last_update: %s\n' % comment["updated"] - yaml_prefix += 'max_accepted_payout: %s\n' % str(comment["max_accepted_payout"]) + yaml_prefix += "last_update: %s\n" % comment["updated"] + yaml_prefix += "max_accepted_payout: %s\n" % str(comment["max_accepted_payout"]) if "percent_steem_dollars" in comment: - yaml_prefix += 'percent_steem_dollars: %s\n' % str(comment["percent_steem_dollars"]) + yaml_prefix += "percent_steem_dollars: %s\n" % str( + comment["percent_steem_dollars"] + ) elif "percent_hbd" in comment: - yaml_prefix += 'percent_hbd: %s\n' % str(comment["percent_hbd"]) + yaml_prefix += "percent_hbd: %s\n" % str(comment["percent_hbd"]) if "tags" in json_metadata: - if len(json_metadata["tags"]) > 0 and comment["category"] != json_metadata["tags"][0] and len(comment["category"]) > 0: - yaml_prefix += 'community: %s\n' % comment["category"] - yaml_prefix += 'tags: %s\n' % ",".join(json_metadata["tags"]) + if ( + len(json_metadata["tags"]) > 0 + and comment["category"] != json_metadata["tags"][0] + and len(comment["category"]) > 0 + ): + yaml_prefix += "community: %s\n" % comment["category"] + yaml_prefix += "tags: %s\n" % ",".join(json_metadata["tags"]) if "beneficiaries" in comment: beneficiaries = [] for b in comment["beneficiaries"]: - beneficiaries.append("%s:%.2f%%" % (b["account"], b["weight"] / 10000 * 100)) + beneficiaries.append( + "%s:%.2f%%" % (b["account"], b["weight"] / 10000 * 100) + ) if len(beneficiaries) > 0: - yaml_prefix += 'beneficiaries: %s\n' % ",".join(beneficiaries) + yaml_prefix += "beneficiaries: %s\n" % ",".join(beneficiaries) if reply_identifier is not None: - yaml_prefix += 'reply_identifier: %s\n' % reply_identifier - yaml_prefix += '---\n' + yaml_prefix += "reply_identifier: %s\n" % reply_identifier + yaml_prefix += "---\n" return yaml_prefix - + def load_dirty_json(dirty_json): - regex_replace = [(r"([ \{,:\[])(u)?'([^']+)'", r'\1"\3"'), (r" False([, \}\]])", r' false\1'), (r" True([, \}\]])", r' true\1')] + regex_replace = [ + (r"([ \{,:\[])(u)?'([^']+)'", r'\1"\3"'), + (r" False([, \}\]])", r" false\1"), + (r" True([, \}\]])", r" true\1"), + ] for r, s in regex_replace: dirty_json = re.sub(r, s, dirty_json) clean_json = json.loads(dirty_json) - return clean_json + return clean_json def create_new_password(length=32): """Creates a random password containing alphanumeric chars with at least 1 number and 1 upper and lower char""" alphabet = string.ascii_letters + string.digits while True: - import_password = ''.join(secrets.choice(alphabet) for i in range(length)) - if (any(c.islower() for c in import_password) and any(c.isupper() for c in import_password) and any(c.isdigit() for c in import_password)): + import_password = "".join(secrets.choice(alphabet) for i in range(length)) + if ( + any(c.islower() for c in import_password) + and any(c.isupper() for c in import_password) + and any(c.isdigit() for c in import_password) + ): break return import_password @@ -445,7 +482,7 @@ def import_coldcard_wif(filename): next_var = "" import_password = "" path = "" - with open(filename) as fp: + with open(filename) as fp: for line in fp: if line.strip() == "": continue @@ -469,7 +506,7 @@ def generate_password(import_password, wif=1): for _ in range(wif): pk = PasswordKey("", password, role="") password = str(pk.get_private()) - password = 'P' + password + password = "P" + password else: password = import_password return password @@ -480,8 +517,8 @@ def import_pubkeys(import_pub): raise Exception("File %s does not exist!" % import_pub) with open(import_pub) as fp: pubkeys = fp.read() - if pubkeys.find('\0') > 0: - with open(import_pub, encoding='utf-16') as fp: + if pubkeys.find("\0") > 0: + with open(import_pub, encoding="utf-16") as fp: pubkeys = fp.read() pubkeys = ast.literal_eval(pubkeys) owner = pubkeys["owner"] @@ -506,7 +543,7 @@ def import_custom_json(jsonid, json_data): return None else: try: - with open(json_data[0], 'r') as f: + with open(json_data[0], "r") as f: data = json.load(f) except: print("%s is not a valid file or json field" % json_data) diff --git a/tests/beem/test_utils.py b/tests/beem/test_utils.py index 84719414cf88b6a9bf51e58effa138baead285ba..efd619b413b531ee13c24fa42b78c7b08954ff09 100644 --- a/tests/beem/test_utils.py +++ b/tests/beem/test_utils.py @@ -80,6 +80,19 @@ class Testcases(unittest.TestCase): self.assertEqual(len(derive_permlink("", parent_permlink=256 * "a", parent_author="test")), 256) self.assertEqual(len(derive_permlink("a" * 1024)), 256) + def test_derivePermlinkUTCDate(self): + def old_code_suffix() + suffix = "-" + formatTime(datetime.utcnow()) + "z" + return suffix + + def old_formatTime(dt): + return dt.strftime("%Y%m%dt%H%M%S") + + + perm_link_old = derive_permlink("Hello World", parent_permlink="test", parent_author="test") + perm_link_new = derive_permlink("Hello World", parent_permlink="test", parent_author="test", utc_date=date(2018, 1, 1)) + self.assertTrue(perm_link_old != perm_link_new) + def test_patch(self): self.assertEqual(make_patch("aa", "ab"), '@@ -1,2 +1,2 @@\n a\n-a\n+b\n') self.assertEqual(make_patch("aa\n", "ab\n"), '@@ -1,3 +1,3 @@\n a\n-a\n+b\n %0A\n') @@ -151,10 +164,10 @@ class Testcases(unittest.TestCase): self.assertEqual(b, [{"account": "beembot", "weight": 4000}, {"account": "holger80", "weight": 3000}]) t = "holger80:30.00%,beembot:40.00%" b = derive_beneficiaries(t) - self.assertEqual(b, [{"account": "beembot", "weight": 4000}, {"account": "holger80", "weight": 3000}]) + self.assertEqual(b, [{"account": "beembot", "weight": 4000}, {"account": "holger80", "weight": 3000}]) t = "holger80:30%, beembot:40%" b = derive_beneficiaries(t) - self.assertEqual(b, [{"account": "beembot", "weight": 4000}, {"account": "holger80", "weight": 3000}]) + self.assertEqual(b, [{"account": "beembot", "weight": 4000}, {"account": "holger80", "weight": 3000}]) t = "holger80:30,beembot" b = derive_beneficiaries(t) self.assertEqual(b, [{"account": "beembot", "weight": 7000}, {"account": "holger80", "weight": 3000}]) @@ -190,7 +203,7 @@ class Testcases(unittest.TestCase): parameter = yaml_safe.load(yaml_content) self.assertEqual(parameter["title"], "test") self.assertEqual(parameter["author"], "holger80") - self.assertEqual(parameter["max_accepted_payout"], "100") + self.assertEqual(parameter["max_accepted_payout"], "100") def test_create_new_password(self): new_password = create_new_password()