pipeline.src.processing

Functions

get_unused_col_name(→ str)

If col_name is not already a column name of the DataFrame df, returns

is_a_value(→ bool)

Returns False if pd.isna(x), True otherwise.

concatenate_values(→ List)

Filters the input pandas Series to keep only distinct non null values and returns

concatenate_columns(→ pandas.Series)

For each row in the input DataFrame, the distinct and non null values contained in

coalesce(→ pandas.Series)

Combines the input DataFrame's columns into one by taking the non null value in

get_first_non_null_column_name(→ pandas.Series)

Returns a Series with the same index as the input DataFrame, whose values are

remove_nones_from_dict(→ dict)

Takes a dictionary and removes None values from it.

remove_nones_from_list(→ list)

Takes a list and removes None values from it.

df_to_dict_series(df[, result_colname, remove_nulls])

Converts a pandas DataFrame into a Series with the same index as the input

explode_dicts(→ pandas.DataFrame)

Expands a column of dicts into one column per dict key.

explode_lists_of_dicts(→ pandas.DataFrame)

Expands a column of lists of dicts into one row per dict, one column per key.

zeros_ones_to_bools(→ Union[pandas.Series, ...)

Converts a pandas DataFrame or Series containing str, int or float values,

to_pgarr(→ Union[str, None])

Converts a python list, set or numpy.ndarray to a string with Postgresql

df_values_to_psql_arrays(→ pandas.DataFrame)

Returns a pandas.DataFrame with all values serialized as strings

json_converter(x)

Converter for types not natively handled by json.dumps

to_json(→ str)

Converts python object to json string.

df_values_to_json(→ pandas.DataFrame)

Returns a pandas.DataFrame with all values serialized to json string.

serialize_nullable_integer_df(→ pandas.DataFrame)

Serializes the values of a DataFrame that contains numbers that represent

serialize_timedelta_df(→ pandas.DataFrame)

Serializes the values of a DataFrame that contains timedelta values.

drop_rows_already_in_table(→ pandas.DataFrame)

Removes rows from the input DataFrame df in which the column df_column_name

prepare_df_for_loading(df, logger[, pg_array_columns, ...])

join_on_multiple_keys(left, right, or_join_keys[, ...])

Join two pandas DataFrames, attempting to match rows on several keys by

left_isin_right_by_decreasing_priority(→ pandas.Series)

Performs an operation similar to pandas.DataFrame.isin on multiple columns, with

drop_duplicates_by_decreasing_priority(→ pandas.DataFrame)

Similar to pandas.DataFrame.drop_duplicates(subset=subset), with the

try_get_factory(key[, error_value])

array_equals_row_on_window(→ numpy.array)

Tests whether each row of an input 2D array is the last of a sequence of

back_propagate_ones(→ numpy.array)

Given a 1D array with values 0.0, 1.0 and np.nan, propagates 1.0 backward

rows_belong_to_sequence(→ numpy.array)

Tests whether each row of an input 2D array belongs to a sequence of

get_matched_groups(→ pandas.Series)

Matches the input str with the input Pattern and returns a pandas Series

merge_dicts(→ dict)

Merges a list of dicts as a single dict

Module Contents

pipeline.src.processing.get_unused_col_name(col_name: str, df: pandas.DataFrame) str[source]

If col_name is not already a column name of the DataFrame df, returns col_name. Otherwise, appends a number to col_name, trying 0, 1, 2, … until a unused column name if found.

Parameters:
  • col_name (str) – desired column name

  • df (pd.DataFrame) – DataFrame for which we want to ensure the column name is not already used

Returns:

column name

Return type:

str

Examples

>>> get_unused_col_name("id", pd.DataFrame({"idx": [1, 2, 3]}))
"id"
>>> get_unused_col_name("id", pd.DataFrame({"id": [1, 2, 3]}))
"id_0"
>>> get_unused_col_name("id", pd.DataFrame({"id": [1, 2, 3], "id_0": [4, 5, 6]}))
"id_1"
pipeline.src.processing.is_a_value(x) bool[source]

Returns False if pd.isna(x), True otherwise.

NB : The same result could be obtained simply by checking pd.isna(x), but checking if x is None before checking pd.isna(x) improves performance on DataFrames containing many None values, since checking pd.isna(x) is slower than checking if x is None.

Parameters:

x – Anything

Returns:

False if pd.isna(x), True otherwise

Return type:

bool

pipeline.src.processing.concatenate_values(row: pandas.Series) List[source]

Filters the input pandas Series to keep only distinct non null values and returns the result as a python list.

Parameters:

row (pd.Series) – pandas Series

Returns:

list of distinct non null values in row

Return type:

List

pipeline.src.processing.concatenate_columns(df: pandas.DataFrame, input_col_names: List) pandas.Series[source]

For each row in the input DataFrame, the distinct and non null values contained in the columns input_col_names are stored in a list. A pandas Series of the same length as the input DataFrame is then constructed with these lists as values.

Parameters:
  • df (pd.DataFrame) – input DataFrame

  • input_col_names (List) – the names of the columns to use

Returns:

resulting Series

Return type:

pd.Series

pipeline.src.processing.coalesce(df: pandas.DataFrame) pandas.Series[source]

Combines the input DataFrame’s columns into one by taking the non null value in each row, in the order of the DataFrame’s columns from left to right.

Returns a pandas Series with the combined results.

Parameters:

df (pd.DataFrame) – input pandas DataFrame

Returns:

Series containing the first non null value in each row of the DataFrame, taken in order of the DataFrame’s columns from left to right.

Return type:

pd.Series

pipeline.src.processing.get_first_non_null_column_name(df: pandas.DataFrame, result_labels: None | dict = None) pandas.Series[source]

Returns a Series with the same index as the input DataFrame, whose values are the name of the first column (or the corresponding label, if provided) with a non-null value in each row, from left to right.

Rows with all null values return None.

Parameters:
  • df (pd.DataFrame) – input pandas DataFrame

  • result_labels (dict) – if provided, must be a mapping of column names to the corresponding labels in the result.

Returns:

Series containing the name of the first column with a non-null value in each row of the DataFrame, from left to right

Return type:

pd.Series

pipeline.src.processing.remove_nones_from_dict(d: dict) dict[source]

Takes a dictionary and removes None values from it.

Parameters:

d (dict) – a dictionary

Returns:

the input dictionary, with all None removed.

Return type:

dict

Examples

>>> d = {
    "a" : 1,
    "b": [1, 2, None],
    "c": {"key": "value", "key2": None},
    "d": None
    }
>>> remove_nones_from_dict(d)
{"a" : 1, "b": [1, 2, None], "c": {"key": "value", "key2": None}}
pipeline.src.processing.remove_nones_from_list(li: list) list[source]

Takes a list and removes None values from it.

Parameters:

li (list) – a list

Returns:

the input list, with all None removed.

Return type:

list

Examples

>>> li = [1, 3, None, "a", "b", None]
>>> remove_nones_from_dict(li)
[1, 3, "a", "b"]
pipeline.src.processing.df_to_dict_series(df: pandas.DataFrame, result_colname: str = 'json_col', remove_nulls: bool = False)[source]

Converts a pandas DataFrame into a Series with the same index as the input DataFrame and whose values are dictionaries like :

{
    "column_1" : value,
    "column_2": value,
}
Parameters:
  • df (pd.DataFrame) – input DataFrame

  • result_colname (Union[str, None]) – optionnal, name of result Series

  • remove_nulls (bool) – if set to True, null values are recursively removed from the dictionaries

Returns:

pandas Series

Return type:

pd.Series

pipeline.src.processing.explode_dicts(df: pandas.DataFrame, column_name: str) pandas.DataFrame[source]

Expands a column of dicts into one column per dict key.

The original column is dropped from the result.

Parameters:
  • df (pd.DataFrame) – input DataFrame

  • column_name (str) – name of the column containing dicts

Returns:

DataFrame with the dict column replaced by its expanded keys

Return type:

pd.DataFrame

pipeline.src.processing.explode_lists_of_dicts(df: pandas.DataFrame, column_name: str) pandas.DataFrame[source]

Expands a column of lists of dicts into one row per dict, one column per key.

Each dict in each list becomes a row, with dict keys as columns. The index is reset. The original column is dropped from the result.

Parameters:
  • df (pd.DataFrame) – input DataFrame

  • column_name (str) – name of the column containing lists of dicts

Returns:

DataFrame with one row per dict and dict keys as columns

Return type:

pd.DataFrame

pipeline.src.processing.zeros_ones_to_bools(x: pandas.Series | pandas.DataFrame) pandas.Series | pandas.DataFrame[source]

Converts a pandas DataFrame or Series containing str, int or float values, possibly including null (None and np.nan) values to a DataFrame with False, True and np.nan values respectively.

Values 1, 1.0, “1”, any non zero number… is converted to True. Values 0, 0.0, “0” are converted to False. Values None and np.nan are converted to np.nan.

Useful to convert boolean data extracted from Oracle databases, since Oracle does not have a boolean data type and boolean data is often stored as “0”s and “1”s, or to handle sitations in which pandas data structures should contain nullable boolean data (in pandas / numpy, the bool dtype is not nullable, and this can be tricky to handle).

pipeline.src.processing.to_pgarr(x: list | set | numpy.ndarray, handle_errors: bool = False, value_on_error: str | None = None) str | None[source]

Converts a python list, set or numpy.ndarray to a string with Postgresql array syntax.

Elements of the list-like input argument are converted to string type, then stripped of leading and trailing blank spaces, and finally filtered to keep only non empty strings.

This transformation is required on the elements of a DataFrame’s columns that contain collections before bulk inserting the DataFrame into Postgresql with the psql_insert_copy method.

Parameters:
  • x (list, set or numpy.ndarray) – iterable to serialize as Postgres array

  • handle_errors (bool) – if True, returns value_on_error instead of raising ValueError when the input is of an unexpected type

  • value_on_error (str or None) – value to return on errors, if handle_errors is True

Returns:

string with Postgresql Array compatible syntax

Return type:

str

Raises:

ValueError – when handle_errors is False and x is not list-like.

Examples

>>> to_pgarr([1, 2, "a ", "b", "", " "])
"{1,2,a,b}"
>>> to_pgarr(["a,b", "c"])
'{"a,b",c}'
>>> to_pgarr(None)
ValueError
>>> to_pgarr(None, handle_errors=True, value_on_error="{}")
"{}"
>>> to_pgarr(np.nan, handle_errors=True, value_on_error=None)
pipeline.src.processing.df_values_to_psql_arrays(df: pandas.DataFrame, handle_errors: bool = False, value_on_error: str | None = None) pandas.DataFrame[source]

Returns a pandas.DataFrame with all values serialized as strings with Postgresql array syntax. All values must be of type list, set or numpy array. Other values raise errors, which may be handled if handle_errors is set to True.

See to_pgarr for details on error handling.

This is required before bulk loading a pandas.DataFrame into a Postgresql table with the psql_insert_copy method.

Parameters:

df (pd.DataFrame) – pandas DataFrame

Returns:

pandas DataFrame with the same shape and index, all values serialized as strings with Postgresql array syntax.

Return type:

pd.DataFrame

Examples :
>>> df_to_psql_arrays(pd.DataFrame({'a': [[1, 2], ['a', 'b']]}))
    a
0   {1,2,3}
1   {a,b}
pipeline.src.processing.json_converter(x)[source]

Converter for types not natively handled by json.dumps

pipeline.src.processing.to_json(x: Any) str[source]

Converts python object to json string.

pipeline.src.processing.df_values_to_json(df: pandas.DataFrame) pandas.DataFrame[source]

Returns a pandas.DataFrame with all values serialized to json string.

This is required before bulk loading into a Postgresql table with the psql_insert_copy method.

See to_json function for details.

Parameters:

df (pd.DataFrame) – pandas DataFrame

Returns:

pandas DataFrame with the same shape and index, all values serialized as json strings.

Return type:

pd.DataFrame

pipeline.src.processing.serialize_nullable_integer_df(df: pandas.DataFrame) pandas.DataFrame[source]

Serializes the values of a DataFrame that contains numbers that represent possibly null (np.nan or None) integers. This is useful to prepare data before loading to integer Postgres columns, as pandas automatically converts integer Series to float dtype if they contain nulls.

Parameters:

df (pd.DataFrame) – DataFrame of integer, possibly with None and np.nan values

Returns:

same DataFrame converted to string dtype

Return type:

pd.DataFrame

pipeline.src.processing.serialize_timedelta_df(df: pandas.DataFrame) pandas.DataFrame[source]

Serializes the values of a DataFrame that contains timedelta values. This is useful to prepare data before loading to interval Postgres columns, as sqlachemy does not support the timedelta dtype.

Parameters:

df (pd.DataFrame) – DataFrame of timedeltas

Returns:

same DataFrame converted to string dtype

Return type:

pd.DataFrame

pipeline.src.processing.drop_rows_already_in_table(df: pandas.DataFrame, df_column_name: str, table: sqlalchemy.Table, table_column_name: str, connection: sqlalchemy.engine.base.Connection, logger: logging.Logger) pandas.DataFrame[source]

Removes rows from the input DataFrame df in which the column df_column_name contains values that are already present in the column table_column_name of the table table, and returns the filtered DataFrame.

pipeline.src.processing.prepare_df_for_loading(df: pandas.DataFrame, logger: logging.Logger, pg_array_columns: list = None, handle_array_conversion_errors: bool = True, value_on_array_conversion_error='{}', jsonb_columns: list = None, nullable_integer_columns: list = None, timedelta_columns: list = None, enum_columns: list = None, bytea_columns: list = None)[source]
pipeline.src.processing.join_on_multiple_keys(left: pandas.DataFrame, right: pandas.DataFrame, or_join_keys: list, how: str = 'inner', and_join_keys: list = None, coalesce_common_columns: bool = True)[source]

Join two pandas DataFrames, attempting to match rows on several keys by decreasing order of priority.

Joins are performed successively with each of the keys listed in or_join_keys, and results are then concatenated to form the final result. This is different from joining on a composite key where all keys must match simultaneously : here, rows of left and right DataFrames are joined if at least one of the keys match.

Joins are performed on the keys listed in or_join_keys by “decreasing order of priority” in the sense that, in order to be matched, rows of left and right MUST match on their highest priority non null key (which come first in the list) but MIGHT not match on lower priority keys (which come later in the list).

During each of the joins on the individual keys, non-joining key pairs and, if any, columns common to both left and right DataFrames, are coalesced (from left to right) if coalesce_common_columns is True (the default).

Optionally, the join condition can contain an additional equality clause on keys listed in and_join_keys.

If or_join_keys is [‘A’, ‘B’] and and_join_keys is [‘C’, ‘D’], the SQL equivalent of the join condition is :

ON
    (
        left.A = right.A AND
        left.C = right.C AND
        left.D = right.D
    ) OR
    (
        (
            left.A IS NULL OR
            right.A IS NULL
        ) AND
        left.B = right.B AND
        left.C = right.C AND
        left.D = right.D
    )
Parameters:
  • left (pd.DataFrame) – pandas DataFrame

  • right (pd.DataFrame) – pandas DataFrame

  • or_join_keys (list) – list of column names to use as join keys

  • how (str, optional) – ‘inner’, ‘left’, ‘right’ or ‘outer’. Defaults to ‘inner’.

  • and_join_keys (list, optional) – list of column names to use as additional join keys

  • coalesce_common_columns (bool, optional) – whether to coalesce values in the columns that are present in both DataFrames. Defaults to True.

Returns:

result of join operation

Return type:

pd.DataFrame

pipeline.src.processing.left_isin_right_by_decreasing_priority(left: pandas.DataFrame, right: pandas.DataFrame) pandas.Series[source]

Performs an operation similar to pandas.DataFrame.isin on multiple columns, with the differences that :

  • the columns are tested one by one (instead of being tested simultaneously as in the case of pandas.DataFrame.isin), the first column of left being tested against the first column of right, the second column of left being tested against the second column of right

  • columns are considered to be sorted by decreasing priority, meaning that a match on 2 rows of left and right on a given column will be taken into account only if the columns of higher priority on those 2 rows have values that are either equal or null.

Takes two DataFrames left and right with the same columns, returns a Series with the same index as the left DataFrame and whose values are :

  • True if the corresponding row in left has a match in right in at least one column

  • False if the corresponding row in left has no match in right

This is typically useful to filter vessels’ data based on some other vessels’ data, both datasets being indexed with multiple identifiers (vessel_id, cfr, ircs, external immat…).

Parameters:
  • left (pd.DataFrame) – DataFrame

  • right (pd.DataFrame) – DataFrame with values for which to test if they are present in left

Returns:

list of booleans with the same length as left

Return type:

List[bool]

pipeline.src.processing.drop_duplicates_by_decreasing_priority(df: pandas.DataFrame, subset: List[str]) pandas.DataFrame[source]

Similar to pandas.DataFrame.drop_duplicates(subset=subset), with the differences that:

  • the rows are deduplicated based on their values in the columns in subset one after the other and by decreasing priority, and not simultaneously

  • NA values on a key are not considered

Rows having all NA values in all columns of subset are dropped.

What is meant by “by decreasing priority” is that keys in subset are considered to be sorted by decreasing level of priority (for instance A and B, with A having the highest level of priority), and rows with distinct values on B but identical values on A will be considered duplicated, whereas rows with distinct values on A and identical values on B will not be considered duplicates. Hence, the first key in subset entirely determines whether rows are duplicates or not on all rows with non null A, and subsequent keys in subset only come into play on rows where A is null.

This is typically useful to deduplicate data containing one row per vessel with potential duplicates but with multiple identifier columns (cfr, external immatriculation, ircs), some identifiers being more reliable than others. For instance, if two rows have the same CFR but different external immatriculation, it is reasonable to assume that it is a one the same vessel, whereas two rows wihout any information on CFR and different external immats should be considered as two distinct vessels.

Parameters:
  • df (pd.DataFrame) – Input DataFrame

  • id_cols (List[str]) – List of column names to use as keys for the drop_duplicates operation, by decreasing level of priority

Returns:

Copy of the input DataFrame with duplicate rows removed.

Return type:

pd.DataFrame

pipeline.src.processing.try_get_factory(key: Hashable, error_value: Any = None)[source]
pipeline.src.processing.array_equals_row_on_window(arr: numpy.array, row: numpy.array, window_length: int) numpy.array[source]

Tests whether each row of an input 2D array is the last of a sequence of window_length consecutive rows equal to a given row 1D array, and returns the result as a float array with the same length as the input array.

The output array is of float dtype and not bool dtype, because numpy bool arrays cannot contain null values. The values are 0.0 (representing False), 1.0 (representing True) and np.nan representing nulls.

The first (window_length - 1) rows evaluate to np.nan, since the sliding window would need to know the values of the previous rows which are not given.

Parameters:
  • arr (np.array) – 2D numpy array

  • row (np.array) – 1D numpy array with the same length as the number of columns in arr

  • window_length (int) – number of consecutive rows that must be equal to row for the result to be True

Returns:

1D boolean array of the same length as the input arrays

Return type:

np.array

Examples

>>> arr = np.array([
    [False, True],
    [False, True],
    [True, True],
    [False, True],
    [False, True],
])
>>> row = np.array([False, True])
>>> array_equals_row_on_window(arr, row, 2)
array([nan,  1.,  0.,  0.,  1.])
pipeline.src.processing.back_propagate_ones(arr: numpy.array, steps: int) numpy.array[source]

Given a 1D array with values 0.0, 1.0 and np.nan, propagates 1.0 backward steps times.

Parameters:
  • arr (np.array) – array containing 0.0, 1.0 and np.nan values

  • steps (int) – number of steps that ones should be back-propagated

Returns:

1D array with the same dimensions as input, with ones back-propagated steps times.

Return type:

np.array

Examples

>>> arr = np.array([np.nan,  0.,  0.,  1.,  0.,  0.,  1.,  1.,  0.,  1.])
>>> back_propagate_ones(arr, 1)
array([nan,  0.,  1.,  1.,  0.,  1.,  1.,  1.,  1.,  1.])
pipeline.src.processing.rows_belong_to_sequence(arr: numpy.array, row: numpy.array, window_length: int) numpy.array[source]

Tests whether each row of an input 2D array belongs to a sequence of window_length consecutive rows equal to a given row 1D array, and returns the result as a float array with the same length as the input array.

The output array is of float dtype and not bool dtype, because numpy bool arrays cannot contain null values. The values are 0.0 (representing False), 1.0 (representing True) and np.nan representing nulls.

The first and last (window_length - 1) rows may be np.nan, since the rows before the beginning and after the end of the array are not known and might be needed to determine the result.

Parameters:
  • arr (np.array) – 2D numpy array

  • row (np.array) – 1D numpy array with the same length as the number of columns in arr

  • window_length (int) – number of consecutive rows that must be equal to row for the result to be True

Returns:

1D boolean array of the same length as the input arrays

Return type:

np.array

Examples

>>> arr = np.array([
    [False, True],
    [False, True],
    [True, True],
    [False, True],
    [False, True],
])
>>> row = np.array([False, True])
>>> rows_belong_to_sequence(arr, row, 2)
array([1., 1., 0., 1., 1.])
>>> arr = np.array([
    [False, True],
    [True, True],
    [True, True],
    [False, True],
    [False, True],
    [False, False]
])
>>> row = np.array([False, True])
>>> rows_belong_to_sequence(arr, row, 2)
array([nan,  0.,  0.,  1.,  1., 0.])
pipeline.src.processing.get_matched_groups(string: str, regex: re.Pattern) pandas.Series[source]

Matches the input str with the input Pattern and returns a pandas Series with the matched data.

The index labels of the result Series are the group names (?<group_name>…) of the pattern.

The values of the result Series are:

  • the match’s group values, if the string matches the pattern

  • None, if the string does not matches the pattern

Parameters:
  • string (str) – string to match

  • regex (re.Pattern) – pattern against which to match the string

Returns:

the match’s group data

Return type:

pd.Series

pipeline.src.processing.merge_dicts(list_of_dicts: List[dict]) dict[source]

Merges a list of dicts as a single dict

Parameters:

list_of_dicts (List[dict]) – List of dictionnaries

Returns:

Dictionnary containing all the entries of the input dictionnaries

Return type:

dict