import pandas as pd
from datetime import datetime
from msTools.data_manager import DataManager
from msTools import i18n
from msTools.timeutils import ensure_utc
[docs]
class CodeIDProcessor:
"""Process CodeIDs from raw InfluxDB data into semantic activity frames."""
def __init__(self, data_manager: DataManager, verbose: int = 0) -> None:
"""Initialize the CodeID processor.
Args:
data_manager: DataManager instance used for database interactions.
verbose: Verbosity level for console output.
"""
self.data_manager = data_manager
self.influx_client = data_manager.get_influx_client()
self.bucket = data_manager.bucket
self.verbose = verbose
[docs]
def fetch_codeid_data(
self,
codeid: str,
start_datetime: datetime,
end_datetime: datetime
) -> pd.DataFrame:
"""Fetch aggregated sensor-count data for a given CodeID from InfluxDB.
Args:
codeid: Unique CodeID string.
start_datetime: Start of the time range.
end_datetime: End of the time range.
Returns:
DataFrame containing the InfluxDB query results.
"""
# Normalize to UTC format
start_str = ensure_utc(start_datetime).isoformat().replace("+00:00", "Z")
end_str = ensure_utc(end_datetime).isoformat().replace("+00:00", "Z")
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start_str}, stop: {end_str})
|> filter(fn: (r) => r["CodeID"] == "{codeid}" and r["_field"] == "Ax")
|> aggregateWindow(every: 1m, fn: count, createEmpty: false)
|> keep(columns: ["_time", "CodeID", "_field", "_value", "Foot", "lat", "lng", "mac", "DeviceName"])
'''
try:
result = self.influx_client.query_api().query(
org=self.data_manager.config['influxdb']['org'], query=query
)
data = [record.values for table in result for record in table.records]
df = pd.DataFrame(data)
if df.empty or "_time" not in df.columns:
if self.verbose >= 1:
print(i18n._("NO_DATA_CODEID").format(codeid=codeid))
return pd.DataFrame()
df = df.sort_values("_time")
if self.verbose >= 2:
print(i18n._("DATA_RETRIEVED_CODEID").format(
codeid=codeid, rows=len(df)
))
return df
except Exception as e:
# Ensure the exception is cast to string for formatting
print(i18n._("ERR_INFLUX_FETCH").format(
codeid=codeid, error=str(e)
))
return pd.DataFrame()
[docs]
def identify_activity_segments(
self,
df: pd.DataFrame,
threshold_seconds: float = 70,
foot: str = "Left"
) -> pd.DataFrame:
"""Identify contiguous activity windows based on time gaps.
Args:
df: Raw count DataFrame containing a `_time` column.
threshold_seconds: Maximum gap in seconds allowed inside one segment.
foot: Foot to filter (`"Left"` or `"Right"`).
Returns:
DataFrame with columns
[`time_from`, `time_until`, `CodeID`, `DeviceName`, `Foot`,
`total_value`, `mac`].
"""
def grouping(block: pd.DataFrame, thresh: float) -> pd.DataFrame:
"""Group rows into segments based on time gaps and device changes.
Args:
block: Filtered DataFrame for one foot.
thresh: Threshold in seconds to start a new segment.
Returns:
Aggregated segment DataFrame with start and end times.
"""
block = block.assign(
_time_diff=block['_time'].diff().dt.total_seconds()
)
block = block.assign(
group=((block['_time_diff'] > thresh) |
(block['DeviceName'] != block['DeviceName'].shift())).cumsum()
)
result = block.groupby('group').agg(
time_from=pd.NamedAgg(column='_time', aggfunc='first'),
time_until=pd.NamedAgg(column='_time', aggfunc='last'),
CodeID=pd.NamedAgg(column='CodeID', aggfunc='first'),
DeviceName=pd.NamedAgg(column='DeviceName', aggfunc='first'),
Foot=pd.NamedAgg(column='Foot', aggfunc='first'),
total_value=pd.NamedAgg(column='_value', aggfunc='sum'),
mac=pd.NamedAgg(column='mac', aggfunc='first')
).reset_index(drop=True)
return result
if df.empty:
if self.verbose >= 1:
print(i18n._("MSG_NO_DATA_DF"))
# Return empty with expected columns
return pd.DataFrame(columns=[
'time_from','time_until','CodeID','DeviceName','Foot','total_value','mac'
])
# Ensure '_time' is datetime with timezone
df = df.copy()
df["_time"] = pd.to_datetime(df["_time"], errors="coerce")
df = df.dropna(subset=["_time"])
if df.empty:
if self.verbose >= 1:
print(i18n._("MSG_NO_DATA_DF"))
return pd.DataFrame(columns=[
'time_from', 'time_until', 'CodeID', 'DeviceName', 'Foot', 'total_value', 'mac'
])
if df["_time"].dt.tz is None:
df["_time"] = df["_time"].dt.tz_localize("Europe/Madrid")
# Drop unwanted columns, sort by time
clean = df.drop(columns=['result', 'table', '_field', 'lng', 'lat'], errors='ignore') \
.sort_values("_time")
filtered = clean[clean['Foot'] == foot]
grouped = grouping(filtered, threshold_seconds)
return grouped if not grouped.empty else pd.DataFrame(columns=[
'time_from','time_until','CodeID','DeviceName','Foot','total_value','mac'
])
[docs]
def build_activity_leg_frames(
self,
sensor_data: pd.DataFrame,
codeid_id: int,
gap_threshold_seconds: float = 80.0,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Build left/right activity_leg frames bottom-up from raw sensor counts.
Args:
sensor_data (pd.DataFrame): Raw aggregated sensor data for one CodeID.
codeid_id (int): Internal PostgreSQL id for the CodeID.
gap_threshold_seconds (float): Maximum temporal gap to keep samples
inside the same activity segment.
Returns:
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
- activity_seg_left: raw left-leg segments
- activity_seg_right: raw right-leg segments
- activity_seg_left_merge: left-leg segments prepared for downstream merge
- activity_seg_right_merge: right-leg segments prepared for downstream merge
"""
leg_merge_columns = [
"time_from",
"time_until",
"CodeID",
"device_name",
"foot",
"total_value",
"mac",
"codeid_id",
"codeleg_id",
]
activity_seg_left = self.identify_activity_segments(
sensor_data,
gap_threshold_seconds,
"Left",
)
activity_seg_right = self.identify_activity_segments(
sensor_data,
gap_threshold_seconds,
"Right",
)
if not activity_seg_left.empty:
activity_seg_left = activity_seg_left.loc[
(activity_seg_left["time_until"] - activity_seg_left["time_from"])
.dt.total_seconds() > 0
].copy()
if not activity_seg_right.empty:
activity_seg_right = activity_seg_right.loc[
(activity_seg_right["time_until"] - activity_seg_right["time_from"])
.dt.total_seconds() > 0
].copy()
activity_seg_left_merge = pd.DataFrame(columns=leg_merge_columns)
activity_seg_right_merge = pd.DataFrame(columns=leg_merge_columns)
if not activity_seg_left.empty:
activity_seg_left_merge = activity_seg_left.rename(
columns={"DeviceName": "device_name", "Foot": "foot"}
).copy()
activity_seg_left_merge["codeid_id"] = codeid_id
if not activity_seg_right.empty:
activity_seg_right_merge = activity_seg_right.rename(
columns={"DeviceName": "device_name", "Foot": "foot"}
).copy()
activity_seg_right_merge["codeid_id"] = codeid_id
return (
activity_seg_left,
activity_seg_right,
activity_seg_left_merge,
activity_seg_right_merge,
)
[docs]
def build_activity_all_frame(
self,
activity_seg_right_merge: pd.DataFrame,
activity_seg_left_merge: pd.DataFrame,
) -> pd.DataFrame:
"""Build activity_all bottom-up from right/left activity_leg frames.
Args:
activity_seg_right_merge (pd.DataFrame): Right-leg activity_leg-like frame.
activity_seg_left_merge (pd.DataFrame): Left-leg activity_leg-like frame.
Returns:
pd.DataFrame: activity_all-like DataFrame ready to be stored.
"""
if activity_seg_right_merge.empty or activity_seg_left_merge.empty:
return pd.DataFrame()
intersections = self.inter_segs(activity_seg_right_merge, activity_seg_left_merge)
if intersections.empty:
return pd.DataFrame()
return self.merge_activity_legs_to_all(
activity_seg_right_merge,
activity_seg_left_merge,
intersections,
)
[docs]
def inter_segs(
self,
sg1: pd.DataFrame,
sg2: pd.DataFrame
) -> pd.DataFrame:
"""Compute temporal intersections between two sets of segments.
Args:
sg1: DataFrame of segments for leg 1.
sg2: DataFrame of segments for leg 2.
Returns:
DataFrame of overlapping intervals with index references.
"""
def overlaps(r):
return (r['time_from_1'] <= r['time_until_2'] and
r['time_from_2'] <= r['time_until_1'])
def intersection(r):
return pd.Series({
'time_from': max(r['time_from_1'], r['time_from_2']),
'time_until': min(r['time_until_1'], r['time_until_2'])
})
if sg1.empty or sg2.empty:
return pd.DataFrame(columns=[
'time_from','time_until','R1_id','R2_id','codeid_id_1','codeid_id_2'
])
a = sg1.reset_index().rename(columns={'index':'R1_id'})
b = sg2.reset_index().rename(columns={'index':'R2_id'})
cross = a.merge(b, how='cross', suffixes=('_1','_2'))
cross['intersects'] = cross.apply(overlaps, axis=1)
intr = cross[cross['intersects']]
if intr.empty:
return pd.DataFrame(columns=[
'time_from','time_until','R1_id','R2_id','codeid_id_1','codeid_id_2'
])
intr.loc[:, ['time_from','time_until']] = intr.apply(intersection, axis=1)
return intr[['time_from','time_until','R1_id','R2_id','codeid_id_1','codeid_id_2']].reset_index(drop=True)
[docs]
def merge_activity_legs_to_all(
self,
act_segR: pd.DataFrame,
act_segL: pd.DataFrame,
inter: pd.DataFrame
) -> pd.DataFrame:
"""Merge left and right leg activity segments into an activity_all frame.
Args:
act_segR: Right-leg segment DataFrame.
act_segL: Left-leg segment DataFrame.
inter: DataFrame of bilateral temporal intersections.
Returns:
DataFrame ready to be inserted into the `activity_all` table.
"""
def format_mac(addr: str) -> str:
"""Convert hyphenated MAC suffix into colon-separated hex."""
raw = addr.split('-')[-1]
return ':'.join(raw[i:i+2] for i in range(0, len(raw), 2))
# Join on R1_id and R2_id, then select relevant columns
merged = inter.merge(
act_segR[['CodeID','device_name','foot','mac','codeleg_id']],
left_on='R1_id', right_index=True, suffixes=('','_R')
).merge(
act_segL[['CodeID','device_name','foot','mac','codeleg_id']],
left_on='R2_id', right_index=True, suffixes=('_R','_L')
)
cols = [
'time_from','time_until',
'CodeID_R','device_name_R','foot_R','mac_R',
'CodeID_L','device_name_L','foot_L','mac_L',
'codeid_id_1','codeid_id_2','codeleg_id_R','codeleg_id_L'
]
df = merged[cols].copy()
# Ensure MACs are colon-separated
if not df['mac_R'].str.contains(':').any():
df['mac_R'] = df['mac_R'].apply(format_mac)
if not df['mac_L'].str.contains(':').any():
df['mac_L'] = df['mac_L'].apply(format_mac)
# Build final columns and drop intermediates
df['is_effective'] = False
df['duration'] = (df['time_until'] - df['time_from']).dt.total_seconds()
df['macs'] = df.apply(lambda r: [r['mac_L'], r['mac_R']], axis=1)
df['codeid_ids'] = df.apply(lambda r: [r['codeid_id_2'], r['codeid_id_1']], axis=1)
df['codeleg_ids'] = df.apply(lambda r: [r['codeleg_id_L'], r['codeleg_id_R']], axis=1)
df['device_names'] = df.apply(lambda r: [r['device_name_L'], r['device_name_R']], axis=1)
df['active_legs'] = df.apply(lambda r: [r['foot_L'], r['foot_R']], axis=1)
df.rename(columns={'time_from':'start_time','time_until':'end_time'}, inplace=True)
df.drop(columns=[
'CodeID_R','device_name_R','foot_R','mac_R',
'CodeID_L','device_name_L','foot_L','mac_L',
'codeleg_id_L','codeleg_id_R'
], inplace=True)
return df
[docs]
def save_to_postgresql(self, table_name: str, df: pd.DataFrame) -> None:
"""Save a processed DataFrame to PostgreSQL using DataManager.
Args:
table_name: Destination table name.
df: DataFrame to insert.
"""
if df.empty:
print(i18n._("MSG_NO_DATA_SAVE").format(table=table_name))
return
try:
self.data_manager.store_data(table_name, df)
except Exception as e:
print(i18n._("ERR_SAVE_TABLE").format(
table=table_name, error=str(e)
))