Source code for pipeline.src.shared_tasks.dates

from datetime import datetime, timedelta
from enum import Enum
from typing import List

import pytz
from dateutil.relativedelta import relativedelta
from prefect import task

from src.helpers import dates


[docs] class TimeUnit(Enum):
[docs] YEAR = "YEAR"
[docs] MONTH = "MONTH"
[docs] DAY = "DAY"
[docs] HOUR = "HOUR"
[docs] MINUTE = "MINUTE"
[docs] SECOND = "SECOND"
@task
[docs] def get_utcnow(): """Task version of `datetime.utcnow`""" return datetime.utcnow()
@task
[docs] def date_trunc(d: datetime, unit: str): time_unit = TimeUnit(unit) if time_unit == TimeUnit.SECOND: return d.replace(microsecond=0) if time_unit == TimeUnit.MINUTE: return d.replace(second=0, microsecond=0) if time_unit == TimeUnit.HOUR: return d.replace(minute=0, second=0, microsecond=0) if time_unit == TimeUnit.DAY: return d.replace(hour=0, minute=0, second=0, microsecond=0) if time_unit == TimeUnit.MONTH: return d.replace(day=1, hour=0, minute=0, second=0, microsecond=0) if time_unit == TimeUnit.YEAR: return d.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
@task
[docs] def get_current_year() -> int: """Returns current year""" return datetime.utcnow().year
@task
[docs] def get_timezone_aware_utcnow(): return pytz.UTC.localize(datetime.utcnow())
@task
[docs] def make_timedelta(**kwargs) -> timedelta: """Task version of `datetime.timedelta`""" return timedelta(**kwargs)
@task
[docs] def make_relativedelta(**kwargs) -> timedelta: """Task version of `dateutil.relativedelta`""" return relativedelta(**kwargs)
@task
[docs] def make_periods( start_hours_ago: int, end_hours_ago: int, minutes_per_chunk: int, chunk_overlap_minutes: int, ) -> List[dates.Period]: """ `prefect.Task` version of the function `src.helpers.dates.make_periods`, with the difference that start and end dates are to be given as a number of hours from the current date (instead of `datetime` objects), and chunk duration and overlap are to be given as a number of minutes (instead of `timedelta` objects). This is to accomodate for the fact that Prefect flows' parameters must be JSON-serializable, and `datetime` and `timedelta` are not, by default. `src.helpers.dates.make_periods` is recursive, hence the construction as a python function first and not directly as Prefect `Task`. See `src.helpers.dates.make_periods` for help. """ now = datetime.utcnow() return dates.make_periods( start_datetime_utc=now - timedelta(hours=start_hours_ago), end_datetime_utc=now - timedelta(hours=end_hours_ago), period_duration=timedelta(minutes=minutes_per_chunk), overlap=timedelta(minutes=chunk_overlap_minutes), )