Source code for nomad.traj_gen

import pandas as pd
import numpy as np
import numpy.random as npr
from shapely.geometry import box, Point, MultiLineString
from shapely.ops import unary_union, linemerge
from shapely import distance as shp_distance
from datetime import timedelta
import warnings
import funkybob
import pyarrow as pa
import pyarrow.dataset as ds

from nomad.io.base import from_df, to_file

from nomad.city_gen import *
from nomad.filters import to_timestamp
from nomad.constants import DEFAULT_SPEEDS, FAST_SPEEDS, SLOW_SPEEDS, DEFAULT_STILL_PROBS
from nomad.constants import FAST_STILL_PROBS, SLOW_STILL_PROBS, ALLOWED_BUILDINGS, DEFAULT_STAY_PROBS

[docs] def parse_agent_attr(attr, N, name): """ Parse agent attribute (homes/workplaces/datetimes) into a callable that returns the i-th value. Parameters ---------- attr : str, list, pd.Timestamp, or None The attribute value. Can be: - None: returns None for all indices - str or pd.Timestamp: returns the same value for all indices - list: must have length N, returns the i-th element N : int Expected number of agents name : str Name of the attribute for error messages Returns ------- callable A function that takes an index i and returns the corresponding attribute value """ if attr is None: return lambda i: None elif isinstance(attr, (str, pd.Timestamp)): return lambda i: attr elif isinstance(attr, list): if len(attr) != N: raise ValueError(f"{name} must be a list of length {N}, got {len(attr)}") return lambda i: attr[i] else: raise ValueError(f"{name} must be a string, pd.Timestamp, list of length {N}, or None")
[docs] def sample_bursts_gaps(traj, beta_start=None, beta_durations=None, beta_ping=5, ha=3/4, pareto_prior=True, seed=None, output_bursts=False, deduplicate=True): """ Sample from simulated trajectory, drawn using hierarchical Poisson processes. Parameters ---------- traj: numpy array simulated trajectory from simulate_traj beta_start: float scale parameter (mean) of Exponential distribution modeling burst inter-arrival times where 1/beta_start is the rate of events (bursts) per minute. beta_durations: float scale parameter (mean) of Exponential distribution modeling burst durations. if beta_start and beta_durations are None, a single burst covering the whole trajectory is used. beta_ping: float scale parameter (mean) of Exponential distribution modeling ping inter-arrival times within a burst, where 1/beta_ping is the rate of events (pings) per minute. ha: float Mean horizontal-accuracy radius in 15 m blocks. The actual per-ping accuracy is random: ha >= 8 m / 15 m and follows a Pareto distribution with that mean. For each ping the spatial noise (epsilon_x, epsilon_y) is drawn i.i.d. from N(0, sigma^2), with sigma = HA / 1.515 so that abs(epsilon) <= HA with 68 percent probability. seed : int The seed for random number generation. output_bursts : bool If True, outputs the latent variables on when bursts start and end. deduplicate : bool If True, sampled times are also discretized to be in ticks """ rng = npr.default_rng(seed) # absolute window t0 = int(traj['timestamp'].iloc[0]) t_end = int(traj['timestamp'].iloc[-1]) # Step 1: generate ping_times (and bursts if requested) tz = traj['datetime'].dt.tz if output_bursts: ping_times, bursts = generate_ping_times( t0, t_end, beta_start=beta_start, beta_durations=beta_durations, beta_ping=beta_ping, seed=seed, return_bursts=True, tz=tz, ) else: ping_times = generate_ping_times( t0, t_end, beta_start=beta_start, beta_durations=beta_durations, beta_ping=beta_ping, seed=seed, ) # Step 2: thin trajectory sampled_traj = thin_traj_by_times(traj, ping_times, deduplicate=deduplicate) if sampled_traj.empty: empty = sampled_traj if output_bursts: return empty, pd.DataFrame(columns=['start_time','end_time']) return empty # Step 3: add horizontal noise rng = npr.default_rng(seed) n = len(sampled_traj) ha_realized, noise = _sample_horizontal_noise(n, ha=ha, rng=rng, pareto_prior=pareto_prior) sampled_traj['ha'] = ha_realized sampled_traj[['x', 'y']] += noise if output_bursts: burst_df = pd.DataFrame(bursts, columns=['start_time','end_time']) return sampled_traj, burst_df return sampled_traj
# ============================================================================= # AGENT CLASS # =============================================================================
[docs] class Agent: """ Represents an agent in the city simulation. Attributes ---------- still_probs : dict Dictionary containing probabilities of the agent staying still. speeds : dict Dictionary containing possible speeds of the agent. dt : float Time step duration. """ def __init__(self, identifier, city, home=None, workplace=None, still_probs=DEFAULT_STILL_PROBS, speeds=DEFAULT_SPEEDS, destination_diary=None, trajectory=None, diary=None, seed=0, x=None, y=None, location=None, datetime=None, timestamp=None): """ Initialize an agent in the city simulation, with optional existing data. Parameters ---------- identifier : str Agent identifier. city : City City object with buildings and topology. home : str, optional Building ID for the agent's home. If None, sampled from `city.buildings_gdf`. workplace : str, optional Building ID for the agent's workplace. If None, sampled from `city.buildings_gdf`. still_probs : dict, optional Per-building-type probabilities of staying still (default: DEFAULT_STILL_PROBS). speeds : dict, optional Per-building-type movement scalars (default: DEFAULT_SPEEDS). destination_diary : pandas.DataFrame, optional If provided, a DataFrame with columns ['datetime','timestamp','duration','location']. trajectory : pandas.DataFrame, optional If provided, a DataFrame with columns ['x','y','datetime','timestamp','identifier']. diary : pandas.DataFrame, optional If provided, a DataFrame with columns ['datetime','timestamp','duration','location']. seed : int, optional RNG seed used for sampling fallback home/work locations. """ rng = npr.default_rng(seed) self.identifier = identifier self.city = city if home is None: home = city.buildings_gdf[city.buildings_gdf['building_type'] == 'home'].sample(n=1, random_state=rng)['id'].iloc[0] if workplace is None: workplace = city.buildings_gdf[city.buildings_gdf['building_type'] == 'workplace'].sample(n=1, random_state=rng)['id'].iloc[0] home_building = city.get_building(identifier=home) workplace_building = city.get_building(identifier=workplace) if home_building is None or workplace_building is None: raise ValueError(f"Home {home} or workplace {workplace} not found in city buildings.") self.home = home self.workplace = workplace self.home_centroid = home_building['geometry'].iloc[0].centroid self.workplace_centroid = workplace_building['geometry'].iloc[0].centroid self.still_probs = still_probs self.speeds = speeds self.visit_freqs = None # Initialize last_ping if trajectory is not None: if x is not None or y is not None or location is not None or datetime is not None or timestamp is not None: warnings.warn("Both trajectory and position kwargs provided. Trajectory takes precedence.") self.last_ping = trajectory.iloc[-1] else: # Determine initial position if x is not None and y is not None: x_coord, y_coord = x, y elif location is not None: loc_centroid = self.city.buildings_gdf.loc[location, 'geometry'].centroid x_coord, y_coord = loc_centroid.x, loc_centroid.y else: x_coord, y_coord = self.home_centroid.x, self.home_centroid.y # Determine initial time if datetime is not None: init_time = pd.to_datetime(datetime) if not isinstance(datetime, pd.Timestamp) else datetime else: init_time = pd.Timestamp.now(tz='America/New_York') if timestamp is not None: init_timestamp = timestamp else: init_timestamp = to_timestamp(init_time) self.last_ping = pd.Series({ 'x': x_coord, 'y': y_coord, 'datetime': init_time, 'timestamp': init_timestamp, 'user_id': identifier }) self.destination_diary = destination_diary if destination_diary is not None else pd.DataFrame( columns=['datetime', 'timestamp', 'duration', 'location']) self.trajectory = trajectory self.dt = None self.diary = diary if diary is not None else pd.DataFrame( columns=['datetime', 'timestamp', 'duration', 'location', 'identifier']) self.sparse_traj = None # Trajectory simulation parameters (caching for performance) self._cached_bound_poly = None self._cached_path_ml = None self._previous_dest_building_row = None self._current_dest_building_row = None
[docs] def reset_trajectory(self, trajectory = True, sparse = True, last_ping = True, diary = True): """ Resets the agent's trajectories and diaries to the initial state. Keeps the agent's identifier, home, and workplace. This method is useful for reinitializing the agent after a simulation run. """ self.destination_diary = pd.DataFrame(columns=self.destination_diary.columns) self.dt = None # null cache for trajectory generation self._cached_path_ml = None self._cached_bound_poly = None self._cached_dest_id = None self._cached_bound_poly_blocks_set = None self._previous_dest_building_row = None self._current_dest_building_row = None if trajectory: self.trajectory = None if diary: self.diary = pd.DataFrame(columns=self.diary.columns) if last_ping: self.last_ping = None if sparse: self.sparse_traj = None
[docs] def plot_traj(self, ax, color='black', alpha=1, doors=True, address=True, heatmap=False): """ Plots the trajectory of the agent on the given axis. Parameters ---------- ax : matplotlib.axes.Axes The axis on which to plot the trajectory. color : str, optional The color of the trajectory. alpha : float, optional The transparency of the trajectory. doors : bool, optional Whether to plot doors of buildings. address : bool, optional Whether to plot the address of buildings. heatmap : bool, optional Whether to plot a heatmap of time spent in each building. """ if heatmap: self.city.plot_city(ax, doors=doors, address=address, zorder=1, heatmap_agent=self) else: ax.scatter(self.trajectory.x, self.trajectory.y, s=6, color=color, alpha=alpha, zorder=2) self.city.plot_city(ax, doors=doors, address=address, zorder=1)
def _sample_step(self, start_point, dt, rng): """ From a destination diary, generates a single (x, y) ping step towards the current destination building. Parameters ---------- start_point : tuple The coordinates of the current position as a tuple (x, y). dt : float The time step duration (minutes). rng : numpy.random.Generator Random number generator for reproducibility. Returns ------- coord : numpy.ndarray A numpy array of floats with shape (1, 2) representing the new coordinates. location : str or None The building ID if the step is a stay, or `None` if the step is a move. """ city = self.city # Resolve destination building geometry and attributes from cache brow = self._current_dest_building_row dest_type = brow['building_type'] # Determine start block and geometry start_block = tuple(np.floor(start_point).astype(int)) start_point_arr = np.asarray(start_point, dtype=float) # Check if agent is in building using integer truncation in_current_dest = False if self._current_dest_building_row is not None: in_current_dest = _point_in_blocks(start_point_arr, self._current_dest_building_row['blocks_set']) in_previous_dest = False if self._previous_dest_building_row is not None: in_previous_dest = _point_in_blocks(start_point_arr, self._previous_dest_building_row['blocks_set']) # If already at destination building area, stay-within-building dynamics if in_current_dest: # Clear cache when arriving at destination (bound_poly only for inter-building movement) self._cached_path_ml = None self._cached_bound_poly = None self._cached_dest_id = None location = brow['id'] p = self.still_probs.get(dest_type, 0.5) sigma = self.speeds.get(dest_type, 0.5) if rng.uniform() < p: coord = start_point_arr else: # Draw until coord falls inside building while True: coord = rng.normal(loc=start_point_arr, scale=sigma*np.sqrt(dt), size=2) if _point_in_blocks(coord, brow['blocks_set']): break return coord, location # Otherwise, move along streets toward destination door cell location = None start_segment = [] # If currently inside previous destination building, first move towards its door if in_previous_dest: prev_door_point = self._previous_dest_building_row['door_point'] start_segment = [tuple(start_point), prev_door_point] start_node = (int(self._previous_dest_building_row['door_cell_x']), int(self._previous_dest_building_row['door_cell_y'])) else: start_node = start_block # Resolve destination door coordinates for path computation dest_cell = (int(brow['door_cell_x']), int(brow['door_cell_y'])) # Check if cached geometry is valid for current destination use_cache = False if self._cached_bound_poly is not None and self._cached_dest_id == brow['id']: use_cache = True if use_cache: path_ml = self._cached_path_ml bound_poly = self._cached_bound_poly bound_poly_blocks_set = self._cached_bound_poly_blocks_set else: # Shortest path between street blocks (door cells) street_path = city.get_shortest_path(start_node, dest_cell) street_blocks = city.blocks_gdf.loc[street_path] # Build continuous path through block centers, include start/end segments centroids = street_blocks['geometry'].centroid path_segments = [start_segment + [(pt.x, pt.y) for pt in centroids] + [brow['door_point']]] path_ml = MultiLineString([linemerge(MultiLineString(path_segments))]) street_geom = unary_union(street_blocks['geometry']) # Use previous destination building geometry if agent is departing from it, otherwise use start block geometry if in_previous_dest and self._previous_dest_building_row is not None: start_geom = self._previous_dest_building_row['geometry'] else: start_geom = city.blocks_gdf.loc[start_block]['geometry'] bound_poly = unary_union([start_geom, street_geom]) # Build bound_poly_blocks_set from components if in_previous_dest and self._previous_dest_building_row is not None: start_blocks = self._previous_dest_building_row['blocks_set'] else: start_blocks = {start_block} bound_poly_blocks_set = start_blocks | set(street_path) # Cache the results self._cached_path_ml = path_ml self._cached_bound_poly = bound_poly self._cached_dest_id = brow['id'] self._cached_bound_poly_blocks_set = bound_poly_blocks_set # Transformed coordinates of current position along the path path_coord = _path_coords(path_ml, start_point_arr) heading_drift = 3.33 * dt sigma = 0.5 * dt / 1.96 while True: # Step in transformed (path-based) space step = rng.normal(loc=[heading_drift, 0], scale=sigma * np.sqrt(dt), size=2) path_coord = (path_coord[0] + step[0], 0.7 * path_coord[1] + step[1]) if path_coord[0] > path_ml.length: coord = np.array(brow['door_point']) else: coord = _cartesian_coords(path_ml, *path_coord) if _point_in_blocks(coord, bound_poly_blocks_set) or _point_in_blocks(coord, self._current_dest_building_row.get('blocks_set')): break return coord, location def _traj_from_dest_diary(self, dt, seed=0): """ Simulate a trajectory and update agent diary from destination_diary. """ rng = np.random.default_rng(seed) # random generator for steps city = self.city destination_diary = self.destination_diary trajectory_update = [] if self.diary.empty: current_entry = None else: current_entry = self.diary.iloc[-1].to_dict() self.diary = self.diary.iloc[:-1] tick_secs = int(60*dt) # Initialize previous destination building to building containing start ping, if any prev_ping = self.last_ping start_point = (prev_ping['x'], prev_ping['y']) start_block = tuple(np.floor(start_point).astype(int)) start_info = city.get_block(start_block) if start_info['building_id'] is not None: building_dict = city.buildings_gdf.loc[start_info['building_id']].to_dict() building_dict['blocks_set'] = set(building_dict['blocks']) self._previous_dest_building_row = building_dict else: self._previous_dest_building_row = None # Initialize current destination building to first entry first_building_id = destination_diary.iloc[0]['location'] building_dict = city.buildings_gdf.loc[first_building_id].to_dict() building_dict['blocks_set'] = set(building_dict['blocks']) self._current_dest_building_row = building_dict entry_update = [] for i in range(destination_diary.shape[0]): building_id = destination_diary.iloc[i]['location'] # Shift: previous = current, current = new destination (skip shift on first iteration) if i > 0: self._previous_dest_building_row = self._current_dest_building_row building_dict = city.buildings_gdf.loc[building_id].to_dict() building_dict['blocks_set'] = set(building_dict.get('blocks', [])) self._current_dest_building_row = building_dict duration_in_ticks = int(destination_diary.iloc[i]['duration'] / dt) for _ in range(duration_in_ticks): prev_ping = self.last_ping # define point start_point = (prev_ping['x'], prev_ping['y']) unix_timestamp = prev_ping['timestamp'] + tick_secs datetime = prev_ping['datetime'] + timedelta(seconds=tick_secs) coord, location = self._sample_step(start_point, dt, rng) ping = {'x': coord[0], 'y': coord[1], 'datetime': datetime, 'timestamp': unix_timestamp, 'user_id': self.identifier} trajectory_update.append(ping) self.last_ping = ping if current_entry is None: current_entry = {'datetime': datetime, 'timestamp': unix_timestamp, 'duration': dt, 'location': location, 'user_id': self.identifier} elif (current_entry['location'] != location): entry_update.append(current_entry) current_entry = {'datetime': datetime, 'timestamp': unix_timestamp, 'duration': dt, 'location': location, 'user_id': self.identifier} else: current_entry['duration'] += 1*dt # add one tick to the duration if self.trajectory is None: self.trajectory = pd.DataFrame(trajectory_update) else: self.trajectory = pd.concat([self.trajectory, pd.DataFrame(trajectory_update)], ignore_index=True) entry_update.append(current_entry) if (self.diary.empty): self.diary = pd.DataFrame(entry_update) else: self.diary = pd.concat([self.diary, pd.DataFrame(entry_update)], ignore_index=True) self.destination_diary = destination_diary.drop(destination_diary.index) return None def _initialize_visits_unif(self, rng=None, home_work_freq=20, initial_k=None, other_locs_freq=2): """Seed initial visit frequencies uniformly per building type Builds a fresh visit_freqs DataFrame from self.city.buildings_gdf and returns it. Does not mutate self.visit_freqs. Warns if an existing self.visit_freqs appears to already contain positive frequencies. """ if rng is None: rng = npr.default_rng() if initial_k is None: initial_k = {'retail': 4, 'workplace': 2, 'home': 2, 'park': 2} # Warn if existing frequencies are already set if getattr(self, 'visit_freqs', None) is not None: if (self.visit_freqs['freq'] > 0).any(): warnings.warn("Existing visit frequencies are non-zero; re-initializing will overwrite them.") bdf = self.city.buildings_gdf visit_freqs = pd.DataFrame({ 'id': bdf['id'].values, 'building_type': bdf['building_type'].values, 'freq': 0, }).set_index('id') # Strong prior for agent's home/work visit_freqs.loc[self.home, 'freq'] = int(home_work_freq) visit_freqs.loc[self.workplace, 'freq'] = int(home_work_freq) # Seed additional locations per type for btype, k in initial_k.items(): k = int(k) ids = visit_freqs.index[visit_freqs.building_type == btype] # Exclude agent's own home/work excl = [self.home, self.workplace] ids = ids.difference(excl) size = min(k, len(ids)) chosen = rng.choice(ids.to_numpy(), size=size, replace=False) visit_freqs.loc[chosen, 'freq'] = visit_freqs.loc[chosen, 'freq'] + int(other_locs_freq) return visit_freqs
[docs] def generate_dest_diary(self, end_time, epr_time_res = 15, stay_probs = DEFAULT_STAY_PROBS, rho = 0.4, gamma = 0.3, seed = 0, verbose = False): """Generate the destination diary (exploration + preferential return). Parameters ---------- end_time : pd.Timestamp Generate until this timestamp (inclusive). epr_time_res : int Time-step in minutes for each diary entry. stay_probs : dict Probability of staying put, keyed by building type. rho : float Exploration parameter; lower values bias toward exploration. gamma : float Preferential return parameter; controls decay by visit count. seed : int RNG seed. Notes ----- - Requires `city.grav` to be precomputed via `city.compute_gravity(...)` """ rng = npr.default_rng(seed) # Early check: gravity matrix required for EPR generation if self.city.grav is None: raise RuntimeError("city.grav is not available. Call city.compute_gravity() before trajectory generation.") # Validate last_ping exists if self.last_ping is None: raise RuntimeError( "Agent has no last_ping. This should not happen unless reset_trajectory(last_ping=True) was called." ) if end_time.tz is None: tz = getattr(self.last_ping['datetime'], 'tz', None) if tz is not None: end_time = end_time.tz_localize(tz) if isinstance(end_time, pd.Timestamp): end_time = to_timestamp(end_time) visit_freqs = self.visit_freqs if (visit_freqs is None) or (not (visit_freqs['freq'] > 0).any()): visit_freqs = self._initialize_visits_unif( rng=rng, home_work_freq=20, initial_k={'retail': 4, 'workplace': 2, 'home': 2, 'park': 2}, other_locs_freq=2, ) if self.destination_diary.empty: start_time_local = self.last_ping['datetime'] start_time = self.last_ping['timestamp'] curr_info = self.city.get_block((int(np.floor(self.last_ping['x'])), int(np.floor(self.last_ping['y'])))) curr = curr_info['building_id'] if curr_info['building_type'] is not None and curr_info['building_type'] != 'street' and curr_info['building_id'] is not None else self.home else: last_entry = self.destination_diary.iloc[-1] last_datetime = pd.to_datetime(last_entry.datetime) if not isinstance(last_entry.datetime, pd.Timestamp) else last_entry.datetime start_time_local = last_datetime + timedelta(minutes=int(last_entry.duration)) # Derive timestamp from datetime if not present if 'timestamp' in self.destination_diary.columns: start_time = int(last_entry.timestamp + last_entry.duration*60) else: start_time = to_timestamp(last_entry.datetime) + int(last_entry.duration*60) curr = last_entry.location # Check if start_time exceeds end_time if start_time > end_time: raise ValueError( f"Agent {self.identifier}: last_ping timestamp ({start_time}) is at or beyond end_time ({end_time}). " "No destinations will be generated. Consider providing an earlier last_ping or later end_time." ) return dest_update = [] # verbosity if verbose: print(f"Generating destination diary via EPR (rho={rho}, gamma={gamma}, epr_time_res={epr_time_res} min, seed={seed})") while start_time < end_time: curr_type = visit_freqs.loc[curr, 'building_type'] if curr in visit_freqs.index else 'home' allowed = allowed_buildings(start_time_local) x = visit_freqs.loc[(visit_freqs['building_type'].isin(allowed)) & (visit_freqs.freq > 0)] S = len(x) if len(x) > 0 else 1 # probability of exploring p_exp = rho*(S**(-gamma)) # Stay if (curr_type in allowed) & (rng.uniform() < stay_probs.get(curr_type, 0.5)): pass # Exploration elif rng.uniform() < p_exp: # Compute gravity probs from current door cell to unexplored candidates y = visit_freqs.loc[(visit_freqs['building_type'].isin(allowed)) & (visit_freqs.freq == 0)] if not y.empty: if callable(self.city.grav): probs = self.city.grav(curr).loc[y.index].values else: probs = self.city.grav.loc[curr, y.index].values probs = probs / probs.sum() curr = rng.choice(y.index, p=probs) else: # Preferential return curr = _choose_destination(visit_freqs, x, rng) visit_freqs.loc[curr, 'freq'] += 1 # Preferential return else: curr = _choose_destination(visit_freqs, x, rng) visit_freqs.loc[curr, 'freq'] += 1 # Update destination diary entry = {'datetime': start_time_local, 'timestamp': start_time, 'duration': epr_time_res, 'location': curr} dest_update.append(entry) start_time_local = start_time_local + timedelta(minutes=int(epr_time_res)) start_time = start_time + epr_time_res*60 # because start_time in seconds if self.destination_diary.empty: self.destination_diary = pd.DataFrame(dest_update) else: self.destination_diary = pd.concat( [self.destination_diary, pd.DataFrame(dest_update)], ignore_index=True) self.destination_diary = condense_destinations(self.destination_diary) self.visit_freqs = visit_freqs return None
[docs] def generate_trajectory(self, destination_diary=None, end_time=None, epr_time_res=15, dt=1, seed=0, step_seed=None, verbose=False, **kwargs): """ Generate a trajectory for an agent. Parameters ---------- destination_diary : pandas.DataFrame, optional (default=None) DataFrame containing 'location' and 'datetime' columns (required), and optionally 'timestamp' and 'duration'. If 'timestamp' is missing, it will be derived from 'datetime'. end_time : pd.Timestamp, optional The end time to generate the trajectory until. Required if destination_diary is empty. epr_time_res : int, optional The granularity of destination durations in epr generation (minutes). dt : float, optional Time step duration for trajectory simulation (minutes). seed : int, optional Random seed for reproducibility. step_seed : int, optional Random seed for trajectory steps. If None, uses seed. verbose : bool, optional Whether to print verbose warnings. kwargs : dict, optional Additional keyword arguments for setting initial position. Can include 'x', 'y', 'location', 'datetime', 'timestamp'. If 'x' and 'y' are provided, used directly. Otherwise, if 'location' is provided, uses that building's centroid. If neither, uses agent's home. Returns ------- None (updates self.trajectory) """ if self.dt is None: self.dt = dt if self.dt != dt: raise ValueError(f"dt ({dt}) does not match the agent's dt ({self.dt}).") # handle destination diary if destination_diary is not None: if not self.destination_diary.empty: warnings.warn("Overwriting existing destination_diary with new one.") self.destination_diary = destination_diary row = destination_diary.iloc[0] #first destination loc_centroid = self.city.buildings_gdf.loc[row['location'], 'geometry'].centroid x_coord, y_coord = loc_centroid.x, loc_centroid.y datetime = row['datetime'] timestamp = int(row.get('timestamp', to_timestamp(datetime))) self.last_ping = pd.Series({ 'x': x_coord, 'y': y_coord, 'datetime': datetime, 'timestamp': timestamp, 'user_id': self.identifier }) self.trajectory = pd.DataFrame([self.last_ping]) # Handle trajectory initialization and overrides if self.trajectory is None: # Allow overriding last_ping fields with kwargs if 'x' in kwargs or 'y' in kwargs or 'location' in kwargs or 'datetime' in kwargs or 'timestamp' in kwargs: if 'x' in kwargs and 'y' in kwargs: self.last_ping['x'] = kwargs['x'] self.last_ping['y'] = kwargs['y'] elif 'location' in kwargs: loc_centroid = self.city.buildings_gdf.loc[kwargs['location'], 'geometry'].centroid self.last_ping['x'] = loc_centroid.x self.last_ping['y'] = loc_centroid.y if 'datetime' in kwargs: self.last_ping['datetime'] = kwargs['datetime'] self.last_ping['timestamp'] = to_timestamp(self.last_ping['datetime']) if 'timestamp' in kwargs: self.last_ping['timestamp'] = kwargs['timestamp'] self.last_ping['datetime'] = pd.to_datetime(self.last_ping['timestamp'], unit='s') self.trajectory = pd.DataFrame([self.last_ping]) else: if 'x' in kwargs or 'y' in kwargs or 'location' in kwargs or 'datetime' in kwargs or 'timestamp' in kwargs: raise ValueError( "Keyword arguments conflict with existing trajectory. " "Use Agent.reset_trajectory() or do not provide keyword arguments." ) self.last_ping = self.trajectory.iloc[-1] if self.destination_diary.empty: if end_time is None: raise ValueError( "Destination diary is empty. Provide an end_time to generate a trajectory." ) self.generate_dest_diary(end_time=end_time, epr_time_res=epr_time_res, seed=seed) s = step_seed if step_seed else seed self._traj_from_dest_diary(dt=dt, seed=s) return None
[docs] def sample_trajectory(self, beta_start=None, beta_durations=None, beta_ping=5, seed=0, ha=3/4, pareto_prior=True, dt=None, output_bursts=False, deduplicate=True, replace_sparse_traj=False, cache_traj=False): """ Samples a sparse trajectory using a hierarchical non-homogeneous Poisson process. Parameters ---------- beta_start : float The rate parameter governing the Poisson Process controlling the start of the trajectory. beta_durations : float The rate parameter governing the Exponential controlling for the durations of the trajectory. beta_ping : float The rate parameter governing the Poisson Process controlling which pings are sampled. seed : int Random seed for reproducibility. ha : float Horizontal accuracy output_bursts : bool If True, outputs the latent variables on when bursts start and end. replace_sparse_traj : bool if True, replaces existing sparse_traj field with the new sparsified trajectory rather than appending. cache_traj : bool if True, empties the Agent's trajectory DataFrame. """ if not self.trajectory.timestamp.is_monotonic_increasing: raise ValueError("The input trajectory is not sorted chronologically.") result = sample_bursts_gaps( self.trajectory, beta_start, beta_durations, beta_ping, ha=ha, pareto_prior=pareto_prior, seed=seed, output_bursts=output_bursts, deduplicate=deduplicate ) if output_bursts: sparse_traj, burst_info = result else: sparse_traj = result if not sparse_traj.timestamp.is_monotonic_increasing: raise ValueError("The sampled trajectory is not sorted chronologically.") sparse_traj = sparse_traj.set_index('timestamp', drop=False) if self.sparse_traj is None or replace_sparse_traj: self.sparse_traj = sparse_traj else: self.sparse_traj = pd.concat([self.sparse_traj, sparse_traj], ignore_index=False) if not self.sparse_traj.timestamp.is_monotonic_increasing: raise ValueError("The aggregated sampled trajectory is not sorted chronologically.") if cache_traj: self.last_ping = self.trajectory.iloc[-1] self.trajectory = self.trajectory.loc[[]] # empty df if output_bursts: return burst_info
[docs] def condense_destinations(destination_diary, *, time_cols=None): """ Modifies a destination diary, joining consecutive entries for the same location into a single entry with the total duration. Parameters ---------- destination_diary : pandas.DataFrame Diary containing the destinations of the user. time_cols : dict, optional (keyword-only) Optional mapping for non-canonical column names. Expected keys: {'datetime': <col_name>, 'timestamp': <col_name>}. Defaults are 'datetime' and 'timestamp'. Returns ------- pandas.DataFrame Updated destination diary with canonical columns 'datetime' and 'timestamp'. """ if destination_diary.empty: return pd.DataFrame(columns=['datetime','timestamp','duration','location']) # Resolve column names dt_col = 'datetime' ts_col = 'timestamp' if time_cols: dt_col = time_cols.get('datetime', dt_col) ts_col = time_cols.get('timestamp', ts_col) # If inputs use local/unix names, allow mapping through kwargs only required = {'location', 'duration', dt_col, ts_col} missing = required - set(destination_diary.columns) if missing: raise KeyError(f"condense_destinations expected columns {required}, missing {missing}") df = destination_diary.copy() # Detect changes in location df['new_segment'] = df['location'].ne(df['location'].shift()) # Create segment identifiers for grouping df['segment_id'] = df['new_segment'].cumsum() # Aggregate data by segment with provided column names, then rename canonically condensed_df = df.groupby('segment_id').agg({ dt_col: 'first', ts_col: 'first', 'duration': 'sum', 'location': 'first' }).reset_index(drop=True) # Canonical column names in the output if dt_col != 'datetime': condensed_df = condensed_df.rename(columns={dt_col: 'datetime'}) if ts_col != 'timestamp': condensed_df = condensed_df.rename(columns={ts_col: 'timestamp'}) return condensed_df
[docs] def generate_ping_times(t0, t_end, *, beta_start=None, beta_durations=None, beta_ping=5, seed=None, return_bursts=False, tz=None): """Generate absolute ping timestamps (seconds) within [t0, t_end]. If return_bursts is True, also returns a list of (start_time, end_time) for bursts that produced at least one ping. If tz is provided, start/end are timezone-aware pandas Timestamps; otherwise they are Unix seconds (int). """ rng = npr.default_rng(seed) # convert minutes→seconds beta_ping_s = beta_ping * 60 beta_start_s = beta_start * 60 if beta_start is not None else None beta_dur_s = beta_durations * 60 if beta_durations is not None else None if beta_start_s is None and beta_dur_s is None: burst_start_points = np.array([0.0]) burst_end_points = np.array([t_end - t0], dtype=float) else: est_n = int(3 * (t_end - t0) / beta_start_s) + 10 inter_arrival_times = rng.exponential(scale=beta_start_s, size=est_n) burst_start_points = np.cumsum(inter_arrival_times) burst_start_points = burst_start_points[burst_start_points < (t_end - t0)] burst_durations = rng.exponential(scale=beta_dur_s, size=burst_start_points.size) burst_end_points = burst_start_points + burst_durations if burst_end_points.size > 0: burst_end_points[:-1] = np.minimum(burst_end_points[:-1], burst_start_points[1:]) burst_end_points[-1] = min(burst_end_points[-1], t_end - t0) ping_times_chunks: list[np.ndarray] = [] bursts_out = [] if return_bursts else None for start, end in zip(burst_start_points, burst_end_points): dur = end - start if dur <= 0: continue est_pings = int(3 * dur / beta_ping_s) + 10 ping_intervals = rng.exponential(scale=beta_ping_s, size=est_pings) times_rel = np.cumsum(ping_intervals) times_rel = times_rel[times_rel < dur] if times_rel.size: ping_times_chunks.append(t0 + start + times_rel) if return_bursts: if tz is not None: sdt = pd.to_datetime(t0 + start, unit='s', utc=True).tz_convert(tz) edt = pd.to_datetime(t0 + end, unit='s', utc=True).tz_convert(tz) else: sdt = int(t0 + start) edt = int(t0 + end) bursts_out.append([sdt, edt]) if not ping_times_chunks: if return_bursts: return np.array([], dtype=int), [] return np.array([], dtype=int) ping = np.concatenate(ping_times_chunks).astype(int) if return_bursts: return ping, bursts_out return ping
[docs] def thin_traj_by_times(traj, ping_times, *, deduplicate=True): """Apply ping_times to a dense traj via searchsorted thinning.""" if ping_times.size == 0: return pd.DataFrame(columns=traj.columns) traj_ts = traj['timestamp'].to_numpy() tz = traj['datetime'].dt.tz idx = np.searchsorted(traj_ts, ping_times, side='right') - 1 valid = idx >= 0 idx = idx[valid] ping_times = ping_times[valid] if deduplicate: _, keep = np.unique(idx, return_index=True) idx = idx[keep] ping_times = ping_times[keep] sampled_traj = traj.iloc[idx].copy() sampled_traj['timestamp'] = ping_times sampled_traj['datetime'] = ( pd.to_datetime(ping_times, unit='s', utc=True) .tz_convert(tz) ) return sampled_traj
def _sample_horizontal_noise(n, pareto_prior=True, ha=3/4, lower_bound=8/15, upper_bound=20, rng=None): """Sample per-ping horizontal accuracy and Gaussian noise (internal).""" if ha is None or ha==0: return np.zeros(n), np.zeros((n, 2)) if rng is None: rng = npr.default_rng() if pareto_prior: #for heavy tailed noise if ha <= lower_bound: raise ValueError(f"mean ha must exceed {lower_bound} blocks") alpha = ha / (ha - lower_bound) ha_realized = (rng.pareto(alpha, size=n) + 1) * lower_bound ha_realized = np.minimum(ha_realized, upper_bound/2, out=ha_realized) else: ha_realized= np.array([ha]*n) sigma = ha_realized / 1.515 noise = rng.standard_normal((n, 2)) * sigma[:, None] np.clip(noise, -upper_bound, upper_bound, out=noise) #300m return ha_realized, noise def _point_in_blocks(point_arr, blocks_set): x, y = point_arr ix = int(np.floor(x)) iy = int(np.floor(y)) # primary block if (ix, iy) in blocks_set: return True on_x = x == ix on_y = y == iy # boundary checks if on_x and (ix - 1, iy) in blocks_set: return True if on_y and (ix, iy - 1) in blocks_set: return True if on_x and on_y and (ix - 1, iy - 1) in blocks_set: return True return False def _cartesian_coords(multilines, distance, offset, eps=0.001): """ Converts path-based coordinates (distance along path, signed perpendicular offset) into cartesian coordinates on the plane. Parameters ---------- multilines : shapely.geometry.MultiLineString MultiLineString representing the street path. distance : float Distance along the path. offset : float Signed perpendicular offset from the path (positive to the left, negative to the right). eps : float, optional Small delta used to estimate the path's tangent direction. Returns ------- tuple Cartesian coordinates (x, y) corresponding to the input path-based coordinates. """ point_on_path = multilines.interpolate(distance) offset_point = multilines.interpolate(distance - eps) p = np.array([point_on_path.x, point_on_path.y]) q = np.array([offset_point.x, offset_point.y]) direction = p - q unit_direction = direction / np.linalg.norm(direction) # Rotate 90° counter-clockwise to get the normal vector normal = np.flip(unit_direction) * np.array([-1, 1]) return tuple(p + offset * normal) def _path_coords(multilines, point, eps=0.001): """ Given a MultiLineString and a cartesian point, returns the transformed coordinates: distance along the path and signed perpendicular offset. Parameters ---------- multilines : shapely.geometry.MultiLineString MultiLineString representing the street path. point : shapely.geometry.Point or tuple The cartesian point to transform. eps : float, optional Small delta used to estimate the path's tangent direction. Returns ------- tuple (distance_along_path, orthogonal_offset) """ if not isinstance(point, Point): point = Point(point) distance = multilines.project(point) point_on_path = multilines.interpolate(distance) offset_point = multilines.interpolate(distance - eps) p = np.array([point_on_path.x, point_on_path.y]) q = np.array([offset_point.x, offset_point.y]) direction = p - q unit_direction = direction / np.linalg.norm(direction) # Rotate 90° counter-clockwise to get the normal vector normal = np.flip(unit_direction) * np.array([-1, 1]) delta = np.array([point.x - p[0], point.y - p[1]]) offset = np.dot(delta, normal) return distance, offset
[docs] class Population: """ A class to represent a population of agents within a city. Contains methods to initialize agents and randomize their attributes and trajectories. Attributes ---------- roster : dict A dictionary to store agents with their identifiers as keys. city : City The city in which the population resides. dt : float The time step duration for the agents. Methods ------- add_agent: Adds an agent to the population. generate_agents: Generates N agents with randomized attributes. save_pop: Saves trajectories, homes, and diaries as Parquet files to S3. sample_step: Generates (x, y) pings from a destination diary. traj_from_dest_diary: Simulates a trajectory and updates the agent's travel diary. generate_dest_diary: Generates a destination diary using exploration and preferential return. generate_trajectory: Generates a trajectory for an agent. plot_population: Plots the population on a given axis. """ def __init__(self, city, dt=1): self.roster = {} self.city = city self.dt = dt
[docs] def add_agent(self, agent, verbose=True): """ Adds an agent to the population. If the agent identifier already exists in the population, it will be replaced. Parameters ---------- agent : Agent The agent to be added to the population. verbose : bool, optional If True, prints a message if the agent identifier already exists in the population. """ if verbose and agent.identifier in self.roster: print("Agent identifier already exists in population. Replacing corresponding agent.") self.roster[agent.identifier] = agent
[docs] def generate_agents(self, N, seed=0, name_count=2, agent_homes=None, agent_workplaces=None, datetimes=None): """ Generates N agents, with randomized attributes. """ master_rng = np.random.default_rng(seed) generator = funkybob.UniqueRandomNameGenerator(members=name_count, seed=seed) # Create efficient accessors for agent homes and workplaces get_home = parse_agent_attr(agent_homes, N, "agent_homes") get_workplace = parse_agent_attr(agent_workplaces, N, "agent_workplaces") get_datetime = parse_agent_attr(datetimes, N, "datetimes") for i in range(N): agent_seed = int(master_rng.integers(0, 2**32)) identifier = generator[i] agent = Agent(identifier=identifier, city=self.city, home=get_home(i), workplace=get_workplace(i), datetime=get_datetime(i), seed=agent_seed) self.add_agent(agent)
[docs] def save_pop(self, traj_cols=None, sparse_path=None, full_path=None, homes_path=None, diaries_path=None, dest_diaries_path=None, partition_cols=None, mixed_timezone_behavior="naive", filesystem=None, fmt='parquet', **kwargs): """ Save trajectories, homes, and diaries to local or S3 destinations. Parameters ---------- traj_cols : dict, optional Column mapping used to normalize trajectory data before writing. sparse_path : str or Path, optional Destination path for sparse trajectories. full_path : str or Path, optional Destination path for full (ground truth) trajectories. homes_path : str or Path, optional Destination path for the homes table. diaries_path : str or Path, optional Destination path for diaries. dest_diaries_path : str or Path, optional Destination path for destination diaries. partition_cols : list of str, optional Column names used to partition written datasets. mixed_timezone_behavior : str, optional Behavior passed to ``nomad.io.base.from_df`` for mixed timezone columns. filesystem : pyarrow.fs.FileSystem or None Optional filesystem object (e.g., s3fs.S3FileSystem). If None, inferred automatically. fmt : str, optional File format to write. **kwargs : dict, optional Additional static columns to include in the homes table. Each key-value pair represents a column name and its values. Values must be a list/array of length N (number of agents) or a single value to be repeated for all agents. """ if full_path: full_df = pd.concat([agent.trajectory for agent in self.roster.values()], ignore_index=True) if partition_cols and 'date' in partition_cols and 'date' not in full_df.columns: full_df['date'] = pd.to_datetime(full_df['timestamp'], unit='s').dt.date.astype(str) full_df = from_df(full_df, traj_cols=traj_cols, mixed_timezone_behavior=mixed_timezone_behavior) to_file(full_df, path=full_path, format=fmt, partition_by=partition_cols, filesystem=filesystem, existing_data_behavior='delete_matching') if sparse_path: sparse_df = pd.concat([agent.sparse_traj for agent in self.roster.values()], ignore_index=True) if partition_cols and 'date' in partition_cols and 'date' not in sparse_df.columns: sparse_df['date'] = pd.to_datetime(sparse_df['timestamp'], unit='s').dt.date.astype(str) sparse_df = from_df(sparse_df, traj_cols=traj_cols, mixed_timezone_behavior=mixed_timezone_behavior) to_file(sparse_df, path=sparse_path, format=fmt, partition_by=partition_cols, filesystem=filesystem, existing_data_behavior='delete_matching', traj_cols=traj_cols) if diaries_path: diaries_df = pd.concat([agent.diary for agent in self.roster.values()], ignore_index=True) if partition_cols and 'date' in partition_cols and 'date' not in diaries_df.columns: diaries_df['date'] = pd.to_datetime(diaries_df['timestamp'], unit='s').dt.date.astype(str) diaries_df = from_df(diaries_df, traj_cols=traj_cols, mixed_timezone_behavior=mixed_timezone_behavior) to_file(diaries_df, path=diaries_path, format=fmt, partition_by=partition_cols, filesystem=filesystem, existing_data_behavior='delete_matching', traj_cols=traj_cols) if dest_diaries_path: # TODO: from_df should be made compatible with destination diaries dest_diaries_list = [] for agent in self.roster.values(): if agent.destination_diary is not None and not agent.destination_diary.empty: df = agent.destination_diary.copy() df['identifier'] = agent.identifier dest_diaries_list.append(df) if dest_diaries_list: dest_diaries_df = pd.concat(dest_diaries_list, ignore_index=True) dest_diaries_df['date'] = pd.to_datetime(dest_diaries_df['datetime'], unit='s').dt.date.astype(str) dest_diaries_df = from_df(dest_diaries_df, traj_cols=traj_cols, mixed_timezone_behavior=mixed_timezone_behavior) to_file(dest_diaries_df, path=dest_diaries_path, format=fmt, partition_by=partition_cols, filesystem=filesystem, existing_data_behavior='delete_matching', traj_cols=traj_cols) if homes_path: homes_df = self._build_agent_static_data(**kwargs) table = pa.Table.from_pandas(homes_df, preserve_index=False) ds.write_dataset(table, base_dir=str(homes_path), format=fmt, partitioning_flavor='hive', filesystem=filesystem, existing_data_behavior='delete_matching')
def _build_agent_static_data(self, **static_columns): """Build DataFrame with agent static data (user_id, homes, workplaces, user-level attributes).""" N = len(self.roster) # Process static columns processed_static = self._process_static_columns(static_columns, N) # Build base data base_data = [] for agent_id, agent in self.roster.items(): ts = agent.last_ping['datetime'] iso_date = ts.date().isoformat() base_data.append({ 'user_id': agent_id, 'home': agent.home, 'workplace': agent.workplace, 'date': iso_date }) # Create base DataFrame homes_df = pd.DataFrame(base_data) # Add static columns for col_name, col_values in processed_static.items(): homes_df[col_name] = col_values return homes_df def _process_static_columns(self, static_columns, N): """Process static columns, validating lengths and handling single values.""" processed = {} for col_name, col_values in static_columns.items(): if isinstance(col_values, (list, tuple, np.ndarray)): if len(col_values) != N: raise ValueError(f"Static column '{col_name}' has length {len(col_values)}, " f"but expected length {N} (number of agents)") processed[col_name] = col_values else: # Single value - repeat for all agents processed[col_name] = [col_values] * N return processed
[docs] def reproject_to_mercator(self, sparse_traj=True, full_traj=False, diaries=False, poi_data=None): """ Reproject all agent trajectories from city block coordinates to Web Mercator. Uses the city's stored transformation parameters (block_side_length, web_mercator_origin_x/y). Parameters ---------- sparse_traj : bool, default True Whether to reproject sparse trajectories full_traj : bool, default False Whether to reproject full trajectories diaries : bool, default False Whether to reproject diaries (must have x, y columns) poi_data : pd.DataFrame, optional DataFrame with building coordinates (building_id, x, y) to join with diaries. If not provided, derived from city's buildings_gdf using door coordinates. """ for agent in self.roster.values(): if sparse_traj and agent.sparse_traj is not None: agent.sparse_traj = self.city.to_mercator(agent.sparse_traj) if full_traj and agent.trajectory is not None: agent.trajectory = self.city.to_mercator(agent.trajectory) if diaries and agent.diary is not None: # Derive poi_data from city's buildings_gdf if not provided if poi_data is None: bdf = self.city.buildings_gdf poi_data = pd.DataFrame({ 'building_id': bdf['id'].values, 'x': (bdf['door_cell_x'].astype(float) + 0.5).values, 'y': (bdf['door_cell_y'].astype(float) + 0.5).values }) agent.diary = agent.diary.merge(poi_data, left_on='location', right_on='building_id', how='left') agent.diary = agent.diary.drop(columns=['building_id']) agent.diary = self.city.to_mercator(agent.diary)
# ============================================================================= # AUXILIARY METHODS # ============================================================================= def _choose_destination(visit_freqs, x, rng): """ Select destination using preferential return from allowed, visited buildings. Falls back to uniform random selection if no visited buildings are available. Parameters ---------- visit_freqs : pandas.DataFrame DataFrame with building IDs as index and 'freq' column x : pandas.DataFrame Subset of visit_freqs with allowed, visited buildings (freq > 0) rng : numpy.random.Generator Random number generator Returns ------- str Building ID of selected destination """ if not x.empty and x['freq'].sum() > 0: return rng.choice(x.index, p=x['freq']/x['freq'].sum()) else: return rng.choice(visit_freqs.index)
[docs] def allowed_buildings(local_ts): """ Finds allowed buildings for the timestamp """ hour = local_ts.hour return ALLOWED_BUILDINGS[hour]