pyhld/centerlines_from_homes.py
2024-04-19 14:29:59 -05:00

91 lines
3.6 KiB
Python
Executable File

#!/usr/bin/env python3
import osmnx as ox
import geopandas as gpd
from shapely.geometry import LineString, box
print("Getting road centerlines...")
# Load home points from a shapefile
home_points_gdf = gpd.read_file('home_points.shp')
# Check if there's at least one feature in the DataFrame
if not home_points_gdf.empty:
# Check if CRS is projected, if not, prompt to check the CRS
if home_points_gdf.crs.is_geographic:
print("Warning: The CRS of the shapefile is geographic. Buffering might be inaccurate.")
else:
print(f"Using projected CRS {home_points_gdf.crs} for buffering.")
# Determine the units of the CRS
crs_units = home_points_gdf.crs.axis_info[0].unit_name
print(f"CRS units: {crs_units}")
# Calculate the total bounds of the home points and extend by 1000 feet on each side
extension_distance_feet = 1000 # Distance in feet
extension_distance = extension_distance_feet # Default in feet
# Convert the extension distance from feet to the CRS units if necessary
if crs_units == 'meter':
extension_distance = extension_distance_feet * 0.3048 # Convert feet to meters
minx, miny, maxx, maxy = home_points_gdf.total_bounds
extended_bounds = (minx - extension_distance, miny - extension_distance,
maxx + extension_distance, maxy + extension_distance)
# Create an extended bounding box around the home points
bbox = box(*extended_bounds)
# Reproject the bounding box to EPSG:4326 for OSMnx
bbox_reprojected = gpd.GeoSeries([bbox], crs=home_points_gdf.crs).to_crs('epsg:4326')
# Use OSMnx to download the street network
G = ox.graph_from_polygon(bbox_reprojected.iloc[0], network_type='drive')
# Convert the graph into GeoDataFrames
gdf_nodes, gdf_edges = ox.graph_to_gdfs(G)
# Check each row in each column and convert non-compatible types to strings, skip geometry column
for column in gdf_edges.columns:
if column != 'geometry': # Skip the geometry column
for idx, value in gdf_edges[column].items():
if isinstance(value, (list, dict, set)):
gdf_edges.at[idx, column] = ', '.join(map(str, value))
elif not isinstance(value, (int, float, str, type(None))):
gdf_edges.at[idx, column] = str(value)
print(f"Processed column: {column}")
# Reproject gdf_edges back to the original CRS of home points
original_crs = home_points_gdf.crs
gdf_edges = gdf_edges.to_crs(original_crs)
print("Normalizing geometries...")
# Normalize geometries and deduplicate
def normalize_geometry(geometry):
if isinstance(geometry, LineString):
return LineString(sorted(geometry.coords))
return geometry
gdf_edges['normalized_geom'] = gdf_edges['geometry'].apply(normalize_geometry)
gdf_edges_deduped = gdf_edges.drop_duplicates(subset=['osmid', 'normalized_geom'])
# Drop the temporary 'normalized_geom' column
gdf_edges_deduped = gdf_edges_deduped.drop(columns=['normalized_geom'])
# Reproject gdf_edges_deduped back to the original CRS of home points
original_crs = home_points_gdf.crs
gdf_edges_deduped = gdf_edges_deduped.to_crs(original_crs)
# Attempt to save the deduplicated DataFrame to a shapefile
try:
gdf_edges_deduped.to_file('road_centerlines.shp')
print(f"Road centerlines have been saved to road_centerlines.shp in CRS {original_crs}.")
except Exception as e:
print(f"Error encountered: {e}")
else:
print("No points found in the home_points.shp file.")
print("Done.")