msTools package

The msTools package provides the shared infrastructure used by the rest of the repository. It centralizes configuration loading, time normalization, internationalization, data validation through Pydantic models, and database access for both InfluxDB and PostgreSQL.

Architecture Overview

digraph class_msTools {
   rankdir=TB;
   graph [fontname="Helvetica"];
   node  [shape=record, fontname="Helvetica"];
   edge  [fontname="Helvetica"];

   DataManager [label="{DataManager|
     + __init__(config_path)\l
     + load_config(config_path)\l
     + get_config(sect)\l
     + get_influx_client()\l
     + get_codeids_in_range(start_datetime, end_datetime)\l
     + fetch_data(query)\l
     + segments_retrieval(fstart, fend, ids, verbose)\l
     + recover_activity_all(act, verbose)\l
     + store_codeid(codeid, verbose)\l
     + transform_activityleg(data)\l
     + store_data(table_name, data, verbose)\l
     + get_real_codeid(codeid_id)\l
     + get_codeid_id_by_value(codeid)\l
     + get_record_all_legs(clegs, clname)\l
     + get_activity_ids_by_start_date_range(start_datetime, end_datetime)\l
     + close_pg()\l
     + close_influxdb()\l
     + close_all()\l
   }"];

   Models [label="{Pydantic models|
     CodeID\l
     ActivityLeg\l
     ActivityAll\l
   }"];

   TimeUtils [label="{timeutils|
     + ensure_utc(ts)\l
   }"];

   I18N [label="{i18n|
     + detect_language(...)\l
     + available_languages(...)\l
     + init_translation(...)\l
     + set_locale_for_formatting(...)\l
     + gettext(...)\l
   }"];

   DataManager -> Models;
   DataManager -> TimeUtils;
   DataManager -> I18N;
}

DataManager and shared utilities

Core Components

DataManager (msTools.data_manager)

DataManager is the main integration layer of the project.

Its responsibilities include:

  • loading project configuration from config.yaml

  • opening and closing PostgreSQL and InfluxDB connections

  • retrieving CodeIDs from InfluxDB

  • retrieving bilateral activity_all windows from PostgreSQL

  • expanding bilateral windows into one row per leg

  • validating rows with Pydantic before insertion

  • storing semantic tables with idempotent behaviour for the main outputs

  • updating GPS-related fields in effective_gait when required

Important public methods include:

  • __init__(config_path: str) -> None

  • load_config(config_path: str) -> dict[str, Any]

  • get_config(sect: str) -> dict[str, Any] | None

  • get_influx_client() -> InfluxDBClient

  • get_codeids_in_range(start_datetime: str, end_datetime: str) -> list[str]

  • fetch_data(query: str) -> pandas.DataFrame

  • segments_retrieval(fstart: str | None = None, fend: str | None = None, ids: list[int] | None = None, verbose: int = 0) -> pandas.DataFrame

  • recover_activity_all(act: pandas.DataFrame, verbose: int = 0) -> pandas.DataFrame

  • store_codeid(codeid: str, verbose: int = 0) -> tuple[int, bool]

  • transform_activityleg(data: pandas.DataFrame) -> pandas.DataFrame

  • store_data(table_name: str, data: pandas.DataFrame, verbose: int = 1) -> list[int]

  • get_real_codeid(codeid_id: int) -> str

  • get_codeid_id_by_value(codeid: str) -> int | None

  • get_record_all_legs(clegs: set, clname: str = "codeleg_ids") -> pandas.DataFrame

  • get_activity_ids_by_start_date_range(start_datetime: str | datetime, end_datetime: str | datetime) -> list[int]

  • close_pg() -> None

  • close_influxdb() -> None

  • close_all() -> None

models (msTools.models)

The shared Pydantic models used before PostgreSQL insertion are:

  • CodeID

  • ActivityLeg

  • ActivityAll

These models help validate semantic records before they are persisted.

timeutils (msTools.timeutils)

This module provides:

  • ensure_utc(ts: str | pandas.Timestamp | datetime) -> pandas.Timestamp

It is used to normalize timestamps consistently before querying InfluxDB or PostgreSQL. Naive timestamps are interpreted as Europe/Madrid local time and then converted to UTC.

i18n (msTools.i18n)

This module provides lightweight internationalization helpers based on gettext:

  • detect_language(...)

  • available_languages(...)

  • init_translation(...)

  • set_locale_for_formatting(...)

  • gettext(...)

It allows the CLI tools and other modules to expose translated messages while keeping a single shared implementation.

How msTools fits into the pipeline

The package supports both main repository stages.

  1. Bottom-up semantic construction

    • raw wearable data is queried from InfluxDB

    • activity_leg is built per foot

    • bilateral overlaps are merged into activity_all

  2. Movement and gait detection

    • activity_all windows are read from PostgreSQL

    • bilateral windows are expanded into per-leg rows

    • downstream modules derive effective_movement

    • bilateral gait events are stored in effective_gait

    • effective_gait rows may be enriched with GPS metrics such as travelled distance, elapsed time, average speed, and validation flag

Configuration

The package reads configuration from the project-level config.yaml.

Example:

influxdb:
  url: "https://<HOST>:8086"
  token: "<YOUR_TOKEN>"
  org: "<ORG>"
  bucket: "<BUCKET>"
  measurement: "<MEASUREMENT>"
  verify: false
  timeout: 900000

postgresql:
  host: "<PG_HOST>"
  port: 5432
  user: "<USER>"
  password: "<PASSWORD>"
  database: "<DB_NAME>"

Notes

  • Semantic timestamps are handled with timezone awareness.

  • Database inserts are validated with Pydantic models.

  • The storage logic is idempotent for the main semantic tables.

  • effective_gait can include GPS enrichment fields: gps_points, gps_distance_m, gps_elapsed_sec, gps_avg_speed_m_s, and gps_validated.

API Reference

Data manager

class msTools.data_manager.DataManager(config_path: str)[source]

Bases: object

_connect_postgresql() connection[source]

Create and return a PostgreSQL connection.

Returns:

Open psycopg2 PostgreSQL connection.

_find_existing_row_id(cursor, table_name: str, row: dict[str, Any]) int | None[source]

Find an equivalent existing row for idempotent tables.

Parameters:
  • cursor – Open PostgreSQL cursor.

  • table_name – Table to search.

  • row – Candidate row values.

Returns:

Existing row ID if found, otherwise None.

_insert_row_returning_id(cursor, table_name: str, row: dict[str, Any]) int[source]

Insert one row safely into a table and return the generated ID.

Parameters:
  • cursor – Open PostgreSQL cursor.

  • table_name – Destination table name.

  • row – Row values to insert.

Returns:

Inserted row ID.

Raises:

RuntimeError – If the insert operation does not return an ID.

_update_effective_gait_row(cursor, row_id: int, row: dict[str, Any]) None[source]

Update GPS enrichment fields for an existing effective_gait row.

Parameters:
  • cursor – Open PostgreSQL cursor.

  • row_id – Identifier of the row to update.

  • row – Dictionary containing the GPS-related values to persist.

_upsert_like_row_returning_id(cursor, table_name: str, row: dict[str, Any]) int[source]

Reuse an existing row ID when possible, otherwise insert a new row.

Parameters:
  • cursor – Open PostgreSQL cursor.

  • table_name – Destination table name.

  • row – Candidate row values.

Returns:

Existing or newly inserted row ID.

check_and_create_tables(sql_file_path: str) None[source]

Create required PostgreSQL tables when they do not exist.

Parameters:

sql_file_path – Path to the SQL file containing the table definitions.

close_all() None[source]

Closes both PostgreSQL and InfluxDB connections.

close_influxdb() None[source]

Closes the InfluxDB client.

close_pg() None[source]

Closes the PostgreSQL connection.

fetch_data(query: str) DataFrame[source]

Execute a SQL query in PostgreSQL and return the result as a DataFrame.

Parameters:

query – SQL query string.

Returns:

DataFrame containing the query results.

get_activity_ids_by_start_date_range(start_datetime: str | datetime, end_datetime: str | datetime) list[int][source]

Return distinct activity_all IDs whose start time falls inside a range.

Parameters:
  • start_datetime – Start of the time window as string or datetime.

  • end_datetime – End of the time window as string or datetime.

Returns:

Sorted list of matching activity_all.id values.

get_codeid_id_by_value(codeid: str) int | None[source]

Retrieve the numeric PostgreSQL ID associated with a CodeID string.

Parameters:

codeid – CodeID string stored in the codeids table.

Returns:

Integer ID if found, otherwise None.

get_codeids_in_range(start_datetime: str, end_datetime: str) list[str][source]

Retrieve unique CodeIDs from InfluxDB inside a time range.

Parameters:
  • start_datetime – Start datetime as a string.

  • end_datetime – End datetime as a string.

Returns:

Sorted list of unique CodeID values found in the interval.

get_config(sect: str) dict[str, Any] | None[source]

Return one configuration section from the loaded config.

Parameters:

sect – Name of the section to retrieve.

Returns:

The requested configuration section, or None if it does not exist.

get_influx_client() InfluxDBClient[source]

Return the active InfluxDB client instance.

Returns:

InfluxDB client used by the repository.

get_real_codeid(codeid_id: int) str[source]

Retrieve the CodeID string associated with a numeric PostgreSQL ID.

Parameters:

codeid_id – Numeric ID stored in the codeids table.

Returns:

CodeID string associated with that identifier.

Raises:

ValueError – If the given ID does not exist in the codeids table.

get_record_all_legs(clegs: set, clname: str = 'codeleg_ids') DataFrame[source]

Retrieve activity_all rows matching a set of array-valued leg references.

Parameters:
  • clegs – Set of leg-reference arrays to match.

  • clname – Name of the PostgreSQL array column to compare against.

Returns:

DataFrame containing the matching activity_all rows.

Raises:

ValueError – If no matching rows are found.

load_config(config_path: str) dict[str, Any][source]

Load configuration values from a YAML file.

Parameters:

config_path – Path to the YAML configuration file.

Returns:

Parsed configuration dictionary.

recover_activity_all(act: DataFrame, verbose: int = 0) DataFrame[source]

Expand activity_all rows into per-leg rows with resolved CodeID values.

Parameters:
  • act – DataFrame containing activity_all-like rows.

  • verbose – Verbosity level for console output.

Returns:

DataFrame with one row per leg and columns [start_time, end_time, codeid_id, CodeID, foot].

segments_retrieval(fstart: str | None = None, fend: str | None = None, ids: list[int] | None = None, verbose: int = 0) DataFrame[source]

Retrieve activity_all rows by explicit IDs or by overlapping time window.

Parameters:
  • fstart – Start datetime when retrieving by time range.

  • fend – End datetime when retrieving by time range.

  • ids – Explicit list of activity_all IDs to retrieve.

  • verbose – Verbosity level for console output.

Returns:

DataFrame with columns [id, start_time, end_time, duration, codeid_ids, codeleg_ids, active_legs].

Raises:

ValueError – If neither a valid ID list nor a valid time window is provided.

store_codeid(codeid: str, verbose: int = 0) tuple[int, bool][source]

Store a unique CodeID in PostgreSQL and return its identifier.

Parameters:
  • codeid – CodeID string to insert or recover.

  • verbose – Verbosity level for console output.

Returns:

Tuple containing the PostgreSQL ID and a boolean indicating whether the row was newly inserted.

store_data(table_name: str, data: DataFrame, verbose: int = 1) list[int][source]

Validate and store rows into a PostgreSQL table.

Parameters:
  • table_name – Destination table name.

  • data – DataFrame of rows to validate and store.

  • verbose – Verbosity level for console output.

Returns:

List of inserted or reused row IDs.

transform_activityleg(data: DataFrame) DataFrame[source]

Transform raw leg-segment data into the schema expected by activity_leg.

Parameters:

data – DataFrame containing raw activity-leg segments.

Returns:

DataFrame normalized to the activity_leg storage schema.

Models

class msTools.models.ActivityAll(*, codeid_ids: list[int] = <factory>, codeleg_ids: list[int] = <factory>, start_time: str, end_time: str, duration: float, macs: list[str] = <factory>, active_legs: list[str] = <factory>, device_names: list[str] = <factory>, is_effective: bool = False)[source]

Bases: BaseModel

_abc_impl = <_abc._abc_data object>
active_legs: list[str]
codeid_ids: list[int]
codeleg_ids: list[int]
device_names: list[str]
duration: float
end_time: str
is_effective: bool
macs: list[str]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

start_time: str
class msTools.models.ActivityLeg(*, codeid_id: int, foot: str, start_time: str, end_time: str, duration: float, total_value: float, mac: str | None = None, device_name: str | None = None)[source]

Bases: BaseModel

_abc_impl = <_abc._abc_data object>
codeid_id: int
device_name: str | None
duration: float
end_time: str
foot: str
mac: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

start_time: str
total_value: float
class msTools.models.CodeID(*, codeid: str, id: int | None = None)[source]

Bases: BaseModel

_abc_impl = <_abc._abc_data object>
codeid: str
id: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Time utilities

msTools.timeutils.ensure_utc(ts: str | Timestamp | datetime) Timestamp[source]

Convert a timestamp-like value to a UTC-aware pandas Timestamp.

If the input is timezone-naive, it is assumed to be in Europe/Madrid before being converted to UTC. If the input already has timezone information, it is converted directly to UTC.

Parameters:

ts – Datetime-like value to normalize. Supported inputs include strings, pandas.Timestamp and datetime.datetime.

Returns:

A timezone-aware pandas.Timestamp normalized to UTC.

Raises:

ValueError – If the timestamp becomes ambiguous or invalid during timezone localization (for example, around DST transitions).

Internationalization

Lightweight i18n utilities using gettext and OS locales.

This module centralizes internationalization (i18n) setup for command-line scripts and libraries in the project. It provides:

  • Language detection based on explicit preference, environment variables, and system locale.

  • Loading of compiled gettext catalogs (.mo) from a standard path (locales/<lang>/LC_MESSAGES/<domain>.mo).

  • Global aliases _ and ngettext after initialization so you can mark translatable strings throughout your code.

  • Optional process locale configuration to influence date/number formatting (independent from gettext message language).

The code follows Google-style docstrings, making it easy to generate API documentation with Sphinx + napoleon.

Example

>>> from msTools import i18n
>>> i18n.init_translation("es", domain="msgait", localedir="locales")
<gettext.GNUTranslations ...>
>>> _ = i18n._
>>> print(_("HELLO"))
Hola

Notes

  • init_translation installs the alias _ into Python builtins.

  • If no .mo is found, translations gracefully fallback to a NullTranslations (no exception, messages returned as-is).

msTools.i18n._(message: str) str

Translate a single message using the active catalog.

This is a small helper that proxies to the active translations object if available; otherwise it returns message unchanged. Most code should use the global alias _ installed by init_translation().

Parameters:

message – The source message string to translate.

Returns:

The translated string if a catalog is active; otherwise, message.

Example

>>> init_translation("en")
<gettext.GNUTranslations ...>
>>> gettext("HELLO")
'HELLO'
msTools.i18n.available_languages(localedir: str | PathLike = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/healthywear-gait-identification/checkouts/latest/locales'), *, domain: str = 'msGait') list[str][source]

List all languages for which a compiled catalog exists.

It scans <localedir>/<lang>/LC_MESSAGES/<domain>.mo and returns the <lang> parts found.

Parameters:
  • localedir – Base directory where locale folders live.

  • domain – Gettext domain (catalog name without extension).

Returns:

Sorted list of language directory names (e.g., ["en", "es"]).

Example

>>> available_languages("locales", domain="msGait")
['en', 'es']
msTools.i18n.detect_language(preferred: str | None = None) str[source]

Detect a reasonable language code to use for message translation.

The function tries, in order: an explicit preferred value, environment variables (LC_ALL, LC_MESSAGES, LANG), and the system locale. The return value is normalized to a short language code (e.g., "es") whenever possible.

Parameters:

preferred – Explicit language preference such as "es", "es_ES", or "en_US". If provided, it takes precedence over environment and system settings.

Returns:

A language code string (e.g., "es" or "en"). If no suitable value can be determined, "en" is returned.

Example

>>> detect_language("es_ES")
'es'
>>> os.environ["LANG"] = "en_US.UTF-8"
>>> detect_language(None)
'en'
msTools.i18n.gettext(message: str) str[source]

Translate a single message using the active catalog.

This is a small helper that proxies to the active translations object if available; otherwise it returns message unchanged. Most code should use the global alias _ installed by init_translation().

Parameters:

message – The source message string to translate.

Returns:

The translated string if a catalog is active; otherwise, message.

Example

>>> init_translation("en")
<gettext.GNUTranslations ...>
>>> gettext("HELLO")
'HELLO'
msTools.i18n.init_translation(lang: str | None = None, *, domain: str = 'msGait', localedir: str | PathLike = PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/healthywear-gait-identification/checkouts/latest/locales')) NullTranslations[source]

Initialize gettext and expose _/ngettext.

This function configures gettext for the given language and domain. It also installs the global alias _ into builtins (via trans.install()), and updates the module-level _ and ngettext references so that other modules importing msTools.i18n can immediately use them.

Parameters:
  • lang – Explicit language preference ("es", "en", "es_ES", …). If None, a language will be detected via detect_language().

  • domain – Gettext domain (catalog base name, without extension).

  • localedir – Base directory where locale folders live.

Returns:

The loaded gettext translations object. If no .mo is found, a NullTranslations object is returned (strings remain unchanged).

Example

>>> trans = init_translation("es", domain="msGait", localedir="locales")
>>> _ = _  # alias provided by this module
>>> print(_("HELLO"))
Hola
msTools.i18n.ngettext(s, p, n)
msTools.i18n.set_locale_for_formatting(lang: str | None) None[source]

Attempt to set the process locale for regional formatting.

This affects functions that rely on the C locale (e.g., number and date formatting via locale or datetime.strftime). It does not affect gettext translations; use init_translation() for message language.

Parameters:

lang – Locale name such as "es_ES" or "en_US". If None or the desired locale is not available on the system, this function does nothing.

Example

>>> set_locale_for_formatting("es_ES")
>>> import datetime
>>> datetime.datetime(2025, 1, 2).strftime("%x")  # may print in Spanish format
'02/01/25'