pipeline.src.processing
Functions
|
If col_name is not already a column name of the DataFrame df, returns |
|
Returns False if pd.isna(x), True otherwise. |
|
Filters the input pandas Series to keep only distinct non null values and returns |
|
For each row in the input DataFrame, the distinct and non null values contained in |
|
Combines the input DataFrame's columns into one by taking the non null value in |
|
Returns a Series with the same index as the input DataFrame, whose values are |
|
Takes a dictionary and removes |
|
Takes a list and removes |
|
Converts a pandas DataFrame into a Series with the same index as the input |
|
Expands a column of dicts into one column per dict key. |
|
Expands a column of lists of dicts into one row per dict, one column per key. |
|
Converts a pandas DataFrame or Series containing str, int or float values, |
|
Converts a python list, set or numpy.ndarray to a string with Postgresql |
|
Returns a pandas.DataFrame with all values serialized as strings |
Converter for types not natively handled by json.dumps |
|
|
Converts python object to json string. |
|
Returns a pandas.DataFrame with all values serialized to json string. |
|
Serializes the values of a DataFrame that contains numbers that represent |
|
Serializes the values of a DataFrame that contains timedelta values. |
|
Removes rows from the input DataFrame df in which the column df_column_name |
|
|
|
Join two pandas DataFrames, attempting to match rows on several keys by |
|
Performs an operation similar to pandas.DataFrame.isin on multiple columns, with |
|
Similar to pandas.DataFrame.drop_duplicates(subset=subset), with the |
|
|
|
Tests whether each row of an input 2D array is the last of a sequence of |
|
Given a 1D array with values 0.0, 1.0 and np.nan, propagates 1.0 backward |
|
Tests whether each row of an input 2D array belongs to a sequence of |
|
Matches the input str with the input Pattern and returns a pandas Series |
|
Merges a list of dicts as a single dict |
Module Contents
- pipeline.src.processing.get_unused_col_name(col_name: str, df: pandas.DataFrame) str[source]
If col_name is not already a column name of the DataFrame df, returns col_name. Otherwise, appends a number to col_name, trying 0, 1, 2, … until a unused column name if found.
- Parameters:
col_name (str) – desired column name
df (pd.DataFrame) – DataFrame for which we want to ensure the column name is not already used
- Returns:
column name
- Return type:
str
Examples
>>> get_unused_col_name("id", pd.DataFrame({"idx": [1, 2, 3]})) "id" >>> get_unused_col_name("id", pd.DataFrame({"id": [1, 2, 3]})) "id_0" >>> get_unused_col_name("id", pd.DataFrame({"id": [1, 2, 3], "id_0": [4, 5, 6]})) "id_1"
- pipeline.src.processing.is_a_value(x) bool[source]
Returns False if pd.isna(x), True otherwise.
NB : The same result could be obtained simply by checking pd.isna(x), but checking if x is None before checking pd.isna(x) improves performance on DataFrames containing many None values, since checking pd.isna(x) is slower than checking if x is None.
- Parameters:
x – Anything
- Returns:
False if pd.isna(x), True otherwise
- Return type:
bool
- pipeline.src.processing.concatenate_values(row: pandas.Series) List[source]
Filters the input pandas Series to keep only distinct non null values and returns the result as a python
list.- Parameters:
row (pd.Series) – pandas
Series- Returns:
list of distinct non null values in row
- Return type:
List
- pipeline.src.processing.concatenate_columns(df: pandas.DataFrame, input_col_names: List) pandas.Series[source]
For each row in the input DataFrame, the distinct and non null values contained in the columns input_col_names are stored in a list. A pandas Series of the same length as the input DataFrame is then constructed with these lists as values.
- Parameters:
df (pd.DataFrame) – input DataFrame
input_col_names (List) – the names of the columns to use
- Returns:
resulting Series
- Return type:
pd.Series
- pipeline.src.processing.coalesce(df: pandas.DataFrame) pandas.Series[source]
Combines the input DataFrame’s columns into one by taking the non null value in each row, in the order of the DataFrame’s columns from left to right.
Returns a pandas Series with the combined results.
- Parameters:
df (pd.DataFrame) – input pandas DataFrame
- Returns:
Series containing the first non null value in each row of the DataFrame, taken in order of the DataFrame’s columns from left to right.
- Return type:
pd.Series
- pipeline.src.processing.get_first_non_null_column_name(df: pandas.DataFrame, result_labels: None | dict = None) pandas.Series[source]
Returns a Series with the same index as the input DataFrame, whose values are the name of the first column (or the corresponding label, if provided) with a non-null value in each row, from left to right.
Rows with all null values return None.
- Parameters:
df (pd.DataFrame) – input pandas DataFrame
result_labels (dict) – if provided, must be a mapping of column names to the corresponding labels in the result.
- Returns:
Series containing the name of the first column with a non-null value in each row of the DataFrame, from left to right
- Return type:
pd.Series
- pipeline.src.processing.remove_nones_from_dict(d: dict) dict[source]
Takes a dictionary and removes
Nonevalues from it.- Parameters:
d (dict) – a dictionary
- Returns:
the input dictionary, with all None removed.
- Return type:
dict
Examples
>>> d = { "a" : 1, "b": [1, 2, None], "c": {"key": "value", "key2": None}, "d": None } >>> remove_nones_from_dict(d) {"a" : 1, "b": [1, 2, None], "c": {"key": "value", "key2": None}}
- pipeline.src.processing.remove_nones_from_list(li: list) list[source]
Takes a list and removes
Nonevalues from it.- Parameters:
li (list) – a list
- Returns:
the input list, with all None removed.
- Return type:
list
Examples
>>> li = [1, 3, None, "a", "b", None] >>> remove_nones_from_dict(li) [1, 3, "a", "b"]
- pipeline.src.processing.df_to_dict_series(df: pandas.DataFrame, result_colname: str = 'json_col', remove_nulls: bool = False)[source]
Converts a pandas DataFrame into a Series with the same index as the input DataFrame and whose values are dictionaries like :
{ "column_1" : value, "column_2": value, }
- Parameters:
df (pd.DataFrame) – input DataFrame
result_colname (Union[str, None]) – optionnal, name of result Series
remove_nulls (bool) – if set to
True,nullvalues are recursively removed from the dictionaries
- Returns:
pandas Series
- Return type:
pd.Series
- pipeline.src.processing.explode_dicts(df: pandas.DataFrame, column_name: str) pandas.DataFrame[source]
Expands a column of dicts into one column per dict key.
The original column is dropped from the result.
- Parameters:
df (pd.DataFrame) – input DataFrame
column_name (str) – name of the column containing dicts
- Returns:
DataFrame with the dict column replaced by its expanded keys
- Return type:
pd.DataFrame
- pipeline.src.processing.explode_lists_of_dicts(df: pandas.DataFrame, column_name: str) pandas.DataFrame[source]
Expands a column of lists of dicts into one row per dict, one column per key.
Each dict in each list becomes a row, with dict keys as columns. The index is reset. The original column is dropped from the result.
- Parameters:
df (pd.DataFrame) – input DataFrame
column_name (str) – name of the column containing lists of dicts
- Returns:
DataFrame with one row per dict and dict keys as columns
- Return type:
pd.DataFrame
- pipeline.src.processing.zeros_ones_to_bools(x: pandas.Series | pandas.DataFrame) pandas.Series | pandas.DataFrame[source]
Converts a pandas DataFrame or Series containing str, int or float values, possibly including null (None and np.nan) values to a DataFrame with False, True and np.nan values respectively.
Values 1, 1.0, “1”, any non zero number… is converted to True. Values 0, 0.0, “0” are converted to False. Values None and np.nan are converted to np.nan.
Useful to convert boolean data extracted from Oracle databases, since Oracle does not have a boolean data type and boolean data is often stored as “0”s and “1”s, or to handle sitations in which pandas data structures should contain nullable boolean data (in pandas / numpy, the bool dtype is not nullable, and this can be tricky to handle).
- pipeline.src.processing.to_pgarr(x: list | set | numpy.ndarray, handle_errors: bool = False, value_on_error: str | None = None) str | None[source]
Converts a python list, set or numpy.ndarray to a string with Postgresql array syntax.
Elements of the list-like input argument are converted to string type, then stripped of leading and trailing blank spaces, and finally filtered to keep only non empty strings.
This transformation is required on the elements of a DataFrame’s columns that contain collections before bulk inserting the DataFrame into Postgresql with the psql_insert_copy method.
- Parameters:
x (list, set or numpy.ndarray) – iterable to serialize as Postgres array
handle_errors (bool) – if
True, returnsvalue_on_errorinstead of raisingValueErrorwhen the input is of an unexpected typevalue_on_error (str or None) – value to return on errors, if
handle_errorsisTrue
- Returns:
string with Postgresql Array compatible syntax
- Return type:
str
- Raises:
ValueError – when
handle_errorsis False andxis not list-like.
Examples
>>> to_pgarr([1, 2, "a ", "b", "", " "]) "{1,2,a,b}" >>> to_pgarr(["a,b", "c"]) '{"a,b",c}' >>> to_pgarr(None) ValueError >>> to_pgarr(None, handle_errors=True, value_on_error="{}") "{}" >>> to_pgarr(np.nan, handle_errors=True, value_on_error=None)
- pipeline.src.processing.df_values_to_psql_arrays(df: pandas.DataFrame, handle_errors: bool = False, value_on_error: str | None = None) pandas.DataFrame[source]
Returns a pandas.DataFrame with all values serialized as strings with Postgresql array syntax. All values must be of type list, set or numpy array. Other values raise errors, which may be handled if handle_errors is set to True.
See to_pgarr for details on error handling.
This is required before bulk loading a pandas.DataFrame into a Postgresql table with the psql_insert_copy method.
- Parameters:
df (pd.DataFrame) – pandas DataFrame
- Returns:
pandas DataFrame with the same shape and index, all values serialized as strings with Postgresql array syntax.
- Return type:
pd.DataFrame
- Examples :
>>> df_to_psql_arrays(pd.DataFrame({'a': [[1, 2], ['a', 'b']]})) a 0 {1,2,3} 1 {a,b}
- pipeline.src.processing.json_converter(x)[source]
Converter for types not natively handled by json.dumps
- pipeline.src.processing.df_values_to_json(df: pandas.DataFrame) pandas.DataFrame[source]
Returns a pandas.DataFrame with all values serialized to json string.
This is required before bulk loading into a Postgresql table with the psql_insert_copy method.
See to_json function for details.
- Parameters:
df (pd.DataFrame) – pandas DataFrame
- Returns:
pandas DataFrame with the same shape and index, all values serialized as json strings.
- Return type:
pd.DataFrame
- pipeline.src.processing.serialize_nullable_integer_df(df: pandas.DataFrame) pandas.DataFrame[source]
Serializes the values of a DataFrame that contains numbers that represent possibly null (np.nan or None) integers. This is useful to prepare data before loading to integer Postgres columns, as pandas automatically converts integer Series to float dtype if they contain nulls.
- Parameters:
df (pd.DataFrame) – DataFrame of integer, possibly with None and np.nan values
- Returns:
same DataFrame converted to string dtype
- Return type:
pd.DataFrame
- pipeline.src.processing.serialize_timedelta_df(df: pandas.DataFrame) pandas.DataFrame[source]
Serializes the values of a DataFrame that contains timedelta values. This is useful to prepare data before loading to interval Postgres columns, as sqlachemy does not support the timedelta dtype.
- Parameters:
df (pd.DataFrame) – DataFrame of timedeltas
- Returns:
same DataFrame converted to string dtype
- Return type:
pd.DataFrame
- pipeline.src.processing.drop_rows_already_in_table(df: pandas.DataFrame, df_column_name: str, table: sqlalchemy.Table, table_column_name: str, connection: sqlalchemy.engine.base.Connection, logger: logging.Logger) pandas.DataFrame[source]
Removes rows from the input DataFrame df in which the column df_column_name contains values that are already present in the column table_column_name of the table table, and returns the filtered DataFrame.
- pipeline.src.processing.prepare_df_for_loading(df: pandas.DataFrame, logger: logging.Logger, pg_array_columns: list = None, handle_array_conversion_errors: bool = True, value_on_array_conversion_error='{}', jsonb_columns: list = None, nullable_integer_columns: list = None, timedelta_columns: list = None, enum_columns: list = None, bytea_columns: list = None)[source]
- pipeline.src.processing.join_on_multiple_keys(left: pandas.DataFrame, right: pandas.DataFrame, or_join_keys: list, how: str = 'inner', and_join_keys: list = None, coalesce_common_columns: bool = True)[source]
Join two pandas DataFrames, attempting to match rows on several keys by decreasing order of priority.
Joins are performed successively with each of the keys listed in or_join_keys, and results are then concatenated to form the final result. This is different from joining on a composite key where all keys must match simultaneously : here, rows of left and right DataFrames are joined if at least one of the keys match.
Joins are performed on the keys listed in or_join_keys by “decreasing order of priority” in the sense that, in order to be matched, rows of left and right MUST match on their highest priority non null key (which come first in the list) but MIGHT not match on lower priority keys (which come later in the list).
During each of the joins on the individual keys, non-joining key pairs and, if any, columns common to both left and right DataFrames, are coalesced (from left to right) if coalesce_common_columns is True (the default).
Optionally, the join condition can contain an additional equality clause on keys listed in and_join_keys.
If or_join_keys is [‘A’, ‘B’] and and_join_keys is [‘C’, ‘D’], the SQL equivalent of the join condition is :
ON ( left.A = right.A AND left.C = right.C AND left.D = right.D ) OR ( ( left.A IS NULL OR right.A IS NULL ) AND left.B = right.B AND left.C = right.C AND left.D = right.D )
- Parameters:
left (pd.DataFrame) – pandas DataFrame
right (pd.DataFrame) – pandas DataFrame
or_join_keys (list) – list of column names to use as join keys
how (str, optional) – ‘inner’, ‘left’, ‘right’ or ‘outer’. Defaults to ‘inner’.
and_join_keys (list, optional) – list of column names to use as additional join keys
coalesce_common_columns (bool, optional) – whether to coalesce values in the columns that are present in both DataFrames. Defaults to True.
- Returns:
result of join operation
- Return type:
pd.DataFrame
- pipeline.src.processing.left_isin_right_by_decreasing_priority(left: pandas.DataFrame, right: pandas.DataFrame) pandas.Series[source]
Performs an operation similar to pandas.DataFrame.isin on multiple columns, with the differences that :
the columns are tested one by one (instead of being tested simultaneously as in the case of pandas.DataFrame.isin), the first column of left being tested against the first column of right, the second column of left being tested against the second column of right…
columns are considered to be sorted by decreasing priority, meaning that a match on 2 rows of left and right on a given column will be taken into account only if the columns of higher priority on those 2 rows have values that are either equal or null.
Takes two DataFrames left and right with the same columns, returns a Series with the same index as the left DataFrame and whose values are :
True if the corresponding row in left has a match in right in at least one column
False if the corresponding row in left has no match in right
This is typically useful to filter vessels’ data based on some other vessels’ data, both datasets being indexed with multiple identifiers (vessel_id, cfr, ircs, external immat…).
- Parameters:
left (pd.DataFrame) – DataFrame
right (pd.DataFrame) – DataFrame with values for which to test if they are present in left
- Returns:
list of booleans with the same length as left
- Return type:
List[bool]
- pipeline.src.processing.drop_duplicates_by_decreasing_priority(df: pandas.DataFrame, subset: List[str]) pandas.DataFrame[source]
Similar to pandas.DataFrame.drop_duplicates(subset=subset), with the differences that:
the rows are deduplicated based on their values in the columns in subset one after the other and by decreasing priority, and not simultaneously
NA values on a key are not considered
Rows having all NA values in all columns of subset are dropped.
What is meant by “by decreasing priority” is that keys in subset are considered to be sorted by decreasing level of priority (for instance A and B, with A having the highest level of priority), and rows with distinct values on B but identical values on A will be considered duplicated, whereas rows with distinct values on A and identical values on B will not be considered duplicates. Hence, the first key in subset entirely determines whether rows are duplicates or not on all rows with non null A, and subsequent keys in subset only come into play on rows where A is null.
This is typically useful to deduplicate data containing one row per vessel with potential duplicates but with multiple identifier columns (cfr, external immatriculation, ircs), some identifiers being more reliable than others. For instance, if two rows have the same CFR but different external immatriculation, it is reasonable to assume that it is a one the same vessel, whereas two rows wihout any information on CFR and different external immats should be considered as two distinct vessels.
- Parameters:
df (pd.DataFrame) – Input DataFrame
id_cols (List[str]) – List of column names to use as keys for the drop_duplicates operation, by decreasing level of priority
- Returns:
Copy of the input DataFrame with duplicate rows removed.
- Return type:
pd.DataFrame
- pipeline.src.processing.array_equals_row_on_window(arr: numpy.array, row: numpy.array, window_length: int) numpy.array[source]
Tests whether each row of an input 2D array is the last of a sequence of window_length consecutive rows equal to a given row 1D array, and returns the result as a float array with the same length as the input array.
The output array is of float dtype and not bool dtype, because numpy bool arrays cannot contain null values. The values are 0.0 (representing False), 1.0 (representing True) and np.nan representing nulls.
The first (window_length - 1) rows evaluate to np.nan, since the sliding window would need to know the values of the previous rows which are not given.
- Parameters:
arr (np.array) – 2D numpy array
row (np.array) – 1D numpy array with the same length as the number of columns in arr
window_length (int) – number of consecutive rows that must be equal to row for the result to be True
- Returns:
1D boolean array of the same length as the input arrays
- Return type:
np.array
Examples
>>> arr = np.array([ [False, True], [False, True], [True, True], [False, True], [False, True], ]) >>> row = np.array([False, True]) >>> array_equals_row_on_window(arr, row, 2) array([nan, 1., 0., 0., 1.])
- pipeline.src.processing.back_propagate_ones(arr: numpy.array, steps: int) numpy.array[source]
Given a 1D array with values 0.0, 1.0 and np.nan, propagates 1.0 backward steps times.
- Parameters:
arr (np.array) – array containing 0.0, 1.0 and np.nan values
steps (int) – number of steps that ones should be back-propagated
- Returns:
1D array with the same dimensions as input, with ones back-propagated steps times.
- Return type:
np.array
Examples
>>> arr = np.array([np.nan, 0., 0., 1., 0., 0., 1., 1., 0., 1.]) >>> back_propagate_ones(arr, 1) array([nan, 0., 1., 1., 0., 1., 1., 1., 1., 1.])
- pipeline.src.processing.rows_belong_to_sequence(arr: numpy.array, row: numpy.array, window_length: int) numpy.array[source]
Tests whether each row of an input 2D array belongs to a sequence of window_length consecutive rows equal to a given row 1D array, and returns the result as a float array with the same length as the input array.
The output array is of float dtype and not bool dtype, because numpy bool arrays cannot contain null values. The values are 0.0 (representing False), 1.0 (representing True) and np.nan representing nulls.
The first and last (window_length - 1) rows may be np.nan, since the rows before the beginning and after the end of the array are not known and might be needed to determine the result.
- Parameters:
arr (np.array) – 2D numpy array
row (np.array) – 1D numpy array with the same length as the number of columns in arr
window_length (int) – number of consecutive rows that must be equal to row for the result to be True
- Returns:
1D boolean array of the same length as the input arrays
- Return type:
np.array
Examples
>>> arr = np.array([ [False, True], [False, True], [True, True], [False, True], [False, True], ]) >>> row = np.array([False, True]) >>> rows_belong_to_sequence(arr, row, 2) array([1., 1., 0., 1., 1.])
>>> arr = np.array([ [False, True], [True, True], [True, True], [False, True], [False, True], [False, False] ]) >>> row = np.array([False, True]) >>> rows_belong_to_sequence(arr, row, 2) array([nan, 0., 0., 1., 1., 0.])
- pipeline.src.processing.get_matched_groups(string: str, regex: re.Pattern) pandas.Series[source]
Matches the input str with the input Pattern and returns a pandas Series with the matched data.
The index labels of the result Series are the group names (?<group_name>…) of the pattern.
The values of the result Series are:
the match’s group values, if the string matches the pattern
None, if the string does not matches the pattern
- Parameters:
string (str) – string to match
regex (re.Pattern) – pattern against which to match the string
- Returns:
the match’s group data
- Return type:
pd.Series