Basic route QC

This commit is contained in:
Chris Bomar 2025-04-25 07:38:37 -05:00
parent bdbc8d61b4
commit 4b3bbf4f16
6 changed files with 525 additions and 3 deletions

View File

@ -2,6 +2,6 @@
PyHLD - Python Fiber to the Home High Level Design Tool PyHLD - Python Fiber to the Home High Level Design Tool
""" """
__version__ = "0.2.0" __version__ = "0.4.0"
__author__ = "Chris Bomar" __author__ = "Chris Bomar"
__email__ = "" __email__ = ""

View File

@ -7,6 +7,9 @@ from .commands.design import run_design
from .commands.calculate import run_calculate from .commands.calculate import run_calculate
from .commands.validate import run_validate from .commands.validate import run_validate
from .commands.graph import run_graph from .commands.graph import run_graph
from .commands.multi2single import run_multi2single
from .commands.poles import run_poles
from .commands.access_points import run_access_points
console = Console() console = Console()
@ -47,5 +50,47 @@ def graph(input_file, delete_duplicates):
"""Analyze network connectivity from line shapefile.""" """Analyze network connectivity from line shapefile."""
run_graph(input_file, delete_duplicates) run_graph(input_file, delete_duplicates)
@main.command()
@click.option('--input-file', '-i',
type=click.Path(exists=True, path_type=Path),
required=True,
help='Input shapefile path')
@click.option('--output-file', '-o',
type=click.Path(path_type=Path),
help='Output shapefile path (optional)')
def multi2single(input_file, output_file):
"""Convert multipart geometries to single part geometries."""
run_multi2single(input_file, output_file)
@main.command()
@click.option('--segments', '-s',
type=click.Path(exists=True, path_type=Path),
required=True,
help='Input segments shapefile path')
@click.option('--poles', '-p',
type=click.Path(exists=True, path_type=Path),
required=True,
help='Input poles shapefile path')
def poles(segments, poles):
"""Validate pole placement for aerial segments."""
run_poles(segments, poles)
@main.command()
@click.option('--segments', '-s',
type=click.Path(exists=True, path_type=Path),
required=True,
help='Input segments shapefile path')
@click.option('--poles', '-p',
type=click.Path(exists=True, path_type=Path),
required=True,
help='Input poles shapefile path')
@click.option('--access-points', '-a',
type=click.Path(exists=True, path_type=Path),
required=True,
help='Input access points shapefile path')
def access_points(segments, poles, access_points):
"""Validate access point placement for underground segments."""
run_access_points(segments, poles, access_points)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -0,0 +1,170 @@
"""Command to validate access point placement for underground segments."""
from pathlib import Path
import geopandas as gpd
from shapely.geometry import Point
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.panel import Panel
from rich.table import Table
console = Console()
def check_access_points(segments: gpd.GeoDataFrame,
poles: gpd.GeoDataFrame,
access_points: gpd.GeoDataFrame,
snap_threshold: float = 1.0) -> tuple:
"""Check access point and pole placement for underground segments."""
# Create/overwrite columns
segments['access'] = False
access_points['isolated'] = True # Default to isolated until proven connected
# Track issues
missing_endpoints = []
duplicate_aps = []
isolated_aps = []
# Process only underground segments
underground_segments = segments[segments['Type'] == 'Underground']
# Check for duplicate access points
for idx, ap in access_points.iterrows():
nearby_aps = access_points[access_points.geometry.distance(ap.geometry) <= snap_threshold]
if len(nearby_aps) > 1: # More than itself
duplicate_aps.append({
'ap_id': idx,
'count': len(nearby_aps)
})
# Check segment endpoints
for idx, row in underground_segments.iterrows():
line = row.geometry
start_point = Point(line.coords[0])
end_point = Point(line.coords[-1])
# Find poles and APs near endpoints
start_poles = poles[poles.geometry.distance(start_point) <= snap_threshold]
end_poles = poles[poles.geometry.distance(end_point) <= snap_threshold]
start_aps = access_points[access_points.geometry.distance(start_point) <= snap_threshold]
end_aps = access_points[access_points.geometry.distance(end_point) <= snap_threshold]
# Mark APs as connected if found at endpoints
if not start_aps.empty:
access_points.loc[start_aps.index, 'isolated'] = False
if not end_aps.empty:
access_points.loc[end_aps.index, 'isolated'] = False
# Check if endpoints have required connections
has_start = not (start_poles.empty and start_aps.empty)
has_end = not (end_poles.empty and end_aps.empty)
if not has_start or not has_end:
missing_endpoints.append({
'segment_id': idx,
'start_missing': not has_start,
'end_missing': not has_end
})
segments.loc[idx, 'access'] = False
else:
segments.loc[idx, 'access'] = True
# Find isolated access points
isolated_aps = access_points[access_points['isolated']].index.tolist()
return (segments, access_points,
missing_endpoints, duplicate_aps, isolated_aps)
def run_access_points(segments_file: Path, poles_file: Path,
access_points_file: Path) -> None:
"""Validate access point placement."""
console.print(Panel.fit("🔌 Validating Access Points", title="PyHLD", width=30))
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
)
with progress:
# Create tasks
read_task = progress.add_task("Reading shapefiles", total=1)
check_task = progress.add_task("Checking access points", total=1, visible=False)
save_task = progress.add_task("Saving results", total=1, visible=False)
try:
# Read input files
segments_gdf = gpd.read_file(segments_file)
poles_gdf = gpd.read_file(poles_file)
aps_gdf = gpd.read_file(access_points_file)
progress.update(read_task, completed=True)
progress.refresh()
# Check CRS match
if not (segments_gdf.crs == poles_gdf.crs == aps_gdf.crs):
console.print("[red]Error: Coordinate systems don't match[/red]")
return
# Validate access point placement
progress.update(check_task, visible=True)
segments_gdf, aps_gdf, missing, duplicates, isolated = check_access_points(
segments_gdf, poles_gdf, aps_gdf
)
progress.update(check_task, completed=True)
progress.refresh()
# Save results
progress.update(save_task, visible=True)
segments_gdf.to_file(segments_file)
aps_gdf.to_file(access_points_file)
progress.update(save_task, completed=True)
progress.stop()
# Report results
if missing:
console.print(f"\n[red]Found {len(missing)} underground segments missing connections:[/red]")
table = Table(title="Missing Connections")
table.add_column("Segment ID", justify="right")
table.add_column("Missing At", justify="left")
for m in missing:
missing_at = []
if m['start_missing']: missing_at.append("Start")
if m['end_missing']: missing_at.append("End")
table.add_row(str(m['segment_id']), ", ".join(missing_at))
console.print(table)
if duplicates:
console.print(f"\n[yellow]Warning: Found {len(duplicates)} duplicate access points:[/yellow]")
table = Table(title="Duplicate Access Points")
table.add_column("AP ID", justify="right")
table.add_column("Duplicates", justify="right")
for d in duplicates:
table.add_row(str(d['ap_id']), str(d['count']))
console.print(table)
if isolated:
console.print(f"\n[yellow]Warning: Found {len(isolated)} isolated access points:[/yellow]")
table = Table(title="Isolated Access Points")
table.add_column("AP ID", justify="right")
for ap_id in isolated:
table.add_row(str(ap_id))
console.print(table)
if not any([missing, duplicates, isolated]):
console.print("\n[green]✓ All underground segments properly connected[/green]")
# Print summary
console.print("\n[yellow]Summary:[/yellow]")
console.print(f"• Total underground segments: {len(segments_gdf[segments_gdf['Type'] == 'Underground'])}")
console.print(f"• Segments with proper access: {len(segments_gdf[segments_gdf['access']])}")
console.print(f"• Total access points: {len(aps_gdf)}")
console.print(f"• Connected access points: {len(aps_gdf[~aps_gdf['isolated']])}")
except Exception as e:
progress.stop()
console.print(f"\n[red]Error: {str(e)}[/red]")
return

View File

@ -91,6 +91,33 @@ def find_overlapping_segments(gdf: gpd.GeoDataFrame) -> list:
# Convert to list of tuples with info # Convert to list of tuples with info
return [(p[0], p[1], overlap_info[p]) for p in sorted(overlap_pairs)] return [(p[0], p[1], overlap_info[p]) for p in sorted(overlap_pairs)]
def find_matching_endpoints(gdf: gpd.GeoDataFrame) -> list:
"""Find segments that share the same endpoints regardless of direction."""
endpoint_pairs = set() # Store unique endpoint pairs
endpoint_info = {} # Store segment info for each endpoint pair
for idx, row in gdf.iterrows():
# Get endpoints as a frozenset to ignore direction
line = row.geometry
endpoints = frozenset([line.coords[0], line.coords[-1]])
# Check if we've seen these endpoints before
if endpoints in endpoint_pairs:
if endpoints not in endpoint_info:
# Find the first segment with these endpoints
for prev_idx, prev_row in gdf.iloc[:idx].iterrows():
prev_endpoints = frozenset([prev_row.geometry.coords[0],
prev_row.geometry.coords[-1]])
if prev_endpoints == endpoints:
endpoint_info[endpoints] = [(prev_idx, prev_row.geometry.length)]
break
endpoint_info[endpoints].append((idx, line.length))
else:
endpoint_pairs.add(endpoints)
# Return only the endpoints that have multiple segments
return [(pairs, infos) for pairs, infos in endpoint_info.items() if len(infos) > 1]
def delete_duplicate_geometries(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: def delete_duplicate_geometries(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Remove duplicate geometries from GeoDataFrame while preserving LineStrings. """Remove duplicate geometries from GeoDataFrame while preserving LineStrings.
@ -179,7 +206,7 @@ def run_graph(input_file: Path, delete_duplicates: bool = False) -> None:
console.print("• QGIS Row: QGIS's 1-based row number (use this in QGIS expressions)") console.print("• QGIS Row: QGIS's 1-based row number (use this in QGIS expressions)")
console.print("\n[yellow]Troubleshooting Tips:[/yellow]") console.print("\n[yellow]Troubleshooting Tips:[/yellow]")
qgis_rows = ", ".join(str(idx + 1) for idx in invalid_rows.index) qgis_rows = ", ".join(str(idx) for idx in invalid_rows.index)
console.print("1. Open the shapefile in QGIS") console.print("1. Open the shapefile in QGIS")
console.print("2. Find invalid features using this expression:") console.print("2. Find invalid features using this expression:")
console.print(f" $id IN ({qgis_rows})") console.print(f" $id IN ({qgis_rows})")
@ -242,7 +269,7 @@ def run_graph(input_file: Path, delete_duplicates: bool = False) -> None:
console.print(table) console.print(table)
console.print("\n[yellow]Troubleshooting Tips:[/yellow]") console.print("\n[yellow]Troubleshooting Tips:[/yellow]")
overlap_expr = " OR ".join([f"$id IN ({i1 + 1}, {i2 + 1})" for i1, i2, _ in overlaps]) overlap_expr = " OR ".join([f"$id IN ({i1}, {i2})" for i1, i2, _ in overlaps])
console.print("1. Open the shapefile in QGIS") console.print("1. Open the shapefile in QGIS")
console.print("2. Find overlapping segments using this expression:") console.print("2. Find overlapping segments using this expression:")
console.print(f" {overlap_expr}") console.print(f" {overlap_expr}")
@ -254,6 +281,46 @@ def run_graph(input_file: Path, delete_duplicates: bool = False) -> None:
except Exception as e: except Exception as e:
console.print(f"[red]Error checking for overlaps: {e}[/red]") console.print(f"[red]Error checking for overlaps: {e}[/red]")
try:
matching_endpoints = find_matching_endpoints(gdf)
if matching_endpoints:
console.print(f"\n[yellow]Warning: Found {len(matching_endpoints)} sets of segments sharing endpoints[/yellow]")
table = Table(title="Segments Sharing Endpoints")
table.add_column("Set", justify="right")
table.add_column("QGIS Rows", justify="right")
table.add_column("Length Difference", justify="right")
table.add_column("Segment Lengths", justify="right")
for i, (endpoints, segments) in enumerate(matching_endpoints, 1):
# Get row numbers and lengths
qgis_rows = [str(idx + 1) for idx, _ in segments]
lengths = [length for _, length in segments]
# Calculate length difference
length_diff = max(lengths) - min(lengths)
table.add_row(
str(i),
", ".join(qgis_rows),
f"{length_diff:.2f}",
", ".join(f"{length:.2f}" for length in lengths)
)
console.print(table)
console.print("\n[yellow]Troubleshooting Tips:[/yellow]")
endpoint_expr = " OR ".join([
f"$id IN ({','.join(str(idx) for idx, _ in segments)})"
for _, segments in matching_endpoints
])
console.print("1. Open the shapefile in QGIS")
console.print("2. Find segments with matching endpoints using this expression:")
console.print(f" {endpoint_expr}")
console.print("3. Review segments that share endpoints but have different lengths")
console.print("4. Consider removing redundant segments or fixing geometry\n")
except Exception as e:
console.print(f"[red]Error checking for matching endpoints: {e}[/red]")
# Create network graph # Create network graph
progress.update(create_graph_task, visible=True) progress.update(create_graph_task, visible=True)
G = nx.Graph() G = nx.Graph()

View File

@ -0,0 +1,85 @@
"""Command to convert multipart geometries to single part geometries."""
from pathlib import Path
import pandas as pd
import geopandas as gpd
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.panel import Panel
console = Console()
def explode_multiparts(gdf: gpd.GeoDataFrame) -> tuple[gpd.GeoDataFrame, int]:
"""Explode multipart geometries while preserving attributes."""
# Keep track of original multipart features
multi_mask = gdf.geometry.apply(lambda geom: geom.geom_type.startswith('Multi'))
multi_count = multi_mask.sum()
if multi_count == 0:
return gdf, 0
# Separate single and multipart features
singles = gdf[~multi_mask].copy()
multis = gdf[multi_mask].copy()
# Explode multipart features
exploded = multis.explode(index_parts=True)
# Reset index and drop the multiindex columns
exploded = exploded.reset_index(drop=True)
# Combine back with single part features
result = gpd.GeoDataFrame(
pd.concat([singles, exploded], ignore_index=True),
crs=gdf.crs
)
return result, multi_count
def run_multi2single(input_file: Path, output_file: Path = None) -> None:
"""Convert multipart geometries to single part geometries."""
console.print(Panel.fit("🔨 Converting Multipart to Singlepart", title="PyHLD"))
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
)
with progress:
# Create tasks
read_task = progress.add_task("Reading shapefile", total=1)
convert_task = progress.add_task("Converting geometries", total=1, visible=False)
save_task = progress.add_task("Saving results", total=1, visible=False)
try:
# Read input file
gdf = gpd.read_file(input_file)
progress.update(read_task, completed=True)
progress.refresh()
# Convert multipart to singlepart
progress.update(convert_task, visible=True)
result_gdf, multi_count = explode_multiparts(gdf)
progress.update(convert_task, completed=True)
progress.refresh()
if multi_count == 0:
progress.stop()
console.print("\n[green]No multipart geometries found[/green]")
return
# Save results
progress.update(save_task, visible=True)
output_path = output_file if output_file else input_file
result_gdf.to_file(output_path)
progress.update(save_task, completed=True)
progress.stop()
# Print results
console.print(f"\n[green]Converted {multi_count} multipart geometries to singlepart[/green]")
console.print(f"[green]Output saved to:[/green] {output_path}")
except Exception as e:
progress.stop()
console.print(f"\n[red]Error: {str(e)}[/red]")
return

155
src/pyhld/commands/poles.py Normal file
View File

@ -0,0 +1,155 @@
"""Command to validate pole placement for aerial segments."""
from pathlib import Path
import geopandas as gpd
import numpy as np
from shapely.geometry import Point
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.panel import Panel
from rich.table import Table
console = Console()
def check_pole_placement(segments: gpd.GeoDataFrame, poles: gpd.GeoDataFrame,
snap_threshold: float = 1.0) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
"""Check pole placement for aerial segments.
Args:
segments: GeoDataFrame with line segments
poles: GeoDataFrame with pole points
snap_threshold: Maximum distance in CRS units to consider a pole snapped
Returns:
Tuple of (modified segments GDF, modified poles GDF)
"""
# Create new columns, overwriting if they exist
segments['poles'] = False
poles['usage'] = ''
# Track problem segments
missing_poles = []
duplicate_poles = []
# Process only aerial segments
aerial_segments = segments[segments['Type'] == 'Aerial']
for idx, row in aerial_segments.iterrows():
line = row.geometry
start_point = Point(line.coords[0])
end_point = Point(line.coords[-1])
# Find poles near endpoints
start_poles = poles[poles.geometry.distance(start_point) <= snap_threshold]
end_poles = poles[poles.geometry.distance(end_point) <= snap_threshold]
# Check for duplicate poles
if len(start_poles) > 1 or len(end_poles) > 1:
duplicate_poles.append({
'segment_id': idx,
'start_count': len(start_poles),
'end_count': len(end_poles)
})
# Check for missing poles
if len(start_poles) == 0 or len(end_poles) == 0:
missing_poles.append({
'segment_id': idx,
'start_missing': len(start_poles) == 0,
'end_missing': len(end_poles) == 0
})
segments.loc[idx, 'poles'] = False
else:
# Mark poles as used for mainline
poles.loc[start_poles.index, 'usage'] = 'mainline'
poles.loc[end_poles.index, 'usage'] = 'mainline'
segments.loc[idx, 'poles'] = True
return segments, poles, missing_poles, duplicate_poles
def run_poles(segments_file: Path, poles_file: Path) -> None:
"""Validate pole placement for aerial segments."""
console.print(Panel.fit("🏗️ Validating Pole Placement", title="PyHLD", width=30))
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
)
with progress:
# Create tasks
read_task = progress.add_task("Reading shapefiles", total=1)
check_task = progress.add_task("Checking pole placement", total=1, visible=False)
save_task = progress.add_task("Saving results", total=1, visible=False)
try:
# Read input files
segments_gdf = gpd.read_file(segments_file)
poles_gdf = gpd.read_file(poles_file)
progress.update(read_task, completed=True)
progress.refresh()
# Check CRS match
if segments_gdf.crs != poles_gdf.crs:
console.print("[red]Error: Coordinate systems don't match[/red]")
return
# Validate pole placement
progress.update(check_task, visible=True)
segments_gdf, poles_gdf, missing, duplicates = check_pole_placement(
segments_gdf, poles_gdf
)
progress.update(check_task, completed=True)
progress.refresh()
# Save results
progress.update(save_task, visible=True)
segments_gdf.to_file(segments_file)
poles_gdf.to_file(poles_file)
progress.update(save_task, completed=True)
progress.stop()
# Report results
if missing:
console.print(f"\n[red]Found {len(missing)} aerial segments missing poles:[/red]")
table = Table(title="Missing Poles")
table.add_column("Segment ID", justify="right")
table.add_column("Missing At", justify="left")
for m in missing:
missing_at = []
if m['start_missing']: missing_at.append("Start")
if m['end_missing']: missing_at.append("End")
table.add_row(str(m['segment_id']), ", ".join(missing_at))
console.print(table)
if duplicates:
console.print(f"\n[yellow]Warning: Found {len(duplicates)} segments with duplicate poles:[/yellow]")
table = Table(title="Duplicate Poles")
table.add_column("Segment ID", justify="right")
table.add_column("Start Count", justify="right")
table.add_column("End Count", justify="right")
for d in duplicates:
table.add_row(
str(d['segment_id']),
str(d['start_count']),
str(d['end_count'])
)
console.print(table)
if not missing and not duplicates:
console.print("\n[green]✓ All aerial segments have correct pole placement[/green]")
# Print summary
console.print("\n[yellow]Summary:[/yellow]")
console.print(f"• Total aerial segments: {len(segments_gdf[segments_gdf['Type'] == 'Aerial'])}")
console.print(f"• Segments with proper poles: {len(segments_gdf[segments_gdf['poles']])}")
console.print(f"• Poles used as mainline: {len(poles_gdf[poles_gdf['usage'] == 'mainline'])}")
except Exception as e:
progress.stop()
console.print(f"\n[red]Error: {str(e)}[/red]")
return