import pandas as pd
from influxdb_client import InfluxDBClient
import psycopg2
import yaml
from msTools.models import CodeID, ActivityLeg, ActivityAll
from msTools import i18n
from msTools.timeutils import ensure_utc
from msGait.models import EffectiveMovement, EffectiveGait, ActivitySegment
from pydantic import ValidationError
from typing import Any
from psycopg2 import sql
import datetime
from pandas.api.types import DatetimeTZDtype
[docs]
class DataManager:
def __init__(self, config_path: str) -> None:
"""Initialize the DataManager and open database connections.
Args:
config_path: Path to the YAML configuration file.
"""
# Initialize attributes early so __del__ never fails even if init raises.
self.config: dict[str, Any] = {}
self.pg_conn: psycopg2.extensions.connection | None = None
self.influxdb_client: InfluxDBClient | None = None
self.bucket: str = ""
self.measurement: str = ""
self.config = self.load_config(config_path)
# Configure PostgreSQL connection
self.pg_conn = self._connect_postgresql()
# Configure InfluxDB client
self.influxdb_client = InfluxDBClient(
url=self.config["influxdb"]["url"],
token=self.config["influxdb"]["token"],
org=self.config["influxdb"]["org"],
timeout=self.config["influxdb"]["timeout"]
)
self.bucket: str = self.config["influxdb"]["bucket"]
self.measurement: str = self.config['influxdb']['measurement']
def __del__(self) -> None:
"""Ensure all connections are closed on deletion."""
# Never raise in destructors (it creates noisy 'Exception ignored in ...').
try:
self.close_influxdb()
except Exception:
pass
try:
self.close_pg()
except Exception:
pass
[docs]
def load_config(self, config_path: str) -> dict[str, Any]:
"""Load configuration values from a YAML file.
Args:
config_path: Path to the YAML configuration file.
Returns:
Parsed configuration dictionary.
"""
with open(config_path, "r", encoding="utf-8") as file:
return yaml.safe_load(file)
[docs]
def get_config(self, sect: str) -> dict[str, Any] | None:
"""Return one configuration section from the loaded config.
Args:
sect: Name of the section to retrieve.
Returns:
The requested configuration section, or ``None`` if it does not exist.
"""
if sect in self.config.keys():
return self.config.get(sect)
else:
return None
[docs]
def _connect_postgresql(self) -> psycopg2.extensions.connection:
"""Create and return a PostgreSQL connection.
Returns:
Open psycopg2 PostgreSQL connection.
"""
try:
return psycopg2.connect(
host=self.config["postgresql"]["host"],
port=self.config["postgresql"].get("port", 5432),
database=self.config["postgresql"]["database"],
user=self.config["postgresql"]["user"],
password=self.config["postgresql"]["password"],
options="-c timezone=UTC"
)
except psycopg2.OperationalError as e:
print(i18n._("PGSQL-CONN-ERR").format(e=e))
raise
[docs]
def close_pg(self) -> None:
"""Closes the PostgreSQL connection."""
if self.pg_conn is not None and getattr(self.pg_conn, "closed", 1) == 0:
self.pg_conn.close()
[docs]
def close_influxdb(self) -> None:
"""Closes the InfluxDB client."""
if self.influxdb_client is not None:
self.influxdb_client.close()
[docs]
def close_all(self) -> None:
"""Closes both PostgreSQL and InfluxDB connections."""
if self.pg_conn is not None and getattr(self.pg_conn, "closed", 1) == 0:
self.pg_conn.close()
if self.influxdb_client is not None:
self.influxdb_client.close()
[docs]
def get_influx_client(self) -> InfluxDBClient:
"""Return the active InfluxDB client instance.
Returns:
InfluxDB client used by the repository.
"""
return self.influxdb_client
[docs]
def check_and_create_tables(self, sql_file_path: str) -> None:
"""Create required PostgreSQL tables when they do not exist.
Args:
sql_file_path: Path to the SQL file containing the table definitions.
"""
try:
required_tables = [
"codeids",
"effective_movement",
"activity_leg",
"activity_all",
"fullref_sensor_codeid",
"effective_gait",
]
with self.pg_conn.cursor() as cursor:
for table_name in required_tables:
cursor.execute(f"""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = '{table_name}'
);
""")
exists = cursor.fetchone()[0]
if not exists:
print(i18n._("INFO_CREATE_TABLE").format(table=table_name, file=sql_file_path))
with open(sql_file_path, "r", encoding="utf-8") as sql_file:
sql_script = sql_file.read()
cursor.execute(sql_script)
self.pg_conn.commit()
else:
print(i18n._("INFO_TABLE_EXISTS").format(table=table_name))
except Exception as e:
self.pg_conn.rollback()
print(i18n._("PGSQL-TAB-ERR").format(e=e))
raise
[docs]
def get_codeids_in_range(self, start_datetime: str, end_datetime: str) -> list[str]:
"""Retrieve unique CodeIDs from InfluxDB inside a time range.
Args:
start_datetime: Start datetime as a string.
end_datetime: End datetime as a string.
Returns:
Sorted list of unique CodeID values found in the interval.
"""
try:
start = ensure_utc(start_datetime)
end = ensure_utc(end_datetime)
start_iso = start.isoformat().replace("+00:00", "Z")
end_iso = end.isoformat().replace("+00:00", "Z")
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start_iso}, stop: {end_iso})
|> filter(fn: (r) => r._measurement == "{self.measurement}")
|> filter(fn: (r) => exists r.CodeID)
|> keep(columns: ["CodeID"])
|> distinct(column: "CodeID")
'''
result = self.influxdb_client.query_api().query(
query, org=self.config['influxdb']['org']
)
codeids = sorted({
record.values.get("CodeID")
for table in result
for record in table.records
if record.values.get("CodeID") not in (None, "")
})
return codeids
except Exception as e:
print(i18n._("INFL-QRY-COD-ERR").format(e=e))
return []
[docs]
def fetch_data(self, query: str) -> pd.DataFrame:
"""Execute a SQL query in PostgreSQL and return the result as a DataFrame.
Args:
query: SQL query string.
Returns:
DataFrame containing the query results.
"""
try:
with self.pg_conn.cursor() as cursor:
cursor.execute(query)
columns = [desc[0] for desc in cursor.description]
data = cursor.fetchall()
return pd.DataFrame(data, columns=columns)
except Exception as e:
print(i18n._("PGSQL-QRY-GEN-ERR").format(e=e))
raise
[docs]
def segments_retrieval(
self,
fstart: str | None = None,
fend: str | None = None,
ids: list[int] | None = None,
verbose: int = 0
) -> pd.DataFrame:
"""Retrieve `activity_all` rows by explicit IDs or by overlapping time window.
Args:
fstart: Start datetime when retrieving by time range.
fend: End datetime when retrieving by time range.
ids: Explicit list of `activity_all` IDs to retrieve.
verbose: Verbosity level for console output.
Returns:
DataFrame with columns
[`id`, `start_time`, `end_time`, `duration`,
`codeid_ids`, `codeleg_ids`, `active_legs`].
Raises:
ValueError: If neither a valid ID list nor a valid time window is provided.
"""
query = """
SELECT id, start_time, end_time, duration,
codeid_ids, codeleg_ids, active_legs
FROM activity_all
"""
params = None
if ids is not None:
if len(ids) == 0:
if verbose >= 1:
print(i18n._("WARN_NO_SEGMENTS_FOUND"))
return pd.DataFrame(
columns=[
"id",
"start_time",
"end_time",
"duration",
"codeid_ids",
"codeleg_ids",
"active_legs",
]
)
if verbose >= 1:
print(i18n._("INFO_SEGMENTS_BY_IDS").format(ids=ids))
query += " WHERE id = ANY(%s) ORDER BY codeid_ids;"
params = (ids,)
else:
if not fstart or not fend:
raise ValueError(i18n._("ERR_MISSING_IDS_OR_WINDOW"))
if verbose >= 1:
print(i18n._("INFO_SEGMENTS_BY_RANGE").format(start=fstart, end=fend))
start_utc = ensure_utc(fstart)
end_utc = ensure_utc(fend)
query += """
WHERE start_time <= %s
AND end_time >= %s
ORDER BY codeid_ids;
"""
params = (end_utc, start_utc)
try:
with self.pg_conn.cursor() as cursor:
cursor.execute(query, params)
columns = [desc[0] for desc in cursor.description]
data = cursor.fetchall()
df = pd.DataFrame(data, columns=columns)
except Exception as e:
print(i18n._("PGSQL-QRY-GEN-ERR").format(e=e))
raise
if df.empty and verbose >= 1:
print(i18n._("WARN_NO_SEGMENTS_FOUND"))
else:
for col in df.columns:
if isinstance(df[col].dtype, DatetimeTZDtype):
df[col] = df[col].dt.tz_localize(None)
return df
[docs]
def recover_activity_all(self, act: pd.DataFrame, verbose: int = 0) -> pd.DataFrame:
"""Expand `activity_all` rows into per-leg rows with resolved CodeID values.
Args:
act: DataFrame containing `activity_all`-like rows.
verbose: Verbosity level for console output.
Returns:
DataFrame with one row per leg and columns
[`start_time`, `end_time`, `codeid_id`, `CodeID`, `foot`].
"""
if act.empty:
return pd.DataFrame(columns=["start_time", "end_time", "codeid_id", "CodeID", "foot"])
activity_leg_like = []
# Collect unique codeid_ids first to avoid N+1 queries
unique_codeid_ids = sorted({
int(codeid_id)
for _, row in act.iterrows()
for codeid_id in row["codeid_ids"]
if codeid_id is not None
})
codeid_map = {}
if unique_codeid_ids:
with self.pg_conn.cursor() as cursor:
cursor.execute(
"SELECT id, codeid FROM codeids WHERE id = ANY(%s);",
(unique_codeid_ids,)
)
codeid_map = {row_id: codeid for row_id, codeid in cursor.fetchall()}
for _, row in act.iterrows():
if verbose > 1:
print(i18n._("VB_REG_ACT_ALL").format(row=row))
for i, foot in enumerate(row["active_legs"]):
current_codeid_id = row["codeid_ids"][i]
resolved_codeid = codeid_map.get(int(current_codeid_id))
if resolved_codeid is None:
raise ValueError(
i18n._("ERR_CODEID_NOT_FOUND").format(id=current_codeid_id)
)
activity_leg_like.append({
"start_time": row["start_time"],
"end_time": row["end_time"],
"codeid_id": current_codeid_id,
"CodeID": resolved_codeid,
"foot": foot
})
df_legs = pd.DataFrame(activity_leg_like)
if verbose > 0:
print(i18n._("VB-ACT-ALL-LEGS").format(ns=df_legs.shape[0]))
if df_legs.shape[0] > 0:
for col in df_legs.columns:
if isinstance(df_legs[col].dtype, DatetimeTZDtype):
df_legs[col] = df_legs[col].dt.tz_localize(None)
return df_legs
[docs]
def store_codeid(self, codeid: str, verbose: int = 0) -> tuple[int, bool]:
"""Store a unique CodeID in PostgreSQL and return its identifier.
Args:
codeid: CodeID string to insert or recover.
verbose: Verbosity level for console output.
Returns:
Tuple containing the PostgreSQL ID and a boolean indicating
whether the row was newly inserted.
"""
try:
validated_codeid = CodeID(codeid=codeid)
with self.pg_conn.cursor() as cursor:
cursor.execute(
"INSERT INTO codeids (codeid) VALUES (%s) ON CONFLICT (codeid) DO NOTHING RETURNING id;",
(validated_codeid.codeid,)
)
result = cursor.fetchone()
if result:
new_id = result[0]
self.pg_conn.commit()
if verbose >= 2:
print(i18n._("INFO_CODEID_NEW").format(codeid=codeid, id=new_id))
return new_id, True
cursor.execute("SELECT id FROM codeids WHERE codeid = %s;", (validated_codeid.codeid,))
existing_id = cursor.fetchone()[0]
if verbose >= 2:
print(i18n._("INFO_CODEID_EXIST").format(codeid=codeid, id=existing_id))
return existing_id, False
except ValidationError as e:
print(i18n._("PGSQL-VAL-COD-ERR").format(e=e))
raise
except Exception as e:
self.pg_conn.rollback()
print(i18n._("PGSQL-INS-COD-ERR").format(e=e))
raise
[docs]
def _insert_row_returning_id(self, cursor, table_name: str, row: dict[str, Any]) -> int:
"""Insert one row safely into a table and return the generated ID.
Args:
cursor: Open PostgreSQL cursor.
table_name: Destination table name.
row: Row values to insert.
Returns:
Inserted row ID.
Raises:
RuntimeError: If the insert operation does not return an ID.
"""
column_names = list(row.keys())
query = sql.SQL(
"INSERT INTO {table} ({fields}) VALUES ({values}) RETURNING id"
).format(
table=sql.Identifier(table_name),
fields=sql.SQL(", ").join(sql.Identifier(col) for col in column_names),
values=sql.SQL(", ").join(sql.Placeholder() for _ in column_names),
)
cursor.execute(query, tuple(row[col] for col in column_names))
result = cursor.fetchone()
if result is None:
raise RuntimeError(f"Insert into {table_name} did not return an id.")
return result[0]
[docs]
def _find_existing_row_id(self, cursor, table_name: str, row: dict[str, Any]) -> int | None:
"""Find an equivalent existing row for idempotent tables.
Args:
cursor: Open PostgreSQL cursor.
table_name: Table to search.
row: Candidate row values.
Returns:
Existing row ID if found, otherwise ``None``.
"""
if table_name == "activity_leg":
cursor.execute(
"""
SELECT id
FROM activity_leg
WHERE codeid_id IS NOT DISTINCT FROM %s
AND foot IS NOT DISTINCT FROM %s
AND start_time IS NOT DISTINCT FROM %s
AND end_time IS NOT DISTINCT FROM %s
AND duration IS NOT DISTINCT FROM %s
AND mac IS NOT DISTINCT FROM %s
AND device_name IS NOT DISTINCT FROM %s
AND total_value IS NOT DISTINCT FROM %s
LIMIT 1
""",
(
row["codeid_id"],
row["foot"],
row["start_time"],
row["end_time"],
row["duration"],
row.get("mac"),
row.get("device_name"),
row.get("total_value"),
),
)
elif table_name == "activity_all":
cursor.execute(
"""
SELECT id
FROM activity_all
WHERE codeid_ids IS NOT DISTINCT FROM %s
AND codeleg_ids IS NOT DISTINCT FROM %s
AND start_time IS NOT DISTINCT FROM %s
AND end_time IS NOT DISTINCT FROM %s
AND duration IS NOT DISTINCT FROM %s
AND macs IS NOT DISTINCT FROM %s
AND device_names IS NOT DISTINCT FROM %s
AND active_legs IS NOT DISTINCT FROM %s
AND is_effective IS NOT DISTINCT FROM %s
LIMIT 1
""",
(
row["codeid_ids"],
row["codeleg_ids"],
row["start_time"],
row["end_time"],
row["duration"],
row.get("macs"),
row.get("device_names"),
row.get("active_legs"),
row.get("is_effective"),
),
)
elif table_name == "effective_movement":
cursor.execute(
"""
SELECT id
FROM effective_movement
WHERE codeid_id IS NOT DISTINCT FROM %s
AND start_time IS NOT DISTINCT FROM %s
AND end_time IS NOT DISTINCT FROM %s
AND duration IS NOT DISTINCT FROM %s
AND leg IS NOT DISTINCT FROM %s
LIMIT 1
""",
(
row["codeid_id"],
row["start_time"],
row["end_time"],
row["duration"],
row["leg"],
),
)
elif table_name == "effective_gait":
cursor.execute(
"""
SELECT id
FROM effective_gait
WHERE codeid_id IS NOT DISTINCT FROM %s
AND start_time IS NOT DISTINCT FROM %s
AND end_time IS NOT DISTINCT FROM %s
AND duration IS NOT DISTINCT FROM %s
LIMIT 1
""",
(
row["codeid_id"],
row["start_time"],
row["end_time"],
row["duration"],
),
)
else:
return None
result = cursor.fetchone()
return result[0] if result else None
[docs]
def _upsert_like_row_returning_id(self, cursor, table_name: str, row: dict[str, Any]) -> int:
"""Reuse an existing row ID when possible, otherwise insert a new row.
Args:
cursor: Open PostgreSQL cursor.
table_name: Destination table name.
row: Candidate row values.
Returns:
Existing or newly inserted row ID.
"""
existing_id = self._find_existing_row_id(cursor, table_name, row)
if existing_id is not None:
if table_name == "effective_gait":
self._update_effective_gait_row(cursor, existing_id, row)
return existing_id
return self._insert_row_returning_id(cursor, table_name, row)
[docs]
def store_data(self, table_name: str, data: pd.DataFrame, verbose: int = 1) -> list[int]:
"""Validate and store rows into a PostgreSQL table.
Args:
table_name: Destination table name.
data: DataFrame of rows to validate and store.
verbose: Verbosity level for console output.
Returns:
List of inserted or reused row IDs.
"""
if data.empty:
if verbose > 0:
print(i18n._("PGSQL-INS-TAB-NOD-ERR").format(table_name=table_name))
return []
try:
if verbose > 0:
print(i18n._("PGSQL-INS-TAB-INFO").format(table_name=table_name))
data_to_store = data.copy()
if "start_time" in data_to_store.columns:
data_to_store["start_time"] = data_to_store["start_time"].astype(str)
if "end_time" in data_to_store.columns:
data_to_store["end_time"] = data_to_store["end_time"].astype(str)
validated_rows = []
for _, row in data_to_store.iterrows():
row_dict = row.to_dict()
if table_name == "activity_leg":
validated_rows.append(ActivityLeg(**row_dict).model_dump())
elif table_name == "effective_movement":
validated_rows.append(EffectiveMovement(**row_dict).model_dump())
elif table_name == "activity_all":
if "codeleg_ids" in row_dict:
row_dict["codeleg_ids"] = [
-1 if v is None else int(v) for v in row_dict["codeleg_ids"]
]
validated_rows.append(ActivityAll(**row_dict).model_dump())
elif table_name == "fullref_sensor_codeid":
validated_rows.append(ActivitySegment(**row_dict).model_dump())
elif table_name == "effective_gait":
validated_rows.append(EffectiveGait(**row_dict).model_dump())
elif table_name == "codeids":
validated_rows.append(CodeID(**row_dict).model_dump())
else:
raise ValueError(i18n._("ERR_UNKNOWN_TABLE").format(table=table_name))
inserted_ids: list[int] = []
with self.pg_conn.cursor() as cursor:
for row in validated_rows:
if table_name in {"activity_leg", "activity_all", "effective_movement", "effective_gait"}:
row_id = self._upsert_like_row_returning_id(cursor, table_name, row)
else:
row_id = self._insert_row_returning_id(cursor, table_name, row)
inserted_ids.append(row_id)
self.pg_conn.commit()
if verbose > 0:
print(i18n._("PGSQL-INS-TAB-OK").format(table_name=table_name))
if verbose > 1:
print(i18n._("PGSQL-LST-INS").format(ids=inserted_ids))
return inserted_ids
except ValidationError as e:
print(i18n._("PGSQL-VAL-TAB-ERR").format(e=e))
return []
except Exception as e:
self.pg_conn.rollback()
print(i18n._("PGSQL-INS-TAB-ERR").format(e=e))
return []
[docs]
def get_real_codeid(self, codeid_id: int) -> str:
"""Retrieve the CodeID string associated with a numeric PostgreSQL ID.
Args:
codeid_id: Numeric ID stored in the `codeids` table.
Returns:
CodeID string associated with that identifier.
Raises:
ValueError: If the given ID does not exist in the `codeids` table.
"""
try:
query = "SELECT codeid FROM codeids WHERE id = %s;"
with self.pg_conn.cursor() as cursor:
cursor.execute(query, (codeid_id,))
result = cursor.fetchone()
if result:
return result[0]
else:
raise ValueError(i18n._("ERR_CODEID_NOT_FOUND").format(id=codeid_id))
except Exception as e:
print(i18n._("PGSQL-QRY-COD-ERR").format(e=e))
raise
[docs]
def get_codeid_id_by_value(self, codeid: str) -> int | None:
"""Retrieve the numeric PostgreSQL ID associated with a CodeID string.
Args:
codeid: CodeID string stored in the `codeids` table.
Returns:
Integer ID if found, otherwise ``None``.
"""
try:
with self.pg_conn.cursor() as cursor:
cursor.execute(
"SELECT id FROM codeids WHERE codeid = %s;",
(codeid,)
)
result = cursor.fetchone()
return result[0] if result else None
except Exception as e:
print(i18n._("PGSQL-QRY-COD-ERR").format(e=e))
raise
[docs]
def get_record_all_legs(self, clegs: set, clname: str = "codeleg_ids") -> pd.DataFrame:
"""Retrieve `activity_all` rows matching a set of array-valued leg references.
Args:
clegs: Set of leg-reference arrays to match.
clname: Name of the PostgreSQL array column to compare against.
Returns:
DataFrame containing the matching `activity_all` rows.
Raises:
ValueError: If no matching rows are found.
"""
try:
array_literals = [sql.SQL("ARRAY[{}]").format(
sql.SQL(', ').join(map(sql.Literal, pair))
) for pair in clegs]
# Build ARRAY[...] literals for the IN clause
in_clause = sql.SQL(', ').join(array_literals)
query = sql.SQL("SELECT * FROM activity_all WHERE {} IN ({})").format(
sql.Identifier(clname),in_clause)
with self.pg_conn.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchall()
if result:
columns = [desc[0] for desc in cursor.description]
df = pd.DataFrame(result, columns=columns)
# Due to the lack of TZ info we must change
for col in df.columns:
if isinstance(df[col].dtype, DatetimeTZDtype):
df[col] = df[col].dt.tz_localize(None)
return df
else:
raise ValueError(i18n._("PGSQL-QRY-CLNAME-NONE").format(clname=clname,clegs=clegs))
except Exception as e:
print(i18n._("PGSQL-QRY-COD-ERR").format(e=e))
raise
[docs]
def get_activity_ids_by_start_date_range(
self,
start_datetime: str | datetime.datetime,
end_datetime: str | datetime.datetime
) -> list[int]:
"""Return distinct `activity_all` IDs whose start time falls inside a range.
Args:
start_datetime: Start of the time window as string or datetime.
end_datetime: End of the time window as string or datetime.
Returns:
Sorted list of matching `activity_all.id` values.
"""
try:
sdt = ensure_utc(start_datetime) # → aware UTC datetime
edt = ensure_utc(end_datetime)
with self.pg_conn.cursor() as cur:
cur.execute(
"""
SELECT DISTINCT id
FROM activity_all
WHERE start_time >= %s AND start_time <= %s
ORDER BY id
""",
(sdt, edt),
)
rows = cur.fetchall()
return [r[0] for r in rows]
except Exception as e:
print(i18n._("PGSQL-QRY-GEN-ERR").format(e=e))
return []
[docs]
def _update_effective_gait_row(
self,
cursor,
row_id: int,
row: dict[str, Any]
) -> None:
"""Update GPS enrichment fields for an existing `effective_gait` row.
Args:
cursor: Open PostgreSQL cursor.
row_id: Identifier of the row to update.
row: Dictionary containing the GPS-related values to persist.
"""
cursor.execute(
"""
UPDATE effective_gait
SET gps_points = %s,
gps_distance_m = %s,
gps_elapsed_sec = %s,
gps_avg_speed_m_s = %s,
gps_validated = %s
WHERE id = %s
""",
(
row.get("gps_points"),
row.get("gps_distance_m"),
row.get("gps_elapsed_sec"),
row.get("gps_avg_speed_m_s"),
row.get("gps_validated"),
row_id,
),
)