import numpy as np
import pandas as pd
from pandas.api.types import is_integer_dtype
import geopandas as gpd
import pyproj
from shapely.geometry import Polygon, Point
from shapely import wkt
import warnings
import h3
import nomad.io.base as loader
from nomad.constants import DEFAULT_SCHEMA, SEC_PER_UNIT
def _timestamp_handling(
ts,
output_type,
timezone=None
):
"""
Convert timestamp to either pandas Timestamp or UNIX timestamp, with optional timezone handling.
Parameters
----------
ts : str, int, float, pd.Timestamp, or np.datetime64
The input timestamp to be converted.
output_type : str
Desired output type: "pd.timestamp" or "unix".
timezone : str, optional
Timezone to localize or convert the timestamp to. If None, no timezone conversion is applied.
Returns
-------
pd.Timestamp or int
Converted timestamp in the desired format.
Notes
------
Using tz_localize is intentional. It will raise a TypeError
if the timestamp is already timezone-aware, which correctly signals
a caller error (e.g., providing a timezone for data that already has one).
"""
if ts is None:
return None
if isinstance(ts, str):
ts = pd.to_datetime(ts, errors="coerce")
elif isinstance(ts, (pd.Timestamp, np.datetime64)):
ts = pd.to_datetime(ts)
elif isinstance(ts, (int, np.integer, float, np.floating)):
ts = pd.to_datetime(ts, unit='s', errors="coerce")
else:
raise TypeError("Unsupported input type for timestamp conversion.")
if timezone:
ts = ts.tz_localize(timezone)
if output_type == "pd.timestamp":
return ts
elif output_type == "unix":
return int(ts.timestamp())
else:
raise ValueError("Invalid ts_output value. Use 'pd.timestamp' or 'unix'.")
[docs]
def to_timestamp(datetime, tz_offset=None):
"""
Convert a datetime Series or scalar into UNIX timestamps (seconds).
Parameters
----------
datetime : pd.Series, str, pd.Timestamp, or scalar
tz_offset : pd.Series, optional
Returns
-------
pd.Series or int
UNIX timestamps as nullable Int64 values (seconds since epoch)
for non-scalar inputs.
Returns scalar int if input was scalar.
"""
# Handle scalar inputs for NumPy 2.0 compatibility
is_scalar = not isinstance(datetime, (pd.Series, pd.Index))
if is_scalar:
datetime = pd.Series([datetime])
# Validate input type
if not (
pd.api.types.is_datetime64_any_dtype(datetime) or
pd.api.types.is_string_dtype(datetime) or
(pd.api.types.is_object_dtype(datetime) and loader._is_series_of_timestamps(datetime))
):
raise TypeError(
f"Input must be datetime64, string, or Series of Timestamps; got {datetime.dtype}."
)
if tz_offset is not None and not is_integer_dtype(tz_offset):
tz_offset = tz_offset.astype('int64')
# 1) tz-aware datetime64[ns, tz]
if isinstance(datetime.dtype, pd.DatetimeTZDtype):
# convert to UTC, drop tz, downcast to seconds, then int
dt_utc = datetime.dt.tz_convert('UTC').dt.tz_localize(None)
unix_s = dt_utc.astype('datetime64[s]').astype('int64')
result = unix_s if tz_offset is None else unix_s - tz_offset
return result.iloc[0] if is_scalar else result.astype('Int64')
# 2) tz-naive datetime64 (any unit)
if pd.api.types.is_datetime64_dtype(datetime):
# downcast any-unit to seconds, then int
unix_s = datetime.astype('datetime64[s]').astype('int64')
if tz_offset is None:
warnings.warn(
"Input is timezone-naive; assuming UTC. "
"Pass tz_offset or localize if needed."
)
result = unix_s
else:
result = unix_s - tz_offset
return result.iloc[0] if is_scalar else result.astype('Int64')
# 3) strings
if pd.api.types.is_string_dtype(datetime):
parsed = pd.to_datetime(datetime, errors="coerce", utc=True)
# drop tz and downcast
dt_ns = parsed.dt.tz_convert('UTC').dt.tz_localize(None)
unix_s = dt_ns.astype('datetime64[s]').astype('int64')
has_tz = datetime.str.contains(r'(?:Z|[+\-]\d{2}:\d{2})$', regex=True, na=False).any()
if not has_tz and tz_offset is None:
warnings.warn(
"String datetimes appear timezone-naive; assuming UTC."
)
if tz_offset is None:
result = unix_s
else:
# If original strings were timezone-annotated, do not apply tz_offset again
result = unix_s if has_tz else (unix_s - tz_offset)
return result.iloc[0] if is_scalar else result.astype('Int64')
# 4) object dtype of pandas.Timestamp
f = np.frompyfunc(lambda x: int(x.timestamp()), 1, 1)
unix_s = pd.Series(f(datetime).astype('int64'), index=datetime.index)
result = unix_s if tz_offset is None else unix_s - tz_offset
return result.iloc[0] if is_scalar else result.astype('Int64')
[docs]
def to_yyyymmdd(time_values, tz_offset=None):
"""
Convert datetimes/timestamps to integer YYYYMMDD.
Accepts heterogeneous inputs and optional per-row timezone offsets.
If tz_offset is provided (seconds), the date is computed in that local time;
otherwise dates are computed in UTC.
Parameters
----------
time_values : pd.Series
Series of datetime64, strings, pandas.Timestamp objects, or Unix seconds.
tz_offset : pd.Series or scalar, optional
Seconds offset from UTC to local time (e.g., -18000 for UTC-5). If provided,
the conversion uses local dates; otherwise UTC dates.
Returns
-------
pd.Series
Integer dates encoded as YYYYMMDD (dtype Int64, NA-friendly).
"""
s = pd.Series(time_values)
# Normalize to Unix seconds in UTC
if pd.api.types.is_integer_dtype(s):
unix_s = s.astype('int64')
elif pd.api.types.is_datetime64_any_dtype(s) or pd.api.types.is_string_dtype(s) or (
pd.api.types.is_object_dtype(s) and loader._is_series_of_timestamps(s)
):
unix_s = to_timestamp(s)
elif pd.api.types.is_object_dtype(s):
# attempt to parse mixed object (strings/None/Timestamps)
parsed = pd.to_datetime(s, errors='coerce', utc=True)
dt_ns = parsed.dt.tz_convert('UTC').dt.tz_localize(None)
unix_s = dt_ns.astype('datetime64[s]').astype('int64')
else:
raise TypeError("Unsupported input type for to_yyyymmdd: expected datetime-like or integer Unix seconds.")
# Shift to local time if tz_offset provided
if tz_offset is not None:
# Broadcast scalar or align Series
if isinstance(tz_offset, pd.Series):
local_unix = unix_s + tz_offset.astype('int64')
else:
local_unix = unix_s + int(tz_offset)
else:
local_unix = unix_s
dt = pd.to_datetime(local_unix, unit='s', errors='coerce')
y = dt.dt.year.astype('Int64')
m = dt.dt.month.astype('Int64')
d = dt.dt.day.astype('Int64')
yyyymmdd = y * 10000 + m * 100 + d
return yyyymmdd
[docs]
def to_zoned_datetime(utc_timestamps, timezone_offset):
naive_dt = loader.naive_datetime_from_unix_and_offset(utc_timestamps, timezone_offset)
zoned_str = loader._naive_to_localized_str(naive_dt, timezone_offset)
# mixed timezones
return pd.to_datetime(zoned_str, utc=False, errors='raise')
def _dup_per_freq_mask(sec, periods=1, freq='min', keep='first'):
bins = sec // (periods * SEC_PER_UNIT[freq])
if isinstance(sec, pd.Series):
return ~pd.Series(bins, index=sec.index).duplicated(keep=keep)
return ~pd.Series(bins).duplicated(keep=keep).to_numpy()
def _fmt_from_freq(f):
return {"s": "%Y-%m-%d %H:%M:%S",
"min": "%Y-%m-%d %H:%M",
"h": "%Y-%m-%d %H:00",
"d": "%Y-%m-%d",
"w": "%Y-%m-%d"}.get(f.lower(), "%Y-%m-%d %H:%M:%S")
[docs]
def downsample(df,
periods=1,
freq='min',
keep='first',
traj_cols=None,
verbose=False,
**kwargs):
"""
Down-sample *df* so that each user contributes at most one row
in every consecutive ``periods × freq`` window.
Parameters
----------
df : pandas.DataFrame
The input data.
periods : int, default 1
Size of the window expressed in multiples of *freq*; must be ≥ 1.
freq : {'s', 'min', 'h', 'd', 'w'}, default 'min'
Unit of the window: second, minute, hour, day, or week
(lower-case aliases).
keep : {'first', 'last', False}, default 'first'
Which duplicate inside each window to retain, matching
``pandas.Series.duplicated`` semantics.
traj_cols : dict, optional
Mapping from the standard keys `'timestamp'`, `'datetime'`,
`'user_id'`, and `'tz_offset'` to the actual column names in *df*.
Any key may be absent if the corresponding column is not present.
verbose : bool, default False
When True, prints the fraction of rows removed and the window size.
**kwargs
Shorthand overrides for entries in *traj_cols*
Returns
-------
pandas.DataFrame
A view of *df* containing the surviving rows.
Raises
------
ValueError
If *periods* is not a positive integer or *freq* is invalid.
KeyError
If no suitable time column is found after parsing *traj_cols*.
"""
if not isinstance(periods, (int, np.integer)) or periods < 1:
raise ValueError("periods must be an integer ≥ 1")
freq = freq.lower()
if freq not in SEC_PER_UNIT:
raise ValueError("freq must be one of 's', 'min', 'h', 'd', 'w'")
t_key, use_dt = loader._fallback_time_cols_dt(df.columns, traj_cols, kwargs)
traj_cols = loader._parse_traj_cols(df.columns, traj_cols, kwargs)
loader._has_time_cols(df.columns, traj_cols)
uid = traj_cols['user_id']
multi = uid in df.columns and df[uid].nunique() > 1
if use_dt:
window = f"{periods}{freq}"
if multi:
mask = df.groupby(uid)[traj_cols[t_key]].transform(
lambda s: ~s.dt.floor(window).duplicated(keep=keep))
else:
mask = ~df[traj_cols[t_key]].dt.floor(window).duplicated(keep=keep)
else:
sec = df[traj_cols[t_key]]
if traj_cols['tz_offset'] in df.columns:
sec = sec + df[traj_cols['tz_offset']]
if multi:
mask = sec.groupby(df[uid]).transform(
lambda s: _dup_per_freq_mask(s, periods, freq, keep))
else:
mask = _dup_per_freq_mask(sec, periods, freq, keep)
if verbose:
pct = 100 * (1 - mask.sum() / len(mask))
print(f"{pct:.3f}% of rows removed by downsampling to {periods}{freq} windows per user.")
return df[mask]
[docs]
def to_tessellation(
data,
index,
res,
data_crs=None,
traj_cols=None,
**kwargs
):
"""
Project coordinates from data_crs to crs_to, with robust column handling.
Parameters
----------
data : pd.DataFrame
Data to project.
index : str
One of 'h3', 'geohash', or 's2'.
data_crs : str or CRS, optional
Source CRS (default: inferred).
traj_cols : dict, optional
Mapping of logical column names to actual columns.
**kwargs
Passed to trajectory column parsing.
"""
coord_key1, coord_key2, use_lon_lat = loader._fallback_spatial_cols(data.columns, traj_cols, kwargs)
if not use_lon_lat:
if data_crs is None:
raise ValueError("data_crs must be specified for projected coordinates.")
lon_col, lat_col = to_projection(data, crs_to="EPSG:4326", data_crs=data_crs, traj_cols=traj_cols, **kwargs)
else:
traj_cols = loader._parse_traj_cols(data.columns, traj_cols, kwargs)
lon_col, lat_col = data[traj_cols['longitude']], data[traj_cols['latitude']]
if index == "h3":
out = pd.concat([lat_col, lon_col], axis=1)
out.columns = ["latitude", "longitude"]
h3_cell = out.apply(lambda row: h3.latlng_to_cell(lat=row['latitude'], lng=row['longitude'], res=res), axis=1)
h3_cell.name = "h3_cell"
return h3_cell
elif index == "geohash":
out = pd.concat([lat_col, lon_col], axis=1)
out.columns = ["latitude", "longitude"]
geohash_cell = out.apply(lambda row: pygeohash.encode(row['latitude'], row['longitude'], precision=res), axis=1)
geohash_cell.name = "geohash_cell"
return geohash_cell
elif index == "s2":
# S2 support needs an external package (e.g., s2sphere)
raise NotImplementedError("S2 tessellation is not implemented.")
else:
raise ValueError(f"Unknown tessellation index: {index}")
[docs]
def to_projection(
data,
crs_to,
data_crs=None,
traj_cols=None,
**kwargs
):
"""
Project coordinates from data_crs to crs_to, with robust column handling.
Warns if coordinate columns and CRS type appear mismatched.
Parameters
----------
data : pd.DataFrame
Data to project.
crs_to : str or CRS
Output CRS (required).
data_crs : str or CRS, optional
Source CRS (default: inferred).
traj_cols : dict, optional
Mapping of logical column names to actual columns.
**kwargs
Passed to trajectory column parsing.
Returns
-------
pd.Series, pd.Series
Projected x and y as Series, aligned to data.index
Note
-------
To assign directly, use np.column_stack. For example
df[['lon','lat']] = np.column_stack(to_projection(...))
"""
coord_key1, coord_key2, use_lon_lat = loader._fallback_spatial_cols(data.columns, traj_cols, kwargs)
traj_cols = loader._parse_traj_cols(data.columns, traj_cols, kwargs)
# CRS detection and warnings
if data_crs is None:
if use_lon_lat:
data_crs = "EPSG:4326"
else:
raise ValueError("data_crs must be specified for projected coordinates.")
crs_obj = pyproj.CRS(data_crs)
# Warn if mismatch between columns and CRS
if crs_obj.is_projected and use_lon_lat:
warnings.warn(
f"data_crs '{data_crs}' is projected but latitude/longitude columns were selected ({coord_key1}, {coord_key2})."
)
if crs_obj.is_geographic and not use_lon_lat:
warnings.warn(
f"data_crs '{data_crs}' is geographic (lat/lon) but x/y columns were selected ({coord_key1}, {coord_key2})."
)
if crs_to is None:
raise ValueError("crs_to must be specified.")
points = gpd.points_from_xy(data[traj_cols[coord_key1]], data[traj_cols[coord_key2]])
gseries = gpd.GeoSeries(points, crs=data_crs)
projected = gseries.to_crs(crs_to)
if pyproj.CRS(crs_to).is_projected:
out_coord_1 = "x"
out_coord_2 = "y"
else:
out_coord_1 = "longitude"
out_coord_2 = "latitude"
# Return unnamed series to satisfy existing tests
return pd.Series(projected.x, name=None), pd.Series(projected.y, name=None)
def _filtered_users(
traj,
start_time,
end_time,
polygon,
min_active_days,
min_pings_per_day,
traj_cols,
input_x,
input_y,
time_col,
crs
):
"""
Helper function that returns a series containing users who have at least
k distinct days with at least m pings in the polygon within the timeframe T0 to T1.
"""
# Filter by time range (this logic would not necessarily remove pings outside the timeframe.
# Rather, it use pings inside the timeframe to determine whether a user is sufficient "complete".)
# traj_filtered = traj[(traj[time_col] >= start_time) & (traj[time_col] <= end_time)].copy()
# traj_filtered[time_col] = pd.to_datetime(traj_filtered[time_col])
traj_filtered = traj.copy()
if traj_filtered.empty:
return pd.Series()
# Filter points inside the polygon
if polygon is not None:
traj_filtered = _in_geo(traj_filtered, input_x, input_y, polygon, crs)
else:
traj_filtered['in_geo'] = True
if traj_cols['tz_offset'] not in traj_filtered.columns:
traj_filtered[traj_cols['tz_offset']] = 0
warnings.warn(
f"The trajectory dataframe does not have a tz_offset (timezone offset) column."
"UTC (tz_offset=0) will be assumed.")
if traj_cols['datetime'] not in traj_filtered.columns:
traj_filtered[traj_cols['datetime']] = loader.naive_datetime_from_unix_and_offset(
traj_filtered[traj_cols[time_col]], traj_filtered[traj_cols['tz_offset']]
)
traj_filtered['date'] = pd.to_datetime(traj_filtered[traj_cols['datetime']].dt.date)
# Count pings per user per date inside the polygon
daily_ping_counts = (
traj_filtered[traj_filtered['in_geo']]
.groupby([traj_cols['user_id'], 'date'])
.size()
.reset_index(name='ping_count')
)
# Filter users who have at least `m` (`min_pings_per_day`) pings on a given day
users_with_m_pings = daily_ping_counts[daily_ping_counts['ping_count'] >= min_pings_per_day]
# Count distinct days per user that satisfy the `m` pings condition
users_with_k_days = (
users_with_m_pings
.groupby(traj_cols['user_id'])['date']
.nunique()
.reset_index(name='days_in_polygon')
)
# Select users who have at least `k` (`min_active_days`) such days
filtered_users = users_with_k_days[users_with_k_days['days_in_polygon'] >= min_active_days][traj_cols['user_id']]
return filtered_users
[docs]
def q_filter(df: pd.DataFrame,
qbar: float,
traj_cols: dict = None,
user_id: str = DEFAULT_SCHEMA["user_id"],
timestamp: str = DEFAULT_SCHEMA["timestamp"]):
"""
Computes the q statistic for each user as the proportion of unique hours with pings
over the total observed hours (last hour - first hour) and filters users where q > qbar.
Parameters
----------
df : pd.DataFrame
Input DataFrame with user_id and timestamp columns.
qbar : float
The threshold q value; users with q > qbar will be retained.
traj_cols : dict, optional
Dictionary containing column mappings,
e.g., {"user_id": "user_id", "timestamp": "timestamp"}.
user_id : str, optional
Name of the user_id column (default is "user_id").
timestamp : str, optional
Name of the timestamp column (default is "timestamp").
Returns
-------
pd.Series
A Series containing the user IDs for users whose q_stat > qbar.
"""
user_col = traj_cols.get("user_id", user_id) if traj_cols else user_id
datetime_col = traj_cols.get("timestamp", timestamp) if traj_cols else timestamp
user_q_stats = df.groupby(user_col).apply(
lambda group: _compute_q_stat(group, datetime_col)
).reset_index(name='q_stat')
# Filter users where q > qbar
filtered_users = user_q_stats[user_q_stats['q_stat'] > qbar][user_col]
return filtered_users
[docs]
def within(
df,
within,
poly_crs=None,
data_crs=None,
traj_cols=None,
**kwargs
):
"""
Return a filtered DataFrame containing only rows within the given polygon.
All arguments are passed to is_within.
"""
mask = is_within(
df,
within,
traj_cols=traj_cols,
poly_crs=poly_crs,
data_crs=data_crs,
**kwargs
)
return df.loc[mask]
[docs]
def is_within(
df,
within,
poly_crs=None,
data_crs=None,
traj_cols=None,
**kwargs,
):
"""
Filter a DataFrame to include only points within the given polygon.
Parameters
----------
df : pd.DataFrame or GeoDataFrame
Trajectory data.
within : shapely Polygon/MultiPolygon or WKT string
Polygon defining the spatial filter.
traj_cols : dict, optional
Mapping of logical trajectory column names to actual columns.
poly_crs : CRS or str, optional
CRS of the polygon.
data_crs : CRS or str, optional
CRS of the DataFrame coordinates.
**kwargs
Additional parameters for trajectory columns resolution.
Returns
-------
pd.Series
Boolean mask for which points are in the polygon within
"""
if within is None:
raise ValueError("A polygon or WKT string must be provided in 'within'.")
# Normalize polygon geometry
if isinstance(within, str):
poly = wkt.loads(within)
elif isinstance(within, Polygon):
poly = within
elif isinstance(within, gpd.GeoSeries):
poly = within.unary_union
elif isinstance(within, gpd.GeoDataFrame):
poly = within.geometry.unary_union
else:
raise TypeError("within must be WKT, shapely Polygon, GeoSeries or GeoDataFrame.")
# Determine coordinate columns
coord_key1, coord_key2, use_lat_lon = loader._fallback_spatial_cols(df.columns, traj_cols, kwargs)
traj_cols = loader._parse_traj_cols(df.columns, traj_cols, kwargs)
coord_col1 = traj_cols[coord_key1]
coord_col2 = traj_cols[coord_key2]
# Handle CRS defaults
if data_crs is None:
if use_lat_lon:
data_crs = "EPSG:4326"
warnings.warn("data_crs not provided; assuming EPSG:4326 for lon/lat.")
else:
raise ValueError(
"data_crs must be supplied for projected x/y columns or provide lat/lon instead."
)
data_crs = pyproj.CRS(data_crs)
poly_crs_final = getattr(within, "crs", None) or poly_crs
if poly_crs_final is not None:
src_crs = pyproj.CRS(poly_crs_final)
if not src_crs.equals(data_crs):
poly = gpd.GeoSeries([poly], crs=src_crs).to_crs(data_crs).iloc[0]
else:
warnings.warn("Polygon CRS unspecified; assuming it matches data_crs.")
# Construct GeoSeries of points from df coordinates
pts = gpd.GeoSeries(gpd.points_from_xy(df[coord_col1], df[coord_col2]), crs=data_crs)
return pts.within(poly).set_axis(df.index)
[docs]
def coverage_matrix(data,
periods=1,
freq="h",
start=None,
end=None,
offset_col=0,
relative=False,
str_from_time=False,
traj_cols=None,
**kwargs):
"""
Matrix of 0/1 flags; rows=user (or the single Series), columns=bucket start.
"""
if isinstance(data, pd.Series):
return _q_series(data, periods, freq, start, end, offset_col=offset_col)
if isinstance(data, pd.Series):
hits = _q_series(data, periods, freq, start, end, offset_col=offset_col).astype(int)
if str_from_time:
if is_integer_dtype(hits.index.dtype):
hits.index = pd.to_datetime(hits.index, unit='s')
hits.index = hits.index.strftime(_fmt_from_freq(freq))
return hits
df = data
t_key, _ = loader._fallback_time_cols_dt(df.columns, traj_cols, kwargs)
traj_cols = loader._parse_traj_cols(df.columns, traj_cols, kwargs)
loader._has_time_cols(df.columns, traj_cols)
uid = traj_cols["user_id"]
ts = traj_cols[t_key]
off_name = traj_cols["tz_offset"]
if off_name in df.columns:
offset_col = df[off_name]
if (start is not None) or (end is not None):
relative = False
if not relative and (start is None or end is None):
start = start or df[ts].min()
end = end or df[ts].max()
hit_map = {}
for user, grp in df.groupby(uid, sort=False):
off = offset_col.loc[grp.index] if isinstance(offset_col, pd.Series) else offset_col
s = None if relative else start
e = None if relative else end
hit_map[user] = _q_series(grp[ts], periods, freq, s, e, offset_col=off)
if not hit_map: # empty dataset edge-case
return pd.DataFrame(dtype=int)
hit_df = pd.concat(hit_map, axis=1).T.astype(int) # rows=user
if str_from_time:
if is_integer_dtype(hit_df.columns.dtype):
hit_df.columns = pd.to_datetime(hit_df.columns, unit='s')
hit_df.columns = hit_df.columns.strftime(_fmt_from_freq(freq))
return hit_df
[docs]
def completeness(data,
periods=1,
freq="h",
*,
start=None,
end=None,
offset_col=0,
relative=False,
str_from_time=True,
agg_freq=None,
traj_cols=None,
**kwargs):
"""
Measure trajectory completeness as the fraction of expected time intervals
('buckets') containing at least one observation.
Parameters
----------
data : pandas.Series or pandas.DataFrame
Trajectory data containing timestamps, either as:
- A pandas Series of Unix-second integers or datetime64 values.
- A DataFrame, from which timestamp and user columns are identified
via `traj_cols` or default column naming conventions.
periods : int, default 1
Number of units of `freq` per bucket (must be ≥ 1). For example,
`periods=3, freq='h'` results in 3-hour buckets.
freq : {'s', 'min', 'h', 'd', 'w'}, default 'h'
Time resolution used to define buckets: seconds ('s'), minutes ('min'),
hours ('h'), days ('d'), or weeks ('w').
start, end : scalar, optional
Explicit time bounds to define the bucket range. If either is omitted,
the range is inferred from the data. Ignored if `relative=True`.
relative : bool, default False
If False, completeness is measured within a common time span shared
by all users. If True, each user's completeness is computed only within
their own individual time span (from their first to their last record).
offset_col : pandas.Series or int, default 0
Offset in seconds to apply to timestamps (useful for handling time zones).
If a `tz_offset` column is present in the data and indicated via
`traj_cols` or `kwargs`, this argument is ignored.
agg_freq : str, optional
Aggregation frequency (e.g., 'D' for daily, 'W' for weekly, 'M' for monthly).
If specified, returns completeness aggregated at this frequency instead
of overall completeness.
traj_cols : dict, optional
Mapping from standard keys ('timestamp', 'datetime', 'user_id',
'tz_offset') to column names in `data`. If omitted, defaults are used.
**kwargs
Shorthand overrides for entries in `traj_cols`.
Returns
-------
float or pandas.Series or pandas.DataFrame
- If input is a single Series and `agg_freq=None`, returns a single float.
- If input is a DataFrame and `agg_freq=None`, returns a Series indexed by user_id.
- If `agg_freq` is specified, returns completeness aggregated by the specified
frequency, either as a Series (single user) or DataFrame (rows per user,
columns per aggregation bucket).
"""
hits = coverage_matrix(
data, periods, freq,
start=start, end=end,
offset_col=offset_col,
relative=relative,
traj_cols=traj_cols,
**kwargs
)
if agg_freq is None:
return hits.mean(axis=1) if isinstance(hits, pd.DataFrame) else hits.mean()
if isinstance(hits.columns, pd.DatetimeIndex):
buckets = hits.columns.floor(agg_freq)
else: # unix seconds (integers)
agg_step = SEC_PER_UNIT[agg_freq.lower()]
buckets = (hits.columns // agg_step) * agg_step
# Single series input
if isinstance(hits, pd.Series):
return hits.groupby(buckets).mean()
# DataFrame input
hits.columns = buckets
hit_df = hits.T.groupby(hits.columns).mean().T
if str_from_time:
if is_integer_dtype(hit_df.columns.dtype):
hit_df.columns = pd.to_datetime(hit_df.columns, unit='s')
hit_df.columns = hit_df.columns.strftime(_fmt_from_freq(agg_freq))
return hit_df
def _q_series(time_col, periods, freq, start=None, end=None, offset_col=0):
"""Return the per-bucket Boolean hits array for a single Series of timestamps."""
# Validate frequency
freq = freq.lower()
if freq not in SEC_PER_UNIT:
raise ValueError("freq must be one of 's', 'min', 'h', 'd', 'w'")
if is_integer_dtype(time_col): # unix seconds path
if not time_col.is_monotonic_increasing:
raise ValueError("time_col must be sorted in ascending order.")
sec = time_col + offset_col # offset may be scalar or vector
s_min = _timestamp_handling(start, "unix") or int(sec.min())
s_max = _timestamp_handling(end, "unix") or int(sec.max())
return _q_array(sec, s_min, s_max, periods, freq)
# datetime64 path (tz-aware respected by .floor)
tz = getattr(time_col.dt, "tz", None)
start = _timestamp_handling(start, "pd.timestamp", tz) or time_col.min()
end = _timestamp_handling(end, "pd.timestamp", tz) or time_col.max()
window = f"{periods}{freq}"
bucket_start = pd.date_range(start.floor(window), end, freq=window, inclusive="left")
hits = pd.Series(False, index=bucket_start)
active = time_col[(time_col >= start) & (time_col < end)].dt.floor(window).unique()
hits.loc[active] = True
return hits
def _q_array(sec, start_timestamp, end_timestamp, periods=1, freq='h'):
"""True/False array: one flag per (periods×freq) bucket."""
step = periods * SEC_PER_UNIT[freq]
first = (start_timestamp // step) * step
bin_starts = np.arange(first, end_timestamp, step)
pos = np.searchsorted(sec, bin_starts, side='left')
all_pos = np.append(pos, len(sec))
# np.diff computes `pos[i+1] - pos[i]`. If > 0, the bucket had data.
hits = np.diff(all_pos) > 0
return pd.Series(hits, index=bin_starts) if isinstance(sec, pd.Series) else hits