pipeline.src.processing ======================= .. py:module:: pipeline.src.processing Functions --------- .. autoapisummary:: pipeline.src.processing.get_unused_col_name pipeline.src.processing.is_a_value pipeline.src.processing.concatenate_values pipeline.src.processing.concatenate_columns pipeline.src.processing.coalesce pipeline.src.processing.get_first_non_null_column_name pipeline.src.processing.remove_nones_from_dict pipeline.src.processing.remove_nones_from_list pipeline.src.processing.df_to_dict_series pipeline.src.processing.explode_dicts pipeline.src.processing.explode_lists_of_dicts pipeline.src.processing.zeros_ones_to_bools pipeline.src.processing.to_pgarr pipeline.src.processing.df_values_to_psql_arrays pipeline.src.processing.json_converter pipeline.src.processing.to_json pipeline.src.processing.df_values_to_json pipeline.src.processing.serialize_nullable_integer_df pipeline.src.processing.serialize_timedelta_df pipeline.src.processing.drop_rows_already_in_table pipeline.src.processing.prepare_df_for_loading pipeline.src.processing.join_on_multiple_keys pipeline.src.processing.left_isin_right_by_decreasing_priority pipeline.src.processing.drop_duplicates_by_decreasing_priority pipeline.src.processing.try_get_factory pipeline.src.processing.array_equals_row_on_window pipeline.src.processing.back_propagate_ones pipeline.src.processing.rows_belong_to_sequence pipeline.src.processing.get_matched_groups pipeline.src.processing.merge_dicts Module Contents --------------- .. py:function:: get_unused_col_name(col_name: str, df: pandas.DataFrame) -> str If `col_name` is not already a column name of the DataFrame `df`, returns `col_name`. Otherwise, appends a number to `col_name`, trying 0, 1, 2, ... until a unused column name if found. :param col_name: desired column name :type col_name: str :param df: DataFrame for which we want to ensure the column name is not already used :type df: pd.DataFrame :returns: column name :rtype: str .. rubric:: Examples >>> get_unused_col_name("id", pd.DataFrame({"idx": [1, 2, 3]})) "id" >>> get_unused_col_name("id", pd.DataFrame({"id": [1, 2, 3]})) "id_0" >>> get_unused_col_name("id", pd.DataFrame({"id": [1, 2, 3], "id_0": [4, 5, 6]})) "id_1" .. py:function:: is_a_value(x) -> bool Returns False if pd.isna(x), True otherwise. NB : The same result could be obtained simply by checking pd.isna(x), but checking if x is None before checking pd.isna(x) improves performance on DataFrames containing many None values, since checking pd.isna(x) is slower than checking if x is None. :param x: Anything :returns: `False` if pd.isna(x), `True` otherwise :rtype: bool .. py:function:: concatenate_values(row: pandas.Series) -> List Filters the input pandas Series to keep only distinct non null values and returns the result as a python ``list``. :param row: pandas ``Series`` :type row: pd.Series :returns: list of distinct non null values in row :rtype: List .. py:function:: concatenate_columns(df: pandas.DataFrame, input_col_names: List) -> pandas.Series For each row in the input DataFrame, the distinct and non null values contained in the columns input_col_names are stored in a list. A pandas Series of the same length as the input DataFrame is then constructed with these lists as values. :param df: input DataFrame :type df: pd.DataFrame :param input_col_names: the names of the columns to use :type input_col_names: List :returns: resulting Series :rtype: pd.Series .. py:function:: coalesce(df: pandas.DataFrame) -> pandas.Series Combines the input DataFrame's columns into one by taking the non null value in each row, in the order of the DataFrame's columns from left to right. Returns a pandas Series with the combined results. :param df: input pandas DataFrame :type df: pd.DataFrame :returns: Series containing the first non null value in each row of the DataFrame, taken in order of the DataFrame's columns from left to right. :rtype: pd.Series .. py:function:: get_first_non_null_column_name(df: pandas.DataFrame, result_labels: Union[None, dict] = None) -> pandas.Series Returns a Series with the same index as the input DataFrame, whose values are the name of the first column (or the corresponding label, if provided) with a non-null value in each row, from left to right. Rows with all null values return None. :param df: input pandas DataFrame :type df: pd.DataFrame :param result_labels: if provided, must be a mapping of column names to the corresponding labels in the result. :type result_labels: dict :returns: Series containing the name of the first column with a non-null value in each row of the DataFrame, from left to right :rtype: pd.Series .. py:function:: remove_nones_from_dict(d: dict) -> dict Takes a dictionary and removes ``None`` values from it. :param d: a dictionary :type d: dict :returns: the input dictionary, with all `None` removed. :rtype: dict .. rubric:: Examples >>> d = { "a" : 1, "b": [1, 2, None], "c": {"key": "value", "key2": None}, "d": None } >>> remove_nones_from_dict(d) {"a" : 1, "b": [1, 2, None], "c": {"key": "value", "key2": None}} .. py:function:: remove_nones_from_list(li: list) -> list Takes a list and removes ``None`` values from it. :param li: a list :type li: list :returns: the input list, with all `None` removed. :rtype: list .. rubric:: Examples >>> li = [1, 3, None, "a", "b", None] >>> remove_nones_from_dict(li) [1, 3, "a", "b"] .. py:function:: df_to_dict_series(df: pandas.DataFrame, result_colname: str = 'json_col', remove_nulls: bool = False) Converts a pandas DataFrame into a Series with the same index as the input DataFrame and whose values are dictionaries like : .. code-block:: python { "column_1" : value, "column_2": value, } :param df: input DataFrame :type df: pd.DataFrame :param result_colname: optionnal, name of result Series :type result_colname: Union[str, None] :param remove_nulls: if set to ``True``, ``null`` values are recursively removed from the dictionaries :type remove_nulls: bool :returns: pandas Series :rtype: pd.Series .. py:function:: explode_dicts(df: pandas.DataFrame, column_name: str) -> pandas.DataFrame Expands a column of dicts into one column per dict key. The original column is dropped from the result. :param df: input DataFrame :type df: pd.DataFrame :param column_name: name of the column containing dicts :type column_name: str :returns: DataFrame with the dict column replaced by its expanded keys :rtype: pd.DataFrame .. py:function:: explode_lists_of_dicts(df: pandas.DataFrame, column_name: str) -> pandas.DataFrame Expands a column of lists of dicts into one row per dict, one column per key. Each dict in each list becomes a row, with dict keys as columns. The index is reset. The original column is dropped from the result. :param df: input DataFrame :type df: pd.DataFrame :param column_name: name of the column containing lists of dicts :type column_name: str :returns: DataFrame with one row per dict and dict keys as columns :rtype: pd.DataFrame .. py:function:: zeros_ones_to_bools(x: Union[pandas.Series, pandas.DataFrame]) -> Union[pandas.Series, pandas.DataFrame] Converts a pandas DataFrame or Series containing `str`, `int` or `float` values, possibly including null (`None` and `np.nan`) values to a DataFrame with False, True and `np.nan` values respectively. Values 1, 1.0, "1", any non zero number... is converted to `True`. Values 0, 0.0, "0" are converted to `False`. Values `None` and `np.nan` are converted to `np.nan`. Useful to convert boolean data extracted from Oracle databases, since Oracle does not have a boolean data type and boolean data is often stored as "0"s and "1"s, or to handle sitations in which pandas data structures should contain nullable boolean data (in pandas / numpy, the `bool` dtype is not nullable, and this can be tricky to handle). .. py:function:: to_pgarr(x: Union[list, set, numpy.ndarray], handle_errors: bool = False, value_on_error: Union[str, None] = None) -> Union[str, None] Converts a python `list`, `set` or `numpy.ndarray` to a string with Postgresql array syntax. Elements of the list-like input argument are converted to `string` type, then stripped of leading and trailing blank spaces, and finally filtered to keep only non empty strings. This transformation is required on the elements of a DataFrame's columns that contain collections before bulk inserting the DataFrame into Postgresql with the psql_insert_copy method. :param x: iterable to serialize as Postgres array :type x: list, set or numpy.ndarray :param handle_errors: if ``True``, returns ``value_on_error`` instead of raising ``ValueError`` when the input is of an unexpected type :type handle_errors: bool :param value_on_error: value to return on errors, if ``handle_errors`` is ``True`` :type value_on_error: str or None :returns: string with Postgresql Array compatible syntax :rtype: str :raises ValueError: when ``handle_errors`` is False and ``x`` is not list-like. .. rubric:: Examples >>> to_pgarr([1, 2, "a ", "b", "", " "]) "{1,2,a,b}" >>> to_pgarr(["a,b", "c"]) '{"a,b",c}' >>> to_pgarr(None) ValueError >>> to_pgarr(None, handle_errors=True, value_on_error="{}") "{}" >>> to_pgarr(np.nan, handle_errors=True, value_on_error=None) .. py:function:: df_values_to_psql_arrays(df: pandas.DataFrame, handle_errors: bool = False, value_on_error: Union[str, None] = None) -> pandas.DataFrame Returns a `pandas.DataFrame` with all values serialized as strings with Postgresql array syntax. All values must be of type list, set or numpy array. Other values raise errors, which may be handled if handle_errors is set to True. See `to_pgarr` for details on error handling. This is required before bulk loading a pandas.DataFrame into a Postgresql table with the psql_insert_copy method. :param df: pandas DataFrame :type df: pd.DataFrame :returns: pandas DataFrame with the same shape and index, all values serialized as strings with Postgresql array syntax. :rtype: pd.DataFrame Examples : >>> df_to_psql_arrays(pd.DataFrame({'a': [[1, 2], ['a', 'b']]})) a 0 {1,2,3} 1 {a,b} .. py:function:: json_converter(x) Converter for types not natively handled by json.dumps .. py:function:: to_json(x: Any) -> str Converts python object to json string. .. py:function:: df_values_to_json(df: pandas.DataFrame) -> pandas.DataFrame Returns a `pandas.DataFrame` with all values serialized to json string. This is required before bulk loading into a Postgresql table with the psql_insert_copy method. See `to_json` function for details. :param df: pandas DataFrame :type df: pd.DataFrame :returns: pandas DataFrame with the same shape and index, all values serialized as json strings. :rtype: pd.DataFrame .. py:function:: serialize_nullable_integer_df(df: pandas.DataFrame) -> pandas.DataFrame Serializes the values of a DataFrame that contains numbers that represent possibly null (np.nan or None) integers. This is useful to prepare data before loading to integer Postgres columns, as pandas automatically converts integer Series to float dtype if they contain nulls. :param df: DataFrame of integer, possibly with None and np.nan values :type df: pd.DataFrame :returns: same DataFrame converted to string dtype :rtype: pd.DataFrame .. py:function:: serialize_timedelta_df(df: pandas.DataFrame) -> pandas.DataFrame Serializes the values of a DataFrame that contains `timedelta` values. This is useful to prepare data before loading to `interval` Postgres columns, as sqlachemy does not support the timedelta dtype. :param df: DataFrame of timedeltas :type df: pd.DataFrame :returns: same DataFrame converted to string dtype :rtype: pd.DataFrame .. py:function:: drop_rows_already_in_table(df: pandas.DataFrame, df_column_name: str, table: sqlalchemy.Table, table_column_name: str, connection: sqlalchemy.engine.base.Connection, logger: logging.Logger) -> pandas.DataFrame Removes rows from the input DataFrame `df` in which the column `df_column_name` contains values that are already present in the column `table_column_name` of the table `table`, and returns the filtered DataFrame. .. py:function:: prepare_df_for_loading(df: pandas.DataFrame, logger: logging.Logger, pg_array_columns: list = None, handle_array_conversion_errors: bool = True, value_on_array_conversion_error='{}', jsonb_columns: list = None, nullable_integer_columns: list = None, timedelta_columns: list = None, enum_columns: list = None, bytea_columns: list = None) .. py:function:: join_on_multiple_keys(left: pandas.DataFrame, right: pandas.DataFrame, or_join_keys: list, how: str = 'inner', and_join_keys: list = None, coalesce_common_columns: bool = True) Join two pandas DataFrames, attempting to match rows on several keys by decreasing order of priority. Joins are performed successively with each of the keys listed in `or_join_keys`, and results are then concatenated to form the final result. This is different from joining on a composite key where all keys must match simultaneously : here, rows of left and right DataFrames are joined if at least one of the keys match. Joins are performed on the keys listed in `or_join_keys` by "decreasing order of priority" in the sense that, in order to be matched, rows of left and right MUST match on their highest priority non null key (which come first in the list) but MIGHT not match on lower priority keys (which come later in the list). During each of the joins on the individual keys, non-joining key pairs and, if any, columns common to both left and right DataFrames, are coalesced (from left to right) if `coalesce_common_columns` is `True` (the default). Optionally, the join condition can contain an additional equality clause on keys listed in `and_join_keys`. If `or_join_keys` is `['A', 'B']` and `and_join_keys` is `['C', 'D']`, the SQL equivalent of the join condition is : .. code-block:: SQL ON ( left.A = right.A AND left.C = right.C AND left.D = right.D ) OR ( ( left.A IS NULL OR right.A IS NULL ) AND left.B = right.B AND left.C = right.C AND left.D = right.D ) :param left: pandas DataFrame :type left: pd.DataFrame :param right: pandas DataFrame :type right: pd.DataFrame :param or_join_keys: list of column names to use as join keys :type or_join_keys: list :param how: 'inner', 'left', 'right' or 'outer'. Defaults to 'inner'. :type how: str, optional :param and_join_keys: list of column names to use as additional join keys :type and_join_keys: list, optional :param coalesce_common_columns: whether to coalesce values in the columns that are present in both DataFrames. Defaults to `True`. :type coalesce_common_columns: bool, optional :returns: result of join operation :rtype: pd.DataFrame .. py:function:: left_isin_right_by_decreasing_priority(left: pandas.DataFrame, right: pandas.DataFrame) -> pandas.Series Performs an operation similar to `pandas.DataFrame.isin` on multiple columns, with the differences that : - the columns are tested one by one (instead of being tested simultaneously as in the case of `pandas.DataFrame.isin`), the first column of `left` being tested against the first column of `right`, the second column of `left` being tested against the second column of `right`... - columns are considered to be sorted by decreasing priority, meaning that a match on 2 rows of `left` and `right` on a given column will be taken into account only if the columns of higher priority on those 2 rows have values that are either equal or null. Takes two DataFrames `left` and `right` with the same columns, returns a Series with the same index as the `left` DataFrame and whose values are : - `True` if the corresponding row in `left` has a match in `right` in at least one column - `False` if the corresponding row in `left` has no match in `right` This is typically useful to filter vessels' data based on some other vessels' data, both datasets being indexed with multiple identifiers (vessel_id, cfr, ircs, external immat...). :param left: DataFrame :type left: pd.DataFrame :param right: DataFrame with values for which to test if they are present in `left` :type right: pd.DataFrame :returns: list of booleans with the same length as `left` :rtype: List[bool] .. py:function:: drop_duplicates_by_decreasing_priority(df: pandas.DataFrame, subset: List[str]) -> pandas.DataFrame Similar to `pandas.DataFrame.drop_duplicates(subset=subset)`, with the differences that: - the rows are deduplicated based on their values in the columns in `subset` one after the other and by decreasing priority, and not simultaneously - `NA` values on a key are not considered Rows having all `NA` values in all columns of `subset` are dropped. What is meant by "by decreasing priority" is that keys in `subset` are considered to be sorted by decreasing level of priority (for instance `A` and `B`, with `A` having the highest level of priority), and rows with distinct values on `B` but identical values on `A` will be considered duplicated, whereas rows with distinct values on `A` and identical values on `B` will not be considered duplicates. Hence, the first key in `subset` entirely determines whether rows are duplicates or not on all rows with non null `A`, and subsequent keys in `subset` only come into play on rows where `A` is null. This is typically useful to deduplicate data containing one row per vessel with potential duplicates but with multiple identifier columns (cfr, external immatriculation, ircs), some identifiers being more reliable than others. For instance, if two rows have the same CFR but different external immatriculation, it is reasonable to assume that it is a one the same vessel, whereas two rows wihout any information on CFR and different external immats should be considered as two distinct vessels. :param df: Input DataFrame :type df: pd.DataFrame :param id_cols: List of column names to use as keys for the `drop_duplicates` operation, by decreasing level of priority :type id_cols: List[str] :returns: Copy of the input DataFrame with duplicate rows removed. :rtype: pd.DataFrame .. py:function:: try_get_factory(key: Hashable, error_value: Any = None) .. py:function:: array_equals_row_on_window(arr: numpy.array, row: numpy.array, window_length: int) -> numpy.array Tests whether each row of an input 2D array is the last of a sequence of `window_length` consecutive rows equal to a given `row` 1D array, and returns the result as a float array with the same length as the input array. The output array is of `float` dtype and not `bool` dtype, because numpy `bool` arrays cannot contain null values. The values are `0.0` (representing `False`), `1.0` (representing `True`) and `np.nan` representing nulls. The first (`window_length` - 1) rows evaluate to `np.nan`, since the sliding window would need to know the values of the previous rows which are not given. :param arr: 2D numpy array :type arr: np.array :param row: 1D numpy array with the same length as the number of columns in `arr` :type row: np.array :param window_length: number of consecutive rows that must be equal to `row` for the result to be `True` :type window_length: int :returns: 1D boolean array of the same length as the input arrays :rtype: np.array .. rubric:: Examples >>> arr = np.array([ [False, True], [False, True], [True, True], [False, True], [False, True], ]) >>> row = np.array([False, True]) >>> array_equals_row_on_window(arr, row, 2) array([nan, 1., 0., 0., 1.]) .. py:function:: back_propagate_ones(arr: numpy.array, steps: int) -> numpy.array Given a 1D array with values `0.0`, `1.0` and `np.nan`, propagates `1.0` backward `steps` times. :param arr: array containing `0.0`, `1.0` and `np.nan` values :type arr: np.array :param steps: number of steps that ones should be back-propagated :type steps: int :returns: 1D array with the same dimensions as input, with ones back-propagated `steps` times. :rtype: np.array .. rubric:: Examples >>> arr = np.array([np.nan, 0., 0., 1., 0., 0., 1., 1., 0., 1.]) >>> back_propagate_ones(arr, 1) array([nan, 0., 1., 1., 0., 1., 1., 1., 1., 1.]) .. py:function:: rows_belong_to_sequence(arr: numpy.array, row: numpy.array, window_length: int) -> numpy.array Tests whether each row of an input 2D array belongs to a sequence of `window_length` consecutive rows equal to a given `row` 1D array, and returns the result as a float array with the same length as the input array. The output array is of `float` dtype and not `bool` dtype, because numpy `bool` arrays cannot contain null values. The values are `0.0` (representing `False`), `1.0` (representing `True`) and `np.nan` representing nulls. The first and last (`window_length` - 1) rows may be `np.nan`, since the rows before the beginning and after the end of the array are not known and might be needed to determine the result. :param arr: 2D numpy array :type arr: np.array :param row: 1D numpy array with the same length as the number of columns in `arr` :type row: np.array :param window_length: number of consecutive rows that must be equal to `row` for the result to be `True` :type window_length: int :returns: 1D boolean array of the same length as the input arrays :rtype: np.array .. rubric:: Examples >>> arr = np.array([ [False, True], [False, True], [True, True], [False, True], [False, True], ]) >>> row = np.array([False, True]) >>> rows_belong_to_sequence(arr, row, 2) array([1., 1., 0., 1., 1.]) >>> arr = np.array([ [False, True], [True, True], [True, True], [False, True], [False, True], [False, False] ]) >>> row = np.array([False, True]) >>> rows_belong_to_sequence(arr, row, 2) array([nan, 0., 0., 1., 1., 0.]) .. py:function:: get_matched_groups(string: str, regex: re.Pattern) -> pandas.Series Matches the input `str` with the input `Pattern` and returns a pandas `Series` with the matched data. The index labels of the result `Series` are the group names `(?...)` of the pattern. The values of the result `Series` are: - the match's group values, if the string matches the pattern - `None`, if the string does not matches the pattern :param string: string to match :type string: str :param regex: pattern against which to match the string :type regex: re.Pattern :returns: the match's group data :rtype: pd.Series .. py:function:: merge_dicts(list_of_dicts: List[dict]) -> dict Merges a list of dicts as a single dict :param list_of_dicts: List of dictionnaries :type list_of_dicts: List[dict] :returns: Dictionnary containing all the entries of the input dictionnaries :rtype: dict