diff --git a/src/pyhld/__init__.py b/src/pyhld/__init__.py index c8bec53..2575616 100644 --- a/src/pyhld/__init__.py +++ b/src/pyhld/__init__.py @@ -2,6 +2,6 @@ PyHLD - Python Fiber to the Home High Level Design Tool """ -__version__ = "0.2.0" +__version__ = "0.4.0" __author__ = "Chris Bomar" __email__ = "" \ No newline at end of file diff --git a/src/pyhld/cli.py b/src/pyhld/cli.py index f2cad0f..c1fe9a3 100644 --- a/src/pyhld/cli.py +++ b/src/pyhld/cli.py @@ -7,6 +7,9 @@ from .commands.design import run_design from .commands.calculate import run_calculate from .commands.validate import run_validate from .commands.graph import run_graph +from .commands.multi2single import run_multi2single +from .commands.poles import run_poles +from .commands.access_points import run_access_points console = Console() @@ -47,5 +50,47 @@ def graph(input_file, delete_duplicates): """Analyze network connectivity from line shapefile.""" run_graph(input_file, delete_duplicates) +@main.command() +@click.option('--input-file', '-i', + type=click.Path(exists=True, path_type=Path), + required=True, + help='Input shapefile path') +@click.option('--output-file', '-o', + type=click.Path(path_type=Path), + help='Output shapefile path (optional)') +def multi2single(input_file, output_file): + """Convert multipart geometries to single part geometries.""" + run_multi2single(input_file, output_file) + +@main.command() +@click.option('--segments', '-s', + type=click.Path(exists=True, path_type=Path), + required=True, + help='Input segments shapefile path') +@click.option('--poles', '-p', + type=click.Path(exists=True, path_type=Path), + required=True, + help='Input poles shapefile path') +def poles(segments, poles): + """Validate pole placement for aerial segments.""" + run_poles(segments, poles) + +@main.command() +@click.option('--segments', '-s', + type=click.Path(exists=True, path_type=Path), + required=True, + help='Input segments shapefile path') +@click.option('--poles', '-p', + type=click.Path(exists=True, path_type=Path), + required=True, + help='Input poles shapefile path') +@click.option('--access-points', '-a', + type=click.Path(exists=True, path_type=Path), + required=True, + help='Input access points shapefile path') +def access_points(segments, poles, access_points): + """Validate access point placement for underground segments.""" + run_access_points(segments, poles, access_points) + if __name__ == '__main__': main() \ No newline at end of file diff --git a/src/pyhld/commands/access_points.py b/src/pyhld/commands/access_points.py new file mode 100644 index 0000000..4b0b598 --- /dev/null +++ b/src/pyhld/commands/access_points.py @@ -0,0 +1,170 @@ +"""Command to validate access point placement for underground segments.""" +from pathlib import Path +import geopandas as gpd +from shapely.geometry import Point +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn +from rich.panel import Panel +from rich.table import Table + +console = Console() + +def check_access_points(segments: gpd.GeoDataFrame, + poles: gpd.GeoDataFrame, + access_points: gpd.GeoDataFrame, + snap_threshold: float = 1.0) -> tuple: + """Check access point and pole placement for underground segments.""" + # Create/overwrite columns + segments['access'] = False + access_points['isolated'] = True # Default to isolated until proven connected + + # Track issues + missing_endpoints = [] + duplicate_aps = [] + isolated_aps = [] + + # Process only underground segments + underground_segments = segments[segments['Type'] == 'Underground'] + + # Check for duplicate access points + for idx, ap in access_points.iterrows(): + nearby_aps = access_points[access_points.geometry.distance(ap.geometry) <= snap_threshold] + if len(nearby_aps) > 1: # More than itself + duplicate_aps.append({ + 'ap_id': idx, + 'count': len(nearby_aps) + }) + + # Check segment endpoints + for idx, row in underground_segments.iterrows(): + line = row.geometry + start_point = Point(line.coords[0]) + end_point = Point(line.coords[-1]) + + # Find poles and APs near endpoints + start_poles = poles[poles.geometry.distance(start_point) <= snap_threshold] + end_poles = poles[poles.geometry.distance(end_point) <= snap_threshold] + start_aps = access_points[access_points.geometry.distance(start_point) <= snap_threshold] + end_aps = access_points[access_points.geometry.distance(end_point) <= snap_threshold] + + # Mark APs as connected if found at endpoints + if not start_aps.empty: + access_points.loc[start_aps.index, 'isolated'] = False + if not end_aps.empty: + access_points.loc[end_aps.index, 'isolated'] = False + + # Check if endpoints have required connections + has_start = not (start_poles.empty and start_aps.empty) + has_end = not (end_poles.empty and end_aps.empty) + + if not has_start or not has_end: + missing_endpoints.append({ + 'segment_id': idx, + 'start_missing': not has_start, + 'end_missing': not has_end + }) + segments.loc[idx, 'access'] = False + else: + segments.loc[idx, 'access'] = True + + # Find isolated access points + isolated_aps = access_points[access_points['isolated']].index.tolist() + + return (segments, access_points, + missing_endpoints, duplicate_aps, isolated_aps) + +def run_access_points(segments_file: Path, poles_file: Path, + access_points_file: Path) -> None: + """Validate access point placement.""" + console.print(Panel.fit("🔌 Validating Access Points", title="PyHLD", width=30)) + + progress = Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) + + with progress: + # Create tasks + read_task = progress.add_task("Reading shapefiles", total=1) + check_task = progress.add_task("Checking access points", total=1, visible=False) + save_task = progress.add_task("Saving results", total=1, visible=False) + + try: + # Read input files + segments_gdf = gpd.read_file(segments_file) + poles_gdf = gpd.read_file(poles_file) + aps_gdf = gpd.read_file(access_points_file) + progress.update(read_task, completed=True) + progress.refresh() + + # Check CRS match + if not (segments_gdf.crs == poles_gdf.crs == aps_gdf.crs): + console.print("[red]Error: Coordinate systems don't match[/red]") + return + + # Validate access point placement + progress.update(check_task, visible=True) + segments_gdf, aps_gdf, missing, duplicates, isolated = check_access_points( + segments_gdf, poles_gdf, aps_gdf + ) + progress.update(check_task, completed=True) + progress.refresh() + + # Save results + progress.update(save_task, visible=True) + segments_gdf.to_file(segments_file) + aps_gdf.to_file(access_points_file) + progress.update(save_task, completed=True) + progress.stop() + + # Report results + if missing: + console.print(f"\n[red]Found {len(missing)} underground segments missing connections:[/red]") + table = Table(title="Missing Connections") + table.add_column("Segment ID", justify="right") + table.add_column("Missing At", justify="left") + + for m in missing: + missing_at = [] + if m['start_missing']: missing_at.append("Start") + if m['end_missing']: missing_at.append("End") + table.add_row(str(m['segment_id']), ", ".join(missing_at)) + + console.print(table) + + if duplicates: + console.print(f"\n[yellow]Warning: Found {len(duplicates)} duplicate access points:[/yellow]") + table = Table(title="Duplicate Access Points") + table.add_column("AP ID", justify="right") + table.add_column("Duplicates", justify="right") + + for d in duplicates: + table.add_row(str(d['ap_id']), str(d['count'])) + + console.print(table) + + if isolated: + console.print(f"\n[yellow]Warning: Found {len(isolated)} isolated access points:[/yellow]") + table = Table(title="Isolated Access Points") + table.add_column("AP ID", justify="right") + + for ap_id in isolated: + table.add_row(str(ap_id)) + + console.print(table) + + if not any([missing, duplicates, isolated]): + console.print("\n[green]✓ All underground segments properly connected[/green]") + + # Print summary + console.print("\n[yellow]Summary:[/yellow]") + console.print(f"• Total underground segments: {len(segments_gdf[segments_gdf['Type'] == 'Underground'])}") + console.print(f"• Segments with proper access: {len(segments_gdf[segments_gdf['access']])}") + console.print(f"• Total access points: {len(aps_gdf)}") + console.print(f"• Connected access points: {len(aps_gdf[~aps_gdf['isolated']])}") + + except Exception as e: + progress.stop() + console.print(f"\n[red]Error: {str(e)}[/red]") + return \ No newline at end of file diff --git a/src/pyhld/commands/graph.py b/src/pyhld/commands/graph.py index 3fe5700..5634355 100644 --- a/src/pyhld/commands/graph.py +++ b/src/pyhld/commands/graph.py @@ -91,6 +91,33 @@ def find_overlapping_segments(gdf: gpd.GeoDataFrame) -> list: # Convert to list of tuples with info return [(p[0], p[1], overlap_info[p]) for p in sorted(overlap_pairs)] +def find_matching_endpoints(gdf: gpd.GeoDataFrame) -> list: + """Find segments that share the same endpoints regardless of direction.""" + endpoint_pairs = set() # Store unique endpoint pairs + endpoint_info = {} # Store segment info for each endpoint pair + + for idx, row in gdf.iterrows(): + # Get endpoints as a frozenset to ignore direction + line = row.geometry + endpoints = frozenset([line.coords[0], line.coords[-1]]) + + # Check if we've seen these endpoints before + if endpoints in endpoint_pairs: + if endpoints not in endpoint_info: + # Find the first segment with these endpoints + for prev_idx, prev_row in gdf.iloc[:idx].iterrows(): + prev_endpoints = frozenset([prev_row.geometry.coords[0], + prev_row.geometry.coords[-1]]) + if prev_endpoints == endpoints: + endpoint_info[endpoints] = [(prev_idx, prev_row.geometry.length)] + break + endpoint_info[endpoints].append((idx, line.length)) + else: + endpoint_pairs.add(endpoints) + + # Return only the endpoints that have multiple segments + return [(pairs, infos) for pairs, infos in endpoint_info.items() if len(infos) > 1] + def delete_duplicate_geometries(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """Remove duplicate geometries from GeoDataFrame while preserving LineStrings. @@ -179,7 +206,7 @@ def run_graph(input_file: Path, delete_duplicates: bool = False) -> None: console.print("• QGIS Row: QGIS's 1-based row number (use this in QGIS expressions)") console.print("\n[yellow]Troubleshooting Tips:[/yellow]") - qgis_rows = ", ".join(str(idx + 1) for idx in invalid_rows.index) + qgis_rows = ", ".join(str(idx) for idx in invalid_rows.index) console.print("1. Open the shapefile in QGIS") console.print("2. Find invalid features using this expression:") console.print(f" $id IN ({qgis_rows})") @@ -242,7 +269,7 @@ def run_graph(input_file: Path, delete_duplicates: bool = False) -> None: console.print(table) console.print("\n[yellow]Troubleshooting Tips:[/yellow]") - overlap_expr = " OR ".join([f"$id IN ({i1 + 1}, {i2 + 1})" for i1, i2, _ in overlaps]) + overlap_expr = " OR ".join([f"$id IN ({i1}, {i2})" for i1, i2, _ in overlaps]) console.print("1. Open the shapefile in QGIS") console.print("2. Find overlapping segments using this expression:") console.print(f" {overlap_expr}") @@ -254,6 +281,46 @@ def run_graph(input_file: Path, delete_duplicates: bool = False) -> None: except Exception as e: console.print(f"[red]Error checking for overlaps: {e}[/red]") + try: + matching_endpoints = find_matching_endpoints(gdf) + if matching_endpoints: + console.print(f"\n[yellow]Warning: Found {len(matching_endpoints)} sets of segments sharing endpoints[/yellow]") + + table = Table(title="Segments Sharing Endpoints") + table.add_column("Set", justify="right") + table.add_column("QGIS Rows", justify="right") + table.add_column("Length Difference", justify="right") + table.add_column("Segment Lengths", justify="right") + + for i, (endpoints, segments) in enumerate(matching_endpoints, 1): + # Get row numbers and lengths + qgis_rows = [str(idx + 1) for idx, _ in segments] + lengths = [length for _, length in segments] + + # Calculate length difference + length_diff = max(lengths) - min(lengths) + + table.add_row( + str(i), + ", ".join(qgis_rows), + f"{length_diff:.2f}", + ", ".join(f"{length:.2f}" for length in lengths) + ) + + console.print(table) + console.print("\n[yellow]Troubleshooting Tips:[/yellow]") + endpoint_expr = " OR ".join([ + f"$id IN ({','.join(str(idx) for idx, _ in segments)})" + for _, segments in matching_endpoints + ]) + console.print("1. Open the shapefile in QGIS") + console.print("2. Find segments with matching endpoints using this expression:") + console.print(f" {endpoint_expr}") + console.print("3. Review segments that share endpoints but have different lengths") + console.print("4. Consider removing redundant segments or fixing geometry\n") + except Exception as e: + console.print(f"[red]Error checking for matching endpoints: {e}[/red]") + # Create network graph progress.update(create_graph_task, visible=True) G = nx.Graph() diff --git a/src/pyhld/commands/multi2single.py b/src/pyhld/commands/multi2single.py new file mode 100644 index 0000000..4714a61 --- /dev/null +++ b/src/pyhld/commands/multi2single.py @@ -0,0 +1,85 @@ +"""Command to convert multipart geometries to single part geometries.""" +from pathlib import Path +import pandas as pd +import geopandas as gpd +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn +from rich.panel import Panel + +console = Console() + +def explode_multiparts(gdf: gpd.GeoDataFrame) -> tuple[gpd.GeoDataFrame, int]: + """Explode multipart geometries while preserving attributes.""" + # Keep track of original multipart features + multi_mask = gdf.geometry.apply(lambda geom: geom.geom_type.startswith('Multi')) + multi_count = multi_mask.sum() + + if multi_count == 0: + return gdf, 0 + + # Separate single and multipart features + singles = gdf[~multi_mask].copy() + multis = gdf[multi_mask].copy() + + # Explode multipart features + exploded = multis.explode(index_parts=True) + + # Reset index and drop the multiindex columns + exploded = exploded.reset_index(drop=True) + + # Combine back with single part features + result = gpd.GeoDataFrame( + pd.concat([singles, exploded], ignore_index=True), + crs=gdf.crs + ) + + return result, multi_count + +def run_multi2single(input_file: Path, output_file: Path = None) -> None: + """Convert multipart geometries to single part geometries.""" + console.print(Panel.fit("🔨 Converting Multipart to Singlepart", title="PyHLD")) + + progress = Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) + + with progress: + # Create tasks + read_task = progress.add_task("Reading shapefile", total=1) + convert_task = progress.add_task("Converting geometries", total=1, visible=False) + save_task = progress.add_task("Saving results", total=1, visible=False) + + try: + # Read input file + gdf = gpd.read_file(input_file) + progress.update(read_task, completed=True) + progress.refresh() + + # Convert multipart to singlepart + progress.update(convert_task, visible=True) + result_gdf, multi_count = explode_multiparts(gdf) + progress.update(convert_task, completed=True) + progress.refresh() + + if multi_count == 0: + progress.stop() + console.print("\n[green]No multipart geometries found[/green]") + return + + # Save results + progress.update(save_task, visible=True) + output_path = output_file if output_file else input_file + result_gdf.to_file(output_path) + progress.update(save_task, completed=True) + progress.stop() + + # Print results + console.print(f"\n[green]Converted {multi_count} multipart geometries to singlepart[/green]") + console.print(f"[green]Output saved to:[/green] {output_path}") + + except Exception as e: + progress.stop() + console.print(f"\n[red]Error: {str(e)}[/red]") + return \ No newline at end of file diff --git a/src/pyhld/commands/poles.py b/src/pyhld/commands/poles.py new file mode 100644 index 0000000..2a8a3f8 --- /dev/null +++ b/src/pyhld/commands/poles.py @@ -0,0 +1,155 @@ +"""Command to validate pole placement for aerial segments.""" +from pathlib import Path +import geopandas as gpd +import numpy as np +from shapely.geometry import Point +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn +from rich.panel import Panel +from rich.table import Table + +console = Console() + +def check_pole_placement(segments: gpd.GeoDataFrame, poles: gpd.GeoDataFrame, + snap_threshold: float = 1.0) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: + """Check pole placement for aerial segments. + + Args: + segments: GeoDataFrame with line segments + poles: GeoDataFrame with pole points + snap_threshold: Maximum distance in CRS units to consider a pole snapped + + Returns: + Tuple of (modified segments GDF, modified poles GDF) + """ + # Create new columns, overwriting if they exist + segments['poles'] = False + poles['usage'] = '' + + # Track problem segments + missing_poles = [] + duplicate_poles = [] + + # Process only aerial segments + aerial_segments = segments[segments['Type'] == 'Aerial'] + + for idx, row in aerial_segments.iterrows(): + line = row.geometry + start_point = Point(line.coords[0]) + end_point = Point(line.coords[-1]) + + # Find poles near endpoints + start_poles = poles[poles.geometry.distance(start_point) <= snap_threshold] + end_poles = poles[poles.geometry.distance(end_point) <= snap_threshold] + + # Check for duplicate poles + if len(start_poles) > 1 or len(end_poles) > 1: + duplicate_poles.append({ + 'segment_id': idx, + 'start_count': len(start_poles), + 'end_count': len(end_poles) + }) + + # Check for missing poles + if len(start_poles) == 0 or len(end_poles) == 0: + missing_poles.append({ + 'segment_id': idx, + 'start_missing': len(start_poles) == 0, + 'end_missing': len(end_poles) == 0 + }) + segments.loc[idx, 'poles'] = False + else: + # Mark poles as used for mainline + poles.loc[start_poles.index, 'usage'] = 'mainline' + poles.loc[end_poles.index, 'usage'] = 'mainline' + segments.loc[idx, 'poles'] = True + + return segments, poles, missing_poles, duplicate_poles + +def run_poles(segments_file: Path, poles_file: Path) -> None: + """Validate pole placement for aerial segments.""" + console.print(Panel.fit("🏗️ Validating Pole Placement", title="PyHLD", width=30)) + + progress = Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) + + with progress: + # Create tasks + read_task = progress.add_task("Reading shapefiles", total=1) + check_task = progress.add_task("Checking pole placement", total=1, visible=False) + save_task = progress.add_task("Saving results", total=1, visible=False) + + try: + # Read input files + segments_gdf = gpd.read_file(segments_file) + poles_gdf = gpd.read_file(poles_file) + progress.update(read_task, completed=True) + progress.refresh() + + # Check CRS match + if segments_gdf.crs != poles_gdf.crs: + console.print("[red]Error: Coordinate systems don't match[/red]") + return + + # Validate pole placement + progress.update(check_task, visible=True) + segments_gdf, poles_gdf, missing, duplicates = check_pole_placement( + segments_gdf, poles_gdf + ) + progress.update(check_task, completed=True) + progress.refresh() + + # Save results + progress.update(save_task, visible=True) + segments_gdf.to_file(segments_file) + poles_gdf.to_file(poles_file) + progress.update(save_task, completed=True) + progress.stop() + + # Report results + if missing: + console.print(f"\n[red]Found {len(missing)} aerial segments missing poles:[/red]") + table = Table(title="Missing Poles") + table.add_column("Segment ID", justify="right") + table.add_column("Missing At", justify="left") + + for m in missing: + missing_at = [] + if m['start_missing']: missing_at.append("Start") + if m['end_missing']: missing_at.append("End") + table.add_row(str(m['segment_id']), ", ".join(missing_at)) + + console.print(table) + + if duplicates: + console.print(f"\n[yellow]Warning: Found {len(duplicates)} segments with duplicate poles:[/yellow]") + table = Table(title="Duplicate Poles") + table.add_column("Segment ID", justify="right") + table.add_column("Start Count", justify="right") + table.add_column("End Count", justify="right") + + for d in duplicates: + table.add_row( + str(d['segment_id']), + str(d['start_count']), + str(d['end_count']) + ) + + console.print(table) + + if not missing and not duplicates: + console.print("\n[green]✓ All aerial segments have correct pole placement[/green]") + + # Print summary + console.print("\n[yellow]Summary:[/yellow]") + console.print(f"• Total aerial segments: {len(segments_gdf[segments_gdf['Type'] == 'Aerial'])}") + console.print(f"• Segments with proper poles: {len(segments_gdf[segments_gdf['poles']])}") + console.print(f"• Poles used as mainline: {len(poles_gdf[poles_gdf['usage'] == 'mainline'])}") + + except Exception as e: + progress.stop() + console.print(f"\n[red]Error: {str(e)}[/red]") + return \ No newline at end of file