This commit is contained in:
Chris Bomar 2025-04-21 08:57:35 -05:00
parent 9fbdad45a8
commit bdbc8d61b4
8 changed files with 385 additions and 15 deletions

View File

@ -23,6 +23,8 @@ classifiers = [
dependencies = [
"click>=8.1.8",
"rich>=14.0.0",
"geopandas>=1.0.1",
"networkx>=3.4.2",
]
[project.scripts]

View File

@ -2,6 +2,6 @@
PyHLD - Python Fiber to the Home High Level Design Tool
"""
__version__ = "0.1.0"
__version__ = "0.2.0"
__author__ = "Chris Bomar"
__email__ = ""

View File

@ -1,8 +1,12 @@
"""Command line interface for PyHLD."""
import click
from rich.console import Console
from rich.panel import Panel
from . import __version__
import click
from pathlib import Path
from rich.console import Console
from .commands.design import run_design
from .commands.calculate import run_calculate
from .commands.validate import run_validate
from .commands.graph import run_graph
console = Console()
@ -17,27 +21,31 @@ def main():
@click.option('--output', '-o', type=click.Path(), help='Output file path')
def design(input_file, output):
"""Create a high-level FTTH design."""
console.print(Panel.fit("🌐 Creating FTTH Design", title="PyHLD"))
console.print(f"Input file: {input_file}")
console.print(f"Output will be saved to: {output}")
run_design(input_file, output)
@main.command()
@click.option('--homes', '-h', type=int, help='Number of homes to be served')
@click.option('--area', '-a', type=float, help='Service area in square kilometers')
def calculate(homes, area):
"""Calculate basic FTTH metrics."""
console.print(Panel.fit("📊 Calculating FTTH Metrics", title="PyHLD"))
if homes and area:
density = homes / area
console.print(f"Homes: {homes}")
console.print(f"Area: {area} km²")
console.print(f"Density: {density:.2f} homes/km²")
run_calculate(homes, area)
@main.command()
def validate():
"""Validate design parameters and constraints."""
console.print(Panel.fit("✅ Validating Design", title="PyHLD"))
# Add validation logic here
run_validate()
@main.command()
@click.option('--input-file', '-i',
type=click.Path(exists=True, path_type=Path),
required=True,
help='Input line shapefile path')
@click.option('--delete-duplicates', '-dd',
is_flag=True,
help='Delete duplicate geometries')
def graph(input_file, delete_duplicates):
"""Analyze network connectivity from line shapefile."""
run_graph(input_file, delete_duplicates)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,13 @@
"""Command implementations for PyHLD CLI tool."""
from .design import run_design
from .calculate import run_calculate
from .validate import run_validate
from .graph import run_graph
__all__ = [
'run_design',
'run_calculate',
'run_validate',
'run_graph',
]

View File

@ -0,0 +1,14 @@
"""Calculate command implementation for FTTH metrics."""
from rich.console import Console
from rich.panel import Panel
console = Console()
def run_calculate(homes: int, area: float) -> None:
"""Execute the calculate command logic."""
console.print(Panel.fit("📊 Calculating FTTH Metrics", title="PyHLD"))
if homes and area:
density = homes / area
console.print(f"Homes: {homes}")
console.print(f"Area: {area} km²")
console.print(f"Density: {density:.2f} homes/km²")

View File

@ -0,0 +1,11 @@
"""Design command implementation for FTTH high-level design."""
from rich.console import Console
from rich.panel import Panel
console = Console()
def run_design(input_file: str, output: str) -> None:
"""Execute the design command logic."""
console.print(Panel.fit("🌐 Creating FTTH Design", title="PyHLD"))
console.print(f"Input file: {input_file}")
console.print(f"Output will be saved to: {output}")

312
src/pyhld/commands/graph.py Normal file
View File

@ -0,0 +1,312 @@
"""Graph analysis command for FTTH network validation."""
from pathlib import Path
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.panel import Panel
from rich.table import Table
import geopandas as gpd
import networkx as nx
from shapely.geometry import LineString
console = Console()
def get_geometry_error(geom) -> str:
"""Analyze why a geometry is invalid."""
try:
if geom is None:
return "Null geometry"
if not isinstance(geom, LineString):
return f"Wrong geometry type: {geom.geom_type} (expected LineString)"
if geom.is_empty:
return "Empty geometry"
if not geom.is_valid:
return f"Invalid geometry: {geom.is_valid_reason}"
if geom.length == 0:
return "Zero-length line"
# Add detailed geometry info for debugging
return f"Geometry appears valid: Length={geom.length:.2f}, Points={len(geom.coords)}"
except Exception as e:
return f"Error analyzing geometry: {str(e)}"
def save_component_shapefile(gdf: gpd.GeoDataFrame, components: list, input_file: Path) -> Path:
"""Save component information back to input shapefile."""
# Create a mapping of coordinates to component ID
coord_to_component = {}
for i, component in enumerate(components):
for node in component:
coord_to_component[node] = i
# If comp_id exists, warn user it will be replaced
if 'comp_id' in gdf.columns:
console.print("[yellow]Warning: Existing 'comp_id' column will be overwritten[/yellow]")
# Drop existing column to avoid dtype conflicts
gdf = gdf.drop(columns=['comp_id'])
# Create a new column for component ID
def get_component_id(row):
start = row.geometry.coords[0]
return coord_to_component.get(start, -1)
gdf['comp_id'] = gdf.apply(get_component_id, axis=1)
# Save back to input shapefile
gdf.to_file(input_file)
return input_file
def find_overlapping_segments(gdf: gpd.GeoDataFrame) -> list:
"""Find overlapping line segments in the shapefile."""
overlap_pairs = set() # Store only the index pairs
overlap_info = {} # Store the additional info separately
sindex = gdf.sindex
for idx, row in gdf.iterrows():
possible_matches_idx = list(sindex.intersection(row.geometry.bounds))
possible_matches = gdf.iloc[possible_matches_idx]
possible_matches = possible_matches.drop(idx, errors='ignore')
for match_idx, match_row in possible_matches.iterrows():
pair = tuple(sorted([idx, match_idx]))
if pair not in overlap_pairs:
intersection = row.geometry.intersection(match_row.geometry)
is_overlap = False
if intersection.geom_type == 'LineString':
is_overlap = intersection.length > 0.001
elif intersection.geom_type == 'MultiLineString':
is_overlap = any(part.length > 0.001 for part in intersection.geoms)
elif intersection.geom_type == 'GeometryCollection':
line_parts = [geom for geom in intersection.geoms
if geom.geom_type in ('LineString', 'MultiLineString')]
is_overlap = any(part.length > 0.001 for part in line_parts)
if is_overlap:
overlap_pairs.add(pair)
overlap_info[pair] = {
'intersection_type': intersection.geom_type,
'intersection_length': intersection.length if hasattr(intersection, 'length') else 0,
'line1_length': row.geometry.length,
'line2_length': match_row.geometry.length
}
# Convert to list of tuples with info
return [(p[0], p[1], overlap_info[p]) for p in sorted(overlap_pairs)]
def delete_duplicate_geometries(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Remove duplicate geometries from GeoDataFrame while preserving LineStrings.
This mimics QGIS's Delete Duplicate Geometries tool behavior:
- Maintains single LineString geometries
- Preserves attributes of first occurrence
- Removes exact geometric duplicates
"""
# Create a string representation of coordinates for comparison
gdf['geom_coords'] = gdf.geometry.apply(lambda x: str(list(x.coords)))
# Keep first occurrence of each unique geometry
unique_gdf = gdf.drop_duplicates(subset=['geom_coords'])
# Drop the temporary column
unique_gdf = unique_gdf.drop(columns=['geom_coords'])
# Reset index to maintain continuous row numbers
return unique_gdf.reset_index(drop=True)
def run_graph(input_file: Path, delete_duplicates: bool = False) -> None:
"""Analyze shapefile and create network graph."""
console.print(Panel.fit("🗺️ Analyzing Network Graph", title="PyHLD"))
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
)
with progress:
if not input_file.exists():
console.print(f"[red]Error: File {input_file} not found[/red]")
return
# Create tasks with explicit IDs
read_task = progress.add_task("Reading shapefile", total=1)
validate_task = progress.add_task("Validating geometries", total=1, visible=False)
create_graph_task = progress.add_task("Creating network graph", total=1, visible=False)
check_connect_task = progress.add_task("Checking connectivity", total=1, visible=False)
try:
gdf = gpd.read_file(input_file)
progress.update(read_task, completed=True)
progress.refresh()
except Exception as e:
progress.update(read_task, completed=True)
progress.refresh()
console.print(f"\n[red]Error reading shapefile: {e}[/red]")
return
progress.update(validate_task, visible=True)
progress.refresh()
# Validate geometries
try:
gdf['geom_check'] = gdf.geometry.map(lambda g: get_geometry_error(g))
invalid_rows = gdf[gdf['geom_check'].str.contains('Error|Null|Wrong|Empty|Invalid|Zero')].copy()
if not invalid_rows.empty:
progress.update(validate_task, completed=True)
progress.refresh()
progress.stop()
console.print(f"\n[red]Error: Found {len(invalid_rows)} problematic geometries[/red]")
table = Table(title="Geometry Analysis Details")
table.add_column("DataFrame Row (0-based)", justify="right")
table.add_column("QGIS Row (1-based)", justify="right")
table.add_column("Attributes")
table.add_column("Diagnosis", style="red")
for idx, row in invalid_rows.iterrows():
attrs = {k:v for k,v in row.items() if k not in ['geometry', 'geom_check']}
qgis_row = idx + 1 # Convert to 1-based indexing
table.add_row(
str(idx),
str(qgis_row),
str(attrs),
row['geom_check']
)
console.print(table)
console.print("\n[yellow]Note about Row Numbers:[/yellow]")
console.print("• DataFrame Row: Python's 0-based index")
console.print("• QGIS Row: QGIS's 1-based row number (use this in QGIS expressions)")
console.print("\n[yellow]Troubleshooting Tips:[/yellow]")
qgis_rows = ", ".join(str(idx + 1) for idx in invalid_rows.index)
console.print("1. Open the shapefile in QGIS")
console.print("2. Find invalid features using this expression:")
console.print(f" $id IN ({qgis_rows})")
console.print("3. Use 'Vertex Tool' to inspect the feature's geometry")
return
if not all(geom.geom_type == 'LineString' for geom in gdf.geometry):
console.print("[red]Error: Shapefile must contain only LineString features[/red]")
return
except Exception as e:
console.print(f"[red]Error validating geometries: {e}[/red]")
return
progress.update(validate_task, completed=True)
progress.refresh()
if delete_duplicates:
progress.add_task("Removing duplicate geometries...", total=1)
try:
original_count = len(gdf)
gdf = delete_duplicate_geometries(gdf)
deleted_count = original_count - len(gdf)
if deleted_count > 0:
console.print(f"\n[green]Removed {deleted_count} duplicate geometries[/green]")
console.print("[yellow]Note:[/yellow] This operation preserves single LineString geometries")
# Save changes back to file
gdf.to_file(input_file)
console.print(f"[green]Updated shapefile saved: {input_file}[/green]\n")
else:
console.print("\n[green]No duplicate geometries found[/green]\n")
except Exception as e:
console.print(f"\n[red]Error removing duplicates: {e}[/red]\n")
return
# Overlap checking
progress.update(validate_task, description="Checking for overlaps", visible=True)
try:
overlaps = find_overlapping_segments(gdf)
if overlaps:
console.print(f"\n[yellow]Warning: Found {len(overlaps)} overlapping segment pairs[/yellow]")
table = Table(title="Overlapping Segments")
table.add_column("Segment 1 (QGIS Row)", justify="right")
table.add_column("Segment 2 (QGIS Row)", justify="right")
table.add_column("Overlap Type", justify="left")
table.add_column("Overlap Length", justify="right")
# Sort by overlap length for better readability
sorted_overlaps = sorted(overlaps, key=lambda x: x[2]['intersection_length'], reverse=True)
for idx1, idx2, info in sorted_overlaps:
# Convert to QGIS 1-based indexing
table.add_row(
str(idx1 + 1),
str(idx2 + 1),
info['intersection_type'],
f"{info['intersection_length']:.2f}"
)
console.print(table)
console.print("\n[yellow]Troubleshooting Tips:[/yellow]")
overlap_expr = " OR ".join([f"$id IN ({i1 + 1}, {i2 + 1})" for i1, i2, _ in overlaps])
console.print("1. Open the shapefile in QGIS")
console.print("2. Find overlapping segments using this expression:")
console.print(f" {overlap_expr}")
console.print("3. Consider merging or removing redundant segments")
console.print("4. Common causes of overlaps:")
console.print(" • Duplicated lines")
console.print(" • Lines drawn multiple times")
console.print(" • Segments that should be split at intersections\n")
except Exception as e:
console.print(f"[red]Error checking for overlaps: {e}[/red]")
# Create network graph
progress.update(create_graph_task, visible=True)
G = nx.Graph()
# Add edges from line geometries
for _, row in gdf.iterrows():
line = row.geometry
try:
start = line.coords[0]
end = line.coords[-1]
G.add_edge(start, end)
except Exception as e:
console.print(f"[red]Error processing line: {e}[/red]")
continue
# Check if graph has any edges
if len(G.edges) == 0:
console.print("[red]Error: No valid edges could be created from the shapefile[/red]")
return
progress.update(create_graph_task, completed=True)
progress.refresh()
# Check connectivity
progress.update(check_connect_task, visible=True)
is_connected = nx.is_connected(G)
progress.update(check_connect_task, completed=True)
progress.refresh()
# Stop progress before printing results
progress.stop()
# Print CRS information
crs = gdf.crs
if crs is None:
console.print("\n[yellow]Warning: No CRS defined in shapefile[/yellow]\n")
else:
console.print(f"\nCRS: {crs.name}")
console.print(f"Units: {crs.axis_info[0].unit_name}\n")
# Print results
if is_connected:
console.print("[green]✓ Network is fully connected[/green]")
console.print(f"Total nodes: {len(G.nodes)}")
console.print(f"Total edges: {len(G.edges)}\n")
else:
console.print("[red]✗ Network is not fully connected[/red]")
components = list(nx.connected_components(G))
console.print(f"Found {len(components)} disconnected components")
console.print(f"Largest component has {max(len(c) for c in components)} nodes")
try:
output_file = save_component_shapefile(gdf, components, input_file)
console.print(f"\n[green]Component IDs added to:[/green] {output_file}")
console.print("[yellow]Note:[/yellow] Open in QGIS and style by 'comp_id' to visualize\n")
except Exception as e:
console.print(f"\n[red]Error updating shapefile: {e}[/red]\n")

View File

@ -0,0 +1,10 @@
"""Validate command implementation for FTTH design parameters."""
from rich.console import Console
from rich.panel import Panel
console = Console()
def run_validate() -> None:
"""Execute the validate command logic."""
console.print(Panel.fit("✅ Validating Design", title="PyHLD"))
# Add validation logic here