Source code for msGait.movement_detector

import pandas as pd
import numpy as np

from pandas import ExcelWriter
from pandas.api.types import DatetimeTZDtype

from msTools.data_manager import DataManager
from msTools import i18n
from msTools.timeutils import ensure_utc
from msGait.models import EffectiveMovement

from scipy.signal import welch
from pydantic import ValidationError


[docs] class MovementDetector: """Detects effective movement and gait periods using raw sensor data from a data manager.""" def __init__( self, config_file: str, sampling_rate: float | None = None, sect: str = "movement", fstart: str | None = None, fend: str | None = None, ids: list[int] | None = None, verbose: int = 1 ) -> None: """Initializes the movement detector, loads configurations and activity data. Args: config_file (str): Path to the YAML configuration file. sampling_rate (float | None): Sampling rate of the sensor data (in Hz). If None, it is read from config. sect (str): Section in the YAML config for movement detection parameters. fstart (str | None): Optional start timestamp for activity query. fend (str | None): Optional end timestamp for activity query. ids (list[int] | None): Optional list of segment IDs to retrieve. verbose (int): Verbosity level for logging (0 = silent, 1 = info, 2 = debug). """ self.verbose = verbose # Initialize DataManager self.data_manager = DataManager(config_path=config_file) self.ids = ids if self.ids is None and fstart is not None and fend is not None: try: resolved_ids = self.data_manager.get_activity_ids_by_start_date_range(fstart, fend) if verbose >= 1: if resolved_ids: print(f"[MovementDetector] {len(resolved_ids)} ID(s) found in activity_all for the time window.") else: print("[MovementDetector] No IDs found in activity_all for the requested time window.") if resolved_ids: self.ids = resolved_ids except Exception as e: if verbose >= 1: print(f"[MovementDetector] ID lookup by start_date failed: {e}") # Retrieve activity segments based on IDs or time range from activity_all table self.activity_all = self.data_manager.segments_retrieval( fstart=fstart, fend=fend, ids=self.ids, verbose=verbose ) if self.activity_all.empty and verbose >= 1: print(i18n._("FGAIT_NO_WINS")) # Retrieve full sensor-linked activity self.df_legs = self.data_manager.recover_activity_all( self.activity_all, verbose=verbose ) # Load detection parameters from config params = self.data_manager.get_config(sect) if params is None: raise ValueError(f"Missing configuration section: {sect}") self.sampling_rate = float( sampling_rate if sampling_rate is not None else params.get("sampling_rate", 47.0) ) self.resample_hz = float(params.get("resample_hz", 100.0)) self.window_size_samples = int(params.get("window_size_samples", 256)) self.min_window_fraction = float(params.get("min_window_fraction", 0.5)) self.min_effective_duration_sec = float(params.get("min_effective_duration_sec", 6.0)) self.min_gait_duration_sec = float(params.get("min_gait_duration_sec", 6.0)) self.freq_band = (params["freq_band_min"], params["freq_band_max"]) self.min_continuous_hits = params["min_continuous_hits"] self.accel_threshold = params.get("accel_threshold", 0.2) self.gyro_threshold = params.get("gyro_threshold", 50) self.accel_power_threshold = params.get("accel_power_threshold",0.1) self.gyro_power_threshold = params.get("gyro_power_threshold",1000) self.gps_resample_seconds = int(params.get("gps_resample_seconds", 10)) self.gps_padding_seconds = int(params.get("gps_padding_seconds", 15)) self.gps_min_points = int(params.get("gps_min_points", 2)) self.gps_min_distance_m = float(params.get("gps_min_distance_m", 3.0)) self.gps_min_speed_m_s = float(params.get("gps_min_speed_m_s", 0.2)) self.gps_max_speed_m_s = float(params.get("gps_max_speed_m_s", 3.0))
[docs] def close(self) -> None: """Closing all the opened connections""" self.data_manager.close_all()
[docs] def fetch_sensor_data(self, start_time: str, end_time: str, codeid_id: int, foot: str) -> pd.DataFrame: """Fetches raw sensor data from InfluxDB for a specific time interval and limb. Args: start_time (str): Start time in ISO format. end_time (str): End time in ISO format. codeid_id (int): Identifier to map to real CodeID. foot (str): 'Left' or 'Right'. Returns: pd.DataFrame: Sensor data with fields Ax, Ay, Az, Gx, Gy, Gz, and timestamps. """ try: codeid = self.data_manager.get_real_codeid(codeid_id) except ValueError as e: if self.verbose >= 1: print(i18n._("PGSQL-QRY-GEN-ERR").format(e=e)) return pd.DataFrame() start_time = ensure_utc(start_time).isoformat().replace("+00:00", "Z") end_time = ensure_utc(end_time).isoformat().replace("+00:00", "Z") query = f''' from(bucket: "{self.data_manager.bucket}") |> range(start: {start_time}, stop: {end_time}) |> filter(fn: (r) => r["CodeID"] == "{codeid}" and r["Foot"] == "{foot}") |> filter(fn: (r) => r["_field"] == "Ax" or r["_field"] == "Ay" or r["_field"] == "Az" or r["_field"] == "Gx" or r["_field"] == "Gy" or r["_field"] == "Gz") |> pivot(rowKey:["_time"], columnKey:["_field"], valueColumn:"_value") ''' try: result = self.data_manager.influxdb_client.query_api().query( query=query, org=self.data_manager.config['influxdb']['org'] ) data = [record.values for table in result for record in table.records] return pd.DataFrame(data) except Exception as e: msg = str(e) if "cannot query an empty range" in msg: if self.verbose >= 2: print(i18n._("WARN_EMPTY_RANGE").format(codeid=codeid, foot=foot)) return pd.DataFrame() if self.verbose >= 1: print(i18n._("INFL-QRY-DATA-ERR").format(e=e)) return pd.DataFrame()
[docs] def fetch_gps_data( self, start_time: str, end_time: str, codeid_id: int ) -> pd.DataFrame: """Fetches GPS data from InfluxDB for a specific time interval. Args: start_time (str): Start time in ISO format. end_time (str): End time in ISO format. codeid_id (int): Identifier to map to real CodeID. Returns: pd.DataFrame: GPS data with columns '_time', 'lat', 'lng'. """ try: codeid = self.data_manager.get_real_codeid(codeid_id) except ValueError as e: if self.verbose >= 1: print(i18n._("PGSQL-QRY-GEN-ERR").format(e=e)) return pd.DataFrame(columns=["_time", "lat", "lng"]) start_time = ensure_utc(start_time).isoformat().replace("+00:00", "Z") end_time = ensure_utc(end_time).isoformat().replace("+00:00", "Z") query = f''' from(bucket: "{self.data_manager.bucket}") |> range(start: {start_time}, stop: {end_time}) |> filter(fn: (r) => r["CodeID"] == "{codeid}") |> filter(fn: (r) => r["_field"] == "Ax") |> keep(columns: ["_time", "CodeID", "lat", "lng"]) ''' try: result = self.data_manager.influxdb_client.query_api().query( query=query, org=self.data_manager.config["influxdb"]["org"] ) data = [record.values for table in result for record in table.records] df = pd.DataFrame(data) if df.empty: return pd.DataFrame(columns=["_time", "lat", "lng"]) df["_time"] = pd.to_datetime(df["_time"], errors="coerce") df = df.dropna(subset=["_time"]).copy() if "lat" not in df.columns or "lng" not in df.columns: return pd.DataFrame(columns=["_time", "lat", "lng"]) df["lat"] = pd.to_numeric(df["lat"], errors="coerce") df["lng"] = pd.to_numeric(df["lng"], errors="coerce") df = ( df.dropna(subset=["lat", "lng"]) .sort_values("_time") .drop_duplicates(subset=["_time", "lat", "lng"]) .reset_index(drop=True) ) return df[["_time", "lat", "lng"]] except Exception as e: msg = str(e) if "cannot query an empty range" in msg: return pd.DataFrame(columns=["_time", "lat", "lng"]) if self.verbose >= 1: print(i18n._("INFL-QRY-DATA-ERR").format(e=e)) return pd.DataFrame(columns=["_time", "lat", "lng"])
[docs] @staticmethod def _haversine_distance_m( lat1: np.ndarray, lon1: np.ndarray, lat2: np.ndarray, lon2: np.ndarray ) -> np.ndarray: """Compute haversine distance in meters between consecutive GPS points.""" earth_radius_m = 6371000.0 lat1_rad = np.radians(lat1) lon1_rad = np.radians(lon1) lat2_rad = np.radians(lat2) lon2_rad = np.radians(lon2) dlat = lat2_rad - lat1_rad dlon = lon2_rad - lon1_rad a = ( np.sin(dlat / 2.0) ** 2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon / 2.0) ** 2 ) c = 2.0 * np.arctan2(np.sqrt(a), np.sqrt(1.0 - a)) return earth_radius_m * c
[docs] @staticmethod def _prepare_gps_track( df_gps: pd.DataFrame, resample_seconds: int ) -> pd.DataFrame: """Clean and downsample GPS track to a manageable cadence.""" if df_gps.empty: return pd.DataFrame(columns=["_time", "lat", "lng"]) work = df_gps.copy() work["_time"] = pd.to_datetime(work["_time"], errors="coerce", utc=True) work["lat"] = pd.to_numeric(work["lat"], errors="coerce") work["lng"] = pd.to_numeric(work["lng"], errors="coerce") work = work.dropna(subset=["_time", "lat", "lng"]).copy() if work.empty: return pd.DataFrame(columns=["_time", "lat", "lng"]) work = ( work.sort_values("_time") .drop_duplicates(subset="_time", keep="last") .set_index("_time") ) rule = f"{resample_seconds}s" work = work.resample(rule).first().dropna(subset=["lat", "lng"]).reset_index() return work[["_time", "lat", "lng"]]
[docs] @classmethod def _summarize_prepared_gps_track( cls, gps_track: pd.DataFrame, min_points: int, min_distance_m: float, min_speed_m_s: float, max_speed_m_s: float ) -> dict[str, int | float | bool]: """Summarize one already-prepared GPS window.""" if gps_track.empty or len(gps_track) < 2: return { "gps_points": int(len(gps_track)), "gps_distance_m": 0.0, "gps_elapsed_sec": 0.0, "gps_avg_speed_m_s": 0.0, "gps_validated": False, } lat1 = gps_track["lat"].to_numpy()[:-1] lon1 = gps_track["lng"].to_numpy()[:-1] lat2 = gps_track["lat"].to_numpy()[1:] lon2 = gps_track["lng"].to_numpy()[1:] distances = cls._haversine_distance_m(lat1, lon1, lat2, lon2) total_distance_m = float(np.nansum(distances)) elapsed_sec = float( (gps_track["_time"].iloc[-1] - gps_track["_time"].iloc[0]).total_seconds() ) avg_speed_m_s = total_distance_m / elapsed_sec if elapsed_sec > 0 else 0.0 gps_validated = ( len(gps_track) >= min_points and total_distance_m >= min_distance_m and min_speed_m_s <= avg_speed_m_s <= max_speed_m_s ) return { "gps_points": int(len(gps_track)), "gps_distance_m": total_distance_m, "gps_elapsed_sec": elapsed_sec, "gps_avg_speed_m_s": avg_speed_m_s, "gps_validated": gps_validated, }
[docs] def validate_gait_with_gps( self, df_gait: pd.DataFrame, verbose: int = 0 ) -> pd.DataFrame: """Validate detected gait episodes against GPS displacement. This is an external validation layer. It does not replace inertial gait detection; it enriches each gait episode with GPS support metrics. """ if df_gait.empty: return df_gait.copy() work = df_gait.copy() work["start_time"] = pd.to_datetime(work["start_time"], errors="coerce", utc=True) work["end_time"] = pd.to_datetime(work["end_time"], errors="coerce", utc=True) work = work.dropna(subset=["start_time", "end_time"]).copy() if work.empty: return work padding = pd.to_timedelta(self.gps_padding_seconds, unit="s") enriched_parts: list[pd.DataFrame] = [] for codeid_id, group in work.groupby("codeid_id"): fetch_start = group["start_time"].min() - padding fetch_end = group["end_time"].max() + padding gps_raw = self.fetch_gps_data(fetch_start, fetch_end, int(codeid_id)) gps_track = self._prepare_gps_track( gps_raw, self.gps_resample_seconds ) rows: list[dict[str, object]] = [] for _, gait_row in group.iterrows(): window_start = gait_row["start_time"] - padding window_end = gait_row["end_time"] + padding if gps_track.empty: window_track = gps_track else: window_track = gps_track[ (gps_track["_time"] >= window_start) & (gps_track["_time"] <= window_end) ].copy() gps_summary = self._summarize_prepared_gps_track( gps_track=window_track, min_points=self.gps_min_points, min_distance_m=self.gps_min_distance_m, min_speed_m_s=self.gps_min_speed_m_s, max_speed_m_s=self.gps_max_speed_m_s, ) output_row = gait_row.to_dict() output_row.update(gps_summary) rows.append(output_row) enriched_group = pd.DataFrame(rows) enriched_parts.append(enriched_group) if verbose >= 2: validated_count = int(enriched_group["gps_validated"].sum()) print( f"[GPS] codeid_id={codeid_id} | gait_rows={len(enriched_group)} | " f"gps_validated={validated_count}" ) return pd.concat(enriched_parts, ignore_index=True)
[docs] def resample_sensor_data(self, df: pd.DataFrame, target_hz: float) -> pd.DataFrame: """Resamples raw sensor data to a fixed temporal grid. Args: df (pd.DataFrame): Raw sensor data containing a '_time' column. target_hz (float): Target resampling frequency in Hz. Returns: pd.DataFrame: Resampled sensor data with a regular '_time' grid. """ if df.empty: return df if "_time" not in df.columns: raise ValueError(i18n._("ERR_MISSING_TIME_COLUMN")) numeric_cols = [col for col in ["Ax", "Ay", "Az", "Gx", "Gy", "Gz"] if col in df.columns] if not numeric_cols: return pd.DataFrame() work = df[["_time"] + numeric_cols].copy() work["_time"] = pd.to_datetime(work["_time"], errors="coerce") work = work.dropna(subset=["_time"]) if work.empty: return pd.DataFrame() work = ( work.sort_values("_time") .drop_duplicates(subset="_time", keep="last") .set_index("_time") ) if len(work) < 2: return work.reset_index() sample_period = pd.to_timedelta(1 / target_hz, unit="s") resampled = ( work.resample(sample_period) .mean() .interpolate(method="time", limit_direction="both") .reset_index() ) return resampled
[docs] def calculate_magnitude(self, df: pd.DataFrame) -> pd.DataFrame: """Calculates signal magnitudes for acceleration and gyroscope data. Args: df (pd.DataFrame): DataFrame containing raw sensor values. Returns: pd.DataFrame: Same DataFrame with added ``|a|`` and ``|g|`` columns. """ if {"Ax", "Ay", "Az", "Gx", "Gy", "Gz"}.issubset(df.columns): df["|a|"] = np.sqrt(df["Ax"]**2 + df["Ay"]**2 + df["Az"]**2) df["|g|"] = np.sqrt(df["Gx"]**2 + df["Gy"]**2 + df["Gz"]**2) else: if self.verbose >= 1: print(i18n._("MVDET-AG-NODAT")) return df
[docs] def is_effective_by_welch(self, signal: np.ndarray, power_threshold: float, sampling_rate: float) -> bool: """Determines if a signal contains sufficient power in a frequency band. Args: signal (np.ndarray): Input signal array. power_threshold (float): Minimum band power required to mark the segment as effective. sampling_rate (float): Sampling rate used for spectral analysis. Returns: bool: True if power within the target frequency band exceeds threshold. """ if len(signal) < 2: return False freqs, power = welch( signal, fs=sampling_rate, nperseg=min(len(signal), self.window_size_samples) ) band_power = power[(freqs >= self.freq_band[0]) & (freqs <= self.freq_band[1])].sum() return band_power >= power_threshold
[docs] def is_effective_by_time(self, signal: np.ndarray, threshold: float) -> bool: """Checks if the signal remains active for a continuous minimum duration. Args: signal (np.ndarray): Input signal array. threshold (float): Activity threshold. Returns: bool: True if signal is continuously active long enough. """ std_signal = np.std(signal) if std_signal < 0.01: return False active = np.abs(signal) > threshold consecutive = np.diff( np.where( np.concatenate(([active[0]], active[:-1] != active[1:], [True])) )[0] )[::2] return (len(consecutive) >= self.min_continuous_hits)
[docs] def detect_effective_movement( self, activity_windows: pd.DataFrame, output_filename: str | None = None, verbose: int = 0 ) -> pd.DataFrame: """Detects intervals of effective movement from sensor data. Args: activity_windows (pd.DataFrame): DataFrame containing rows with start_time, end_time, codeid_id, and foot. output_filename (str, optional): Path to an Excel file for exporting raw data (default is None). verbose (int): Verbosity level (0 = silent, 1 = info, 2 = debug). Returns: pd.DataFrame: Validated segments with effective movement data. """ def segment_fixed_windows(df: pd.DataFrame, window_size: int) -> list[tuple[pd.Timestamp, pd.Timestamp]]: """ Split the data into consecutive fixed-size windows (by sample count), optionally keeping the final partial window if it is large enough. Args: df (pd.DataFrame): Sorted DataFrame with a '_time' column. window_size (int): Number of samples per window. Returns: list[tuple[pd.Timestamp, pd.Timestamp]]: (start_time, end_time) for each window. """ if "_time" not in df.columns: raise ValueError(i18n._("ERR_MISSING_TIME_COLUMN")) df = df.sort_values("_time").reset_index(drop=True) segments = [] total_rows = len(df) full_limit = total_rows - window_size + 1 last_full_start = None for i in range(0, max(full_limit, 0), window_size): start_time = df.loc[i, "_time"] end_time = df.loc[i + window_size - 1, "_time"] segments.append((start_time, end_time)) last_full_start = i if total_rows == 0: return segments next_start = 0 if last_full_start is None else last_full_start + window_size remaining = total_rows - next_start if 0 < remaining < window_size: min_partial_samples = max(2, int(np.ceil(window_size * self.min_window_fraction))) if remaining >= min_partial_samples: start_time = df.loc[next_start, "_time"] end_time = df.loc[total_rows - 1, "_time"] segments.append((start_time, end_time)) return segments def merge_connected_segments(segments: list[tuple[pd.Timestamp, pd.Timestamp]], max_gap_sec=5.): """Merges temporally close segments into a single one. Args: segments (list[tuple[pd.Timestamp, pd.Timestamp]]): List of segment start and end times. max_gap_sec (float): Maximum allowed gap for merging segments. Returns: list[tuple[pd.Timestamp, pd.Timestamp]]: Merged list of segments. """ if not segments: return [] segments = sorted(segments, key=lambda x: x[0]) merged = [] current_start, current_end = segments[0] for start, end in segments[1:]: gap = (start - current_end).total_seconds() if gap <= max_gap_sec: current_end = max(current_end, end) else: merged.append((current_start, current_end)) current_start, current_end = start, end merged.append((current_start, current_end)) return merged # # End of aux fucntion. # Starting point for function detect_effective_movement() if "foot" not in activity_windows.columns: raise ValueError(i18n._("MVNT-ROOT-MISS")) results = [] writer = ExcelWriter(output_filename, engine="xlsxwriter") if output_filename else None for row in activity_windows.itertuples(index=False): segments = [] valid_segments = [] if verbose >= 1: print(i18n._("MSG_TMP_IDS").format(id=row.CodeID,ref=row.codeid_id, tstart=row.start_time,tend=row.end_time)) try: start = ensure_utc(row.start_time) end = ensure_utc(row.end_time) if end <= start: if verbose >= 2: print(f"Skipping invalid segment: start={start}, end={end}") continue except Exception: if self.verbose: print(i18n._("MVNT-TS-NOV").format(row=row)) continue if pd.isnull(start) or pd.isnull(end): if self.verbose: print(i18n._("MVNT-TS-NOV").format(row=row)) continue codeid_id = row.codeid_id foot = row.foot cid = getattr(row, "CodeID", codeid_id) if verbose > 1: print(i18n._("MVNT-QRY-DAT").format( cid=cid, frm=start, dur=(end - start).total_seconds() )) sensor_data = self.fetch_sensor_data(start, end, codeid_id, foot) sensor_data.drop(columns=['result', 'table', '_start', '_stop'], inplace=True, errors='ignore') for col in sensor_data.columns: if isinstance(sensor_data[col].dtype, DatetimeTZDtype): sensor_data[col] = sensor_data[col].dt.tz_localize(None) if sensor_data.empty: continue # Export to Excel if requested if writer: sheet_base = f"{codeid_id}_{foot}_{start.strftime('%H%M%S')}"[:25] max_rows = 1_000_000 for i in range((len(sensor_data) - 1) // max_rows + 1): part = sensor_data.iloc[i*max_rows:(i+1)*max_rows].copy() for col in part.select_dtypes(['datetimetz']).columns: part[col] = pd.to_datetime(part[col], utc=True).dt.tz_localize(None) sheet = f"{sheet_base}_{i+1}" part.to_excel(writer, sheet_name=sheet, index=False) if verbose > 0: print(i18n._("MVNT-XLSX-DAT").format(shn=sheet, file=output_filename)) if verbose > 1: print(i18n._("MVNT-QRY-REC").format(ns=sensor_data.shape[0])) sensor_data = self.resample_sensor_data(sensor_data, self.resample_hz) if sensor_data.empty: continue sensor_data = self.calculate_magnitude(sensor_data) if "|a|" not in sensor_data.columns or "|g|" not in sensor_data.columns: continue sensor_data = sensor_data.sort_values("_time").reset_index(drop=True) analysis_sampling_rate = self.resample_hz # Split into fixed-size analysis windows segments = segment_fixed_windows(sensor_data, self.window_size_samples) for seg_start, seg_end in segments: seg_data = sensor_data[(sensor_data["_time"] >= seg_start) & (sensor_data["_time"] <= seg_end)] acc_seg = seg_data["|a|"].to_numpy() gyro_seg = seg_data["|g|"].to_numpy() acc_ok = self.is_effective_by_welch( acc_seg, self.accel_power_threshold, analysis_sampling_rate ) and self.is_effective_by_time(acc_seg - 1, self.accel_threshold) gyro_ok = self.is_effective_by_welch( gyro_seg, self.gyro_power_threshold, analysis_sampling_rate ) and self.is_effective_by_time(gyro_seg, self.gyro_threshold) if acc_ok or gyro_ok: valid_segments.append((seg_start, seg_end)) merged_segments = merge_connected_segments(valid_segments, max_gap_sec=10) filtered_segments = [ (mstart, mend) for mstart, mend in merged_segments if (mend - mstart).total_seconds() >= self.min_effective_duration_sec ] for mstart, mend in filtered_segments: results.append({ "codeid_id": codeid_id, "start_time": mstart.isoformat(), "end_time": mend.isoformat(), "duration": (mend - mstart).total_seconds(), "leg": foot }) if verbose >= 3: print(i18n._("LST-SEGS").format(results=pd.DataFrame(results))) if verbose >= 2: print(i18n._("MVNT-WLK-FOOT").format(cid=codeid_id, foot=foot, dur=(end - start).total_seconds())) if writer: writer.close() try: validated = [EffectiveMovement(**r).model_dump() for r in results] return pd.DataFrame(validated) except ValidationError as e: print(i18n._("MVNT-VAL-EFF-ERR").format(e=e)) return pd.DataFrame()
[docs] def detect_effective_gait(self, df_effective: pd.DataFrame, verbose: int = 0) -> pd.DataFrame: """Detect overlapping periods of effective movement for both feet. Args: df_effective (pd.DataFrame): Movement segments with columns ['codeid_id', 'start_time', 'end_time', 'duration', 'leg']. verbose (int): Verbosity level. Returns: pd.DataFrame: Gait episodes, optionally enriched with GPS validation fields. """ base_columns = ["codeid_id", "start_time", "end_time", "duration"] gps_columns = [ "gps_points", "gps_distance_m", "gps_elapsed_sec", "gps_avg_speed_m_s", "gps_validated", ] output_columns = base_columns + gps_columns if df_effective.empty: return pd.DataFrame(columns=output_columns) df = df_effective.copy() def _parse_mixed_utc_naive(value): if pd.isnull(value): return pd.NaT ts = pd.Timestamp(value) if ts.tzinfo is None: return ts return ts.tz_convert("UTC").tz_localize(None) df["start_time"] = df["start_time"].apply(_parse_mixed_utc_naive) df["end_time"] = df["end_time"].apply(_parse_mixed_utc_naive) df = df.dropna(subset=["start_time", "end_time"]).copy() gait_rows: list[dict] = [] for codeid, grp in df.groupby("codeid_id"): left = grp[grp["leg"] == "Left"][["start_time", "end_time"]] right = grp[grp["leg"] == "Right"][["start_time", "end_time"]] if left.empty or right.empty: continue left = left.assign(_key=1) right = right.assign(_key=1) merged = left.merge(right, on="_key", suffixes=("_L", "_R")).drop(columns="_key") for _, row in merged.iterrows(): start_time = max(row["start_time_L"], row["start_time_R"]) end_time = min(row["end_time_L"], row["end_time_R"]) if start_time < end_time: duration = (end_time - start_time).total_seconds() if duration >= self.min_gait_duration_sec: gait_rows.append( { "codeid_id": codeid, "start_time": start_time, "end_time": end_time, "duration": duration, } ) df_gait = pd.DataFrame(gait_rows, columns=base_columns) if df_gait.empty: return pd.DataFrame(columns=output_columns) for col in gps_columns: if col not in df_gait.columns: df_gait[col] = pd.NA return df_gait[output_columns]
[docs] def save_to_postgresql(self, table_name: str, df: pd.DataFrame, verbose: int = 0) -> None: """Saves the given DataFrame to a PostgreSQL table using the DataManager. Args: table_name (str): Name of the destination table. df (pd.DataFrame): DataFrame to store. Returns: None """ if df.empty and verbose > 0: print(i18n._("PGSQL-INS-TAB-NOD-ERR").format(table_name=table_name)) return try: self.data_manager.store_data(table_name, df, verbose) except Exception as e: print(i18n._("PGSQL-INS-TAB-ERR").format(table_name=table_name, e=e))