dragndrop_hld/backend/qc_validator.py
alex 12407b74e4 Initial commit - Stage 1 working version
Saving current working state before proceeding to Stage 2.
Includes:
- Backend: Python-based QC validator with shapefile processing
- Frontend: Drag-and-drop file upload interface
- Sample files for testing
- Documentation and revision history

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-04 13:43:57 -07:00

894 lines
35 KiB
Python

import shapefile
from pathlib import Path
import re
# Required shapefiles
REQUIRED_SHAPEFILES = [
"poles",
"network_elements",
"splicing",
"sites",
"parcels",
"permits",
"cabinet_boundaries",
"segments",
"access_points",
"cables"
]
# WGS 84 projection string (EPSG:4326)
WGS84_PROJ = 'GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]]'
def validate_shapefiles(temp_dir: Path):
"""Main QC validation function"""
errors = []
# Find the directory containing shapefiles (might be in subdirectory)
shapefile_dir = find_shapefile_directory(temp_dir)
if not shapefile_dir:
errors.append("No shapefiles found in the uploaded ZIP")
return {"passed": False, "errors": errors}
# Check all required shapefiles exist
missing = check_required_shapefiles(shapefile_dir)
if missing:
for shapefile_name in missing:
errors.append(f"Missing required shapefile: {shapefile_name}")
return {"passed": False, "errors": errors}
# Validate each shapefile
for shapefile_name in REQUIRED_SHAPEFILES:
shp_path = shapefile_dir / f"{shapefile_name}.shp"
# Validate projection
proj_errors = validate_projection(shp_path, shapefile_name)
errors.extend(proj_errors)
# Validate UID field
uid_errors = validate_uid_field(shp_path, shapefile_name)
errors.extend(uid_errors)
# Validate attributes based on shapefile type
attr_errors = validate_attributes(shp_path, shapefile_name)
errors.extend(attr_errors)
# Perform spatial validation (features within correct cabinet boundaries)
spatial_errors = validate_spatial_containment(shapefile_dir)
errors.extend(spatial_errors)
return {"passed": len(errors) == 0, "errors": errors}
def find_shapefile_directory(temp_dir: Path):
"""Find the directory containing the shapefiles (may be in subdirectory)"""
# Check root directory first
shp_files = list(temp_dir.glob("*.shp"))
if shp_files:
return temp_dir
# Check subdirectories
for subdir in temp_dir.iterdir():
if subdir.is_dir():
shp_files = list(subdir.glob("*.shp"))
if shp_files:
return subdir
return None
def check_required_shapefiles(shapefile_dir: Path):
"""Check if all required shapefiles exist"""
missing = []
for shapefile_name in REQUIRED_SHAPEFILES:
shp_path = shapefile_dir / f"{shapefile_name}.shp"
if not shp_path.exists():
missing.append(shapefile_name)
return missing
def validate_projection(shp_path: Path, shapefile_name: str):
"""Validate shapefile is in WGS 84 projection"""
errors = []
prj_path = shp_path.with_suffix('.prj')
if not prj_path.exists():
errors.append(f"{shapefile_name}: Missing .prj file")
return errors
with open(prj_path, 'r') as f:
proj_content = f.read().strip()
# Check if it contains WGS 84 identifiers
if 'WGS_1984' not in proj_content and 'WGS84' not in proj_content:
errors.append(f"{shapefile_name}: Not in WGS 84 projection")
return errors
def validate_uid_field(shp_path: Path, shapefile_name: str):
"""Validate UID field exists and contains unique integers"""
errors = []
try:
sf = shapefile.Reader(str(shp_path))
except Exception as e:
errors.append(f"{shapefile_name}: Error reading shapefile - {str(e)}")
return errors
# Check if UID field exists
field_names = [field[0] for field in sf.fields[1:]]
if 'UID' not in field_names:
errors.append(f"{shapefile_name}: Missing UID field")
return errors
# Get UID field index
uid_index = field_names.index('UID')
# Collect UIDs and validate
uids = []
non_integer_count = 0
for idx, record in enumerate(sf.records()):
uid = record[uid_index]
# Check if integer
if not isinstance(uid, int):
try:
uid = int(uid)
except (ValueError, TypeError):
non_integer_count += 1
if non_integer_count <= 10:
errors.append(f"{shapefile_name}: UID at feature index {idx} is not an integer")
continue
uids.append(uid)
if non_integer_count > 10:
errors.append(f"{shapefile_name}: 10 or more features failed UID is not an integer")
# Check for uniqueness
if len(uids) != len(set(uids)):
duplicate_count = len(uids) - len(set(uids))
if duplicate_count >= 10:
errors.append(f"{shapefile_name}: 10 or more features failed UID is not unique")
else:
errors.append(f"{shapefile_name}: UID field contains {duplicate_count} duplicate values")
sf.close()
return errors
def validate_attributes(shp_path: Path, shapefile_name: str):
"""Validate shapefile-specific attributes"""
validators = {
"segments": validate_segments,
"access_points": validate_access_points,
"cabinet_boundaries": validate_cabinet_boundaries,
"permits": validate_permits,
"cables": validate_cables,
"parcels": validate_parcels,
"sites": validate_sites,
"splicing": validate_splicing,
"network_elements": validate_network_elements,
"poles": validate_poles
}
validator = validators.get(shapefile_name)
if validator:
return validator(shp_path, shapefile_name)
return []
def validate_segments(shp_path: Path, shapefile_name: str):
"""Validate segments shapefile attributes"""
errors = []
try:
sf = shapefile.Reader(str(shp_path))
field_names = [field[0] for field in sf.fields[1:]]
# Check required fields (Group 1 with space, not Group_01)
required_fields = ['Type', 'Group 1', 'Conduit']
for field in required_fields:
if field not in field_names:
errors.append(f"{shapefile_name}: Missing required field '{field}'")
return errors
type_idx = field_names.index('Type')
group_idx = field_names.index('Group 1')
conduit_idx = field_names.index('Conduit')
valid_types = ['Aerial', '3rd Party Duct', 'Underground', 'Existing VERO', 'Drop Cable']
failure_counts = {'type': 0, 'group': 0, 'conduit': 0}
for idx, record in enumerate(sf.records()):
# Validate Type
if record[type_idx] not in valid_types:
failure_counts['type'] += 1
if failure_counts['type'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Type value")
# Validate Group 1 format (Zone XX)
group_val = str(record[group_idx]) if record[group_idx] else ""
if not re.match(r'^Zone \d{2}$', group_val):
failure_counts['group'] += 1
if failure_counts['group'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format (should be 'Zone XX')")
# Validate Conduit (only for Underground)
if record[type_idx] == 'Underground':
conduit_val = str(record[conduit_idx]).strip() if record[conduit_idx] else ""
# Check if first 8 characters match "(1)-1.25" or "(3)-1.25"
# Using regex to handle any quote-like character
if not re.match(r'^\([13]\)-1\.25.', conduit_val):
failure_counts['conduit'] += 1
if failure_counts['conduit'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Conduit value for Underground type (must start with '(1)-1.25\"' or '(3)-1.25\"')")
for key, count in failure_counts.items():
if count > 10:
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
sf.close()
except Exception as e:
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
return errors
def validate_access_points(shp_path: Path, shapefile_name: str):
"""Validate access_points shapefile attributes"""
errors = []
try:
sf = shapefile.Reader(str(shp_path))
field_names = [field[0] for field in sf.fields[1:]]
required_fields = ['Type', 'Group 1', 'Latitude', 'Longitude']
for field in required_fields:
if field not in field_names:
errors.append(f"{shapefile_name}: Missing required field '{field}'")
return errors
type_idx = field_names.index('Type')
group_idx = field_names.index('Group 1')
lat_idx = field_names.index('Latitude')
lon_idx = field_names.index('Longitude')
valid_types = ['Handhole', 'Cabinet']
failure_counts = {'type': 0, 'group': 0, 'lat': 0, 'lon': 0}
for idx, record in enumerate(sf.records()):
if record[type_idx] not in valid_types:
failure_counts['type'] += 1
if failure_counts['type'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Type")
group_val = str(record[group_idx]) if record[group_idx] else ""
if not re.match(r'^Zone \d{2}$', group_val):
failure_counts['group'] += 1
if failure_counts['group'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format")
try:
float(record[lat_idx])
except (ValueError, TypeError):
failure_counts['lat'] += 1
if failure_counts['lat'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Latitude is not a number")
try:
float(record[lon_idx])
except (ValueError, TypeError):
failure_counts['lon'] += 1
if failure_counts['lon'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Longitude is not a number")
for key, count in failure_counts.items():
if count > 10:
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
sf.close()
except Exception as e:
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
return errors
def validate_cabinet_boundaries(shp_path: Path, shapefile_name: str):
"""Validate cabinet_boundaries shapefile attributes"""
errors = []
try:
sf = shapefile.Reader(str(shp_path))
field_names = [field[0] for field in sf.fields[1:]]
if 'Name' not in field_names:
errors.append(f"{shapefile_name}: Missing required field 'Name'")
return errors
name_idx = field_names.index('Name')
failure_count = 0
for idx, record in enumerate(sf.records()):
name_val = str(record[name_idx]) if record[name_idx] else ""
if not re.match(r'^Zone \d{2} Boundary$', name_val):
failure_count += 1
if failure_count <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Name format (should be 'Zone XX Boundary')")
if failure_count > 10:
errors.append(f"{shapefile_name}: 10 or more features failed Name validation")
sf.close()
except Exception as e:
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
return errors
def validate_permits(shp_path: Path, shapefile_name: str):
"""Validate permits shapefile attributes"""
errors = []
try:
sf = shapefile.Reader(str(shp_path))
field_names = [field[0] for field in sf.fields[1:]]
if 'Name' not in field_names:
errors.append(f"{shapefile_name}: Missing required field 'Name'")
return errors
name_idx = field_names.index('Name')
failure_count = 0
for idx, record in enumerate(sf.records()):
name_val = str(record[name_idx]) if record[name_idx] else ""
if not (name_val.startswith('ROW') or name_val.startswith('ROE') or name_val.startswith('LLP')):
failure_count += 1
if failure_count <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Name does not start with ROW, ROE, or LLP")
if failure_count > 10:
errors.append(f"{shapefile_name}: 10 or more features failed Name validation")
sf.close()
except Exception as e:
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
return errors
def validate_cables(shp_path: Path, shapefile_name: str):
"""Validate cables shapefile attributes"""
errors = []
try:
sf = shapefile.Reader(str(shp_path))
field_names = [field[0] for field in sf.fields[1:]]
if 'Name' not in field_names:
errors.append(f"{shapefile_name}: Missing required field 'Name'")
return errors
name_idx = field_names.index('Name')
failure_count = 0
for idx, record in enumerate(sf.records()):
name_val = str(record[name_idx]) if record[name_idx] else ""
if not re.match(r'^\d{3}F', name_val):
failure_count += 1
if failure_count <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Name does not begin with XXXF format (three digits followed by capital F)")
if failure_count > 10:
errors.append(f"{shapefile_name}: 10 or more features failed Name validation")
sf.close()
except Exception as e:
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
return errors
def validate_parcels(shp_path: Path, shapefile_name: str):
"""Validate parcels shapefile attributes"""
errors = []
try:
sf = shapefile.Reader(str(shp_path))
field_names = [field[0] for field in sf.fields[1:]]
required_fields = ['Name', 'Group 1']
for field in required_fields:
if field not in field_names:
errors.append(f"{shapefile_name}: Missing required field '{field}'")
return errors
name_idx = field_names.index('Name')
group_idx = field_names.index('Group 1')
failure_counts = {'name': 0, 'group': 0}
for idx, record in enumerate(sf.records()):
if record[name_idx] != 'Parcel':
failure_counts['name'] += 1
if failure_counts['name'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Name must be exactly 'Parcel'")
group_val = str(record[group_idx]) if record[group_idx] else ""
if not re.match(r'^Zone \d{2}$', group_val):
failure_counts['group'] += 1
if failure_counts['group'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format")
for key, count in failure_counts.items():
if count > 10:
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
sf.close()
except Exception as e:
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
return errors
def validate_sites(shp_path: Path, shapefile_name: str):
"""Validate sites shapefile attributes"""
errors = []
try:
sf = shapefile.Reader(str(shp_path))
field_names = [field[0] for field in sf.fields[1:]]
required_fields = ['Type', 'Address', 'State', 'Zip', 'BEN#', 'Latitude', 'Longitude']
for field in required_fields:
if field not in field_names:
errors.append(f"{shapefile_name}: Missing required field '{field}'")
return errors
type_idx = field_names.index('Type')
address_idx = field_names.index('Address')
state_idx = field_names.index('State')
zip_idx = field_names.index('Zip')
ben_idx = field_names.index('BEN#')
lat_idx = field_names.index('Latitude')
lon_idx = field_names.index('Longitude')
valid_types = ['School', 'Hub Site', 'MDU', 'Administration', 'MTU', 'Dwelling Unit',
'Vendor Location', 'Cell Tower', 'Government', 'Data Center', 'Hosptial',
'Internet', 'Large Business', 'Library', 'Museum', 'Power Substation',
'Small Business', 'Small Cell', 'Stadium', 'University', 'Splice Point',
'ILA', 'SFR', 'Vacant Lot', 'Mobile Home', 'Meet Me']
failure_counts = {'type': 0, 'address': 0, 'state': 0, 'zip': 0, 'ben': 0, 'lat': 0, 'lon': 0}
for idx, record in enumerate(sf.records()):
if record[type_idx] not in valid_types:
failure_counts['type'] += 1
if failure_counts['type'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Type")
if not record[address_idx] or str(record[address_idx]).strip() == '':
failure_counts['address'] += 1
if failure_counts['address'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Address must be populated")
state_val = str(record[state_idx]) if record[state_idx] else ""
if not re.match(r'^[A-Z]{2}$', state_val):
failure_counts['state'] += 1
if failure_counts['state'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} State must be 2 letters")
zip_val = str(record[zip_idx]) if record[zip_idx] else ""
if not re.match(r'^\d{5}$', zip_val):
failure_counts['zip'] += 1
if failure_counts['zip'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Zip must be 5 digits")
try:
int(record[ben_idx])
except (ValueError, TypeError):
failure_counts['ben'] += 1
if failure_counts['ben'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} BEN# must be an integer")
try:
float(record[lat_idx])
except (ValueError, TypeError):
failure_counts['lat'] += 1
if failure_counts['lat'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Latitude is not a number")
try:
float(record[lon_idx])
except (ValueError, TypeError):
failure_counts['lon'] += 1
if failure_counts['lon'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Longitude is not a number")
for key, count in failure_counts.items():
if count > 10:
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
sf.close()
except Exception as e:
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
return errors
def validate_splicing(shp_path: Path, shapefile_name: str):
"""Validate splicing shapefile attributes"""
errors = []
try:
sf = shapefile.Reader(str(shp_path))
field_names = [field[0] for field in sf.fields[1:]]
required_fields = ['AKA', 'Type', 'Group 1', 'Latitude', 'Longitude']
for field in required_fields:
if field not in field_names:
errors.append(f"{shapefile_name}: Missing required field '{field}'")
return errors
aka_idx = field_names.index('AKA')
type_idx = field_names.index('Type')
group_idx = field_names.index('Group 1')
lat_idx = field_names.index('Latitude')
lon_idx = field_names.index('Longitude')
valid_types = ['MST', 'Splice', 'FTP']
failure_counts = {'aka': 0, 'type': 0, 'group': 0, 'lat': 0, 'lon': 0}
for idx, record in enumerate(sf.records()):
aka_val = str(record[aka_idx]) if record[aka_idx] else ""
if not re.match(r'^[A-Z]{3}_[A-Z]', aka_val):
failure_counts['aka'] += 1
if failure_counts['aka'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} AKA must begin with YYY_Y format")
if record[type_idx] not in valid_types:
failure_counts['type'] += 1
if failure_counts['type'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Type")
group_val = str(record[group_idx]) if record[group_idx] else ""
if not re.match(r'^Zone \d{2}$', group_val):
failure_counts['group'] += 1
if failure_counts['group'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format")
try:
float(record[lat_idx])
except (ValueError, TypeError):
failure_counts['lat'] += 1
if failure_counts['lat'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Latitude is not a number")
try:
float(record[lon_idx])
except (ValueError, TypeError):
failure_counts['lon'] += 1
if failure_counts['lon'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Longitude is not a number")
for key, count in failure_counts.items():
if count > 10:
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
sf.close()
except Exception as e:
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
return errors
def validate_network_elements(shp_path: Path, shapefile_name: str):
"""Validate network_elements shapefile attributes"""
errors = []
try:
sf = shapefile.Reader(str(shp_path))
field_names = [field[0] for field in sf.fields[1:]]
required_fields = ['Type', 'Group 1', 'Latitude', 'Longitude']
for field in required_fields:
if field not in field_names:
errors.append(f"{shapefile_name}: Missing required field '{field}'")
return errors
type_idx = field_names.index('Type')
group_idx = field_names.index('Group 1')
lat_idx = field_names.index('Latitude')
lon_idx = field_names.index('Longitude')
valid_types = ['Slack Coil', 'Anchor', 'Bore Pit', 'Riser']
failure_counts = {'type': 0, 'group': 0, 'lat': 0, 'lon': 0}
for idx, record in enumerate(sf.records()):
if record[type_idx] not in valid_types:
failure_counts['type'] += 1
if failure_counts['type'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Type")
group_val = str(record[group_idx]) if record[group_idx] else ""
if not re.match(r'^Zone \d{2}$', group_val):
failure_counts['group'] += 1
if failure_counts['group'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format")
try:
float(record[lat_idx])
except (ValueError, TypeError):
failure_counts['lat'] += 1
if failure_counts['lat'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Latitude is not a number")
try:
float(record[lon_idx])
except (ValueError, TypeError):
failure_counts['lon'] += 1
if failure_counts['lon'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Longitude is not a number")
for key, count in failure_counts.items():
if count > 10:
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
sf.close()
except Exception as e:
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
return errors
def validate_poles(shp_path: Path, shapefile_name: str):
"""Validate poles shapefile attributes"""
errors = []
try:
sf = shapefile.Reader(str(shp_path))
field_names = [field[0] for field in sf.fields[1:]]
required_fields = ['Pole Tag', 'Pole Owner', 'Group 1', 'Latitude', 'Longitude']
for field in required_fields:
if field not in field_names:
errors.append(f"{shapefile_name}: Missing required field '{field}'")
return errors
tag_idx = field_names.index('Pole Tag')
owner_idx = field_names.index('Pole Owner')
group_idx = field_names.index('Group 1')
lat_idx = field_names.index('Latitude')
lon_idx = field_names.index('Longitude')
failure_counts = {'tag': 0, 'owner': 0, 'group': 0, 'lat': 0, 'lon': 0}
for idx, record in enumerate(sf.records()):
if not record[tag_idx] or str(record[tag_idx]).strip() == '':
failure_counts['tag'] += 1
if failure_counts['tag'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} 'Pole Tag' must be populated")
if not record[owner_idx] or str(record[owner_idx]).strip() == '':
failure_counts['owner'] += 1
if failure_counts['owner'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} 'Pole Owner' must be populated")
group_val = str(record[group_idx]) if record[group_idx] else ""
if not re.match(r'^Zone \d{2}$', group_val):
failure_counts['group'] += 1
if failure_counts['group'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format")
try:
float(record[lat_idx])
except (ValueError, TypeError):
failure_counts['lat'] += 1
if failure_counts['lat'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Latitude is not a number")
try:
float(record[lon_idx])
except (ValueError, TypeError):
failure_counts['lon'] += 1
if failure_counts['lon'] <= 10:
errors.append(f"{shapefile_name}: Feature {idx} Longitude is not a number")
for key, count in failure_counts.items():
if count > 10:
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
sf.close()
except Exception as e:
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
return errors
def point_in_polygon(point, polygon):
"""Check if a point is inside a polygon using ray casting algorithm"""
x, y = point
n = len(polygon)
inside = False
p1x, p1y = polygon[0]
for i in range(n + 1):
p2x, p2y = polygon[i % n]
if y > min(p1y, p2y):
if y <= max(p1y, p2y):
if x <= max(p1x, p2x):
if p1y != p2y:
xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
if p1x == p2x or x <= xinters:
inside = not inside
p1x, p1y = p2x, p2y
return inside
def line_crosses_polygon_boundary(line_points, polygon):
"""Check if a line crosses a polygon boundary (for segment exception)"""
# Check if line has points both inside and outside the polygon
points_inside = sum(1 for point in line_points if point_in_polygon(point, polygon))
points_outside = len(line_points) - points_inside
return points_inside > 0 and points_outside > 0
def extract_zone_number(field_value):
"""Extract 2-digit zone number from field value like 'Zone 07' or 'Zone 07 Boundary'"""
match = re.search(r'Zone (\d{2})', str(field_value))
if match:
return match.group(1)
return None
def validate_spatial_containment(shapefile_dir: Path):
"""Validate that features are within their correct cabinet boundaries"""
errors = []
try:
# Load cabinet boundaries
cabinet_path = shapefile_dir / "cabinet_boundaries.shp"
cabinet_sf = shapefile.Reader(str(cabinet_path))
cabinet_records = cabinet_sf.records()
cabinet_shapes = cabinet_sf.shapes()
cabinet_fields = [field[0] for field in cabinet_sf.fields[1:]]
if 'Name' not in cabinet_fields:
errors.append("cabinet_boundaries: Missing 'Name' field for spatial validation")
return errors
name_idx = cabinet_fields.index('Name')
# Build cabinet boundary data structure
cabinets = []
for idx, (record, shape) in enumerate(zip(cabinet_records, cabinet_shapes)):
zone_num = extract_zone_number(record[name_idx])
if zone_num:
# Handle polygon or multipolygon
if shape.shapeType == 5: # Polygon
polygon = shape.points
cabinets.append({'zone': zone_num, 'polygon': polygon})
elif shape.shapeType == 15: # PolygonZ
polygon = shape.points
cabinets.append({'zone': zone_num, 'polygon': polygon})
if not cabinets:
errors.append("cabinet_boundaries: No valid cabinet boundaries found with zone numbers")
return errors
# Validate each feature type
feature_types = [
('sites', True, False), # (shapefile_name, has_group1, is_line)
('access_points', True, False),
('permits', False, False), # permits don't have Group 1, will skip zone matching
('splicing', True, False),
('network_elements', True, False),
('poles', True, False),
('segments', True, True), # segments are lines
]
for shapefile_name, has_group1, is_line in feature_types:
shp_path = shapefile_dir / f"{shapefile_name}.shp"
try:
sf = shapefile.Reader(str(shp_path))
records = sf.records()
shapes = sf.shapes()
field_names = [field[0] for field in sf.fields[1:]]
# Get Group 1 field index if it exists
group1_idx = None
if has_group1 and 'Group 1' in field_names:
group1_idx = field_names.index('Group 1')
# Get UID for error reporting
uid_idx = field_names.index('UID') if 'UID' in field_names else None
failure_counts = {'wrong_zone': 0, 'outside_all': 0}
for idx, (record, shape) in enumerate(zip(records, shapes)):
uid = record[uid_idx] if uid_idx is not None else idx
# Get feature zone number
feature_zone = None
if group1_idx is not None:
feature_zone = extract_zone_number(record[group1_idx])
# Get feature geometry
if is_line:
# For segments, get all points
feature_points = shape.points
else:
# For points, get the first point
if len(shape.points) > 0:
feature_points = [shape.points[0]]
else:
continue
# Check if feature is in any cabinet boundary
in_any_cabinet = False
in_correct_cabinet = False
crosses_boundary = False
for cabinet in cabinets:
if is_line:
# Check if line crosses this boundary
if line_crosses_polygon_boundary(feature_points, cabinet['polygon']):
crosses_boundary = True
break
# Check if any point is in this cabinet
for point in feature_points:
if point_in_polygon(point, cabinet['polygon']):
in_any_cabinet = True
if feature_zone == cabinet['zone']:
in_correct_cabinet = True
break
else:
# For points, check if in this cabinet
if point_in_polygon(feature_points[0], cabinet['polygon']):
in_any_cabinet = True
if feature_zone == cabinet['zone']:
in_correct_cabinet = True
break
# Exception for segments that cross boundaries
if shapefile_name == 'segments' and crosses_boundary:
continue
# Check if feature is outside all cabinets
if not in_any_cabinet:
failure_counts['outside_all'] += 1
if failure_counts['outside_all'] <= 10:
errors.append(f"{shapefile_name}: Feature UID {uid} is outside all cabinet boundaries")
# Check if feature is in wrong zone (only if has Group 1 field)
elif has_group1 and not in_correct_cabinet and feature_zone:
failure_counts['wrong_zone'] += 1
if failure_counts['wrong_zone'] <= 10:
errors.append(f"{shapefile_name}: Feature UID {uid} is in wrong cabinet boundary (expected Zone {feature_zone})")
if failure_counts['outside_all'] > 10:
errors.append(f"{shapefile_name}: 10 or more features failed outside all cabinet boundaries validation")
if failure_counts['wrong_zone'] > 10:
errors.append(f"{shapefile_name}: 10 or more features failed wrong cabinet boundary validation")
sf.close()
except Exception as e:
errors.append(f"{shapefile_name}: Error during spatial validation - {str(e)}")
cabinet_sf.close()
except Exception as e:
errors.append(f"Spatial validation error: {str(e)}")
return errors