Source code for pipeline.src.flows.activity_visualizations
import json
from datetime import datetime
import pandas as pd
from prefect import flow, get_run_logger, task
from config import STATIC_LOCATION
from src.generic_tasks import extract, load
from src.shared_tasks.dates import date_trunc, get_utcnow, make_relativedelta
@task
[docs]
def extract_activity_overview_data(
from_datetime_utc: datetime, to_datetime_utc: datetime
) -> pd.DataFrame:
return extract(
db_name="data_warehouse",
query_filepath="data_warehouse/activity_overview.sql",
params={
"from_datetime_utc": from_datetime_utc,
"to_datetime_utc": to_datetime_utc,
},
)
@task
[docs]
def get_time_range(df: pd.DataFrame) -> list:
time_range = [
int(df.far_week.min().timestamp()) * 1000,
int(df.far_week.max().timestamp()) * 1000,
]
return time_range
@task
[docs]
def make_config(time_range: list) -> dict:
config = {
"version": "v1",
"config": {
"visState": {
"filters": [
{
"dataId": ["activity_dataset"],
"id": "2texduyca",
"name": ["far_week"],
"type": "timeRange",
"value": time_range,
"plotType": "histogram",
"animationWindow": "free",
"yAxis": None,
"view": "enlarged",
"speed": 3.856,
"enabled": True,
},
{
"dataId": ["activity_dataset"],
"id": "clf3bqx5c",
"name": ["landing_department"],
"type": "multiSelect",
"value": [],
"plotType": "histogram",
"animationWindow": "free",
"yAxis": None,
"view": "side",
"speed": 1,
"enabled": True,
},
{
"dataId": ["activity_dataset"],
"id": "ihak66e6d",
"name": ["economic_zone"],
"type": "multiSelect",
"value": [],
"plotType": "histogram",
"animationWindow": "free",
"yAxis": None,
"view": "side",
"speed": 1,
"enabled": True,
},
{
"dataId": ["activity_dataset"],
"id": "nggqwzrw9",
"name": ["landing_facade"],
"type": "multiSelect",
"value": [],
"plotType": "histogram",
"animationWindow": "free",
"yAxis": None,
"view": "side",
"speed": 1,
"enabled": True,
},
{
"dataId": ["activity_dataset"],
"id": "tk82xui47",
"name": ["segment"],
"type": "multiSelect",
"value": [],
"plotType": "histogram",
"animationWindow": "free",
"yAxis": None,
"view": "side",
"speed": 1,
"enabled": True,
},
{
"dataId": ["activity_dataset"],
"id": "btnzrlb1j",
"name": ["facade"],
"type": "multiSelect",
"value": [],
"plotType": "histogram",
"animationWindow": "free",
"yAxis": None,
"view": "side",
"speed": 1,
"enabled": True,
},
],
"layers": [
{
"id": "evl5q7a",
"type": "point",
"config": {
"dataId": "activity_dataset",
"label": "Captures par segment",
"color": [221, 178, 124],
"highlightColor": [252, 242, 26, 255],
"columns": {"lat": "latitude", "lng": "longitude"},
"isVisible": True,
"visConfig": {
"radius": 10,
"fixedRadius": False,
"opacity": 0.16,
"outline": False,
"thickness": 2,
"strokeColor": None,
"colorRange": {
"name": "Uber Viz Qualitative 4",
"type": "qualitative",
"category": "Uber",
"colors": [
"#12939A",
"#DDB27C",
"#88572C",
"#FF991F",
"#F15C17",
"#223F9A",
"#DA70BF",
"#125C77",
"#4DC19C",
"#776E57",
"#17B8BE",
"#F6D18A",
"#B7885E",
"#FFCB99",
"#F89570",
"#829AE3",
"#E79FD5",
"#1E96BE",
"#89DAC1",
"#B3AD9E",
],
},
"strokeColorRange": {
"name": "Global Warming",
"type": "sequential",
"category": "Uber",
"colors": [
"#5A1846",
"#900C3F",
"#C70039",
"#E3611C",
"#F1920E",
"#FFC300",
],
},
"radiusRange": [0, 50],
"filled": True,
},
"hidden": False,
"textLabel": [
{
"field": None,
"color": [255, 255, 255],
"size": 18,
"offset": [0, 0],
"anchor": "start",
"alignment": "center",
"outlineWidth": 0,
"outlineColor": [255, 0, 0, 255],
"background": False,
"backgroundColor": [0, 0, 200, 255],
}
],
},
"visualChannels": {
"colorField": {"name": "segment", "type": "string"},
"colorScale": "ordinal",
"strokeColorField": None,
"strokeColorScale": "quantile",
"sizeField": {"name": "weight", "type": "real"},
"sizeScale": "sqrt",
},
},
{
"id": "5v3jjcm",
"type": "arc",
"config": {
"dataId": "activity_dataset",
"label": "Captures -> port par segment",
"color": [255, 254, 230],
"highlightColor": [252, 242, 26, 255],
"columns": {
"lat0": "latitude",
"lng0": "longitude",
"lat1": "landing_port_latitude",
"lng1": "landing_port_longitude",
},
"isVisible": True,
"visConfig": {
"opacity": 0.02,
"thickness": 2,
"colorRange": {
"name": "Uber Viz Qualitative 4",
"type": "qualitative",
"category": "Uber",
"colors": [
"#12939A",
"#DDB27C",
"#88572C",
"#FF991F",
"#F15C17",
"#223F9A",
"#DA70BF",
"#125C77",
"#4DC19C",
"#776E57",
"#17B8BE",
"#F6D18A",
"#B7885E",
"#FFCB99",
"#F89570",
"#829AE3",
"#E79FD5",
"#1E96BE",
"#89DAC1",
"#B3AD9E",
],
},
"sizeRange": [0, 16.8],
"targetColor": None,
},
"hidden": False,
"textLabel": [
{
"field": None,
"color": [255, 255, 255],
"size": 18,
"offset": [0, 0],
"anchor": "start",
"alignment": "center",
"outlineWidth": 0,
"outlineColor": [255, 0, 0, 255],
"background": False,
"backgroundColor": [0, 0, 200, 255],
}
],
},
"visualChannels": {
"colorField": {"name": "segment", "type": "string"},
"colorScale": "ordinal",
"sizeField": {"name": "weight", "type": "real"},
"sizeScale": "linear",
},
},
{
"id": "ki3onk",
"type": "point",
"config": {
"dataId": "activity_dataset",
"label": "Débarquements",
"color": [255, 254, 230],
"highlightColor": [252, 242, 26, 255],
"columns": {
"lat": "landing_port_latitude",
"lng": "landing_port_longitude",
},
"isVisible": True,
"visConfig": {
"radius": 10,
"fixedRadius": False,
"opacity": 0.01,
"outline": False,
"thickness": 2,
"strokeColor": None,
"colorRange": {
"name": "Global Warming",
"type": "sequential",
"category": "Uber",
"colors": [
"#5A1846",
"#900C3F",
"#C70039",
"#E3611C",
"#F1920E",
"#FFC300",
],
},
"strokeColorRange": {
"name": "Global Warming",
"type": "sequential",
"category": "Uber",
"colors": [
"#5A1846",
"#900C3F",
"#C70039",
"#E3611C",
"#F1920E",
"#FFC300",
],
},
"radiusRange": [0, 50],
"filled": True,
},
"hidden": False,
"textLabel": [
{
"field": None,
"color": [255, 255, 255],
"size": 18,
"offset": [0, 0],
"anchor": "start",
"alignment": "center",
"outlineWidth": 0,
"outlineColor": [255, 0, 0, 255],
"background": False,
"backgroundColor": [0, 0, 200, 255],
}
],
},
"visualChannels": {
"colorField": None,
"colorScale": "quantile",
"strokeColorField": None,
"strokeColorScale": "quantile",
"sizeField": {"name": "weight", "type": "real"},
"sizeScale": "sqrt",
},
},
],
"effects": [],
"interactionConfig": {
"tooltip": {
"fieldsToShow": {
"activity_dataset": [
{"name": "far_week", "format": None},
{"name": "facade", "format": None},
{"name": "landing_facade", "format": None},
{"name": "landing_department", "format": None},
{"name": "economic_zone", "format": None},
{"name": "segment", "format": None},
{"name": "weight", "format": None},
]
},
"compareMode": False,
"compareType": "absolute",
"enabled": True,
},
"brush": {"size": 0.5, "enabled": False},
"geocoder": {"enabled": False},
"coordinate": {"enabled": False},
},
"layerBlending": "normal",
"overlayBlending": "normal",
"splitMaps": [],
"animationConfig": {"currentTime": None, "speed": 1},
"editor": {"features": [], "visible": True},
},
"mapState": {
"bearing": 4.811688311688311,
"dragRotate": True,
"latitude": 49.327447079535155,
"longitude": -5.124412558901727,
"pitch": 0,
"zoom": 5.343217870339727,
"isSplit": False,
"isViewportSynced": True,
"isZoomLocked": False,
"splitMapViewports": [],
},
"mapStyle": {
"styleType": "dark-matter",
"topLayerGroups": {},
"visibleLayerGroups": {
"label": True,
"road": True,
"border": False,
"building": True,
"water": True,
"land": True,
"3d building": False,
},
"threeDBuildingColor": [
15.035172933000911,
15.035172933000911,
15.035172933000911,
],
"backgroundColor": [0, 0, 0],
"mapStyles": {},
},
},
}
return config
[docs]
def _df_to_dict(df):
"""Create an input dict for Kepler.gl using a DataFrame object
Inputs:
- df: a DataFrame object
Returns:
- dictionary: a dictionary variable that can be used in Kepler.gl
"""
df_copy = df.copy()
# Convert all columns that aren't JSON serializable to strings
for col in df_copy.columns:
try:
# just check the first item in the colum
json.dumps(df_copy[col].iloc[0] if len(df_copy) > 0 else None)
except (TypeError, OverflowError):
df_copy[col] = df_copy[col].astype(str)
return df_copy.to_dict("split")
[docs]
def _normalize_data(data):
if isinstance(data, pd.DataFrame):
return _df_to_dict(data)
return data
[docs]
def data_to_json(data):
"""Serialize a Python data object.
Attributes of this dictionary are to be passed to the JavaScript side.
"""
if data is None:
return None
else:
if not isinstance(data, dict):
print(data)
raise ValueError(
"Data type incorrect, expecting a dictionary mapping "
f"from data id to value, but got {type(data)}"
)
else:
dataset = {}
# use g_use_arrow to determine if we should use arrow
for key, value in data.items():
normalized = _normalize_data(value)
dataset.update({key: normalized})
return dataset
[docs]
def _repr_html_(data=None, config=None, read_only=False, center_map=False):
with open(STATIC_LOCATION / "keplergl.html", "r") as f:
keplergl_html = f.read()
k = keplergl_html.find("<body>")
data_to_add = data_to_json(data)
keplergl_data = json.dumps(
{
"config": config,
"data": data_to_add,
"options": {"readOnly": read_only, "centerMap": center_map},
}
)
cmd = f"window.__keplerglDataConfig = {keplergl_data};"
frame_txt = (
keplergl_html[:k]
+ "<body><script>"
+ cmd
+ "</script>"
+ keplergl_html[k + 6 :]
)
return frame_txt
@task
[docs]
def generate_html(df: pd.DataFrame, config: dict) -> str:
df = df.copy(deep=True)
df["far_week"] = df.far_week.map(lambda ts: ts.isoformat(sep=" "))
html = _repr_html_(data={"activity_dataset": df}, config=config)
html_list = html.split("</body></html>")
assert len(html_list) == 2
assert len(html_list[1]) == 0
truncated_html = html_list[0]
resize_script_block = """<script>
window.onload = function() {
setTimeout(function() {
var element = document.getElementById('kepler-gl__map');
if (element) {
element.style.width = '100%';
element.style.height = '100%';
console.warn("Élément avec l'ID 'kepler-gl__map' a été mis à jour.");
} else {
console.warn("Élément avec l'ID 'kepler-gl__map' introuvable.");
}
}, 100);
};
</script>"""
return truncated_html + resize_script_block + "</body></html>"
@task
[docs]
def create_data_frame_to_load(
html: str, from_datetime_utc: datetime, to_datetime_utc: datetime
) -> pd.DataFrame:
res = pd.DataFrame(
{
"start_datetime_utc": [from_datetime_utc],
"end_datetime_utc": [to_datetime_utc],
"html_file": [html],
}
)
return res
@task
[docs]
def load_activity_visualization(df: pd.DataFrame, truncate_table: bool):
logger = get_run_logger()
how = "replace" if truncate_table else "upsert"
load(
df=df,
table_name="activity_visualizations",
schema="public",
db_name="monitorfish_remote",
logger=logger,
how=how,
table_id_column="end_datetime_utc",
df_id_column="end_datetime_utc",
)
@flow(name="Monitorfish - Generate kepler activity viz")
[docs]
def activity_visualizations_flow(
start_months_ago: int = 12,
end_months_ago: int = 0,
truncate_table: bool = False,
get_utcnow_fn=get_utcnow,
):
# Extract
now = get_utcnow_fn()
today = date_trunc(now, "DAY")
from_datetime_utc = today - make_relativedelta(months=start_months_ago)
to_datetime_utc = today - make_relativedelta(months=end_months_ago)
activity_data = extract_activity_overview_data(from_datetime_utc, to_datetime_utc)
# Transform
time_range = get_time_range(activity_data)
config = make_config(time_range)
html = generate_html(activity_data, config)
activity_visualization_df = create_data_frame_to_load(
html, from_datetime_utc, to_datetime_utc
)
# Load
load_activity_visualization(activity_visualization_df, truncate_table)