Source code for ms_monitoring.find_mscodeids

import argparse
import sys
import pytz
from datetime import datetime, timedelta
import pandas as pd

from msTools import i18n
from msTools.data_manager import DataManager
from msCodeID.codeid_processor import CodeIDProcessor
from msTools.timeutils import ensure_utc


[docs] class VAction(argparse.Action): """ Custom argparse Action to handle cumulative verbosity (-v). """ def __call__(self, parser, namespace, values, option_string=None): if values is None: setattr(namespace, self.dest, getattr(namespace, self.dest) + 1) else: setattr(namespace, self.dest, int(values))
[docs] def main() -> None: # 1) Pre-parse only -l/--lang (so we don’t show help in the wrong language yet) pre = argparse.ArgumentParser(add_help=False) pre.add_argument( "-l", "--lang", dest="lng", type=str, default="es", help="Language: [en|es]" ) pre_args, remaining = pre.parse_known_args() # 2) Initialize translation with the chosen language i18n.init_translation(pre_args.lng) _ = i18n._ # 3) Build the fully localized ArgumentParser parser = argparse.ArgumentParser( description=_("DESC_FIND_MSCodeIDs") ) parser.add_argument( "-l", "--lang", dest="lng", type=str, default=pre_args.lng, help=_("ARG_STR_LNG") ) parser.add_argument( "-f", "--from", dest="from_date", metavar="FROM_DATE", type=str, required=False, help=_("ARG_STR_TIME_FROM") ) parser.add_argument( "-u", "--until", dest="until_date", metavar="UNTIL_DATE", type=str, required=False, help=_("ARG_STR_TIME_UNTIL") ) parser.add_argument( "-c", "--config", dest="config_file", metavar="CONFIG_FILE", type=str, required=True, help=_("ARG_STR_PATH_YAML") ) parser.add_argument( "-v", "--verbose", action=VAction, nargs="?", default=0, const=1, help=_("ARG_VB_LEVEL") ) parser.add_argument( "--head-rows", dest="head_rows", type=int, default=5, help=_("ARG_HEAD_ROWS") ) parser.add_argument( "--save", dest="save", type=int, choices=[0,1], default=1, help=_("ARG_SAVE") ) args = parser.parse_args(remaining) # If the user changed -l here, re-initialize translation if args.lng != pre_args.lng: i18n.init_translation(args.lng) _ = i18n._ # Initialize DataManager and CodeIDProcessor data_manager = DataManager(config_path=args.config_file) codeid_processor = CodeIDProcessor(data_manager, verbose=args.verbose) try: # Handle dates using ensure_utc() if args.from_date: start_datetime = ensure_utc(args.from_date) else: # default: midnight yesterday tmp = datetime.now() - timedelta(days=1) tmp = tmp.replace(hour=0, minute=0, second=0, microsecond=0) start_datetime = ensure_utc(tmp) if args.until_date: end_datetime = ensure_utc(args.until_date) else: end_datetime = ensure_utc(datetime.now(pytz.timezone("Europe/Madrid"))) end_datetime = ensure_utc(end_datetime) # If end is before start, error and exit if end_datetime < start_datetime: sys.stderr.write( _("ERR_END_BEFORE_START") .format(end=end_datetime, start=start_datetime) ) sys.exit(1) # Verbose info about date range if args.verbose >= 1: print( _("MSG_GET_MSCodeIDs_RANGE") .format( start=start_datetime.strftime('%Y-%m-%d %H:%M:%S'), end=end_datetime.strftime('%Y-%m-%d %H:%M:%S') ) ) # Fetch CodeIDs in the given date range codeids = data_manager.get_codeids_in_range( start_datetime.strftime("%Y-%m-%d %H:%M:%S"), end_datetime.strftime("%Y-%m-%d %H:%M:%S") ) if not codeids: print(_("MSG_NO_CODEIDS_FOUND")) return if args.verbose >= 1: print(_("MSG_FOUND_N_CODEIDS").format( n=len(codeids), start=args.from_date or start_datetime, end=args.until_date or end_datetime )) if args.verbose >= 3: print(_("LBL_CODEIDS_HEADER")) for cid in codeids: print(_("LIST_ITEM_CODEID").format(cid=cid)) # Process each CodeID for codeid in codeids: if args.verbose >= 1: print(_("MSG_PROCESSING_CODEID").format(codeid=codeid)) # Store the CodeID in PostgreSQL and get its internal ID if args.save == 1: try: codeid_id, is_new = data_manager.store_codeid(codeid, args.verbose) except Exception as e: print(_("ERR_STORING_CODEID").format(codeid=codeid, error=str(e))) continue else: # Dry-run mode: do not write to PostgreSQL. Use a placeholder ID. codeid_id, is_new = (-1, False) # Fetch sensor data for this CodeID try: sensor_data = codeid_processor.fetch_codeid_data( codeid, start_datetime, end_datetime ) except Exception as e: print(_("ERR_FETCH_DATA").format(codeid=codeid, error=str(e))) continue if sensor_data.empty: if args.verbose >= 1: print(_("MSG_NO_DATA_FOR_CODEID").format(codeid=codeid)) continue # Robust check: ensure 'Foot' column is present if 'Foot' not in sensor_data.columns: print(_("ERR_FOOT_MISSING").format(codeid=codeid), file=sys.stderr) continue # Identify activity segments with an 80-second gap threshold try: ( activity_segL, activity_segR, activity_segL_merge, activity_segR_merge, ) = codeid_processor.build_activity_leg_frames( sensor_data=sensor_data, codeid_id=codeid_id, gap_threshold_seconds=80.0, ) if activity_segL.empty: if args.verbose >= 1: print(_("WARN_NO_SEGMENTS_LEG").format(codeid=codeid, foot='Left')) activity_refL = None else: if args.save == 1: activity_refL = data_manager.transform_activityleg(activity_segL) else: activity_refL = None if activity_segR.empty: if args.verbose >= 1: print(_("WARN_NO_SEGMENTS_LEG").format(codeid=codeid, foot='Right')) activity_refR = None else: if args.save == 1: activity_refR = data_manager.transform_activityleg(activity_segR) else: activity_refR = None # Store activity_leg segments if not activity_segL.empty: if args.save == 1: ids = data_manager.store_data("activity_leg", activity_refL, verbose=args.verbose) if len(ids) != len(activity_refL): raise RuntimeError("Failed to store all Left activity_leg rows.") n_ref = len(activity_refL) else: ids = list(range(1, len(activity_segL_merge) + 1)) n_ref = len(activity_segL_merge) activity_segL_merge['codeleg_id'] = ids if args.verbose >= 2: print(_("INFO_SEGMENTS_STORED").format(n=n_ref)) print(activity_segL_merge.head(args.head_rows)) if not activity_segR.empty: if args.save == 1: ids = data_manager.store_data("activity_leg", activity_refR, verbose=args.verbose) if len(ids) != len(activity_refR): raise RuntimeError("Failed to store all Right activity_leg rows.") n_ref = len(activity_refR) else: ids = list(range(100000, 100000 + len(activity_segR_merge))) n_ref = len(activity_segR_merge) activity_segR_merge['codeleg_id'] = ids if args.verbose >= 2: print(_("INFO_SEGMENTS_STORED").format(n=n_ref)) print(activity_segR_merge.head(args.head_rows)) # Compute intersection between left/right segments merged = codeid_processor.build_activity_all_frame( activity_seg_right_merge=activity_segR_merge, activity_seg_left_merge=activity_segL_merge, ) if not merged.empty: if args.save == 1: data_manager.store_data("activity_all", merged, verbose=args.verbose) if args.verbose >= 2: print(_("INFO_MERGED_STORED").format(n=len(merged))) print(merged.head(args.head_rows)) except Exception as e: print(_("ERR_PROCESS_SEGMENTS").format(codeid=codeid, error=str(e))) # Final summary if args.verbose >= 1: print(_("INFO_ALL_PROCESSED")) finally: data_manager.close_all() del data_manager return
if __name__ == "__main__": main()