Source code for pipeline.src.processing

import datetime
import logging
import re
from collections import ChainMap
from functools import partial
from io import StringIO
from typing import Any, Hashable, List, Union

import numpy as np
import pandas as pd
import pytz
import simplejson
import sqlalchemy
from sqlalchemy import select


[docs] def get_unused_col_name(col_name: str, df: pd.DataFrame) -> str: """ If `col_name` is not already a column name of the DataFrame `df`, returns `col_name`. Otherwise, appends a number to `col_name`, trying 0, 1, 2, ... until a unused column name if found. Args: col_name (str): desired column name df (pd.DataFrame): DataFrame for which we want to ensure the column name is not already used Returns: str: column name Examples: >>> get_unused_col_name("id", pd.DataFrame({"idx": [1, 2, 3]})) "id" >>> get_unused_col_name("id", pd.DataFrame({"id": [1, 2, 3]})) "id_0" >>> get_unused_col_name("id", pd.DataFrame({"id": [1, 2, 3], "id_0": [4, 5, 6]})) "id_1" """ df_columns = list(df) attempt_col_name = col_name while attempt_col_name in df_columns: if "i" not in locals(): i = 0 else: i += 1 attempt_col_name = f"{col_name}_{i}" return attempt_col_name
[docs] def is_a_value(x) -> bool: """Returns False if pd.isna(x), True otherwise. NB : The same result could be obtained simply by checking pd.isna(x), but checking if x is None before checking pd.isna(x) improves performance on DataFrames containing many None values, since checking pd.isna(x) is slower than checking if x is None. Args: x : Anything Returns: bool: `False` if pd.isna(x), `True` otherwise """ if x is not None and not pd.isna(x): return True else: return False
[docs] def concatenate_values(row: pd.Series) -> List: """ Filters the input pandas Series to keep only distinct non null values and returns the result as a python ``list``. Args: row (pd.Series): pandas ``Series`` Returns: List: list of distinct non null values in row """ result_set = set() res = [] for x in row: if is_a_value(x) and x not in result_set: res.append(x) result_set.add(x) return res
[docs] def concatenate_columns(df: pd.DataFrame, input_col_names: List) -> pd.Series: """ For each row in the input DataFrame, the distinct and non null values contained in the columns input_col_names are stored in a list. A pandas Series of the same length as the input DataFrame is then constructed with these lists as values. Args: df (pd.DataFrame): input DataFrame input_col_names (List): the names of the columns to use Returns: pd.Series: resulting Series """ non_null_rows = df[input_col_names].dropna(how="all") res_non_null_rows = non_null_rows.apply(concatenate_values, axis=1) res = pd.Series(index=df.index, data=[[]] * len(df)) res[res_non_null_rows.index] = res_non_null_rows.values return res
[docs] def coalesce(df: pd.DataFrame) -> pd.Series: """ Combines the input DataFrame's columns into one by taking the non null value in each row, in the order of the DataFrame's columns from left to right. Returns a pandas Series with the combined results. Args: df (pd.DataFrame): input pandas DataFrame Returns: pd.Series: Series containing the first non null value in each row of the DataFrame, taken in order of the DataFrame's columns from left to right. """ non_null_rows = df.dropna(how="all") first_non_null_values_idx = np.argmax(non_null_rows.notnull().values, axis=1) res_values = np.choose(first_non_null_values_idx, non_null_rows.values.T) res = pd.Series(index=df.index, data=[None] * len(df), dtype=object) res[non_null_rows.index] = res_values return res
[docs] def get_first_non_null_column_name( df: pd.DataFrame, result_labels: Union[None, dict] = None ) -> pd.Series: """ Returns a Series with the same index as the input DataFrame, whose values are the name of the first column (or the corresponding label, if provided) with a non-null value in each row, from left to right. Rows with all null values return None. Args: df (pd.DataFrame): input pandas DataFrame result_labels (dict): if provided, must be a mapping of column names to the corresponding labels in the result. Returns: pd.Series: Series containing the name of the first column with a non-null value in each row of the DataFrame, from left to right """ non_null_rows = df.dropna(how="all") first_non_null_values_idx = np.argmax(non_null_rows.notnull().values, axis=1) res_values = np.choose(first_non_null_values_idx, list(df)) res = pd.Series(index=df.index, data=[None] * len(df), dtype=object) res[non_null_rows.index] = res_values if result_labels is not None: res = res.map(lambda s: result_labels.get(s)) return res
[docs] def remove_nones_from_dict(d: dict) -> dict: """ Takes a dictionary and removes ``None`` values from it. Args: d (dict): a dictionary Returns: dict: the input dictionary, with all `None` removed. Examples: >>> d = { "a" : 1, "b": [1, 2, None], "c": {"key": "value", "key2": None}, "d": None } >>> remove_nones_from_dict(d) {"a" : 1, "b": [1, 2, None], "c": {"key": "value", "key2": None}} """ return {k: v for k, v in d.items() if v is not None}
[docs] def remove_nones_from_list(li: list) -> list: """ Takes a list and removes ``None`` values from it. Args: li (list): a list Returns: list: the input list, with all `None` removed. Examples: >>> li = [1, 3, None, "a", "b", None] >>> remove_nones_from_dict(li) [1, 3, "a", "b"] """ return [x for x in li if x is not None]
[docs] def df_to_dict_series( df: pd.DataFrame, result_colname: str = "json_col", remove_nulls: bool = False ): """ Converts a pandas DataFrame into a Series with the same index as the input DataFrame and whose values are dictionaries like : .. code-block:: python { "column_1" : value, "column_2": value, } Args: df (pd.DataFrame): input DataFrame result_colname (Union[str, None]): optionnal, name of result Series remove_nulls (bool): if set to ``True``, ``null`` values are recursively removed from the dictionaries Returns: pd.Series: pandas Series """ res = df.copy(deep=True) json_string = res.to_json(orient="index") res = pd.read_json(StringIO(json_string), orient="index", typ="Series") res.name = result_colname if remove_nulls: res = res.map(remove_nones_from_dict) return res
[docs] def explode_dicts(df: pd.DataFrame, column_name: str) -> pd.DataFrame: """ Expands a column of dicts into one column per dict key. The original column is dropped from the result. Args: df (pd.DataFrame): input DataFrame column_name (str): name of the column containing dicts Returns: pd.DataFrame: DataFrame with the dict column replaced by its expanded keys """ return df.join(pd.json_normalize(df[column_name])).drop(columns=column_name)
[docs] def explode_lists_of_dicts(df: pd.DataFrame, column_name: str) -> pd.DataFrame: """ Expands a column of lists of dicts into one row per dict, one column per key. Each dict in each list becomes a row, with dict keys as columns. The index is reset. The original column is dropped from the result. Args: df (pd.DataFrame): input DataFrame column_name (str): name of the column containing lists of dicts Returns: pd.DataFrame: DataFrame with one row per dict and dict keys as columns """ return ( df.explode(column_name) .reset_index(drop=True) .pipe( lambda d: d.join(pd.json_normalize(d[column_name])).drop( columns=column_name ) ) )
[docs] def zeros_ones_to_bools( x: Union[pd.Series, pd.DataFrame] ) -> Union[pd.Series, pd.DataFrame]: """ Converts a pandas DataFrame or Series containing `str`, `int` or `float` values, possibly including null (`None` and `np.nan`) values to a DataFrame with False, True and `np.nan` values respectively. Values 1, 1.0, "1", any non zero number... is converted to `True`. Values 0, 0.0, "0" are converted to `False`. Values `None` and `np.nan` are converted to `np.nan`. Useful to convert boolean data extracted from Oracle databases, since Oracle does not have a boolean data type and boolean data is often stored as "0"s and "1"s, or to handle sitations in which pandas data structures should contain nullable boolean data (in pandas / numpy, the `bool` dtype is not nullable, and this can be tricky to handle). """ tmp = x.astype(float) return tmp.where(~((tmp > 0) | (tmp < 0)), True).replace([0.0], False)
[docs] def to_pgarr( x: Union[list, set, np.ndarray], handle_errors: bool = False, value_on_error: Union[str, None] = None, ) -> Union[str, None]: """ Converts a python `list`, `set` or `numpy.ndarray` to a string with Postgresql array syntax. Elements of the list-like input argument are converted to `string` type, then stripped of leading and trailing blank spaces, and finally filtered to keep only non empty strings. This transformation is required on the elements of a DataFrame's columns that contain collections before bulk inserting the DataFrame into Postgresql with the psql_insert_copy method. Args: x (list, set or numpy.ndarray) : iterable to serialize as Postgres array handle_errors (bool): if ``True``, returns ``value_on_error`` instead of raising ``ValueError`` when the input is of an unexpected type value_on_error (str or None): value to return on errors, if ``handle_errors`` is ``True`` Returns: str: string with Postgresql Array compatible syntax Raises: ValueError : when ``handle_errors`` is False and ``x`` is not list-like. Examples: >>> to_pgarr([1, 2, "a ", "b", "", " "]) "{1,2,a,b}" >>> to_pgarr(["a,b", "c"]) '{"a,b",c}' >>> to_pgarr(None) ValueError >>> to_pgarr(None, handle_errors=True, value_on_error="{}") "{}" >>> to_pgarr(np.nan, handle_errors=True, value_on_error=None) """ try: assert isinstance(x, (list, set, np.ndarray)) except AssertionError: if handle_errors: return value_on_error else: raise ValueError(f"Unexpected type for x: {type(x)}.") def _quote(s: str) -> str: # Elements containing PostgreSQL array special characters must be double-quoted. if any(c in s for c in (',', '"', '\\', '{', '}', ' ', '\t', '\n', '\r', '\x00')): return '"' + s.replace('\\', '\\\\').replace('"', '\\"') + '"' return s elements = [ _quote(e) for e in filter(lambda e: len(e) > 0, map(str.strip, map(str, x))) ] return "{" + ",".join(elements) + "}"
[docs] def df_values_to_psql_arrays( df: pd.DataFrame, handle_errors: bool = False, value_on_error: Union[str, None] = None, ) -> pd.DataFrame: """ Returns a `pandas.DataFrame` with all values serialized as strings with Postgresql array syntax. All values must be of type list, set or numpy array. Other values raise errors, which may be handled if handle_errors is set to True. See `to_pgarr` for details on error handling. This is required before bulk loading a pandas.DataFrame into a Postgresql table with the psql_insert_copy method. Args: df (pd.DataFrame): pandas DataFrame Returns: pd.DataFrame: pandas DataFrame with the same shape and index, all values serialized as strings with Postgresql array syntax. Examples : >>> df_to_psql_arrays(pd.DataFrame({'a': [[1, 2], ['a', 'b']]})) a 0 {1,2,3} 1 {a,b} """ serialize = partial( to_pgarr, handle_errors=handle_errors, value_on_error=value_on_error ) return df.map(serialize, na_action="ignore").fillna("{}")
[docs] def json_converter(x): """Converter for types not natively handled by json.dumps""" if isinstance(x, np.ndarray): return x.tolist() if isinstance(x, pd._libs.tslibs.nattype.NaTType): return None if isinstance(x, datetime.datetime): if x.tzinfo: x = x.replace(tzinfo=pytz.timezone("UTC")) - x.utcoffset() return x.isoformat().replace("+00:00", "Z") # UTC, ISO format else: return x.isoformat() + "Z" elif isinstance(x, datetime.date): return x.isoformat()
[docs] def to_json(x: Any) -> str: """Converts python object to json string.""" res = simplejson.dumps( x, ensure_ascii=False, default=json_converter, ignore_nan=True ) return res
[docs] def df_values_to_json(df: pd.DataFrame) -> pd.DataFrame: """ Returns a `pandas.DataFrame` with all values serialized to json string. This is required before bulk loading into a Postgresql table with the psql_insert_copy method. See `to_json` function for details. Args: df (pd.DataFrame): pandas DataFrame Returns: pd.DataFrame: pandas DataFrame with the same shape and index, all values serialized as json strings. """ return df.map(to_json, na_action="ignore").fillna("null")
[docs] def serialize_nullable_integer_df(df: pd.DataFrame) -> pd.DataFrame: """Serializes the values of a DataFrame that contains numbers that represent possibly null (np.nan or None) integers. This is useful to prepare data before loading to integer Postgres columns, as pandas automatically converts integer Series to float dtype if they contain nulls. Args: df (pd.DataFrame): DataFrame of integer, possibly with None and np.nan values Returns: pd.DataFrame: same DataFrame converted to string dtype """ return df.map(lambda x: str(int(x)), na_action="ignore").where(df.notnull(), None)
[docs] def serialize_timedelta_df(df: pd.DataFrame) -> pd.DataFrame: """Serializes the values of a DataFrame that contains `timedelta` values. This is useful to prepare data before loading to `interval` Postgres columns, as sqlachemy does not support the timedelta dtype. Args: df (pd.DataFrame): DataFrame of timedeltas Returns: pd.DataFrame: same DataFrame converted to string dtype """ return df.astype("timedelta64[ns]").astype(str).replace(["NaT"], [None])
[docs] def drop_rows_already_in_table( df: pd.DataFrame, df_column_name: str, table: sqlalchemy.Table, table_column_name: str, connection: sqlalchemy.engine.base.Connection, logger: logging.Logger, ) -> pd.DataFrame: """Removes rows from the input DataFrame `df` in which the column `df_column_name` contains values that are already present in the column `table_column_name` of the table `table`, and returns the filtered DataFrame.""" df_n_rows = len(df) df_ids = tuple(df[df_column_name].unique()) df_n_ids = len(df_ids) statement = select(getattr(table.c, table_column_name)).where( getattr(table.c, table_column_name).in_(df_ids) ) df_ids_already_in_table = tuple( pd.read_sql(statement, connection)[table_column_name] ) # Remove keys already present in the database table from df res = df[~df[df_column_name].isin(df_ids_already_in_table)] # Remove possible duplicate ids in df res = res[~res[df_column_name].duplicated()] res_n_rows = len(res) res_n_ids = res[df_column_name].nunique() log = ( f"From {df_n_rows} rows with {df_n_ids} distinct {df_column_name} values, " + f"{res_n_rows} rows with {res_n_ids} distinct {df_column_name} values " + "are new and will be inserted in the database." ) logger.info(log) return res
[docs] def prepare_df_for_loading( df: pd.DataFrame, logger: logging.Logger, pg_array_columns: list = None, handle_array_conversion_errors: bool = True, value_on_array_conversion_error="{}", jsonb_columns: list = None, nullable_integer_columns: list = None, timedelta_columns: list = None, enum_columns: list = None, bytea_columns: list = None, ): if not ( jsonb_columns or pg_array_columns or nullable_integer_columns or timedelta_columns or enum_columns or bytea_columns ): return df df_ = df.copy(deep=True) # Serialize columns to be loaded into JSONB columns if jsonb_columns: logger.info("Serializing json columns") df_[jsonb_columns] = df_values_to_json(df_[jsonb_columns]) # Serialize columns to be loaded into Postgres ARRAY columns if pg_array_columns: logger.info("Serializing postgresql array columns") df_[pg_array_columns] = df_values_to_psql_arrays( df_[pg_array_columns], handle_errors=handle_array_conversion_errors, value_on_error=value_on_array_conversion_error, ) # Serialize columns that contain nullable integers (stored an float in python) if nullable_integer_columns: logger.info("Serializing nullable integer columns") df_[nullable_integer_columns] = serialize_nullable_integer_df( df_[nullable_integer_columns] ) if timedelta_columns: logger.info("Serializing timedelta columns") df_[timedelta_columns] = serialize_timedelta_df(df_[timedelta_columns]) if enum_columns: logger.info("Serializing enum columns") for enum_column in enum_columns: df_[enum_column] = df_[enum_column].map(lambda x: x.value if x else None) if bytea_columns: logger.info("Hexing bytea columns") for c in bytea_columns: df_[c] = df_[c].map(lambda x: r"\x" + x.hex()) return df_
[docs] def join_on_multiple_keys( left: pd.DataFrame, right: pd.DataFrame, or_join_keys: list, how: str = "inner", and_join_keys: list = None, coalesce_common_columns: bool = True, ): """ Join two pandas DataFrames, attempting to match rows on several keys by decreasing order of priority. Joins are performed successively with each of the keys listed in `or_join_keys`, and results are then concatenated to form the final result. This is different from joining on a composite key where all keys must match simultaneously : here, rows of left and right DataFrames are joined if at least one of the keys match. Joins are performed on the keys listed in `or_join_keys` by "decreasing order of priority" in the sense that, in order to be matched, rows of left and right MUST match on their highest priority non null key (which come first in the list) but MIGHT not match on lower priority keys (which come later in the list). During each of the joins on the individual keys, non-joining key pairs and, if any, columns common to both left and right DataFrames, are coalesced (from left to right) if `coalesce_common_columns` is `True` (the default). Optionally, the join condition can contain an additional equality clause on keys listed in `and_join_keys`. If `or_join_keys` is `['A', 'B']` and `and_join_keys` is `['C', 'D']`, the SQL equivalent of the join condition is : .. code-block:: SQL ON ( left.A = right.A AND left.C = right.C AND left.D = right.D ) OR ( ( left.A IS NULL OR right.A IS NULL ) AND left.B = right.B AND left.C = right.C AND left.D = right.D ) Args: left (pd.DataFrame): pandas DataFrame right (pd.DataFrame): pandas DataFrame or_join_keys (list): list of column names to use as join keys how (str, optional): 'inner', 'left', 'right' or 'outer'. Defaults to 'inner'. and_join_keys (list, optional): list of column names to use as additional join keys coalesce_common_columns (bool, optional): whether to coalesce values in the columns that are present in both DataFrames. Defaults to `True`. Returns: pd.DataFrame: result of join operation """ joins = [] left = left.copy(deep=True) right = right.copy(deep=True) common_columns = set.intersection(set(left.columns), set(right.columns)) keys_already_joined = set() and_join_keys = [] if and_join_keys is None else and_join_keys left_cols = list(left) right_cols = list(right) # Number rows for future use if how in ("left", "outer"): left_id = get_unused_col_name("left_row_number", left) left[left_id] = range(len(left)) if how in ("right", "outer"): right_id = get_unused_col_name("right_row_number", right) right[right_id] = range(len(right)) # Attempt to perform the join successively on each key for or_join_key in or_join_keys: join_keys = and_join_keys + [or_join_key] right_with_keys = right.dropna(subset=join_keys) left_with_keys = left.dropna(subset=join_keys) join = pd.merge( left_with_keys, right_with_keys, on=join_keys, how="inner", suffixes=("_left", "_right"), ) columns_to_merge = common_columns - set(join_keys) for column_to_merge in columns_to_merge: [l, r] = [f"{column_to_merge}_left", f"{column_to_merge}_right"] if column_to_merge in keys_already_joined: join = join[(join[r].isna()) | (join[l].isna())] if coalesce_common_columns: join[column_to_merge] = coalesce(join[[l, r]]) else: join[column_to_merge] = join[l] join = join.drop(columns=[l, r]) keys_already_joined.add(or_join_key) joins.append(join) # Concatenate all join results res = pd.concat(joins, axis=0) # Add unmatched rows if performing left, right or outer joins if how in ("left", "outer"): res = pd.concat([res, left.loc[~left[left_id].isin(res[left_id])]], axis=0) if how in ("right", "outer"): res = pd.concat([res, right.loc[~right[right_id].isin(res[right_id])]], axis=0) res.index = np.arange(0, len(res)) columns_order = left_cols + [col for col in right_cols if col not in left_cols] res = res[columns_order] return res
[docs] def left_isin_right_by_decreasing_priority( left: pd.DataFrame, right: pd.DataFrame ) -> pd.Series: """ Performs an operation similar to `pandas.DataFrame.isin` on multiple columns, with the differences that : - the columns are tested one by one (instead of being tested simultaneously as in the case of `pandas.DataFrame.isin`), the first column of `left` being tested against the first column of `right`, the second column of `left` being tested against the second column of `right`... - columns are considered to be sorted by decreasing priority, meaning that a match on 2 rows of `left` and `right` on a given column will be taken into account only if the columns of higher priority on those 2 rows have values that are either equal or null. Takes two DataFrames `left` and `right` with the same columns, returns a Series with the same index as the `left` DataFrame and whose values are : - `True` if the corresponding row in `left` has a match in `right` in at least one column - `False` if the corresponding row in `left` has no match in `right` This is typically useful to filter vessels' data based on some other vessels' data, both datasets being indexed with multiple identifiers (vessel_id, cfr, ircs, external immat...). Args: left (pd.DataFrame): DataFrame right (pd.DataFrame): DataFrame with values for which to test if they are present in `left` Returns: List[bool]: list of booleans with the same length as `left` """ assert list(left) == list(right) left = left.copy(deep=True) right = right.copy(deep=True) cols = list(left) id_col = get_unused_col_name("id", left) left[id_col] = np.arange(len(left)) isin_right_col = get_unused_col_name("isin_right", right) right[isin_right_col] = True res = join_on_multiple_keys(left, right, or_join_keys=cols, how="left") res = ( res.drop_duplicates(subset=[id_col]) .sort_values(id_col)[isin_right_col] .fillna(False) ) res.index = left.index return res
[docs] def drop_duplicates_by_decreasing_priority( df: pd.DataFrame, subset: List[str] ) -> pd.DataFrame: """Similar to `pandas.DataFrame.drop_duplicates(subset=subset)`, with the differences that: - the rows are deduplicated based on their values in the columns in `subset` one after the other and by decreasing priority, and not simultaneously - `NA` values on a key are not considered Rows having all `NA` values in all columns of `subset` are dropped. What is meant by "by decreasing priority" is that keys in `subset` are considered to be sorted by decreasing level of priority (for instance `A` and `B`, with `A` having the highest level of priority), and rows with distinct values on `B` but identical values on `A` will be considered duplicated, whereas rows with distinct values on `A` and identical values on `B` will not be considered duplicates. Hence, the first key in `subset` entirely determines whether rows are duplicates or not on all rows with non null `A`, and subsequent keys in `subset` only come into play on rows where `A` is null. This is typically useful to deduplicate data containing one row per vessel with potential duplicates but with multiple identifier columns (cfr, external immatriculation, ircs), some identifiers being more reliable than others. For instance, if two rows have the same CFR but different external immatriculation, it is reasonable to assume that it is a one the same vessel, whereas two rows wihout any information on CFR and different external immats should be considered as two distinct vessels. Args: df (pd.DataFrame): Input DataFrame id_cols (List[str]): List of column names to use as keys for the `drop_duplicates` operation, by decreasing level of priority Returns: pd.DataFrame: Copy of the input DataFrame with duplicate rows removed. """ try: assert isinstance(subset, list) except AssertionError: raise TypeError("`subset` must be a list.") try: assert len(subset) >= 1 except AssertionError: raise TypeError("`subset` must not be empty.") if len(subset) == 1: res = df.dropna(subset=subset).drop_duplicates(subset=subset) else: first_key_not_null = df.dropna(subset=[subset[0]]).drop_duplicates( subset=[subset[0]] ) first_key_null = drop_duplicates_by_decreasing_priority( df[df[subset[0]].isna()], subset=subset[1:] ) first_key_null = first_key_null[ ~left_isin_right_by_decreasing_priority( first_key_null[subset], first_key_not_null[subset] ) ] res = pd.concat([first_key_not_null, first_key_null]) return res
[docs] def try_get_factory(key: Hashable, error_value: Any = None): def try_get(d: Any) -> Any: """ Attempt to fetch an element from what is supposed to be dict (but may not be), return error_value if it fails (for any reason). This is useful to extract values from a series of dictionnaries which may not all contain the searched key. It is faster than checking for the presence of the key each time. """ try: return d[key] except: return error_value return try_get
[docs] def array_equals_row_on_window( arr: np.array, row: np.array, window_length: int ) -> np.array: """ Tests whether each row of an input 2D array is the last of a sequence of `window_length` consecutive rows equal to a given `row` 1D array, and returns the result as a float array with the same length as the input array. The output array is of `float` dtype and not `bool` dtype, because numpy `bool` arrays cannot contain null values. The values are `0.0` (representing `False`), `1.0` (representing `True`) and `np.nan` representing nulls. The first (`window_length` - 1) rows evaluate to `np.nan`, since the sliding window would need to know the values of the previous rows which are not given. Args: arr (np.array): 2D numpy array row (np.array): 1D numpy array with the same length as the number of columns in `arr` window_length (int): number of consecutive rows that must be equal to `row` for the result to be `True` Returns: np.array: 1D boolean array of the same length as the input arrays Examples: >>> arr = np.array([ [False, True], [False, True], [True, True], [False, True], [False, True], ]) >>> row = np.array([False, True]) >>> array_equals_row_on_window(arr, row, 2) array([nan, 1., 0., 0., 1.]) """ n_rows, n_columns = arr.shape # When the sliding window has more rows that the input array, return all nulls if n_rows < window_length: res = np.array([np.nan] * n_rows) else: strides = np.lib.stride_tricks.sliding_window_view( arr, (window_length, n_columns) ) res = (strides == row).all(axis=(1, 2, 3)) number_na_rows_to_add = window_length - 1 na_rows_to_add = np.array([np.nan] * number_na_rows_to_add) res = np.concatenate((na_rows_to_add, res)) return res.astype(float)
[docs] def back_propagate_ones(arr: np.array, steps: int) -> np.array: """ Given a 1D array with values `0.0`, `1.0` and `np.nan`, propagates `1.0` backward `steps` times. Args: arr (np.array): array containing `0.0`, `1.0` and `np.nan` values steps (int): number of steps that ones should be back-propagated returns: np.array: 1D array with the same dimensions as input, with ones back-propagated `steps` times. Examples: >>> arr = np.array([np.nan, 0., 0., 1., 0., 0., 1., 1., 0., 1.]) >>> back_propagate_ones(arr, 1) array([nan, 0., 1., 1., 0., 1., 1., 1., 1., 1.]) """ if steps == 0: return arr else: previous_step = back_propagate_ones(np.append(arr[1:], np.nan), steps - 1) tmp = np.concatenate((arr[:, None], previous_step[:, None]), axis=1) ones = np.equal(tmp, 1).any(axis=1) nans = np.isnan(tmp).any(axis=1) res = np.where((nans & (~ones)), np.nan, ones) return res
[docs] def rows_belong_to_sequence( arr: np.array, row: np.array, window_length: int ) -> np.array: """ Tests whether each row of an input 2D array belongs to a sequence of `window_length` consecutive rows equal to a given `row` 1D array, and returns the result as a float array with the same length as the input array. The output array is of `float` dtype and not `bool` dtype, because numpy `bool` arrays cannot contain null values. The values are `0.0` (representing `False`), `1.0` (representing `True`) and `np.nan` representing nulls. The first and last (`window_length` - 1) rows may be `np.nan`, since the rows before the beginning and after the end of the array are not known and might be needed to determine the result. Args: arr (np.array): 2D numpy array row (np.array): 1D numpy array with the same length as the number of columns in `arr` window_length (int): number of consecutive rows that must be equal to `row` for the result to be `True` Returns: np.array: 1D boolean array of the same length as the input arrays Examples: >>> arr = np.array([ [False, True], [False, True], [True, True], [False, True], [False, True], ]) >>> row = np.array([False, True]) >>> rows_belong_to_sequence(arr, row, 2) array([1., 1., 0., 1., 1.]) >>> arr = np.array([ [False, True], [True, True], [True, True], [False, True], [False, True], [False, False] ]) >>> row = np.array([False, True]) >>> rows_belong_to_sequence(arr, row, 2) array([nan, 0., 0., 1., 1., 0.]) """ ends_of_sequences = array_equals_row_on_window( arr, row, window_length=window_length, ) rows_known = back_propagate_ones(ends_of_sequences, steps=window_length - 1) # To test if rows at the beginning and at the end of the array could possibly # belong to a sequence `row` exceeding the boundaries of the array, we add rows to # the array and test again extended_arr = np.concatenate( ( row * np.ones((window_length - 1, len(row))), arr, row * np.ones((window_length - 1, len(row))), ) ) ends_of_sequences_extended = array_equals_row_on_window( extended_arr, row, window_length=window_length, ) rows_maybe = back_propagate_ones( ends_of_sequences_extended, steps=window_length - 1 )[window_length - 1 : -(window_length - 1)] res = np.where(np.isnan(rows_known) & rows_maybe.astype(bool), np.nan, rows_maybe) return res
[docs] def get_matched_groups(string: str, regex: re.Pattern) -> pd.Series: """ Matches the input `str` with the input `Pattern` and returns a pandas `Series` with the matched data. The index labels of the result `Series` are the group names `(?<group_name>...)` of the pattern. The values of the result `Series` are: - the match's group values, if the string matches the pattern - `None`, if the string does not matches the pattern Args: string (str): string to match regex (re.Pattern): pattern against which to match the string Returns: pd.Series: the match's group data """ assert isinstance(regex, re.Pattern) if isinstance(string, str): m = regex.match(string) else: m = None if m: result = pd.Series(m.groupdict()) else: result = pd.Series({i: None for i in regex.groupindex}) return result
[docs] def merge_dicts(list_of_dicts: List[dict]) -> dict: """ Merges a list of dicts as a single dict Args: list_of_dicts (List[dict]): List of dictionnaries Returns: dict: Dictionnary containing all the entries of the input dictionnaries """ return dict(ChainMap(*list_of_dicts))