from shapely.geometry import box, Polygon, LineString, MultiLineString, Point, MultiPolygon
from shapely import wkt
from shapely.affinity import scale, translate, rotate as shapely_rotate
from shapely.ops import unary_union
from shapely.prepared import prep
import pickle
import pandas as pd
import numpy as np
import numpy.random as npr
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
from matplotlib import cm
from matplotlib.colors import Normalize
from matplotlib.cm import ScalarMappable
from matplotlib.patches import Patch
import networkx as nx
import geopandas as gpd
import os
import time
import pdb
import warnings
from typing import List, Tuple, Set
from pyproj import CRS
from nomad.map_utils import blocks_to_mercator, mercator_to_blocks, blocks_to_mercator_gdf
from nomad.constants import TYPE_PRIORITY
# =============================================================================
# STREET CLASS
# =============================================================================
[docs]
class Street:
"""
Class for street block in the city through which individuals move from building to building.
Attributes
----------
coordinates : tuple
A tuple representing the (x, y) coordinates of the street block.
geometry : shapely.geometry.polygon.Polygon
A polygon representing the geometry of the street block.
id : str
A unique identifier for the street block, formatted as 's-x{coordinates[0]}-y{coordinates[1]}'.
Methods
-------
(none)
"""
def __init__(self, coordinates: tuple = None, geometry: Polygon = None):
if coordinates is not None:
self.coordinates = coordinates
self.geometry = box(coordinates[0], coordinates[1], coordinates[0] + 1, coordinates[1] + 1)
elif geometry is not None:
if not geometry.is_valid:
raise ValueError("Invalid street geometry")
bounds = geometry.bounds
if abs(bounds[2] - bounds[0] - 1) > 0.01 or abs(bounds[3] - bounds[1] - 1) > 0.01:
raise ValueError("Street geometry must represent a 1x1 block")
self.coordinates = (int(bounds[0]), int(bounds[1]))
self.geometry = geometry
else:
raise ValueError("Either coordinates or geometry must be provided")
self.id = f's-x{self.coordinates[0]}-y{self.coordinates[1]}'
# =============================================================================
# CITY CLASS
# =============================================================================
[docs]
class City:
"""
Class for representing a city containing buildings, streets, and methods for city management.
Attributes
----------
buildings : dict
A dictionary of Building objects with their IDs as keys.
streets : dict
A dictionary of Street objects with their coordinates as keys.
buildings_outline : shapely.geometry.polygon.Polygon
A polygon representing the combined geometry of all buildings in the city.
city_boundary : shapely.geometry.polygon.Polygon
A polygon representing the boundary of the city.
dimensions : tuple
A tuple representing the dimensions of the city (width, height).
street_graph : dict
A dictionary representing the graph of streets with their neighbors.
shortest_paths : dict
A dictionary containing the shortest paths between all pairs of streets.
gravity : pandas.DataFrame
A DataFrame containing the gravity values between all pairs of streets.
"""
def __init__(self,
dimensions: tuple = (0,0),
manual_streets: bool = False,
name: str = "Garden City",
block_side_length: float = 15.0,
web_mercator_origin_x: float = -4265699.0,
web_mercator_origin_y: float = 4392976.0,
rotation_deg: float = 0.0,
offset_x: int = 0,
offset_y: int = 0):
"""
Initializes the City object with given dimensions and optionally manual streets.
Parameters
----------
dimensions : tuple
A tuple representing the dimensions of the city (width, height).
manual_streets : bool
If True, streets must be manually added to the city.
If False, all blocks are initialized as streets until it is populated with a building.
name : str, optional
Name of the city (default: "Garden City").
block_side_length : float, optional
Side length of city blocks in meters (default: 15.0).
web_mercator_origin_x : float, optional
False easting for Web Mercator projection (default: -4265699.0).
web_mercator_origin_y : float, optional
False northing for Web Mercator projection (default: 4392976.0).
rotation_deg : float, optional
Rotation applied to input data in degrees counterclockwise (default: 0.0).
offset_x : int, optional
Grid offset in block units along x-axis (default: 0).
offset_y : int, optional
Grid offset in block units along y-axis (default: 0).
"""
self.name = name
self.block_side_length = block_side_length
self.web_mercator_origin_x = web_mercator_origin_x
self.web_mercator_origin_y = web_mercator_origin_y
self.rotation_deg = rotation_deg
self.offset_x = offset_x
self.offset_y = offset_y
self.buildings_outline = Polygon()
self.manual_streets = manual_streets
self.city_boundary = box(0, 0, dimensions[0], dimensions[1])
self.dimensions = dimensions
self.buildings_gdf = gpd.GeoDataFrame(
columns=['id','building_type','door_cell_x','door_cell_y','door_point','size','geometry','blocks'],
geometry='geometry', crs=None
)
self.buildings_gdf.set_index('id', inplace=True, drop=False)
self.buildings_gdf.index.name = None
if manual_streets:
self.blocks_gdf = gpd.GeoDataFrame(
columns=['coord_x','coord_y','building_id','building_type','geometry'],
geometry='geometry', crs=None
)
self.blocks_gdf.set_index(['coord_x', 'coord_y'], inplace=True, drop=False)
self.blocks_gdf.index.names = [None, None]
self.streets_gdf = gpd.GeoDataFrame(
columns=['coord_x','coord_y','id','geometry'],
geometry='geometry', crs=None
)
self.streets_gdf.set_index(['coord_x', 'coord_y'], inplace=True, drop=False)
self.streets_gdf.index.names = [None, None]
else:
self.blocks_gdf = self._init_blocks_gdf()
self.streets_gdf = self._derive_streets_from_blocks()
# Convenience properties are defined below for GDF-first access
# Precomputed building-to-building gravity (optional, built on demand)
self.grav = None
self.door_dist = None
# Precomputed shortest paths (optional, built on demand via compute_shortest_paths)
self.shortest_paths = None
self.street_graph = None
# Hub network for efficient gravity computation (optional, built on demand)
self.hubs = None
self.hub_df = None
def _derive_streets_from_blocks(self):
"""
Derives streets GeoDataFrame from blocks_gdf rows tagged as streets.
"""
if self.blocks_gdf.empty:
self.blocks_gdf = self._init_blocks_gdf()
streets = self.blocks_gdf[self.blocks_gdf['building_type'] == 'street'].copy()
if not streets.empty:
streets['id'] = streets['coord_x'].astype(int).astype(str) + '-y' + streets['coord_y'].astype(int).astype(str)
streets['id'] = 's-x' + streets['id']
streets_gdf = gpd.GeoDataFrame(streets, geometry='geometry', crs=self.blocks_gdf.crs)
streets_gdf.set_index(['coord_x', 'coord_y'], inplace=True, drop=False)
streets_gdf.index.names = [None, None]
return streets_gdf
def _init_blocks_gdf(self):
"""Initialize grid of unit blocks covering current dimensions."""
width, height = self.dimensions
if width <= 0 or height <= 0:
return gpd.GeoDataFrame(columns=['coord_x','coord_y','building_id','building_type','geometry'], geometry='geometry', crs=None)
x_coords = np.arange(width) # must be in city_block units
y_coords = np.arange(height) # must be in city_block units
# Create a MultiIndex from the product of x and y coordinates
multi_index = pd.MultiIndex.from_product([x_coords, y_coords], names=['coord_x', 'coord_y'])
# Create a DataFrame from the MultiIndex
df = pd.DataFrame(index=multi_index).reset_index()
# Add other columns
df['building_type'] = 'street'
df['building_id'] = None
df['geometry'] = df.apply(lambda row: box(row['coord_x'], row['coord_y'], row['coord_x']+1, row['coord_y']+1), axis=1)
blocks_gdf = gpd.GeoDataFrame(df, geometry='geometry', crs=None)
blocks_gdf.set_index(['coord_x', 'coord_y'], inplace=True, drop=False)
blocks_gdf.index.names = [None, None]
return blocks_gdf
@property
def buildings_df(self):
return self.buildings_gdf
@property
def streets_df(self):
return self.streets_gdf
[docs]
def add_street(self, coords):
"""
Adds a street to the city at the specified (x, y) coordinates.
"""
x, y = coords
# ensure blocks_gdf marks as street
if hasattr(self, 'blocks_gdf'):
mask = (self.blocks_gdf['coord_x'] == x) & (self.blocks_gdf['coord_y'] == y)
if mask.any():
self.blocks_gdf.loc[mask, ['building_id','building_type']] = [None, 'street']
else:
self.blocks_gdf = pd.concat([self.blocks_gdf, gpd.GeoDataFrame([{
'coord_x': x,
'coord_y': y,
'building_id': None,
'building_type': None,
'geometry': box(x, y, x+1, y+1)
}], geometry='geometry', crs=self.blocks_gdf.crs)], ignore_index=True)
# update streets_gdf
if hasattr(self, 'streets_gdf'):
sid = f's-x{x}-y{y}'
row = {'coord_x': x, 'coord_y': y, 'id': sid, 'geometry': box(x, y, x+1, y+1)}
self.streets_gdf = pd.concat([self.streets_gdf, gpd.GeoDataFrame([row], geometry='geometry', crs=self.streets_gdf.crs)], ignore_index=True)
[docs]
def add_building(self, building_type, door, geom=None, blocks=None, gdf_row=None):
"""
Adds a building to the city with the specified type, door location, and geometry.
Parameters
----------
building_type : str
The type of the building ('home', 'workplace', 'retail', 'park').
door : tuple
A tuple representing the (x, y) coordinates of the door of the building.
geom : shapely.geometry.polygon.Polygon, optional
The geometry of the building (can be a box or MultiPolygon).
blocks : list of tuples, optional
A list of (x, y) coordinates representing the blocks occupied by the building.
gdf_row : geopandas.GeoDataFrame or pandas.Series, optional
A single row GeoDataFrame or Series containing building information.
Raises
------
ValueError
If the door is not on an existing street, if the building overlaps with existing buildings,
or if the geometry/blocks do not align with the grid.
"""
if gdf_row is not None:
if isinstance(gdf_row, pd.Series):
gdf_row = gpd.GeoDataFrame([gdf_row], geometry='geometry')
elif not isinstance(gdf_row, gpd.GeoDataFrame) or len(gdf_row) != 1:
raise ValueError("gdf_row must be a GeoDataFrame with exactly one row or a pandas Series.")
# Use explicit column if present; otherwise rely on provided argument
building_type = gdf_row.iloc[0]['building_type']
door = (gdf_row.iloc[0]['door_cell_x'], gdf_row.iloc[0]['door_cell_y'])
geom = gdf_row.iloc[0]['geometry'] if 'geometry' in gdf_row.columns else geom
# If geom is a list of blocks, reassign it to blocks
if isinstance(geom, list):
blocks = geom
geom = None
# If geom is None and blocks are provided, construct geometry from blocks
if geom is None and blocks is not None and len(blocks) > 0:
block_polys = [box(x, y, x+1, y+1) for x, y in blocks]
geom = MultiPolygon(block_polys).envelope if len(block_polys) > 1 else block_polys[0]
# Validate that blocks align with grid
for x, y in blocks:
if (x, y) not in self.blocks_gdf.index:
raise ValueError(f"Block ({x}, {y}) does not align with city grid.")
elif geom is not None and blocks is None:
# Validate that geometry aligns with grid by checking bounds
minx, miny, maxx, maxy = geom.bounds
if not (minx.is_integer() and miny.is_integer() and maxx.is_integer() and maxy.is_integer()):
raise ValueError(f"Geometry bounds {geom.bounds} do not align with integer grid.")
# Derive blocks from geometry bounds for validation
blocks = [(int(x), int(y)) for x in range(int(minx), int(maxx))
for y in range(int(miny), int(maxy))
if minx <= x < maxx and miny <= y < maxy]
for x, y in blocks:
if (x, y) not in self.blocks_gdf.index:
raise ValueError(f"Derived block ({x}, {y}) from geometry does not align with city grid.")
elif geom is None and (blocks is None or len(blocks) == 0):
raise ValueError("Either geom or blocks must be provided.")
# Compute door centroid via intersection with target street
door_poly = box(door[0], door[1], door[0]+1, door[1]+1)
door_line = geom.intersection(door_poly)
# Require true line adjacency (not area overlap) to the door cell
if door_line.is_empty or not isinstance(door_line, LineString):
raise ValueError(f"Door {door} must be adjacent to new building.")
else:
door_centroid = (door_line.centroid.x, door_line.centroid.y)
# Note: street network adjacency is no longer required. Door adjacency
# is validated via door_line above.
if blocks is not None and len(blocks) > 0:
existing_building_blocks = set()
if hasattr(self, 'blocks_gdf') and not self.blocks_gdf.empty:
existing_building_blocks = set(
self.blocks_gdf[(self.blocks_gdf['building_type'].notna()) & (self.blocks_gdf['building_type'] != 'street')].index.tolist()
)
if set(blocks) & existing_building_blocks:
raise ValueError(
"New building overlaps with existing buildings."
)
else:
if self.buildings_outline.contains(geom) or self.buildings_outline.overlaps(geom):
raise ValueError(
"New building overlaps with existing buildings."
)
# Derive blocks from geom if not provided
if blocks is None:
minx, miny, maxx, maxy = geom.bounds
blocks = [(int(x), int(y)) for x in range(int(minx), int(maxx) + 1)
for y in range(int(miny), int(maxy) + 1)
if minx <= x < maxx and miny <= y < maxy]
# add building
if gdf_row is not None and isinstance(gdf_row, gpd.GeoDataFrame) and 'id' in gdf_row.columns:
building_id = str(gdf_row.iloc[0]['id'])
else:
# pick building block adjacent to door using adjacency helper
candidate = None
if blocks is not None and len(blocks) > 0:
mask = self.check_adjacent(blocks, door) # list[bool]
idx = next((i for i, m in enumerate(mask) if m), None)
candidate = blocks[idx] if idx is not None else blocks[0]
if candidate is None:
candidate = door
building_id = f"{building_type[0]}-x{int(candidate[0])}-y{int(candidate[1])}"
self.buildings_outline = unary_union([self.buildings_outline, geom])
# Append to buildings_gdf
new_row = gpd.GeoDataFrame([
{
'id': building_id,
'building_type': building_type,
'door_cell_x': door[0],
'door_cell_y': door[1],
'door_point': door_centroid,
'size': len(blocks),
'geometry': geom,
'blocks': list(blocks)
}
], geometry='geometry', crs=self.buildings_gdf.crs)
new_row.set_index('id', inplace=True, drop=False)
new_row.index.name = None
# Avoid FutureWarning by assigning directly when empty
if self.buildings_gdf.empty:
self.buildings_gdf = new_row
else:
# Preserve id index across concatenations
self.buildings_gdf = pd.concat([self.buildings_gdf, new_row], axis=0, ignore_index=False)
# blocks are no longer streets
block_index = pd.MultiIndex.from_tuples(blocks)
if hasattr(self, 'blocks_gdf') and not self.blocks_gdf.empty:
self.blocks_gdf.loc[block_index, 'building_id'] = building_id
self.blocks_gdf.loc[block_index, 'building_type'] = building_type
if hasattr(self, 'streets_gdf') and not self.streets_gdf.empty:
drop_index = self.streets_gdf.index.intersection(block_index)
if len(drop_index):
self.streets_gdf = self.streets_gdf.drop(index=drop_index)
# expand city boundary if necessary
minx, miny, maxx, maxy = geom.bounds
if maxx > self.dimensions[0] or maxy > self.dimensions[1]:
new_width = max(self.dimensions[0], int(np.ceil(maxx)))
new_height = max(self.dimensions[1], int(np.ceil(maxy)))
self.dimensions = (new_width, new_height)
self.city_boundary = box(0, 0, new_width, new_height)
self.blocks_gdf = self._init_blocks_gdf()
self.streets_gdf = self._derive_streets_from_blocks()
[docs]
def add_buildings_from_gdf(self, buildings_gdf):
"""
Initialize or add multiple buildings from a GeoDataFrame.
Parameters
----------
buildings_gdf : geopandas.GeoDataFrame
A GeoDataFrame containing building information with columns for type, door coordinates, and geometry.
Raises
------
ValueError
If the GeoDataFrame lacks required columns or contains invalid data.
"""
required_columns = ['building_type', 'door_cell_x', 'door_cell_y', 'geometry']
if not all(col in buildings_gdf.columns for col in required_columns):
raise ValueError(f"GeoDataFrame must contain columns: {required_columns}")
for _, row in buildings_gdf.iterrows():
self.add_building(
building_type=row['building_type'],
door=(row['door_cell_x'], row['door_cell_y']),
geom=row['geometry'],
gdf_row=buildings_gdf.loc[[row.name]]
)
[docs]
def get_block(self, coordinates):
"""
Returns information about the block at the given coordinates.
Parameters
----------
coordinates : tuple
A tuple representing the (x, y) coordinates of the block.
Returns
-------
dict
A dictionary containing the kind of block ('street' or 'building'), the ID of the building if
applicable, the type of building if applicable, and the geometry of the block.
"""
if not isinstance(coordinates, tuple) or len(coordinates) != 2:
raise ValueError("Coordinates must be a tuple of (x, y).")
x, y = coordinates
if not (0 <= x < self.dimensions[0] and 0 <= y < self.dimensions[1]):
raise ValueError(f"Coordinates {coordinates} out of city bounds {self.dimensions}")
row = self.blocks_gdf.loc[(x, y)]
return {
'building_type': row['building_type'],
'building_id': row['building_id'],
'geometry': row['geometry']
}
[docs]
def check_adjacent(self, geom1, geom2, graph=None):
"""Adjacency on a grid of 1x1 blocks. Returns list[bool]."""
# Case 1: geom2 is a block cell (x, y)
if isinstance(geom2, tuple):
bx, by = int(geom2[0]), int(geom2[1])
if graph is not None:
nb = set(graph.neighbors((bx, by)))
else:
nb = {(bx+1, by), (bx-1, by), (bx, by+1), (bx, by-1)}
if isinstance(geom1, tuple):
cx, cy = int(geom1[0]), int(geom1[1])
return [((cx, cy) in nb)]
if isinstance(geom1, list):
return [((int(b[0]), int(b[1])) in nb) for b in geom1]
if isinstance(geom1, (Polygon, MultiPolygon)):
cell_poly = box(bx, by, bx+1, by+1)
return [isinstance(geom1.intersection(cell_poly), LineString)]
raise TypeError("geom1 must be a block tuple, list of block tuples, or shapely (Multi)Polygon when geom2 is a block")
# Case 2: geom2 is a geometry
if hasattr(geom2, 'intersection'):
if isinstance(geom1, tuple):
cx, cy = int(geom1[0]), int(geom1[1])
poly = box(cx, cy, cx+1, cy+1)
return [isinstance(poly.intersection(geom2), LineString)]
if isinstance(geom1, list):
return [isinstance(box(int(b[0]), int(b[1]), int(b[0])+1, int(b[1])+1).intersection(geom2), LineString) for b in geom1]
return [isinstance(geom1.intersection(geom2), LineString)]
raise TypeError("Unsupported types: geom1 must be block tuple/list or shapely geometry; geom2 must be block tuple or shapely geometry")
[docs]
def get_street_graph(self):
"""Build the street graph needed for routing."""
if self.street_graph is not None:
return self.street_graph
self.street_graph = self._build_adjacency_graph(self.streets_gdf)
return self.street_graph
def _build_adjacency_graph(self, gdf):
"""Build grid-adjacency graph from GeoDataFrame with coord_x, coord_y."""
nodes_df = gdf[['coord_x', 'coord_y']].copy()
edge_list = []
for dx, dy in [(1, 0), (0, 1)]:
shifted = nodes_df.copy()
shifted['nx'] = nodes_df['coord_x'] + dx
shifted['ny'] = nodes_df['coord_y'] + dy
# Merge to find neighbor pairs
merged = shifted.merge(
nodes_df.rename(columns={'coord_x': 'nx', 'coord_y': 'ny'}),
on=['nx', 'ny'], how='inner'
)
edge_list.append(
merged[['coord_x', 'coord_y', 'nx', 'ny']].rename(columns={'coord_x': 'x', 'coord_y': 'y'})
)
edges_df = pd.concat(edge_list, ignore_index=True)
# Create graph
G = nx.Graph()
G.add_nodes_from([(int(x), int(y)) for x, y in nodes_df.values])
edge_list = [((int(r.x), int(r.y)), (int(r.nx), int(r.ny))) for r in edges_df.itertuples(index=False)]
G.add_edges_from(edge_list, weight=1)
return G
[docs]
def compute_shortest_paths(self, callable_only=False):
"""
Compute shortest paths between street blocks.
Parameters
----------
callable_only : bool, default False
If False, precompute all-pairs shortest paths (only for small cities)
If True, store on-demand callable (placeholder for hub-based routing)
Notes
-----
Callable mode currently uses nx.shortest_path() on demand—this is a
PLACEHOLDER. Production use with _sample_step requires hub-based routing
for scalability (billions of calls). Current implementation too slow for
large-scale trajectory generation.
Stores result in self.shortest_paths as dict or callable
"""
G = self.get_street_graph()
if callable_only:
# TODO: Replace with hub-based routing for production use
# Current on-demand nx.shortest_path is too slow for billion+ calls in _sample_step
def compute_path(start_coord, end_coord):
try:
return nx.shortest_path(G, start_coord, end_coord)
except nx.NetworkXNoPath:
return []
self.shortest_paths = compute_path
else:
# Dense storage for small cities only
self.shortest_paths = dict(nx.all_pairs_shortest_path(G))
# ---------------------------------------------------------------------
# Shortcut ("highway") network for fast on-demand routing
# ---------------------------------------------------------------------
def _build_hub_network(self, hub_size=100):
"""
Build a sparse shortcut routing structure that enables near-instant shortest
path queries with low memory:
- Select a well-distributed subset of street blocks as hubs
- Precompute hub-to-hub distances in street_graph
"""
G = self.get_street_graph()
hubs = self._select_hubs(hub_size)
# Build edge list with hub tuples as indices
rows = []
for origin in hubs:
distances = nx.single_source_shortest_path_length(G, origin)
for dest in hubs:
if origin == dest:
continue
rows.append({'origin': origin, 'dest': dest, 'distance': distances[dest]})
edge_list = pd.DataFrame(rows)
# Pivot to dense adjacency matrix with hub tuples as row/column indices
hub_df = edge_list.pivot(index='origin', columns='dest', values='distance').fillna(0).astype(int)
self.hubs = hubs
self.hub_df = hub_df
return hub_df
def _select_hubs(self, hub_size = 100):
"""
Compute evenly spaced street blocks as hubs by partitioning coordinates into quantile buckets
and selecting one hub from each bucket pair. Only selects from largest connected component
to ensure all hubs are mutually reachable.
"""
G = self.get_street_graph()
if not nx.is_connected(G):
largest_component = max(nx.connected_components(G), key=len)
streets_subset = self.streets_gdf.loc[self.streets_gdf.index.intersection(largest_component)]
else:
streets_subset = self.streets_gdf
n_buckets = int(np.sqrt(hub_size))
# Create quantile buckets (not added to the dataframe)
x_buckets = pd.qcut(streets_subset['coord_x'], n_buckets, labels=False, duplicates='drop')
y_buckets = pd.qcut(streets_subset['coord_y'], n_buckets, labels=False, duplicates='drop')
# Select one hub from each (x_bucket, y_bucket) pair
hubs = []
for x_bucket in range(n_buckets):
for y_bucket in range(n_buckets):
subset = streets_subset[(x_buckets == x_bucket) & (y_buckets == y_bucket)]
if not subset.empty:
hub_coord = subset.index[0]
# Convert to Python int tuple to match graph node format
hubs.append((int(hub_coord[0]), int(hub_coord[1])))
return hubs
# ---------------------------------------------------------------------
# Building-to-building gravity
# ---------------------------------------------------------------------
def _pairwise_manhattan(self, x1, y1, x2, y2):
"""
Compute pairwise Manhattan distances between two sets of coordinates.
"""
dx = np.abs(x1[:, None] - x2[None, :])
dy = np.abs(y1[:, None] - y2[None, :])
return dx + dy
[docs]
def compute_gravity(self, exponent=2.0, callable_only=False, n_chunks=10, use_proxy_hub_distance=True):
"""Precompute building-to-building gravity from door-to-door distances.
Parameters
----------
exponent : float
The gravity decay exponent (default 2.0)
callable_only : bool
If True, store callable function instead of dense matrix (default False)
n_chunks : int
Number of chunks for computing nearby doors in hub mode (default 10)
use_proxy_hub_distance : bool
If True, use hub-based distance approximation (fast, scales to large cities).
If False, compute true graph distances between all door pairs (slow but exact,
suitable for small cities with <200 buildings). Default True.
Notes
-----
Hub mode approximates ``dist(i, j)`` as the distance from
``door_i`` to ``hub_i``, plus the graph distance from ``hub_i`` to
``hub_j``, plus the distance from ``hub_j`` to ``door_j``.
True mode computes ``shortest_path_length(door_i, door_j)`` on the
street graph.
Stores result in self.grav as DataFrame (callable_only=False) or callable (callable_only=True)
Persistence:
- Hub mode (use_proxy_hub_distance=True): Gravity infrastructure is saved by
save_geopackage() and restored by from_geopackage(load_gravity=True). The
city.grav callable works immediately after loading.
- True distance mode (use_proxy_hub_distance=False): Not persisted. Must call
compute_gravity(..., use_proxy_hub_distance=False) after loading. This is
fast for small cities (<200 buildings) where this mode is intended.
"""
building_ids = self.buildings_gdf['id'].to_numpy()
door_x = self.buildings_gdf['door_cell_x'].astype(int).to_numpy()
door_y = self.buildings_gdf['door_cell_y'].astype(int).to_numpy()
if use_proxy_hub_distance:
if self.hubs is None:
hub_size = int(len(self.buildings_gdf) / 1000) + 5
warnings.warn(f"Hub network not available. Initializing with {hub_size} hubs.")
self._build_hub_network(hub_size=hub_size)
hubs = np.array(self.hubs)
door_to_hub_dist = self._pairwise_manhattan(door_x, door_y, hubs[:, 0], hubs[:, 1])
closest_hub_idx = door_to_hub_dist.argmin(axis=1).astype(np.int32)
dist_to_closest_hub = door_to_hub_dist.min(axis=1).astype(np.int32)
self.grav_hub_info = pd.DataFrame({
'closest_hub_idx': closest_hub_idx,
'dist_to_hub': dist_to_closest_hub
}, index=building_ids)
# Compute close pairs in chunks to avoid memory issues
n = len(building_ids)
chunk_size = max(1, n // n_chunks)
bid_i_list = []
bid_j_list = []
dist_list = []
for i_start in range(0, n, chunk_size):
i_end = min(i_start + chunk_size, n)
chunk_dist = self._pairwise_manhattan(
door_x[i_start:i_end], door_y[i_start:i_end],
door_x, door_y
).astype(np.int32)
min_hub_dist = np.minimum(
dist_to_closest_hub[i_start:i_end, None],
dist_to_closest_hub[None, :]
)
is_close = chunk_dist <= min_hub_dist
i_local, j_global = np.where(is_close)
i_global = i_local + i_start
upper_mask = i_global < j_global
if upper_mask.any():
bid_i_list.append(building_ids[i_global[upper_mask]])
bid_j_list.append(building_ids[j_global[upper_mask]])
dist_list.append(chunk_dist[i_local[upper_mask], j_global[upper_mask]])
if bid_i_list:
self.mh_dist_nearby_doors = pd.Series(
np.concatenate(dist_list),
index=pd.MultiIndex.from_arrays([
np.concatenate(bid_i_list),
np.concatenate(bid_j_list)
])
)
else:
self.mh_dist_nearby_doors = pd.Series([], dtype=np.int32, index=pd.MultiIndex.from_arrays([[], []]))
# Fix: Buildings sharing same door have Manhattan distance 0, set to 1 to avoid inf gravity
door_groups = self.buildings_gdf.groupby(['door_cell_x', 'door_cell_y'])['id']
same_door_mask = door_groups.transform('size') > 1
same_door_buildings = set(self.buildings_gdf[same_door_mask]['id'].values)
if same_door_buildings and len(self.mh_dist_nearby_doors) > 0:
zero_dist_mask = (self.mh_dist_nearby_doors == 0)
for (bid_i, bid_j) in self.mh_dist_nearby_doors[zero_dist_mask].index:
if bid_i in same_door_buildings and bid_j in same_door_buildings:
self.mh_dist_nearby_doors.loc[(bid_i, bid_j)] = 1
if callable_only:
bid_to_idx = {bid: i for i, bid in enumerate(building_ids)}
hub_to_hub = self.hub_df.values
def compute_gravity_row(building_id):
idx = bid_to_idx[building_id]
hub_i = closest_hub_idx[idx]
dist_to_hub_i = dist_to_closest_hub[idx]
distances = dist_to_hub_i + hub_to_hub[hub_i, closest_hub_idx] + dist_to_closest_hub
for (bid_i, bid_j), d in self.mh_dist_nearby_doors.items():
if bid_i == building_id:
distances[bid_to_idx[bid_j]] = d
elif bid_j == building_id:
distances[bid_to_idx[bid_i]] = d
distances[idx] = 1
gravity_row = 1.0 / (distances ** exponent)
gravity_row[idx] = 0.0
return pd.Series(gravity_row, index=building_ids)
self.grav = compute_gravity_row
else:
hub_to_hub = self.hub_df.values
dist_matrix = dist_to_closest_hub[:, None] + hub_to_hub[closest_hub_idx[:, None], closest_hub_idx[None, :]] + dist_to_closest_hub[None, :]
bid_to_idx = {bid: i for i, bid in enumerate(building_ids)}
for (bid_i, bid_j), d in self.mh_dist_nearby_doors.items():
i, j = bid_to_idx[bid_i], bid_to_idx[bid_j]
dist_matrix[i, j] = dist_matrix[j, i] = d
np.fill_diagonal(dist_matrix, 1)
gravity = 1.0 / (dist_matrix ** exponent)
np.fill_diagonal(gravity, 0.0)
self.grav = pd.DataFrame(gravity, index=building_ids, columns=building_ids)
else:
G = self.get_street_graph()
door_coords = list(zip(door_x, door_y))
rows = []
for i, (bid_i, door_i) in enumerate(zip(building_ids, door_coords)):
distances = nx.single_source_shortest_path_length(G, door_i)
for j, (bid_j, door_j) in enumerate(zip(building_ids, door_coords)):
if i >= j:
continue
dist = distances.get(door_j, np.inf)
rows.append({'bid_i': bid_i, 'bid_j': bid_j, 'distance': dist})
if rows:
dist_df = pd.DataFrame(rows)
dist_matrix = np.full((len(building_ids), len(building_ids)), np.inf)
bid_to_idx = {bid: i for i, bid in enumerate(building_ids)}
for row in dist_df.itertuples(index=False):
i, j = bid_to_idx[row.bid_i], bid_to_idx[row.bid_j]
dist_matrix[i, j] = dist_matrix[j, i] = row.distance
else:
dist_matrix = np.full((len(building_ids), len(building_ids)), np.inf)
# Fix: Buildings sharing same door have distance 0, set to 1 to avoid inf gravity
dist_matrix[dist_matrix == 0] = 1
np.fill_diagonal(dist_matrix, 1)
gravity = 1.0 / (dist_matrix ** exponent)
np.fill_diagonal(gravity, 0.0)
if callable_only:
grav_df = pd.DataFrame(gravity, index=building_ids, columns=building_ids)
def compute_gravity_row(building_id):
return grav_df.loc[building_id]
self.grav = compute_gravity_row
else:
self.grav = pd.DataFrame(gravity, index=building_ids, columns=building_ids)
[docs]
def save(self, filename):
"""
Saves the city object to a file.
Parameters
----------
filename : str
The name of the file to save the city object to.
"""
with open(filename, 'wb') as file:
pickle.dump(self, file)
[docs]
def plot_city(self, ax, doors=True, address=True, zorder=1, heatmap_agent=None, colors=None, alpha=1, legend=False):
"""
Plots the city on a given matplotlib axis.
Parameters
----------
ax : matplotlib.axes.Axes
The axis on which to plot the city.
doors : bool
Whether to plot doors of buildings.
address : bool
Whether to plot the address of buildings.
zorder : int
The z-order of the plot.
heatmap_agent : Agent
The agent for which to plot a heatmap of time spent in each building.
colors : dict
A dictionary mapping building types to colors for plotting.
legend : bool
Whether to display a legend for building types.
"""
# Draw city boundary
x, y = self.city_boundary.exterior.xy
ax.plot(np.array(x), np.array(y), linewidth=2, color='black')
# Colors
if colors is None:
colors = {
'street': 'white',
'home': 'skyblue',
'workplace': '#C9A0DC',
'retail': 'lightgrey',
'park': 'lightgreen',
'default': 'lightcoral'
}
# Streets: fill each street cell
if hasattr(self, 'streets_gdf') and not self.streets_gdf.empty:
for _, s in self.streets_gdf.iterrows():
sx, sy = s.geometry.exterior.xy
ax.fill(sx, sy, facecolor=colors['street'], linewidth=0.5, zorder=zorder)
# Buildings
if heatmap_agent is not None and not self.buildings_gdf.empty:
weights = heatmap_agent.diary.groupby('location').duration.sum()
norm = Normalize(vmin=0, vmax=max(weights)/2 if len(weights) else 1)
base_color = np.array([1, 0, 0])
for _, b in self.buildings_gdf.iterrows():
geom = b.geometry
weight = weights.get(b['id'], 0)
a = norm(weight) if weight > 0 else 0
if isinstance(geom, (Polygon, MultiPolygon)):
polys = [geom] if isinstance(geom, Polygon) else list(geom.geoms)
for poly in polys:
bx, by = poly.exterior.xy
ax.fill(bx, by, facecolor=base_color, alpha=a,
edgecolor='black', linewidth=0.5,
zorder=zorder)
ax.plot(bx, by, color='black', alpha=1, linewidth=0.5, zorder=zorder + 1)
for interior_ring in poly.interiors:
x_int, y_int = interior_ring.xy
ax.plot(x_int, y_int, color='black', linewidth=0.5, zorder=zorder + 1)
ax.fill(x_int, y_int, facecolor='white', zorder=zorder + 1)
if doors:
door_x = float(b['door_cell_x'])
door_y = float(b['door_cell_y'])
door_line = geom.intersection(box(door_x, door_y, door_x+1, door_y+1))
scaled = scale(door_line, xfact=0.25, yfact=0.25, origin=door_line.centroid)
if isinstance(scaled, LineString):
dx, dy = scaled.xy
ax.plot(dx, dy, linewidth=2, color='white', zorder=zorder)
if address:
door_coord = (int(b['door_cell_x']), int(b['door_cell_y']))
bbox = ax.get_window_extent().transformed(plt.gcf().dpi_scale_trans.inverted())
axes_width_in_inches = bbox.width
axes_data_range = ax.get_xlim()[1] - ax.get_xlim()[0]
fontsize = (axes_width_in_inches / axes_data_range) * 13
ax.text(door_coord[0] + 0.15, door_coord[1] + 0.15,
f"{door_coord[0]}, {door_coord[1]}",
ha='left', va='bottom', fontsize=fontsize, color='black')
sm = ScalarMappable(cmap=cm.Reds, norm=norm)
sm.set_array([])
plt.colorbar(sm, ax=ax, shrink=0.5, aspect=10, pad=0.02).set_label('Minutes Spent')
elif not self.buildings_gdf.empty:
for _, b in self.buildings_gdf.iterrows():
geom = b.geometry
bcolor = colors.get(b['building_type'], colors['default'])
if isinstance(geom, (Polygon, MultiPolygon)):
polys = [geom] if isinstance(geom, Polygon) else list(geom.geoms)
for poly in polys:
bx, by = poly.exterior.xy
ax.fill(bx, by, facecolor=bcolor,
edgecolor='black', linewidth=0.5, alpha=alpha,
zorder=zorder)
for interior_ring in poly.interiors:
x_int, y_int = interior_ring.xy
ax.plot(x_int, y_int, color='black', linewidth=0.5, zorder=zorder + 1)
ax.fill(x_int, y_int, facecolor='white', zorder=zorder + 1)
if doors:
door_x = float(b['door_cell_x'])
door_y = float(b['door_cell_y'])
door_line = geom.intersection(box(door_x, door_y, door_x+1, door_y+1))
scaled = scale(door_line, xfact=0.25, yfact=0.25, origin=door_line.centroid)
if isinstance(scaled, LineString):
dx, dy = scaled.xy
ax.plot(dx, dy, linewidth=2, color='white', zorder=zorder + 2)
elif isinstance(scaled, MultiLineString):
for single_line in scaled.geoms:
dx, dy = single_line.xy
ax.plot(dx, dy, linewidth=2, color='white', zorder=zorder + 2)
if address:
door_coord = (int(b['door_cell_x']), int(b['door_cell_y']))
bbox = ax.get_window_extent().transformed(plt.gcf().dpi_scale_trans.inverted())
axes_width_in_inches = bbox.width
axes_data_range = ax.get_xlim()[1] - ax.get_xlim()[0]
fontsize = (axes_width_in_inches / axes_data_range) * 13
ax.text(door_coord[0] + 0.15, door_coord[1] + 0.15,
f"{door_coord[0]}, {door_coord[1]}",
ha='left', va='bottom', fontsize=fontsize, color='black')
# Add legend if requested
if legend and not self.buildings_gdf.empty:
building_types = self.buildings_gdf['building_type'].unique()
legend_elements = []
for btype in building_types:
if btype in colors:
legend_elements.append(Patch(facecolor=colors[btype], edgecolor='black', label=btype.capitalize()))
if legend_elements:
ax.legend(handles=legend_elements, loc='upper right', framealpha=0.9)
ax.set_aspect('equal')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
[docs]
def save_geopackage(self, gpkg_path, persist_blocks=True,
persist_city_properties=True, persist_gravity_data=True,
reverse_affine=False,
edges_path=None, street_graphml_path=None):
"""Save city to GeoPackage with all spatial objects for simulation continuity.
Parameters
----------
gpkg_path : str
Path to GeoPackage (.gpkg) for buildings, streets, and auxiliary layers
persist_blocks : bool, default False
If True, save blocks_gdf layer
persist_city_properties : bool, default True
If True, save city properties (name, block_side_length, origins, etc.)
persist_gravity_data : bool, default True
If True, save gravity infrastructure (hubs, distances, nearby doors)
reverse_affine : bool, default False
If True, transform geometries from garden city units to Web Mercator meters
edges_path : str, optional
Path to save edges parquet (ox, oy, dx, dy, distance, gravity)
street_graphml_path : str, optional
Path to save street graph as GraphML
Notes
-----
This method saves everything needed to reload and continue a simulation.
Use from_geopackage() to reload.
"""
from nomad.map_utils import blocks_to_mercator_gdf
if reverse_affine:
buildings_gdf = blocks_to_mercator_gdf(
self.buildings_gdf,
block_size=self.block_side_length,
false_easting=self.web_mercator_origin_x,
false_northing=self.web_mercator_origin_y,
rotation_deg=self.rotation_deg,
drop_garden_cols=False
)
streets_gdf = blocks_to_mercator_gdf(
self.streets_gdf,
block_size=self.block_side_length,
false_easting=self.web_mercator_origin_x,
false_northing=self.web_mercator_origin_y,
rotation_deg=self.rotation_deg,
drop_garden_cols=False
)
else:
buildings_gdf = self.buildings_gdf
streets_gdf = self.streets_gdf
# Convert door_point tuple to two columns for GeoPackage serialization
if 'door_point' in buildings_gdf.columns:
buildings_gdf['door_point_x'] = buildings_gdf['door_point'].apply(lambda p: p[0])
buildings_gdf['door_point_y'] = buildings_gdf['door_point'].apply(lambda p: p[1])
buildings_gdf = buildings_gdf.drop(columns=['door_point'])
buildings_gdf.to_file(gpkg_path, layer='buildings', driver='GPKG')
streets_gdf.to_file(gpkg_path, layer='streets', driver='GPKG')
if persist_blocks and hasattr(self, 'blocks_gdf') and not self.blocks_gdf.empty:
self.blocks_gdf.to_file(gpkg_path, layer='blocks', driver='GPKG')
# Persist city properties
if persist_city_properties:
city_props_gdf = gpd.GeoDataFrame({
'name': [self.name],
'block_side_length': [self.block_side_length],
'web_mercator_origin_x': [self.web_mercator_origin_x],
'web_mercator_origin_y': [self.web_mercator_origin_y],
'city_boundary': [self.city_boundary],
'buildings_outline': [self.buildings_outline],
'geometry': [self.city_boundary] # Primary geometry column
}, crs=self.buildings_gdf.crs)
city_props_gdf.to_file(gpkg_path, layer='city_properties', driver='GPKG')
# Persist gravity infrastructure
if persist_gravity_data and hasattr(self, 'hubs') and self.hubs is not None:
# Save hubs as DataFrame
hubs_df = pd.DataFrame(self.hubs, columns=['hub_x', 'hub_y'])
hubs_gdf = gpd.GeoDataFrame(
hubs_df,
geometry=gpd.points_from_xy(hubs_df['hub_x'], hubs_df['hub_y']),
crs=self.buildings_gdf.crs
)
hubs_gdf.to_file(gpkg_path, layer='hubs', driver='GPKG')
# Save hub_df (hub-to-hub distances) as flattened array
if hasattr(self, 'hub_df') and self.hub_df is not None:
n_hubs = len(self.hubs)
hub_dist_flat = self.hub_df.values.flatten()
hub_dist_df = pd.DataFrame({
'hub_dist_flat': hub_dist_flat,
'n_hubs': [n_hubs] * len(hub_dist_flat)
})
hub_dist_gdf = gpd.GeoDataFrame(hub_dist_df, geometry=[Point(0, 0)] * len(hub_dist_df), crs=self.buildings_gdf.crs)
hub_dist_gdf.to_file(gpkg_path, layer='hub_distances', driver='GPKG')
# Save grav_hub_info (building -> closest hub info)
if hasattr(self, 'grav_hub_info') and self.grav_hub_info is not None:
grav_hub_gdf = gpd.GeoDataFrame(
self.grav_hub_info.reset_index(),
geometry=[Point(0, 0)] * len(self.grav_hub_info),
crs=self.buildings_gdf.crs
)
grav_hub_gdf.to_file(gpkg_path, layer='grav_hub_info', driver='GPKG')
# Save mh_dist_nearby_doors (nearby door pairs)
if hasattr(self, 'mh_dist_nearby_doors') and self.mh_dist_nearby_doors is not None and len(self.mh_dist_nearby_doors) > 0:
nearby_df = self.mh_dist_nearby_doors.reset_index()
nearby_df.columns = ['bid_i', 'bid_j', 'dist']
nearby_gdf = gpd.GeoDataFrame(
nearby_df,
geometry=[Point(0, 0)] * len(nearby_df),
crs=self.buildings_gdf.crs
)
nearby_gdf.to_file(gpkg_path, layer='nearby_doors', driver='GPKG')
# Optional edges persistence (parquet)
if edges_path and hasattr(self, 'gravity') and self.gravity is not None and len(self.gravity) > 0:
edges_df = self.gravity.reset_index()
# Split tuple origins/dests into integer columns
edges_df[['ox', 'oy']] = pd.DataFrame(edges_df['origin'].tolist(), index=edges_df.index)
edges_df[['dx', 'dy']] = pd.DataFrame(edges_df['dest'].tolist(), index=edges_df.index)
edges_df = edges_df.drop(columns=['origin', 'dest'])
edges_df.to_parquet(edges_path, index=False)
# Optional GraphML persistence of the internal grid street graph
if street_graphml_path:
try:
G = self.get_street_graph()
# Remap tuple nodes to string ids and attach coordinates as attributes
H = nx.Graph()
for (x, y) in G.nodes:
node_id = f"{int(x)}_{int(y)}"
H.add_node(node_id, x=int(x), y=int(y))
for u, v, data in G.edges(data=True):
uid = f"{int(u[0])}_{int(u[1])}"
vid = f"{int(v[0])}_{int(v[1])}"
w = data.get('weight', 1)
H.add_edge(uid, vid, weight=int(w))
nx.write_graphml(H, street_graphml_path)
except Exception:
# Silently ignore GraphML write issues to avoid breaking primary persistence
pass
[docs]
@classmethod
def from_geodataframes(cls, buildings_gdf, streets_gdf, blocks_gdf=None, edges_df=None, city_boundary=None):
"""Construct a City from buildings and streets GeoDataFrames."""
if city_boundary is not None:
bounds = city_boundary.bounds
width, height = int(np.ceil(bounds[2])), int(np.ceil(bounds[3]))
else:
max_street_x = int(streets_gdf['coord_x'].max())
max_street_y = int(streets_gdf['coord_y'].max())
max_door_x = int(buildings_gdf['door_cell_x'].max())
max_door_y = int(buildings_gdf['door_cell_y'].max())
width = max(max_street_x, max_door_x) + 1
height = max(max_street_y, max_door_y) + 1
city = cls(dimensions=(width, height), manual_streets=True)
if city_boundary is not None:
city.city_boundary = city_boundary
# Adopt input GeoDataFrames with required columns
city.buildings_gdf = gpd.GeoDataFrame(buildings_gdf, geometry='geometry', crs=buildings_gdf.crs)
if 'building_type' not in city.buildings_gdf.columns:
raise KeyError("buildings_gdf must contain 'building_type'.")
missing_cols = set(['id','door_cell_x','door_cell_y','door_point']) - set(city.buildings_gdf.columns)
if missing_cols:
raise KeyError(f"buildings_gdf missing required columns: {missing_cols}. All buildings must have door_point computed from door_line intersection.")
# Ensure consistent index on id
city.buildings_gdf.set_index('id', inplace=True, drop=False)
city.buildings_gdf.index.name = None
city.buildings_outline = unary_union(city.buildings_gdf.geometry.values) if not city.buildings_gdf.empty else Polygon()
# Blocks/street layers
if blocks_gdf is not None and not blocks_gdf.empty:
city.blocks_gdf = gpd.GeoDataFrame(blocks_gdf, geometry='geometry', crs=blocks_gdf.crs)
city.blocks_gdf['coord_x'] = city.blocks_gdf['coord_x'].astype(int)
city.blocks_gdf['coord_y'] = city.blocks_gdf['coord_y'].astype(int)
city.blocks_gdf.set_index(['coord_x', 'coord_y'], inplace=True, drop=False)
city.blocks_gdf.index.names = [None, None]
city.streets_gdf = city._derive_streets_from_blocks()
else:
city.streets_gdf = gpd.GeoDataFrame(streets_gdf, geometry='geometry', crs=streets_gdf.crs)
city.streets_gdf.set_index(['coord_x', 'coord_y'], inplace=True, drop=False)
city.streets_gdf.index.names = [None, None]
# Regenerate blocks from dimensions (integer grid: 0 to width-1, 0 to height-1)
blocks = [{'coord_x': x, 'coord_y': y, 'building_id': None,
'building_type': 'street', 'geometry': box(x, y, x+1, y+1)}
for x in range(width) for y in range(height)]
city.blocks_gdf = gpd.GeoDataFrame(blocks, geometry='geometry', crs=city.buildings_gdf.crs)
city.blocks_gdf.set_index(['coord_x', 'coord_y'], inplace=True, drop=False)
city.blocks_gdf.index.names = [None, None]
# Vectorized update of blocks_gdf for building blocks
if not city.buildings_gdf.empty:
building_bounds = city.buildings_gdf.geometry.bounds
building_ids = city.buildings_gdf['id'].values
building_types = city.buildings_gdf['building_type'].values
for i, (bid, btype) in enumerate(zip(building_ids, building_types)):
minx, miny, maxx, maxy = building_bounds.iloc[i]
mask = (city.blocks_gdf['coord_x'] >= int(np.floor(minx))) & (city.blocks_gdf['coord_x'] < int(np.ceil(maxx))) & \
(city.blocks_gdf['coord_y'] >= int(np.floor(miny))) & (city.blocks_gdf['coord_y'] < int(np.ceil(maxy)))
city.blocks_gdf.loc[mask, ['building_id', 'building_type']] = [bid, btype]
# Populate blocks column in buildings_gdf
if not city.blocks_gdf.empty:
building_blocks_map = {}
for building_id in city.blocks_gdf['building_id'].dropna().unique():
mask = city.blocks_gdf['building_id'] == building_id
building_blocks_map[building_id] = city.blocks_gdf[mask].index.tolist()
city.buildings_gdf['blocks'] = city.buildings_gdf['id'].map(building_blocks_map)
# Recompute derived attributes, or preload from edges if provided
city.buildings_outline = unary_union(city.buildings_gdf.geometry.values) if not city.buildings_gdf.empty else Polygon()
if edges_df is not None and not edges_df.empty:
# Expect columns: ox, oy, dx, dy, distance, gravity
if all(c in edges_df.columns for c in ['ox','oy','dx','dy']):
# Set gravity from edges
tmp = edges_df.copy()
tmp['origin'] = list(zip(tmp['ox'].astype(int), tmp['oy'].astype(int)))
tmp['dest'] = list(zip(tmp['dx'].astype(int), tmp['dy'].astype(int)))
cols = ['origin','dest'] + [c for c in ['distance','gravity'] if c in tmp.columns]
city.gravity = tmp[cols].set_index(['origin','dest'])
# Build graph from edges
G = nx.Graph()
for r in tmp[['ox','oy','dx','dy']].itertuples(index=False):
G.add_edge((int(r.ox), int(r.oy)), (int(r.dx), int(r.dy)), weight=1)
city.street_graph = G
city.shortest_paths = dict(nx.all_pairs_shortest_path(G))
else:
city.get_street_graph()
else:
city.get_street_graph()
return city
[docs]
@classmethod
def from_geopackage(cls, gpkg_path, edges_path=None, poi_cols=None, load_gravity=True):
b_gdf = gpd.read_file(gpkg_path, layer='buildings')
# Optional explicit renaming via poi_cols (e.g., {'building_type':'type'})
if poi_cols:
rename_map = {}
for target, src in poi_cols.items():
if target not in b_gdf.columns:
if src in b_gdf.columns:
rename_map[src] = target
else:
raise KeyError(f"poi_cols refers to missing source column '{src}' for target '{target}'.")
if rename_map:
b_gdf = b_gdf.rename(columns=rename_map)
# Enforce required columns strictly
required_building_cols = ['id', 'building_type', 'door_cell_x', 'door_cell_y', 'geometry']
missing = [c for c in required_building_cols if c not in b_gdf.columns]
if missing:
raise KeyError(f"'buildings' layer missing required columns {missing}. Provide explicit mappings via column_map kwargs, e.g., building_type='type'.")
# Reconstruct door_point tuple
if 'door_point_x' in b_gdf.columns and 'door_point_y' in b_gdf.columns:
b_gdf['door_point'] = list(zip(b_gdf['door_point_x'], b_gdf['door_point_y']))
b_gdf = b_gdf.drop(columns=['door_point_x', 'door_point_y'])
elif 'door_point' in b_gdf.columns:
# Handle old format: WKT string -> tuple
def parse_door_point(val):
if isinstance(val, tuple):
return val
elif isinstance(val, str):
pt = wkt.loads(val)
return (pt.x, pt.y)
else:
raise ValueError(f"door_point must be tuple or WKT string, got {type(val)}")
b_gdf['door_point'] = b_gdf['door_point'].apply(parse_door_point)
else:
raise KeyError("buildings_gdf missing door_point column. It must be present, reconstructible from door_point_x/door_point_y, or parseable from WKT string.")
s_gdf = gpd.read_file(gpkg_path, layer='streets')
bl_gdf = None
props_gdf = None
try:
bl_gdf = gpd.read_file(gpkg_path, layer='blocks')
except Exception:
pass
try:
props_gdf = gpd.read_file(gpkg_path, layer='city_properties')
except Exception:
pass
city_props = {}
city_boundary = None
if props_gdf is not None and not props_gdf.empty:
props = props_gdf.iloc[0]
city_props['name'] = props.get('name', 'Garden City')
city_props['block_side_length'] = props.get('block_side_length', 15.0)
city_props['web_mercator_origin_x'] = props.get('web_mercator_origin_x', -4265699.0)
city_props['web_mercator_origin_y'] = props.get('web_mercator_origin_y', 4392976.0)
boundary = props.get('city_boundary')
if boundary is not None:
city_boundary = wkt.loads(boundary) if isinstance(boundary, str) else boundary
city_props['city_boundary'] = city_boundary
outline = props.get('buildings_outline')
if outline is not None:
city_props['buildings_outline'] = wkt.loads(outline) if isinstance(outline, str) else outline
edges_df = pd.read_parquet(edges_path) if edges_path and os.path.exists(edges_path) else None
city = cls.from_geodataframes(b_gdf, s_gdf, bl_gdf, edges_df, city_boundary=city_boundary)
for key, value in city_props.items():
if key != 'city_boundary': # Already set in from_geodataframes
setattr(city, key, value)
# Load gravity infrastructure if requested
if load_gravity:
try:
hubs_gdf = gpd.read_file(gpkg_path, layer='hubs')
city.hubs = list(zip(hubs_gdf['hub_x'].astype(int), hubs_gdf['hub_y'].astype(int)))
hub_dist_gdf = gpd.read_file(gpkg_path, layer='hub_distances')
hub_dist_gdf = hub_dist_gdf.drop(columns=['geometry'])
n_hubs = int(hub_dist_gdf['n_hubs'].iloc[0])
hub_dist_flat = hub_dist_gdf['hub_dist_flat'].values.astype(np.int32)
hub_dist_matrix = hub_dist_flat.reshape((n_hubs, n_hubs))
city.hub_df = pd.DataFrame(hub_dist_matrix, index=city.hubs, columns=city.hubs)
grav_hub_gdf = gpd.read_file(gpkg_path, layer='grav_hub_info')
grav_hub_gdf = grav_hub_gdf.drop(columns=['geometry'])
city.grav_hub_info = grav_hub_gdf.set_index('index')
city.grav_hub_info.index.name = None
city.grav_hub_info['closest_hub_idx'] = city.grav_hub_info['closest_hub_idx'].astype(np.int32)
city.grav_hub_info['dist_to_hub'] = city.grav_hub_info['dist_to_hub'].astype(np.int32)
try:
nearby_gdf = gpd.read_file(gpkg_path, layer='nearby_doors')
nearby_gdf = nearby_gdf.drop(columns=['geometry'])
city.mh_dist_nearby_doors = pd.Series(
nearby_gdf['dist'].values.astype(np.int32),
index=pd.MultiIndex.from_arrays([nearby_gdf['bid_i'], nearby_gdf['bid_j']])
)
except Exception:
city.mh_dist_nearby_doors = pd.Series([], dtype=np.int32, index=pd.MultiIndex.from_arrays([[], []]))
# Reconstruct callable gravity function
building_ids = city.buildings_gdf['id'].to_numpy()
bid_to_idx = {bid: i for i, bid in enumerate(building_ids)}
hub_to_hub = city.hub_df.values
closest_hub_idx = city.grav_hub_info['closest_hub_idx'].to_numpy()
dist_to_closest_hub = city.grav_hub_info['dist_to_hub'].to_numpy()
def compute_gravity_row(building_id, exponent=2.0):
idx = bid_to_idx[building_id]
hub_i = closest_hub_idx[idx]
dist_to_hub_i = dist_to_closest_hub[idx]
distances = dist_to_hub_i + hub_to_hub[hub_i, closest_hub_idx] + dist_to_closest_hub
for (bid_i, bid_j), d in city.mh_dist_nearby_doors.items():
if bid_i == building_id:
distances[bid_to_idx[bid_j]] = d
elif bid_j == building_id:
distances[bid_to_idx[bid_i]] = d
distances[idx] = 1 # Temporary non-zero to avoid divide-by-zero warning
gravity_row = 1.0 / (distances ** exponent)
gravity_row[idx] = 0.0 # Self-gravity is always 0
return pd.Series(gravity_row, index=building_ids)
city.grav = compute_gravity_row
except Exception:
pass
return city
[docs]
def get_building(self, identifier=None, door_coords=None, any_coords=None):
"""
Retrieve a building by string ID, door coordinates, or any coordinates within the city's bounds.
Parameters:
-----------
identifier : str, optional
The string ID of the building (e.g., 'h-x8-y8').
door_coords : tuple, optional
The (x, y) coordinates of the building's door.
any_coords : tuple, optional
Any (x, y) coordinates within the city's bounds to find a building that contains this point.
Returns:
--------
geopandas.GeoDataFrame or None
A GeoDataFrame with a single row containing building details if found, None otherwise.
"""
if identifier is not None:
if identifier in self.buildings_gdf.index:
return self.buildings_gdf.loc[[identifier]]
return None
elif door_coords is not None:
# Match by door_cell_x/door_cell_y
df = self.buildings_gdf
mask = (df['door_cell_x'] == door_coords[0]) & (df['door_cell_y'] == door_coords[1])
res = df[mask]
if not res.empty:
return res.iloc[[0]]
elif any_coords is not None:
point = Point(any_coords)
building = self.buildings_gdf[self.buildings_gdf.geometry.contains(point)]
if not building.empty:
return building
return None
[docs]
def get_building_coordinates(self):
"""
Get building coordinates as a DataFrame with building_id, x, y, building_type, size columns.
Returns
-------
pd.DataFrame
DataFrame with columns: building_id, x, y, building_type, size
"""
if self.buildings_gdf.empty:
return pd.DataFrame(columns=['building_id','x','y','building_type','size'])
centroids = self.buildings_gdf.geometry.centroid
out = pd.DataFrame({
'building_id': self.buildings_gdf['id'].values,
'x': centroids.x.values,
'y': centroids.y.values,
'building_type': self.buildings_gdf['building_type'].values,
'size': self.buildings_gdf['size'].values
})
return out
[docs]
def to_mercator(self, data):
"""
Convert city block coordinates to Web Mercator coordinates using city's parameters.
Parameters
----------
data : pd.DataFrame
DataFrame with 'x', 'y' columns in city block coordinates
Returns
-------
pd.DataFrame
DataFrame with 'x', 'y' columns updated to Web Mercator coordinates.
If 'ha' column exists, it is also scaled.
"""
rotation_origin = None
# Use stored rotation_origin if available (from metadata file)
# Otherwise fall back to boundary centroid
if hasattr(self, 'rotation_origin') and self.rotation_origin is not None:
rotation_origin = self.rotation_origin
elif hasattr(self, 'boundary_polygon') and self.boundary_polygon is not None:
centroid = self.boundary_polygon.centroid
rotation_origin = (centroid.x, centroid.y)
return blocks_to_mercator(
data,
block_size=self.block_side_length,
false_easting=self.web_mercator_origin_x,
false_northing=self.web_mercator_origin_y,
rotation_deg=self.rotation_deg,
rotation_origin=rotation_origin
)
[docs]
def from_mercator(self, data):
"""
Convert Web Mercator coordinates back to city block coordinates using city's parameters.
Parameters
----------
data : pd.DataFrame
DataFrame with 'x', 'y' columns in Web Mercator coordinates
Returns
-------
pd.DataFrame
DataFrame with 'x', 'y' columns updated to city block coordinates.
If 'ha' column exists, it is also scaled back.
"""
return mercator_to_blocks(
data,
block_size=self.block_side_length,
false_easting=self.web_mercator_origin_x,
false_northing=self.web_mercator_origin_y
)
[docs]
def to_file(self, buildings_path=None, streets_path=None,
street_graphml_path=None, driver='GeoJSON',
to_crs=None, reverse_affine=False):
"""
Export city layers to file with optional coordinate transformations.
Parameters
----------
buildings_path : str, optional
Path to save buildings layer
streets_path : str, optional
Path to save streets layer
street_graphml_path : str, optional
Path to save street graph as GraphML
driver : str, default 'GeoJSON'
Output format: 'GeoJSON', 'GPKG', 'Parquet', 'ESRI Shapefile'
to_crs : str or CRS, optional
Target CRS for reprojection (e.g., 'EPSG:4326')
reverse_affine : bool, default False
If True, transform from garden city units to Web Mercator meters
using self.block_side_length, offset_x, offset_y, rotation_deg
Notes
-----
For GeoJSON output, to_crs='EPSG:4326' is required if reverse_affine=True.
"""
if street_graphml_path:
G = self.get_street_graph()
nx.write_graphml(G, street_graphml_path)
def transform_gdf(gdf):
if reverse_affine:
gdf = blocks_to_mercator_gdf(
gdf,
block_size=self.block_side_length,
false_easting=self.web_mercator_origin_x,
false_northing=self.web_mercator_origin_y,
rotation_deg=self.rotation_deg,
drop_garden_cols=True
)
else:
gdf = gdf.copy()
if to_crs:
gdf = gdf.to_crs(to_crs)
if driver == 'GeoJSON':
if gdf.crs and gdf.crs.to_epsg() != 4326:
raise ValueError(
f"GeoJSON requires EPSG:4326. Use to_crs='EPSG:4326'."
)
return gdf
if buildings_path:
gdf = transform_gdf(self.buildings_gdf)
gdf.to_file(buildings_path, driver=driver)
if streets_path:
gdf = transform_gdf(self.streets_gdf)
gdf.to_file(streets_path, driver=driver)
[docs]
def get_shortest_path(self, start_coord: tuple, end_coord: tuple, plot: bool = False, ax=None):
"""Return a block path between two street cells.
Parameters
----------
start_coord : tuple[int, int]
Starting street block (x, y).
end_coord : tuple[int, int]
Ending street block (x, y).
plot : bool, optional
Plot the resulting path on the city map when True.
ax : matplotlib.axes.Axes, optional
Target axis used when `plot=True`.
Returns
-------
list[tuple[int, int]]
Sequence of street blocks from start to end.
Raises
------
ValueError
If either coordinate is not a valid street block or if shortest_paths
has not been computed.
Notes
-----
Requires compute_shortest_paths() to be called first. Uses precomputed
paths (dict) or on-demand callable depending on mode.
"""
# Check if coordinates are street blocks
if not ((self.streets_gdf['coord_x'] == start_coord[0]) & (self.streets_gdf['coord_y'] == start_coord[1])).any():
raise ValueError(f"Start coordinate {start_coord} must be a street block.")
if not ((self.streets_gdf['coord_x'] == end_coord[0]) & (self.streets_gdf['coord_y'] == end_coord[1])).any():
raise ValueError(f"End coordinate {end_coord} must be a street block.")
# Auto-initialize callable if not set (lazy initialization for convenience)
if self.shortest_paths is None:
warnings.warn("shortest_paths not initialized. Auto-initializing with callable_only=True. Call compute_shortest_paths() explicitly.", UserWarning)
self.compute_shortest_paths(callable_only=True)
# Get path from either dict or callable
if callable(self.shortest_paths):
path = self.shortest_paths(start_coord, end_coord)
else:
# Dict mode: lookup precomputed path
if start_coord not in self.shortest_paths:
raise ValueError(f"Start coordinate {start_coord} is not in precomputed shortest path graph.")
if start_coord not in self.shortest_paths:
raise ValueError(f"End coordinate {end_coord} is not among shortest paths from {start_coord}.")
path = self.shortest_paths[start_coord][end_coord]
if plot:
if ax is None:
fig, ax = plt.subplots(figsize=(10, 10))
else:
fig = ax.figure
# Plot city streets and buildings
self.plot_city(ax=ax)
# Plot the path as a MultiLineString through block centers
path_centers = [(x + 0.5, y + 0.5) for x, y in path]
path_lines = MultiLineString([LineString([(path_centers[i][0], path_centers[i][1]), (path_centers[i+1][0], path_centers[i+1][1])]) for i in range(len(path_centers)-1)])
gpd.GeoSeries(path_lines).plot(ax=ax, color='red', linewidth=2, label='Shortest Path')
# Highlight start and end points
start_point = Point(path_centers[0])
end_point = Point(path_centers[-1])
gpd.GeoSeries(start_point).plot(ax=ax, color='green', markersize=100, label='Start')
gpd.GeoSeries(end_point).plot(ax=ax, color='blue', markersize=100, label='End')
ax.legend()
ax.set_title(f'Shortest Path from {start_coord} to {end_coord}')
plt.show(block=False)
plt.close(fig)
return path
[docs]
def get_distance_fast(self, start_coord: tuple, end_coord: tuple):
"""Return shortest path distance (number of edges) between two street cells.
Parameters
----------
start_coord : tuple[int, int]
Starting street block (x, y).
end_coord : tuple[int, int]
Ending street block (x, y).
Returns
-------
int
Number of edges in the shortest path from start to end.
Notes
-----
Uses the same shortest_paths mechanism as get_shortest_path(), but returns
only the distance (len(path) - 1) for efficiency.
"""
path = self.get_shortest_path(start_coord, end_coord, plot=False)
return len(path) - 1
# ---------------------------------------------------------------------
# DEPRECATED: Cached path retrieval (kept for potential future use)
# ---------------------------------------------------------------------
[docs]
def get_paths_fast(self, start_coord: tuple, end_coord: tuple):
"""
DEPRECATED: Return shortest path with LRU-style caching.
This method wraps get_shortest_path() with a simple cache. May be useful
for future optimizations but currently not sufficient for large-scale
trajectory generation needs.
Parameters
----------
start_coord : tuple[int, int]
Starting street block (x, y).
end_coord : tuple[int, int]
Ending street block (x, y).
Returns
-------
list[tuple[int, int]]
Sequence of street blocks from start to end, or empty list if no path.
"""
if not (isinstance(start_coord, tuple) and isinstance(end_coord, tuple)):
raise ValueError("Coordinates must be tuples of (x, y).")
# Initialize cache
if not hasattr(self, '_paths_cache'):
self._paths_cache = {}
MAX_CACHE = 50000
key = (start_coord, end_coord)
if key in self._paths_cache:
return self._paths_cache[key]
# Compute path
try:
path = self.get_shortest_path(start_coord, end_coord)
except Exception:
path = []
# Cache with simple capacity control (drop half when full)
self._paths_cache[key] = path
if len(self._paths_cache) > MAX_CACHE:
# drop roughly half oldest by arbitrary order
for k in list(self._paths_cache.keys())[:MAX_CACHE // 2]:
self._paths_cache.pop(k, None)
return path
def _add_to_adjacency_graph(self, G, coord):
"""Add node and cardinal edges to existing graph."""
G.add_node(coord)
x, y = coord
for dx, dy in [(1, 0), (-1, 0), (0, 1), (0, -1)]:
neighbor = (x + dx, y + dy)
if neighbor in G:
G.add_edge(coord, neighbor, weight=1)
def _connect_building_door_to_streets(self, building_id, max_depth=5):
"""Connect building door to street network by converting blocks along shortest path."""
building_row = self.buildings_gdf.loc[building_id]
door_x = int(building_row['door_cell_x'])
door_y = int(building_row['door_cell_y'])
door_coord = (door_x, door_y)
# Get blocks within Manhattan distance
candidates = [(x, y) for x in range(door_x - max_depth, door_x + max_depth + 1)
for y in range(door_y - max_depth, door_y + max_depth + 1)
if abs(x - door_x) + abs(y - door_y) <= max_depth]
# Filter for non-building blocks: streets OR None blocks (void blocks for pathfinding)
non_building_mask = (self.blocks_gdf['building_type'] == 'street') | (self.blocks_gdf['building_type'].isna())
non_building = self.blocks_gdf[non_building_mask]
nearby = non_building[non_building.index.isin(candidates)]
block_graph = self._build_adjacency_graph(nearby)
# If door_coord is not in graph, it's outside Manhattan distance - cannot connect
if door_coord not in block_graph:
return False
paths = nx.single_source_shortest_path(block_graph, source=door_coord, cutoff=max_depth)
for target, path in paths.items():
if target != door_coord and target in self.streets_gdf.index:
new_coords = [c for c in path if c not in self.streets_gdf.index]
for coord in path:
self.blocks_gdf.loc[coord, 'building_type'] = 'street'
if new_coords:
new_gdf = gpd.GeoDataFrame([
{'coord_x': x, 'coord_y': y, 'geometry': box(x, y, x+1, y+1),
'building_type': 'street', 'building_id': None}
for x, y in new_coords
], geometry='geometry', crs=self.streets_gdf.crs)
new_gdf.set_index(['coord_x', 'coord_y'], inplace=True, drop=False)
new_gdf.index.names = [None, None]
self.streets_gdf = gpd.GeoDataFrame(pd.concat([self.streets_gdf, new_gdf]), geometry='geometry', crs=self.streets_gdf.crs)
# Invalidate cached graph so it rebuilds from updated streets_gdf
self.street_graph = None
return True
return False
# =============================================================================
# RANDOM CITY CLASS
# =============================================================================
[docs]
class RandomCityGenerator:
def __init__(self, width, height, street_spacing=5, park_ratio=0.1, home_ratio=0.4, work_ratio=0.3, retail_ratio=0.2, seed=42, verbose=False):
self.seed = seed
self.width = width
self.height = height
self.street_spacing = street_spacing # Determines regular intervals for streets
self.park_ratio = park_ratio
self.home_ratio = home_ratio
self.work_ratio = work_ratio
self.retail_ratio = retail_ratio
self.verbose = verbose
self.city = City(dimensions=(width, height))
self.occupied = np.zeros((self.width, self.height), dtype=bool) # NumPy array for efficiency
self.streets = self.generate_streets()
self.building_sizes = {
'home': [(2, 2), (1, 2), (2, 1), (1, 1)], # Mixed sizes
'workplace': [(1, 3), (3, 1), (4, 4), (3, 3), (4, 2), (2, 4)],
'retail': [(1, 3), (3, 1), (4, 4), (3, 3), (2, 4), (4, 2)],
'park': [(6, 6), (5, 5), (4, 4)]
}
[docs]
def generate_streets(self):
"""Predefine streets in a systematic grid pattern using a NumPy mask."""
streets = np.zeros((self.width, self.height), dtype=bool)
streets[::self.street_spacing, :] = True
streets[:, ::self.street_spacing] = True
return streets
[docs]
def get_block_type(self, x, y):
"""Dynamically assigns a block type instead of storing all in memory."""
npr.seed(self.seed + x * self.width + y) # Ensure consistency
return npr.choice(['home', 'workplace', 'retail', 'park'],
p=[self.home_ratio, self.work_ratio, self.retail_ratio, self.park_ratio])
[docs]
def fill_block(self, block_x, block_y, block_type):
"""Fills a block with multiple buildings of the specified type."""
# Define block bounds (interior cells, excluding street corridors)
bx0 = block_x
by0 = block_y
bx1 = min(block_x + self.street_spacing, self.width)
by1 = min(block_y + self.street_spacing, self.height)
# Track available cells (not streets, not occupied)
available = set()
for x in range(bx0, bx1):
for y in range(by0, by1):
if not self.streets[x, y] and not self.occupied[x, y]:
available.add((x, y))
if not available:
return
# Try to place multiple buildings of varying sizes
npr.seed(self.seed + block_x * self.width + block_y + 1000)
sizes = list(self.building_sizes[block_type])
npr.shuffle(sizes)
for width, height in sizes:
if not available:
break
# Try to find a position for this building size
attempts = 0
max_attempts = 20
while attempts < max_attempts and available:
attempts += 1
# Pick random starting position from available cells
start_cell = list(available)[npr.randint(0, len(available))]
sx, sy = start_cell
# Check if building fits
building_cells = []
fits = True
for dx in range(width):
for dy in range(height):
cell = (sx + dx, sy + dy)
if cell not in available:
fits = False
break
building_cells.append(cell)
if not fits:
break
if not fits or not building_cells:
continue
# Find door adjacent to this building
door = None
for cell in building_cells:
door = self.get_adjacent_street(cell)
if door is not None:
break
if door is None:
continue
# Place building
block_polys = [box(x, y, x+1, y+1) for x, y in building_cells]
geom = unary_union(block_polys) if len(block_polys) > 1 else block_polys[0]
try:
self.city.add_building(building_type=block_type, door=door, geom=geom, blocks=building_cells)
# Mark cells as occupied
for cell in building_cells:
self.occupied[cell] = True
available.discard(cell)
break # Successfully placed, move to next size
except ValueError as e:
if self.verbose:
print(f"Failed to place {width}x{height} building at ({sx}, {sy}): {e}")
continue
[docs]
def get_adjacent_street(self, location):
"""Finds the closest predefined street to assign as the door, ensuring it is within bounds."""
if not location:
return None
x, y = location
possible_streets = np.array([(x + dx, y + dy) for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]])
valid_mask = (possible_streets[:, 0] >= 0) & (possible_streets[:, 0] < self.width) & \
(possible_streets[:, 1] >= 0) & (possible_streets[:, 1] < self.height)
valid_streets = possible_streets[valid_mask]
# Filter to only streets according to the predefined mask
street_mask = np.array([self.streets[sx, sy] for sx, sy in valid_streets])
valid_streets = valid_streets[street_mask]
return tuple(valid_streets[0].tolist()) if valid_streets.size > 0 else None
[docs]
def place_buildings_in_blocks(self):
"""Fills each block completely with buildings using proportional distribution."""
block_list = [(x, y) for x in range(0, self.width, self.street_spacing)
for y in range(0, self.height, self.street_spacing)]
npr.shuffle(block_list)
for block_x, block_y in block_list:
block_type = self.get_block_type(block_x, block_y)
self.fill_block(block_x, block_y, block_type)
[docs]
def generate_city(self):
"""Generates a systematically structured city where blocks are fully occupied with buildings."""
self.place_buildings_in_blocks()
self.city.get_street_graph()
# Always return the city object, even if no buildings were added
return self.city
# =============================================================================
# AUXILIARY METHODS
# =============================================================================
[docs]
def load(filename):
"""Load a city object from a file."""
with open(filename, 'rb') as file:
return pickle.load(file)
[docs]
def save(city, filename):
"""Save the city object to a file."""
with open(filename, 'wb') as file:
pickle.dump(city, file)
# =============================================================================
# RASTERIZATION UTILITIES (integrated from rasterization.py)
# =============================================================================
[docs]
def generate_canvas_blocks(boundary_polygon, block_size, crs="EPSG:3857", verbose=True, with_block_groups=False):
_t0 = time.time()
minx, miny, maxx, maxy = boundary_polygon.bounds
x_min = int(minx // block_size)
x_max = int(maxx // block_size) + 1
y_min = int(miny // block_size)
y_max = int(maxy // block_size) + 1
blocks = []
boundary_prep = prep(boundary_polygon)
for x in range(x_min, x_max):
x0 = x * block_size
x1 = (x + 1) * block_size
for y in range(y_min, y_max):
y0 = y * block_size
y1 = (y + 1) * block_size
block_geom = box(x0, y0, x1, y1)
if boundary_prep.intersects(block_geom):
blocks.append({'coord_x': x, 'coord_y': y, 'geometry': block_geom})
if not blocks:
blocks_gdf = gpd.GeoDataFrame(columns=['coord_x','coord_y','geometry'], geometry='geometry', crs=crs)
else:
blocks_gdf = gpd.GeoDataFrame(blocks, geometry='geometry', crs=crs)
blocks_gdf.set_index(['coord_x','coord_y'], inplace=True, drop=False)
blocks_gdf.index.names = [None, None]
blocks_gdf['building_type'] = None
if with_block_groups:
# Define block groups as connected components of blocks not intersecting streets, discarding huge ones
pass
if verbose:
print(f"Generated {len(blocks_gdf):,} blocks (in {time.time()-_t0:.2f}s)")
return blocks_gdf
[docs]
def find_intersecting_blocks(geometries_gdf: gpd.GeoDataFrame, blocks_gdf: gpd.GeoDataFrame, predicate: str = 'intersects') -> pd.DataFrame:
if len(geometries_gdf) == 0 or len(blocks_gdf) == 0:
return pd.DataFrame(columns=['coord_x','coord_y','geometry_idx'])
blocks_reset = blocks_gdf.reset_index()
geometries_reset = geometries_gdf.reset_index()
intersections = gpd.sjoin(blocks_reset, geometries_reset, how='inner', predicate=predicate)
if len(intersections) == 0:
return pd.DataFrame(columns=['coord_x','coord_y','geometry_idx'])
if 'index_right' in intersections.columns:
idx_col = 'index_right'
else:
idx_col = geometries_reset.index.name if geometries_reset.index.name else 'index'
if idx_col not in intersections.columns:
idx_col = intersections.columns[-1]
result = intersections[['coord_x','coord_y', idx_col]].copy().rename(columns={idx_col:'geometry_idx'})
return result
[docs]
def assign_block_types(blocks_gdf, streets_gdf, buildings_gdf_input):
# Only assign street types - building types assigned when buildings are successfully created
street_intersections = find_intersecting_blocks(streets_gdf, blocks_gdf)
street_coords = set(zip(street_intersections['coord_x'], street_intersections['coord_y']))
blocks_gdf.loc[blocks_gdf.index.isin(street_coords), 'building_type'] = 'street'
[docs]
def find_connected_components(block_coords: List[Tuple[int,int]], connectivity: str = '4-connected') -> List[Set[Tuple[int,int]]]:
if not block_coords:
return []
block_set = set(block_coords)
neighbors = [(0,1),(0,-1),(1,0),(-1,0)] if connectivity=='4-connected' else [(0,1),(0,-1),(1,0),(-1,0),(1,1),(1,-1),(-1,1),(-1,-1)]
components = []
visited = set()
for start in block_coords:
if start in visited:
continue
component = set()
queue = [start]
visited.add(start)
while queue:
x,y = queue.pop(0)
component.add((x,y))
for dx,dy in neighbors:
nb = (x+dx, y+dy)
if nb in block_set and nb not in visited:
visited.add(nb)
queue.append(nb)
components.append(component)
return components
[docs]
def verify_street_connectivity(streets_gdf):
if len(streets_gdf) == 0:
return streets_gdf.copy(), {'total':0,'kept':0,'discarded':0}
street_coords = list(zip(streets_gdf['coord_x'], streets_gdf['coord_y']))
components = find_connected_components(street_coords, connectivity='4-connected')
if not components:
return streets_gdf.iloc[0:0].copy(), {'total':0,'kept':0,'discarded':0}
largest = max(components, key=len)
kept = set(largest)
result = streets_gdf[streets_gdf.apply(lambda r: (r['coord_x'], r['coord_y']) in kept, axis=1)].copy()
summary = {'total': len(streets_gdf), 'kept': len(result), 'discarded': len(streets_gdf)-len(result), 'n_components': len(components)}
return result, summary
[docs]
def assign_door_to_building(building_blocks, available_blocks, graph=None):
"""Find adjacent block that can serve as door (not occupied by buildings)."""
if graph is not None:
for x, y in building_blocks:
for nb in graph.neighbors((x, y)):
if nb in available_blocks:
return nb
else:
neighbors = [(0,1),(0,-1),(1,0),(-1,0)]
for x, y in building_blocks:
for dx, dy in neighbors:
nb = (x+dx, y+dy)
if nb in available_blocks:
return nb
return None
[docs]
class RasterCity(City):
def __init__(self, boundary_polygon, streets_gdf, buildings_gdf, block_side_length=15.0, crs="EPSG:3857", building_type_col='building_type', name="Rasterized City", resolve_overlaps=False, other_building_behavior="keep", rotation_deg=0.0, rotation_origin=None, verbose=True):
"""
Create a rasterized city from OSM data.
Coordinate System Flow
----------------------
INPUT: Web Mercator meters (EPSG:3857), possibly rotated
↓ Rotate by rotation_deg, translate by -offset to align grid at origin
INTERNAL: Garden city block units (coord_x, coord_y from 0, geometries in units)
↓ All rasterization, spatial queries, and simulation logic
OUTPUT: Transform back via reverse_affine_transformation in to_file()
Parameters
----------
boundary_polygon : shapely.geometry.Polygon
Boundary polygon for the city
streets_gdf : gpd.GeoDataFrame
Streets GeoDataFrame
buildings_gdf : gpd.GeoDataFrame
Buildings GeoDataFrame
block_side_length : float, optional
Side length of each block in meters (default: 15.0)
crs : str, optional
Coordinate reference system (default: "EPSG:3857")
building_type_col : str, optional
Column name for building type (default: 'building_type')
name : str, optional
Name of the city (default: "Rasterized City")
resolve_overlaps : bool, optional
If True, resolve overlapping buildings by removing overlapping blocks (default: False)
other_building_behavior : str, optional
How to handle buildings with type 'other': 'keep', 'filter', or 'randomize' (default: 'keep')
rotation_deg : float, optional
Rotation applied to input geometries in degrees counterclockwise (default: 0.0)
verbose : bool, optional
Print progress messages (default: True)
Note
----
Input geometries are transformed to garden city block units. A canvas is
generated to match rotated/shifted input data. Future: transform inputs
to match a garden city canvas for clearer flow.
"""
if other_building_behavior == "randomize":
raise NotImplementedError("randomize option not yet implemented")
minx, miny, maxx, maxy = boundary_polygon.bounds
grid_min_x = int(minx // block_side_length)
grid_min_y = int(miny // block_side_length)
grid_max_x = int(maxx // block_side_length) + 1
grid_max_y = int(maxy // block_side_length) + 1
grid_width = grid_max_x - grid_min_x
grid_height = grid_max_y - grid_min_y
super().__init__(
dimensions=(grid_width, grid_height),
manual_streets=True,
name=name,
block_side_length=block_side_length,
web_mercator_origin_x=minx,
web_mercator_origin_y=miny,
rotation_deg=rotation_deg,
offset_x=0,
offset_y=0
)
self.boundary_polygon = boundary_polygon
self.crs = crs
self.rotation_origin = rotation_origin # Store rotation origin for use in to_mercator
self.streets_gdf_input = streets_gdf.to_crs(crs)
buildings_gdf = buildings_gdf.to_crs(crs)
if building_type_col != 'building_type':
buildings_gdf = buildings_gdf.rename(columns={building_type_col: 'building_type'})
if 'building_type' not in buildings_gdf.columns:
raise ValueError(f"buildings_gdf must have a 'building_type' column (or specify building_type_col parameter)")
if other_building_behavior == "filter":
buildings_gdf = buildings_gdf[buildings_gdf['building_type'] != 'other']
self.buildings_gdf_input = buildings_gdf
self.blocks_gdf = generate_canvas_blocks(self.boundary_polygon, self.block_side_length, self.crs, verbose=verbose).reset_index(drop=True)
# Calculate offset and apply to all gdfs
minx, miny, maxx, maxy = self.boundary_polygon.bounds
self.offset_x = int(minx // self.block_side_length)
self.offset_y = int(miny // self.block_side_length)
offset_meters = (self.offset_x * self.block_side_length, self.offset_y * self.block_side_length)
# Update Web Mercator origins to match actual offset
self.web_mercator_origin_x = self.offset_x * self.block_side_length
self.web_mercator_origin_y = self.offset_y * self.block_side_length
self.blocks_gdf['coord_x'] = self.blocks_gdf['coord_x'] - self.offset_x
self.blocks_gdf['coord_y'] = self.blocks_gdf['coord_y'] - self.offset_y
self.blocks_gdf['building_id'] = None
# Convert geometries from meters to garden city units: translate then scale down
self.blocks_gdf['geometry'] = self.blocks_gdf['geometry'].translate(xoff=-offset_meters[0], yoff=-offset_meters[1])
self.blocks_gdf['geometry'] = self.blocks_gdf['geometry'].scale(
xfact=1/self.block_side_length,
yfact=1/self.block_side_length,
origin=(0, 0)
)
self.blocks_gdf.set_index(['coord_x','coord_y'], inplace=True, drop=False)
self.blocks_gdf.index.names = [None, None]
# Convert input geometries from meters to garden city units
self.streets_gdf_input['geometry'] = self.streets_gdf_input['geometry'].translate(xoff=-offset_meters[0], yoff=-offset_meters[1])
self.streets_gdf_input['geometry'] = self.streets_gdf_input['geometry'].scale(
xfact=1/self.block_side_length,
yfact=1/self.block_side_length,
origin=(0, 0)
)
self.buildings_gdf_input['geometry'] = self.buildings_gdf_input['geometry'].translate(xoff=-offset_meters[0], yoff=-offset_meters[1])
self.buildings_gdf_input['geometry'] = self.buildings_gdf_input['geometry'].scale(
xfact=1/self.block_side_length,
yfact=1/self.block_side_length,
origin=(0, 0)
)
self.resolve_overlaps = resolve_overlaps
self._rasterize(verbose=verbose)
# Connect buildings with non-street doors
non_street_doors = []
for building_id in self.buildings_gdf['id']:
building_row = self.buildings_gdf.loc[building_id]
door_coord = (int(building_row['door_cell_x']), int(building_row['door_cell_y']))
if door_coord not in self.streets_gdf.index:
non_street_doors.append(building_id)
if non_street_doors:
if verbose:
print(f" Connecting {len(non_street_doors)} buildings with non-street doors...")
failed_buildings = []
for building_id in non_street_doors:
success = self._connect_building_door_to_streets(building_id, max_depth=5)
if not success:
failed_buildings.append(building_id)
# Remove buildings that couldn't be connected and clear their blocks
if failed_buildings:
for building_id in failed_buildings:
building_blocks_mask = self.blocks_gdf['building_id'] == building_id
self.blocks_gdf.loc[building_blocks_mask, ['building_id', 'building_type']] = [None, None]
self.buildings_gdf = self.buildings_gdf.drop(building_id)
def _rasterize(self, verbose=True):
# Assigning block types could arguably be its own method, but keeping it here for now
# since it's a core part of the rasterization process that needs the input gdfs
_t1 = time.time()
if verbose:
print("Assigning block types...")
assign_block_types(self.blocks_gdf, self.streets_gdf_input, self.buildings_gdf_input)
if verbose:
print(f"Block types assigned (in {time.time()-_t1:.2f}s)")
_t2 = time.time()
if verbose:
print("Assigning streets...")
street_blocks = self.blocks_gdf[self.blocks_gdf['building_type'] == 'street'][['coord_x','coord_y','geometry']].copy()
if verbose:
print("Verifying street connectivity...")
connected_streets, summary = verify_street_connectivity(street_blocks)
if verbose:
print(f" Streets: {summary['kept']:,} kept, {summary['discarded']:,} discarded (in {time.time()-_t2:.2f}s)")
# Streets: create geometry from block coordinates (garden city units)
geometries = [box(x, y, x+1, y+1) for x, y in zip(connected_streets['coord_x'], connected_streets['coord_y'])]
self.streets_gdf = gpd.GeoDataFrame(connected_streets[['coord_x','coord_y']].copy(), geometry=geometries, crs=None)
self.streets_gdf['id'] = ('s-x' + self.streets_gdf['coord_x'].astype(int).astype(str) + '-y' + self.streets_gdf['coord_y'].astype(int).astype(str))
self.streets_gdf.set_index(['coord_x','coord_y'], inplace=True, drop=False)
self.streets_gdf.index.names = [None, None]
available_for_doors = self.blocks_gdf[self.blocks_gdf['building_type'] == 'street'].index
_t3 = time.time()
if verbose:
print("Adding buildings to city...")
added_building_blocks = set()
skipped_overlap = 0
resolved_overlap = 0
new_building_rows = []
building_types = [t for t in TYPE_PRIORITY if t in self.buildings_gdf_input['building_type'].values]
for building_type in building_types:
subset = self.buildings_gdf_input[self.buildings_gdf_input['building_type'] == building_type]
unassigned = self.blocks_gdf[self.blocks_gdf['building_type'].isna()]
joins = find_intersecting_blocks(subset, unassigned)
grouped = joins.groupby('geometry_idx')
for bidx, grp in grouped:
block_coords = list(zip(grp['coord_x'], grp['coord_y']))
if self.resolve_overlaps:
block_coords_set = set(block_coords) - added_building_blocks
if not block_coords_set:
resolved_overlap += 1
continue
block_coords = list(block_coords_set)
components = find_connected_components(block_coords, connectivity='4-connected')
for component in components:
building_blocks = list(component)
door = assign_door_to_building(building_blocks, available_for_doors)
if door is None:
continue
building_blocks_set = set(building_blocks)
if building_blocks_set & added_building_blocks:
skipped_overlap += 1
continue
block_polys = [box(x, y, x+1, y+1) for x,y in building_blocks]
building_geom = block_polys[0] if len(block_polys) == 1 else unary_union(block_polys)
# Pick building block adjacent to door for unique ID
mask = self.check_adjacent(building_blocks, door)
idx = next((i for i, m in enumerate(mask) if m), None)
candidate = building_blocks[idx] if idx is not None else building_blocks[0]
building_id = f"{building_type[0]}-x{int(candidate[0])}-y{int(candidate[1])}"
# Compute door centroid via intersection with adjacent building block (same logic as check_adjacent)
door_poly = box(door[0], door[1], door[0]+1, door[1]+1)
adjacent_block_poly = box(candidate[0], candidate[1], candidate[0]+1, candidate[1]+1)
door_line = adjacent_block_poly.intersection(door_poly)
door_centroid = (door_line.centroid.x, door_line.centroid.y)
new_building_rows.append({
'id': building_id,
'building_type': building_type,
'door_cell_x': door[0],
'door_cell_y': door[1],
'door_point': door_centroid,
'size': len(building_blocks),
'blocks': building_blocks,
'geometry': building_geom,
})
# Set building_id and building_type together when building is successfully created
for (cx, cy) in building_blocks:
self.blocks_gdf.loc[(cx, cy), ['building_id', 'building_type']] = [building_id, building_type]
added_building_blocks.update(building_blocks_set)
if new_building_rows:
nb_gdf = gpd.GeoDataFrame(new_building_rows, geometry='geometry', crs=None)
nb_gdf.set_index('id', inplace=True, drop=False)
nb_gdf.index.name = None
if self.buildings_gdf.empty:
self.buildings_gdf = nb_gdf
else:
self.buildings_gdf = pd.concat([self.buildings_gdf, nb_gdf], axis=0, ignore_index=False)
if verbose:
overlap_count = resolved_overlap if self.resolve_overlaps else skipped_overlap
print(f" Added {len(new_building_rows)} buildings, skipped {overlap_count} due to overlap (adding took {time.time()-_t3:.2f}s)")