91 lines
3.6 KiB
Python
Executable File
91 lines
3.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import osmnx as ox
|
|
import geopandas as gpd
|
|
from shapely.geometry import LineString, box
|
|
|
|
print("Getting road centerlines...")
|
|
|
|
# Load home points from a shapefile
|
|
home_points_gdf = gpd.read_file('home_points.shp')
|
|
|
|
# Check if there's at least one feature in the DataFrame
|
|
if not home_points_gdf.empty:
|
|
# Check if CRS is projected, if not, prompt to check the CRS
|
|
if home_points_gdf.crs.is_geographic:
|
|
print("Warning: The CRS of the shapefile is geographic. Buffering might be inaccurate.")
|
|
else:
|
|
print(f"Using projected CRS {home_points_gdf.crs} for buffering.")
|
|
|
|
# Determine the units of the CRS
|
|
crs_units = home_points_gdf.crs.axis_info[0].unit_name
|
|
print(f"CRS units: {crs_units}")
|
|
|
|
# Calculate the total bounds of the home points and extend by 1000 feet on each side
|
|
extension_distance_feet = 1000 # Distance in feet
|
|
extension_distance = extension_distance_feet # Default in feet
|
|
|
|
# Convert the extension distance from feet to the CRS units if necessary
|
|
if crs_units == 'meter':
|
|
extension_distance = extension_distance_feet * 0.3048 # Convert feet to meters
|
|
|
|
minx, miny, maxx, maxy = home_points_gdf.total_bounds
|
|
extended_bounds = (minx - extension_distance, miny - extension_distance,
|
|
maxx + extension_distance, maxy + extension_distance)
|
|
|
|
# Create an extended bounding box around the home points
|
|
bbox = box(*extended_bounds)
|
|
|
|
# Reproject the bounding box to EPSG:4326 for OSMnx
|
|
bbox_reprojected = gpd.GeoSeries([bbox], crs=home_points_gdf.crs).to_crs('epsg:4326')
|
|
|
|
# Use OSMnx to download the street network
|
|
G = ox.graph_from_polygon(bbox_reprojected.iloc[0], network_type='drive')
|
|
|
|
# Convert the graph into GeoDataFrames
|
|
gdf_nodes, gdf_edges = ox.graph_to_gdfs(G)
|
|
|
|
# Check each row in each column and convert non-compatible types to strings, skip geometry column
|
|
for column in gdf_edges.columns:
|
|
if column != 'geometry': # Skip the geometry column
|
|
for idx, value in gdf_edges[column].items():
|
|
if isinstance(value, (list, dict, set)):
|
|
gdf_edges.at[idx, column] = ', '.join(map(str, value))
|
|
elif not isinstance(value, (int, float, str, type(None))):
|
|
gdf_edges.at[idx, column] = str(value)
|
|
print(f"Processed column: {column}")
|
|
|
|
# Reproject gdf_edges back to the original CRS of home points
|
|
original_crs = home_points_gdf.crs
|
|
gdf_edges = gdf_edges.to_crs(original_crs)
|
|
|
|
print("Normalizing geometries...")
|
|
|
|
# Normalize geometries and deduplicate
|
|
def normalize_geometry(geometry):
|
|
if isinstance(geometry, LineString):
|
|
return LineString(sorted(geometry.coords))
|
|
return geometry
|
|
|
|
gdf_edges['normalized_geom'] = gdf_edges['geometry'].apply(normalize_geometry)
|
|
gdf_edges_deduped = gdf_edges.drop_duplicates(subset=['osmid', 'normalized_geom'])
|
|
|
|
# Drop the temporary 'normalized_geom' column
|
|
gdf_edges_deduped = gdf_edges_deduped.drop(columns=['normalized_geom'])
|
|
|
|
# Reproject gdf_edges_deduped back to the original CRS of home points
|
|
original_crs = home_points_gdf.crs
|
|
gdf_edges_deduped = gdf_edges_deduped.to_crs(original_crs)
|
|
|
|
# Attempt to save the deduplicated DataFrame to a shapefile
|
|
try:
|
|
gdf_edges_deduped.to_file('road_centerlines.shp')
|
|
print(f"Road centerlines have been saved to road_centerlines.shp in CRS {original_crs}.")
|
|
except Exception as e:
|
|
print(f"Error encountered: {e}")
|
|
|
|
else:
|
|
print("No points found in the home_points.shp file.")
|
|
|
|
print("Done.")
|