import argparse
from datetime import datetime, timedelta, timezone
from msTools import i18n
from msGait.movement_detector import MovementDetector
[docs]
class VAction(argparse.Action):
"""
Custom argparse Action to handle cumulative verbosity (-v).
"""
def __call__(self, parser, namespace, values, option_string=None):
if values is None:
setattr(namespace, self.dest, getattr(namespace, self.dest) + 1)
else:
setattr(namespace, self.dest, int(values))
[docs]
def parse_range_list(rango_str: str) -> list[int]:
"""Convert a string like '1-271' or '1,5,10-15' into a list of integers.
Args:
rango_str: Range/list specification.
Returns:
list[int]: Sorted list of IDs.
"""
result = set()
# Dividir por comas para manejar múltiples segmentos (ej. '1,10-15')
for segment in rango_str.split(','):
if not segment:
continue
# Si contiene un guion, es un rango (ej. '10-15')
if '-' in segment:
try:
# Extraer inicio y fin del rango
start, end = map(int, segment.split('-'))
# Añadir el rango de enteros al resultado
result.update(range(start, end + 1))
except ValueError:
raise ValueError(f"Formato de rango inválido: {segment}. Debe ser 'inicio-fin'.")
# Si no hay guion, es un solo ID (ej. '5')
else:
try:
result.add(int(segment))
except ValueError:
raise ValueError(f"Formato de ID inválido: {segment}. Debe ser un número.")
return sorted(result) # Devolver la lista ordenada
[docs]
def _default_last_hours_window(hours_back: int) -> tuple[str, str]:
"""Return ISO timestamps (UTC) for the last ``hours_back`` hours window.
Args:
hours_back: Number of hours to look back from *now*.
Returns:
tuple[str, str]: (fstart, fend) as ISO strings with "Z" suffix.
"""
now = datetime.now(timezone.utc)
start = now - timedelta(hours=hours_back)
# Use Zulu format acceptable by Influx/our pipeline downstream
fstart = start.isoformat().replace("+00:00", "Z")
fend = now.isoformat().replace("+00:00", "Z")
return fstart, fend
[docs]
def main() -> None:
# 1) Pre‐parse only -l/--lang (to avoid showing help prematurely)
pre = argparse.ArgumentParser(add_help=False)
pre.add_argument(
"-l", "--lang", dest="lng", type=str, default="es",
help="Language: [en|es]"
)
pre_args, remaining = pre.parse_known_args()
# 2) Initialize translation using selected language
i18n.init_translation(pre_args.lng)
_ = i18n._
# 3) Build the real ArgumentParser, using _() for all user‐facing text
parser = argparse.ArgumentParser(
description=_("ARG_TIT_FIND_GAIT")
)
parser.add_argument(
"-i", "--ids", dest="act_all_ids", metavar="IDS", type=parse_range_list,
required=False,
help=_("ARG_LIST_ACT_ALL_IDS. Formato 1-271 o 1,5,10-15")
)
parser.add_argument(
"-c", "--config", dest="config_file", type=str, required=True,
help=_("ARG_STR_PATH_YAML")
)
parser.add_argument(
"-l", "--lang", dest="lng", type=str, default=pre_args.lng,
help=_("ARG_STR_LNG")
)
parser.add_argument(
"-o", "--output", dest="fout", type=str, default=None,
help=_("ARG_STR_FOUT")
)
parser.add_argument(
"-v", "--verbose", action=VAction, nargs="?", default=0, const=1,
help=_("ARG_VB_LEVEL")
)
parser.add_argument(
"--head-rows", dest="head_rows", type=int, default=8,
help=_("ARG_HEAD_ROWS")
)
parser.add_argument(
"--hours-back", dest="hours_back", type=int, default=25,
help="If --ids is omitted, look back the last N hours (default: 25)."
)
parser.add_argument(
"--save", dest="save", type=int, choices=[0,1], default=1,
help=_("ARG_SAVE")
)
# 4) Parse the remaining arguments
args = parser.parse_args(remaining)
# 5) If language was changed here, re‐initialize translation
if args.lng != pre_args.lng:
i18n.init_translation(args.lng)
_ = i18n._
# Determine retrieval mode: explicit IDs vs last-N-hours window
use_ids = args.act_all_ids is not None and len(args.act_all_ids) > 0
fstart, fend = (None, None)
if not use_ids:
# Fallback: last N hours window (defaults to 25h)
fstart, fend = _default_last_hours_window(args.hours_back)
if args.verbose >= 1:
print(_("FGAIT_USING_LAST_HOURS").format(n=args.hours_back))
# Initialize the MovementDetector (internally handles DataManager and segment retrieval)
# Supports either supplying IDs or date ranges
detector = MovementDetector(
config_file = args.config_file,
fstart = fstart,
fend = fend,
ids = (args.act_all_ids if use_ids else None),
verbose = args.verbose
)
try:
# If no leg data was retrieved, exit
if detector.df_legs.empty:
return
if args.verbose >= 1:
print(_("FGAIT_1ST"))
# Detect effective movements per leg
df_effective = detector.detect_effective_movement(
detector.df_legs,
args.fout,
args.verbose
)
if df_effective.empty:
print(_("FGAIT_NO_WALK"))
return
if args.verbose >= 2:
print(_("FGAIT_WKLS_FND"))
print(df_effective.head(args.head_rows))
# Optionally save effective_movement to PostgreSQL
if args.save == 1:
detector.save_to_postgresql("effective_movement", df_effective, args.verbose)
if args.verbose >= 1:
print(_("FGAIT_NUM_WALKS").format(ns=len(df_effective)))
# Detect simultaneous effective gait periods (both feet)
df_gait = detector.detect_effective_gait(df_effective, args.verbose)
df_gait = detector.validate_gait_with_gps(df_gait, args.verbose)
if df_gait.empty:
if args.verbose >= 1:
print(_("NO_GAIT_PERIODS"))
else:
if args.verbose >= 1:
print(_("GAIT_PERIODS_HEADER"))
if args.verbose >= 2:
df_string = df_gait.to_string(index=False)
indentation = " "
indented = "\n".join(indentation + line for line in df_string.splitlines())
print(indented)
if args.save == 1:
detector.save_to_postgresql("effective_gait", df_gait, args.verbose)
if args.verbose >= 1:
print(_("GAIT_SAVED_COUNT").format(n=len(df_gait)))
if args.verbose >= 1:
print(_("FGAIT_END"))
finally:
detector.close()
if __name__ == "__main__":
main()