Source code for nomad.map_utils

"""
OpenStreetMap data utilities used to download, process, rotate, and persist
geospatial layers for the NOMAD library.
"""

import geopandas as gpd
import pandas as pd
import osmnx as ox
import numpy as np
from shapely.geometry import box
from shapely.affinity import rotate as shapely_rotate
import warnings
from osmnx._errors import InsufficientResponseError
import os
import pdb
from shapely.affinity import scale, rotate as shapely_rotate
from nomad.constants import (
    OSM_BUILDING_TO_SUBTYPE, OSM_AMENITY_TO_SUBTYPE, OSM_TOURISM_TO_SUBTYPE,
    PARK_TAGS, DEFAULT_CRS, DEFAULT_CATEGORY_SCHEMA, CATEGORY_SCHEMAS,
    STREET_HIGHWAY_TYPES, STREET_EXCLUDED_SERVICE_TYPES,
    STREET_EXCLUDE_COVERED, STREET_EXCLUDE_TUNNELS, STREET_EXCLUDED_SURFACES,
    INTERSECTION_CONSOLIDATION_TOLERANCE_M, STREET_MIN_LENGTH_M
)

from contextlib import contextmanager
def _clip_to_bbox(gdf, bbox, crs=DEFAULT_CRS):
    """Clip geometries to bounding box."""
    if len(gdf) == 0:
        return gdf.copy()
    
    # Create clip box
    west, south, east, north = bbox
    clip_box = box(west, south, east, north)
    clip_gdf = gpd.GeoDataFrame([1], geometry=[clip_box], crs=crs)
    
    # Convert to same CRS if needed
    if gdf.crs != crs:
        gdf_clipped = gdf.to_crs(crs)
    else:
        gdf_clipped = gdf.copy()
    
    # Perform intersection
    clipped = gdf_clipped.intersection(clip_gdf.geometry.iloc[0])
    
    # Filter out empty geometries
    valid_mask = ~clipped.is_empty
    result = gdf_clipped[valid_mask].copy()
    result.geometry = clipped[valid_mask]
    
    return result

# =============================================================================
# CATEGORIZATION FUNCTIONS
# =============================================================================

[docs] def get_category_for_subtype(subtype, schema= DEFAULT_CATEGORY_SCHEMA): """Return the category name for a subtype in the given schema. If the subtype is not present, returns 'unknown' for 'geolife_plus' or 'other' for all other schemas. """ mapping = CATEGORY_SCHEMAS[schema] default = 'unknown' if schema == 'geolife_plus' else 'other' return mapping.get(subtype, default)
def _classify_building(row, schema): # Alias for _classify_feature for backward compatibility. return _classify_feature(row, schema) def _parse_tag_values(value): # Normalize OSM tag value(s) to a list of lowercase strings. if value is None or (isinstance(value, float) and pd.isna(value)): return [] # Lists/arrays/tuples of values if isinstance(value, (list, tuple, set, np.ndarray)): values = list(value) else: # Split common multi-value delimiters used in OSM tags text = str(value) # Replace pipes and commas with semicolons for unified splitting for delim in ['|', ',']: text = text.replace(delim, ';') values = [v for v in (part.strip() for part in text.split(';')) if v] return [str(v).lower().strip() for v in values] def _choose_amenity_classification(row, schema): # Choose amenity-based classification; prefer non-parking when multiple exist. if 'amenity' not in row or pd.isna(row['amenity']): return None amenities = _parse_tag_values(row['amenity']) if not amenities: return None mapped = [(a, OSM_AMENITY_TO_SUBTYPE[a]) for a in amenities if a in OSM_AMENITY_TO_SUBTYPE] # Prefer the first non-parking amenity if present for amenity_value, subtype in mapped: if subtype != 'parking': category = get_category_for_subtype(subtype, schema) return (amenity_value, subtype, category) # Otherwise, if parking exists (and no better amenity), choose parking for a in amenities: if a == 'parking': subtype = 'parking' category = get_category_for_subtype(subtype, schema) return ('parking', subtype, category) return None def _classify_feature(row, schema, infer_building_types=False): # Classify a feature based on OSM tags; building > amenity > other tags. # Priority 1: building tag (but only if it's a specific building type, not 'yes') if "building" in row and pd.notna(row["building"]): osm_value = str(row["building"]).lower().strip() if osm_value == 'parking': # Building tag takes precedence: building=parking means parking. subtype = 'parking' category = get_category_for_subtype(subtype, schema) return ('parking', subtype, category) if osm_value != 'yes' and osm_value in OSM_BUILDING_TO_SUBTYPE: subtype = OSM_BUILDING_TO_SUBTYPE[osm_value] category = get_category_for_subtype(subtype, schema) return (osm_value, subtype, category) # For building=yes, continue to check other tags (amenity, leisure, etc.) # Priority 2: amenity tag (support multiple amenities; prefer non-parking) amenity_choice = _choose_amenity_classification(row, schema) if amenity_choice is not None: return amenity_choice # Priority 3: leisure tag (for parks and recreational areas) if "leisure" in row and pd.notna(row["leisure"]): osm_value = str(row["leisure"]).lower().strip() if osm_value in OSM_BUILDING_TO_SUBTYPE: subtype = OSM_BUILDING_TO_SUBTYPE[osm_value] category = get_category_for_subtype(subtype, schema) return (osm_value, subtype, category) else: # Leisure tag exists but not in mapping - preserve the leisure type default_category = 'unknown' if schema == 'geolife_plus' else 'other' return (osm_value, 'unknown', default_category) # Priority 4: landuse tag (for land use classifications) if "landuse" in row and pd.notna(row["landuse"]): osm_value = str(row["landuse"]).lower().strip() if osm_value in OSM_BUILDING_TO_SUBTYPE: subtype = OSM_BUILDING_TO_SUBTYPE[osm_value] category = get_category_for_subtype(subtype, schema) return (osm_value, subtype, category) else: # Landuse tag exists but not in mapping - preserve the landuse type default_category = 'unknown' if schema == 'geolife_plus' else 'other' return (osm_value, 'unknown', default_category) # Priority 5: tourism tag if "tourism" in row and pd.notna(row["tourism"]): osm_value = str(row["tourism"]).lower().strip() if osm_value in OSM_TOURISM_TO_SUBTYPE: subtype = OSM_TOURISM_TO_SUBTYPE[osm_value] category = get_category_for_subtype(subtype, schema) return (osm_value, subtype, category) # Priority 6: shop tag if "shop" in row and pd.notna(row["shop"]): osm_value = str(row["shop"]).lower().strip() if osm_value in OSM_BUILDING_TO_SUBTYPE: subtype = OSM_BUILDING_TO_SUBTYPE[osm_value] category = get_category_for_subtype(subtype, schema) return (osm_value, subtype, category) else: # Shop tag exists but not in mapping - preserve the shop type default_category = 'unknown' if schema == 'geolife_plus' else 'other' return (osm_value, 'unknown', default_category) # Priority 7: office tag if "office" in row and pd.notna(row["office"]): osm_value = str(row["office"]).lower().strip() if osm_value in OSM_BUILDING_TO_SUBTYPE: subtype = OSM_BUILDING_TO_SUBTYPE[osm_value] category = get_category_for_subtype(subtype, schema) return (osm_value, subtype, category) # Priority 8: healthcare tag if "healthcare" in row and pd.notna(row["healthcare"]): osm_value = str(row["healthcare"]).lower().strip() if osm_value in OSM_BUILDING_TO_SUBTYPE: subtype = OSM_BUILDING_TO_SUBTYPE[osm_value] category = get_category_for_subtype(subtype, schema) return (osm_value, subtype, category) # Priority 9: craft tag if "craft" in row and pd.notna(row["craft"]): osm_value = str(row["craft"]).lower().strip() if osm_value in OSM_BUILDING_TO_SUBTYPE: subtype = OSM_BUILDING_TO_SUBTYPE[osm_value] category = get_category_for_subtype(subtype, schema) return (osm_value, subtype, category) # Default case: unknown building default_category = 'unknown' if schema == 'geolife_plus' else 'other' # Optional inference for building=yes: after all other tags, # if no amenity and height < 20, label as residential if infer_building_types and "building" in row and pd.notna(row["building"]): building_value = str(row["building"]).lower().strip() if building_value == 'yes': has_amenity = ("amenity" in row and pd.notna(row["amenity"])) height = None if "height" in row and pd.notna(row["height"]): try: height = float(str(row["height"])) except (ValueError, TypeError): height = None # Require an additional residential signal to avoid mass overclassification landuse_val = str(row.get('landuse', '')).lower().strip() if 'landuse' in row and pd.notna(row['landuse']) else '' addr_fields = ['addr:housenumber', 'addr:housename', 'addr:unit'] has_addr = any((c in row and pd.notna(row[c])) for c in addr_fields) building_use = str(row.get('building:use', '')).lower().strip() if 'building:use' in row and pd.notna(row['building:use']) else '' is_residential_signal = ( landuse_val == 'residential' or ('residential' in building_use if building_use else False) or has_addr ) if (not has_amenity) and (height is not None and height < 20) and is_residential_signal: subtype = 'residential' category = get_category_for_subtype(subtype, schema) return ('yes', subtype, category) # Otherwise, keep unknown/other return ('yes', 'unknown', default_category) return ('unknown', 'unknown', default_category) def _categorize_features(gdf, schema, infer_building_types= False): # Apply categorization to all rows of a GeoDataFrame. if len(gdf) == 0: return gdf.copy() result = gdf.copy() classifications = result.apply(lambda row: _classify_feature(row, schema, infer_building_types), axis=1) result['osm_type'] = classifications.apply(lambda x: x[0]) result['subtype'] = classifications.apply(lambda x: x[1]) result['category'] = classifications.apply(lambda x: x[2]) return result # Derive secondary amenity-based subtypes when multiple amenities exist def _derive_secondary_subtypes(row, schema, primary_subtype): amenities = _parse_tag_values(row['amenity']) if 'amenity' in row and pd.notna(row['amenity']) else [] secondary = [] for a in amenities: if a in OSM_AMENITY_TO_SUBTYPE: subtype = OSM_AMENITY_TO_SUBTYPE[a] if subtype != 'parking' and subtype != primary_subtype and subtype not in secondary: secondary.append(subtype) return secondary[:2] def _add_secondary_subtypes_columns(gdf, schema): if len(gdf) == 0: return gdf sec = gdf.apply(lambda row: _derive_secondary_subtypes(row, schema, row.get('subtype')), axis=1) gdf = gdf.copy() gdf['subtype_2'] = sec.apply(lambda arr: arr[0] if isinstance(arr, list) and len(arr) > 0 else pd.NA) gdf['subtype_3'] = sec.apply(lambda arr: arr[1] if isinstance(arr, list) and len(arr) > 1 else pd.NA) return gdf # ============================================================================= # DOWNLOAD FUNCTIONS # ============================================================================= def _download_osm_features(bbox, tags, crs=DEFAULT_CRS): """Fetch OSM features for bbox/tags and return polygons in target CRS.""" try: features = ox.features_from_bbox(bbox=bbox, tags=tags) except InsufficientResponseError: return gpd.GeoDataFrame(columns=['geometry'], crs=crs) if len(features) == 0: return gpd.GeoDataFrame(columns=['geometry'], crs=crs) # Keep only area geometries features = features[features.geometry.apply(lambda g: g.geom_type in ['Polygon', 'MultiPolygon'])] return features.to_crs(crs)
[docs] def get_city_boundary_osm(name, simplify=True, crs="EPSG:4326"): """Fetch a city's boundary from OSM. Returns a tuple of (boundary_multipolygon, center_coordinates, population). If a boundary cannot be retrieved, returns (None, None, None). """ try: city_gdf = ox.geocode_to_gdf(name) if crs is not None: city_gdf = city_gdf.to_crs(crs) if len(city_gdf) == 0: return None, None, None geometry = city_gdf.geometry.iloc[0] if simplify: geometry = geometry.simplify(tolerance=0.001) # Get center coordinates center = geometry.centroid center_coords = (center.x, center.y) # Get population population = None if 'population' in city_gdf.columns: pop_val = city_gdf['population'].iloc[0] if not pd.isna(pop_val): population = int(pop_val) return geometry, center_coords, population except Exception as e: warnings.warn(f"Could not fetch city boundary for '{name}': {e}") return None, None, None
[docs] def download_osm_buildings(bbox_or_city, crs=DEFAULT_CRS, schema=DEFAULT_CATEGORY_SCHEMA, clip=False, infer_building_types=False, explode=False): """Download buildings + parks from OSM and categorize them. Parameters - bbox_or_city: bbox tuple, city name, or shapely polygon - crs: output CRS - schema: category schema name - clip: retained for API parity (bbox path uses Overpass bbox) - infer_building_types: heuristics for building="yes" - explode: explode MultiPolygons into Polygons """ # Determine if input is bbox, city name, or shapely polygon if isinstance(bbox_or_city, str): boundary, center, population = get_city_boundary_osm(bbox_or_city) if boundary is None: return gpd.GeoDataFrame() # Use city boundary directly instead of converting to bbox city_polygon = boundary elif hasattr(bbox_or_city, 'geom_type'): city_polygon = bbox_or_city bbox = None else: bbox = bbox_or_city city_polygon = None # by_chunks parameter is deprecated and ignored for simplicity and performance # Download buildings (including parking lots with building tags) building_tags = {"building": True} if city_polygon is not None: try: buildings = ox.features_from_polygon(city_polygon, building_tags) buildings = buildings.to_crs(crs) except InsufficientResponseError: buildings = gpd.GeoDataFrame(columns=['geometry'], crs=crs) else: buildings = _download_osm_features(bbox, building_tags, crs) # Download parking lots (even without building tags) - they should be buildings, not parks parking_tags = {"amenity": ["parking"]} if city_polygon is not None: try: parking_lots = ox.features_from_polygon(city_polygon, parking_tags) parking_lots = parking_lots.to_crs(crs) except InsufficientResponseError: parking_lots = gpd.GeoDataFrame(columns=['geometry'], crs=crs) else: parking_lots = _download_osm_features(bbox, parking_tags, crs) # Filter out underground parking from parking lots if 'parking' in parking_lots.columns: parking_lots = parking_lots[parking_lots['parking'] != 'underground'] if 'layer' in parking_lots.columns: parking_lots = parking_lots[parking_lots['layer'] != '-1'] # Combine buildings and parking lots if len(parking_lots) > 0: if len(buildings) > 0: buildings = gpd.GeoDataFrame( pd.concat([buildings, parking_lots], ignore_index=True), crs=crs ) else: buildings = parking_lots.copy() # EXCLUDE WATER FEATURES - they should not be buildings if 'natural' in buildings.columns: buildings = buildings[buildings['natural'] != 'water'] if 'waterway' in buildings.columns: buildings = buildings[buildings['waterway'].isna()] # Download parks and green spaces (NO PARKING LOTS, NO WATER - only leisure, natural) park_tags = {} for tag_key, tag_values in PARK_TAGS.items(): if tag_key not in ['landuse', 'amenity']: # Remove landuse and amenity (no parking) park_tags[tag_key] = tag_values if city_polygon is not None: try: parks = ox.features_from_polygon(city_polygon, park_tags) parks = parks.to_crs(crs) except InsufficientResponseError: parks = gpd.GeoDataFrame(columns=['geometry'], crs=crs) else: parks = _download_osm_features(bbox, park_tags, crs) # EXCLUDE WATER FEATURES FROM PARKS - they should not be parks either if 'natural' in parks.columns: parks = parks[parks['natural'] != 'water'] if 'waterway' in parks.columns: parks = parks[parks['waterway'].isna()] # Essential columns to keep essential_cols = ['geometry', 'osm_type', 'subtype', 'subtype_2', 'subtype_3', 'building_type', 'osm_category', 'addr:street', 'addr:city', 'addr:state', 'addr:housenumber', 'addr:postcode'] # Categorize buildings if len(buildings) > 0: classifications = buildings.apply(lambda row: _classify_feature(row, schema, infer_building_types), axis=1) buildings['osm_type'] = classifications.apply(lambda x: x[0]) buildings['subtype'] = classifications.apply(lambda x: x[1]) buildings['building_type'] = classifications.apply(lambda x: x[2]) # Keep schema-specific category name for reference (osm_category) buildings['osm_category'] = buildings['building_type'] buildings = _add_secondary_subtypes_columns(buildings, schema) buildings = buildings[[col for col in essential_cols if col in buildings.columns]] # Categorize parks if len(parks) > 0: classifications = parks.apply(lambda row: _classify_feature(row, schema, infer_building_types), axis=1) parks['osm_type'] = classifications.apply(lambda x: x[0]) parks['subtype'] = classifications.apply(lambda x: x[1]) parks['building_type'] = classifications.apply(lambda x: x[2]) # Keep schema-specific category name for reference (osm_category) parks['osm_category'] = parks['building_type'] parks = _add_secondary_subtypes_columns(parks, schema) parks = parks[[col for col in essential_cols if col in parks.columns]] # Combine only AFTER categorization to prevent parks from being misclassified as buildings if len(buildings) == 0 and len(parks) == 0: result = gpd.GeoDataFrame(columns=['osm_type', 'subtype', 'building_type', 'geometry'], crs=crs) elif len(buildings) == 0: result = parks elif len(parks) == 0: result = buildings else: result = gpd.GeoDataFrame(pd.concat([buildings, parks], ignore_index=True), crs=crs) # If requested, ensure all geometries are strictly inside the boundary by exploding then clipping if clip and len(result) > 0: # Build mask polygon (target CRS) if city_polygon is not None: mask = gpd.GeoDataFrame(geometry=[city_polygon], crs="EPSG:4326").to_crs(crs) else: mask = gpd.GeoDataFrame(geometry=[box(*bbox)], crs="EPSG:4326").to_crs(crs) # explode first, then clip result = result.explode(ignore_index=True) result = gpd.clip(result, mask) result = result[result.geometry.notna() & ~result.geometry.is_empty].reset_index(drop=True) # Optional explode for callers who want exploded geometries without clipping if (not clip) and explode and len(result) > 0: result = result.explode(ignore_index=True) return result
[docs] def download_osm_streets(bbox_or_city, crs=DEFAULT_CRS, clip=True, clip_to_gdf=None, explode=False, graphml_path=None): """Download filtered street network from OSM and return edges as GeoDataFrame.""" # Determine if input is bbox, city name, or shapely polygon if isinstance(bbox_or_city, str): boundary, center, population = get_city_boundary_osm(bbox_or_city) if boundary is None: return gpd.GeoDataFrame() city_polygon = boundary bbox = None elif hasattr(bbox_or_city, 'geom_type'): city_polygon = bbox_or_city bbox = None else: bbox = bbox_or_city city_polygon = None # Build custom Overpass filter (exact match on allowed highway types) highway_types = "|".join(STREET_HIGHWAY_TYPES) parts = [f'["highway"~"^({highway_types})$"]'] if STREET_EXCLUDED_SERVICE_TYPES: excluded_services = "|".join(STREET_EXCLUDED_SERVICE_TYPES) parts.append(f'["service"!~"{excluded_services}"]') if STREET_EXCLUDE_TUNNELS: parts.append('["tunnel"!="yes"]') if STREET_EXCLUDE_COVERED: parts.append('["covered"!="yes"]') if STREET_EXCLUDED_SURFACES: excluded_surfaces = "|".join(STREET_EXCLUDED_SURFACES) parts.append(f'["surface"!~"{excluded_surfaces}"]') custom_filter = "".join(parts) # by_chunks parameter is deprecated and ignored for simplicity and performance try: # Construct graph with query-time filtering and graph-level truncation if city_polygon is not None: G = ox.graph_from_polygon(city_polygon, custom_filter=custom_filter, simplify=True) else: G = ox.graph_from_bbox(bbox=bbox, custom_filter=custom_filter, truncate_by_edge=clip, simplify=True) # Optional additional truncation by another GDF's bounds if clip_to_gdf is not None and len(clip_to_gdf) > 0: cb = clip_to_gdf.total_bounds # (minx, miny, maxx, maxy) west2, south2, east2, north2 = cb[0], cb[1], cb[2], cb[3] G = ox.truncate.truncate_graph_bbox(G, north2, south2, east2, west2, truncate_by_edge=True) # Project to UTM for consolidation (requires meter-based CRS) Gp = ox.project_graph(G) Gc = ox.simplification.consolidate_intersections( Gp, tolerance=INTERSECTION_CONSOLIDATION_TOLERANCE_M, rebuild_graph=True ) if graphml_path: ox.save_graphml(Gc, filepath=str(graphml_path)) # Extract nodes/edges, prune short edges nodes_gdf, streets = ox.graph_to_gdfs(Gc) streets = streets[streets['length'] >= STREET_MIN_LENGTH_M] # Convert to target CRS streets = streets.to_crs(crs) except InsufficientResponseError: return gpd.GeoDataFrame(columns=['geometry'], crs=crs) except Exception as e: raise RuntimeError(f"Failed to download street network: {e}") if len(streets) == 0: return gpd.GeoDataFrame(columns=['geometry'], crs=crs) # Explode if requested if explode and len(streets) > 0: streets = streets.explode(ignore_index=True) return streets
# ============================================================================= # SIMPLE I/O HELPERS (GeoJSON, GeoParquet, Shapefile, GeoPackage) # =============================================================================
[docs] def save_geodata(gdf: gpd.GeoDataFrame, path: str, layer: str = None): """Persist a GeoDataFrame to disk based on file extension. Supported formats: - .geojson/.json -> GeoJSON - .parquet/.geoparquet -> GeoParquet - .shp -> ESRI Shapefile (multiple files created alongside) - .gpkg -> GeoPackage (layer optional, defaults to 'data') """ if gdf is None: raise ValueError("gdf cannot be None") ext = os.path.splitext(path)[1].lower() os.makedirs(os.path.dirname(path) or '.', exist_ok=True) if ext in ('.geojson', '.json'): gdf.to_file(path, driver='GeoJSON') elif ext in ('.parquet', '.geoparquet'): gdf.to_parquet(path) elif ext == '.shp': gdf.to_file(path, driver='ESRI Shapefile') elif ext == '.gpkg': gdf.to_file(path, layer=(layer or 'data'), driver='GPKG') else: raise ValueError(f"Unsupported geodata format: {ext}")
[docs] def load_geodata(path: str, layer: str = None) -> gpd.GeoDataFrame: """Load a GeoDataFrame from disk based on file extension. Supports .geojson/.json, .parquet/.geoparquet, .shp, .gpkg """ ext = os.path.splitext(path)[1].lower() if ext in ('.geojson', '.json', '.shp', '.gpkg'): kwargs = {} if ext == '.gpkg' and layer is not None: kwargs['layer'] = layer return gpd.read_file(path, **kwargs) elif ext in ('.parquet', '.geoparquet'): return gpd.read_parquet(path) else: raise ValueError(f"Unsupported geodata format: {ext}")
# ============================================================================= # GEOMETRY PROCESSING FUNCTIONS # =============================================================================
[docs] def remove_overlaps(gdf, exclude_categories= None): """Remove polygons fully contained within others, keeping one of any identical shapes. If exclude_categories is provided and a 'category' column exists, rows in those categories are not considered for removal. Uses spatial indexing for efficiency. """ if len(gdf) == 0: return gdf.copy() result = gdf.copy().reset_index(drop=True) # Remove duplicates first (keep one copy of identical geometries) result = result.drop_duplicates(subset=['geometry']) # If exclude_categories is specified, only process geometries not in those categories if exclude_categories is not None and 'category' in result.columns: to_process = result[~result['category'].isin(exclude_categories)].copy() excluded = result[result['category'].isin(exclude_categories)].copy() if len(to_process) == 0: return result # Nothing to process # Process only the non-excluded geometries processed = _remove_overlaps_internal(to_process) # Combine processed and excluded geometries final_result = gpd.GeoDataFrame( pd.concat([processed, excluded], ignore_index=True), crs=result.crs ) return final_result else: return _remove_overlaps_internal(result)
def _remove_overlaps_internal(gdf): # Perform overlap removal on a GeoDataFrame (helper for remove_overlaps). result = gdf.copy().reset_index(drop=True) # Step 1: Handle identical polygons to_remove = [] geometry_groups = {} for idx, geom in enumerate(result.geometry): geom_wkt = geom.wkt # Use WKT string as hashable key if geom_wkt not in geometry_groups: geometry_groups[geom_wkt] = [] geometry_groups[geom_wkt].append(idx) # For each group of identical geometries, keep only the first one for geom_wkt, indices in geometry_groups.items(): if len(indices) > 1: to_remove.extend(indices[1:]) # Keep first, remove rest # Step 2: Handle true containment (different geometries only) unique_geometries = set() for geom_wkt, indices in geometry_groups.items(): if len(indices) == 1: # Only unique geometries unique_geometries.add(indices[0]) if len(unique_geometries) > 1: # Use spatial join to find containment relationships among unique geometries unique_gdf = result.iloc[list(unique_geometries)].copy() temp_left = unique_gdf[['geometry']].copy() temp_right = unique_gdf[['geometry']].copy() # Find geometries that are contained within other geometries contained = gpd.sjoin(temp_left, temp_right, predicate='within', how='inner') # Remove self-containment contained = contained[contained.index != contained.index_right] # Process containment for unique geometries for idx in contained.index.unique(): if idx not in to_remove: # Don't double-remove containing_geoms = contained[contained.index == idx]['index_right'].values different_containers = [i for i in containing_geoms if i != idx] if different_containers: to_remove.append(idx) # Remove geometries that are duplicates or contained within different geometries result = result[~result.index.isin(to_remove)] return result
[docs] def rotate(gdf, rotation_deg=0.0, origin='centroid'): """Rotate all geometries around a single origin. Parameters - gdf: input GeoDataFrame - rotation_deg: degrees counterclockwise - origin: 'centroid' or a tuple of (x, y) """ if len(gdf) == 0: return gdf.copy() result = gdf.copy() # Determine rotation origin if origin == 'centroid': # Calculate centroid of all geometries combined all_geoms = result.geometry.union_all() origin_point = all_geoms.centroid origin_coords = (origin_point.x, origin_point.y) else: origin_coords = origin # Rotate all geometries around the same point if rotation_deg != 0: result.geometry = result.geometry.apply( lambda geom: shapely_rotate(geom, rotation_deg, origin=origin_coords) ) return result
# ============================================================================= # SUMMARY FUNCTIONS # =============================================================================
[docs] def get_category_summary(gdf): """Return a dict of building_type counts.""" return gdf['building_type'].value_counts().to_dict()
[docs] def get_subtype_summary(gdf): """Return a dict of subtype counts.""" if 'subtype' not in gdf.columns or len(gdf) == 0: return {} return gdf['subtype'].value_counts().to_dict()
# Removed unused prominence helper to keep module focused on core use cases
[docs] def rotate_streets_to_align(streets_gdf, k=200): """Estimate grid alignment from street bearings and rotate the network. Returns a tuple of (rotated_streets_gdf, rotation_degrees). """ if len(streets_gdf) == 0: return streets_gdf.copy(), 0.0 # Get random non-highway streets non_highway = streets_gdf[~streets_gdf['highway'].isin(['motorway', 'trunk', 'primary'])] if len(non_highway) > k: sample_streets = non_highway.sample(n=k) else: sample_streets = non_highway # Extract all segment angles efficiently all_coords = [] for geom in sample_streets.geometry: if hasattr(geom, 'coords'): all_coords.extend(list(geom.coords)) if len(all_coords) < 2: return streets_gdf.copy(), 0.0 # Vectorized angle calculation coords_array = np.array(all_coords) dx = coords_array[1:, 0] - coords_array[:-1, 0] dy = coords_array[1:, 1] - coords_array[:-1, 1] mask = (dx != 0) | (dy != 0) if not np.any(mask): return streets_gdf.copy(), 0.0 angles = np.arctan2(dy[mask], dx[mask]) angles = ((angles + np.pi/2) % np.pi) - np.pi/2 # Calculate rotation A, B = np.sum(np.cos(4 * angles)), np.sum(np.sin(4 * angles)) rotation_rad = -0.25 * np.arctan2(B, A) rotation_deg = np.degrees(rotation_rad) # Rotate all streets around common centroid all_geoms = streets_gdf.geometry.union_all() origin_coords = (all_geoms.centroid.x, all_geoms.centroid.y) rotated_streets = streets_gdf.copy() rotated_streets.geometry = rotated_streets.geometry.apply( lambda geom: shapely_rotate(geom, rotation_deg, origin=origin_coords) ) return rotated_streets, rotation_deg
# ============================================================================= # GARDEN CITY COORDINATE TRANSFORMATION UTILITIES # =============================================================================
[docs] def blocks_to_mercator(data, block_size=15.0, false_easting=-4265699.0, false_northing=4392976.0, rotation_deg=0.0, rotation_origin=None): """ Convert city block coordinates to Web Mercator coordinates. This function applies an affine transformation to convert abstract city block coordinates (in units of blocks) to Web Mercator projection coordinates (EPSG:3857) in meters. The transformation is: x_mercator = block_size * x_block + false_easting Parameters ---------- data : pd.DataFrame DataFrame with 'x', 'y' columns in city block coordinates block_size : float, default 15.0 Size of one city block in meters false_easting : float, default -4265699.0 False easting offset (x-origin) in Web Mercator meters false_northing : float, default 4392976.0 False northing offset (y-origin) in Web Mercator meters rotation_deg : float, default 0.0 Rotation to undo (degrees counterclockwise). If non-zero, rotates by -rotation_deg around rotation_origin after scaling and translation. rotation_origin : tuple of (x, y), optional Rotation origin in Web Mercator coordinates. If None and rotation_deg != 0, uses (false_easting, false_northing) as origin. Returns ------- pd.DataFrame DataFrame with 'x', 'y' columns updated to Web Mercator coordinates. If 'ha' column exists, it is also scaled by block_size. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame({'x': [0, 1, 2], 'y': [0, 1, 2]}) >>> result = blocks_to_mercator(df, block_size=15, false_easting=-4265699, false_northing=4392976) >>> result['x'].tolist() [-4265699.0, -4265684.0, -4265669.0] """ # Validate required columns if 'x' not in data.columns or 'y' not in data.columns: raise ValueError("DataFrame must contain 'x' and 'y' columns") # Create a copy to avoid modifying original result = data.copy() # Apply affine transformation: mercator = block_size * block + origin result['x'] = block_size * result['x'] + false_easting result['y'] = block_size * result['y'] + false_northing # Undo rotation if specified if rotation_deg != 0.0: if rotation_origin is None: rotation_origin = (false_easting, false_northing) origin_x, origin_y = rotation_origin rotation_rad = np.radians(-rotation_deg) cos_r = np.cos(rotation_rad) sin_r = np.sin(rotation_rad) # Translate to origin, rotate, translate back x_centered = result['x'] - origin_x y_centered = result['y'] - origin_y result['x'] = x_centered * cos_r - y_centered * sin_r + origin_x result['y'] = x_centered * sin_r + y_centered * cos_r + origin_y # Scale horizontal accuracy if present if 'ha' in result.columns: result['ha'] = block_size * result['ha'] return result
[docs] def mercator_to_blocks(data, block_size=15.0, false_easting=-4265699.0, false_northing=4392976.0): """ Convert Web Mercator coordinates back to city block coordinates. This function applies the inverse affine transformation to convert Web Mercator projection coordinates (EPSG:3857) in meters back to abstract city block coordinates. The transformation is: x_block = (x_mercator - false_easting) / block_size Parameters ---------- data : pd.DataFrame DataFrame with 'x', 'y' columns in Web Mercator coordinates block_size : float, default 15.0 Size of one city block in meters false_easting : float, default -4265699.0 False easting offset (x-origin) in Web Mercator meters false_northing : float, default 4392976.0 False northing offset (y-origin) in Web Mercator meters Returns ------- pd.DataFrame DataFrame with 'x', 'y' columns updated to city block coordinates. If 'ha' column exists, it is also scaled back by dividing by block_size. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame({'x': [-4265699.0, -4265684.0], 'y': [4392976.0, 4392991.0]}) >>> result = mercator_to_blocks(df, block_size=15, false_easting=-4265699, false_northing=4392976) >>> result['x'].tolist() [0.0, 1.0] """ # Validate required columns if 'x' not in data.columns or 'y' not in data.columns: raise ValueError("DataFrame must contain 'x' and 'y' columns") # Create a copy to avoid modifying original result = data.copy() # Apply inverse affine transformation: block = (mercator - origin) / block_size result['x'] = (result['x'] - false_easting) / block_size result['y'] = (result['y'] - false_northing) / block_size # Scale horizontal accuracy back if present if 'ha' in result.columns: result['ha'] = result['ha'] / block_size return result
[docs] def blocks_to_mercator_gdf(gdf, block_size, false_easting, false_northing, rotation_deg=0.0, drop_garden_cols=True): """ Transform GeoDataFrame from garden city block units to Web Mercator meters. Parameters ---------- gdf : gpd.GeoDataFrame Input with geometries in garden city block units block_size : float Block side length in meters false_easting : float Web Mercator origin x in meters false_northing : float Web Mercator origin y in meters rotation_deg : float, default 0.0 Rotation applied to input (will be undone by rotating -rotation_deg) drop_garden_cols : bool, default True If True, drop coord_x, coord_y, door_cell_x, door_cell_y columns Returns ------- gpd.GeoDataFrame Transformed GeoDataFrame with CRS='EPSG:3857' Notes ----- Transformation sequence: 1. Scale geometries by block_size (blocks → meters) 2. Translate by false_easting, false_northing (move to Web Mercator position) 3. Rotate by -rotation_deg (undo rotation around centroid) 4. Set CRS to EPSG:3857 5. Optionally drop garden city columns This is the GeoDataFrame equivalent of blocks_to_mercator() for DataFrames. """ from shapely.affinity import scale, rotate as shapely_rotate result = gdf.copy() if drop_garden_cols: drop_cols = [c for c in ['coord_x', 'coord_y', 'door_cell_x', 'door_cell_y', 'door_point_x', 'door_point_y'] if c in result.columns] result = result.drop(columns=drop_cols) # Scale from garden city units to meters result['geometry'] = result['geometry'].apply( lambda g: scale(g, xfact=block_size, yfact=block_size, origin=(0, 0)) ) # Translate to Web Mercator position result['geometry'] = result['geometry'].translate( xoff=false_easting, yoff=false_northing ) # Undo rotation (rotate by negative angle around centroid) if rotation_deg != 0.0: all_geoms = result.geometry.union_all() origin_point = all_geoms.centroid result['geometry'] = result['geometry'].apply( lambda g: shapely_rotate(g, -rotation_deg, origin=(origin_point.x, origin_point.y)) ) result = result.set_crs('EPSG:3857', allow_override=True) return result
[docs] def mercator_to_blocks_gdf(gdf, block_size, false_easting, false_northing, rotation_deg=0.0): """ Transform GeoDataFrame from Web Mercator meters to garden city block units. Parameters ---------- gdf : gpd.GeoDataFrame Input with geometries in Web Mercator (EPSG:3857) block_size : float Block side length in meters false_easting : float Web Mercator origin x in meters false_northing : float Web Mercator origin y in meters rotation_deg : float, default 0.0 Rotation to apply (degrees counterclockwise) Returns ------- gpd.GeoDataFrame Transformed GeoDataFrame with CRS=None (garden city units) Notes ----- Transformation sequence (inverse of blocks_to_mercator_gdf): 1. Rotate by rotation_deg around centroid 2. Translate by -false_easting, -false_northing (move from Web Mercator position) 3. Scale geometries by 1/block_size (meters → blocks) 4. Set CRS to None (abstract units) This is the GeoDataFrame equivalent of mercator_to_blocks() for DataFrames. """ from shapely.affinity import scale, rotate as shapely_rotate result = gdf.copy() # Apply rotation around centroid if rotation_deg != 0.0: all_geoms = result.geometry.union_all() origin_point = all_geoms.centroid result['geometry'] = result['geometry'].apply( lambda g: shapely_rotate(g, rotation_deg, origin=(origin_point.x, origin_point.y)) ) # Translate from Web Mercator position result['geometry'] = result['geometry'].translate( xoff=-false_easting, yoff=-false_northing ) # Scale from meters to garden city units result['geometry'] = result['geometry'].apply( lambda g: scale(g, xfact=1.0/block_size, yfact=1.0/block_size, origin=(0, 0)) ) result = result.set_crs(None, allow_override=True) return result