Saving current working state before proceeding to Stage 2. Includes: - Backend: Python-based QC validator with shapefile processing - Frontend: Drag-and-drop file upload interface - Sample files for testing - Documentation and revision history 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
894 lines
35 KiB
Python
894 lines
35 KiB
Python
import shapefile
|
|
from pathlib import Path
|
|
import re
|
|
|
|
# Required shapefiles
|
|
REQUIRED_SHAPEFILES = [
|
|
"poles",
|
|
"network_elements",
|
|
"splicing",
|
|
"sites",
|
|
"parcels",
|
|
"permits",
|
|
"cabinet_boundaries",
|
|
"segments",
|
|
"access_points",
|
|
"cables"
|
|
]
|
|
|
|
# WGS 84 projection string (EPSG:4326)
|
|
WGS84_PROJ = 'GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]]'
|
|
|
|
|
|
def validate_shapefiles(temp_dir: Path):
|
|
"""Main QC validation function"""
|
|
errors = []
|
|
|
|
# Find the directory containing shapefiles (might be in subdirectory)
|
|
shapefile_dir = find_shapefile_directory(temp_dir)
|
|
if not shapefile_dir:
|
|
errors.append("No shapefiles found in the uploaded ZIP")
|
|
return {"passed": False, "errors": errors}
|
|
|
|
# Check all required shapefiles exist
|
|
missing = check_required_shapefiles(shapefile_dir)
|
|
if missing:
|
|
for shapefile_name in missing:
|
|
errors.append(f"Missing required shapefile: {shapefile_name}")
|
|
return {"passed": False, "errors": errors}
|
|
|
|
# Validate each shapefile
|
|
for shapefile_name in REQUIRED_SHAPEFILES:
|
|
shp_path = shapefile_dir / f"{shapefile_name}.shp"
|
|
|
|
# Validate projection
|
|
proj_errors = validate_projection(shp_path, shapefile_name)
|
|
errors.extend(proj_errors)
|
|
|
|
# Validate UID field
|
|
uid_errors = validate_uid_field(shp_path, shapefile_name)
|
|
errors.extend(uid_errors)
|
|
|
|
# Validate attributes based on shapefile type
|
|
attr_errors = validate_attributes(shp_path, shapefile_name)
|
|
errors.extend(attr_errors)
|
|
|
|
# Perform spatial validation (features within correct cabinet boundaries)
|
|
spatial_errors = validate_spatial_containment(shapefile_dir)
|
|
errors.extend(spatial_errors)
|
|
|
|
return {"passed": len(errors) == 0, "errors": errors}
|
|
|
|
|
|
def find_shapefile_directory(temp_dir: Path):
|
|
"""Find the directory containing the shapefiles (may be in subdirectory)"""
|
|
# Check root directory first
|
|
shp_files = list(temp_dir.glob("*.shp"))
|
|
if shp_files:
|
|
return temp_dir
|
|
|
|
# Check subdirectories
|
|
for subdir in temp_dir.iterdir():
|
|
if subdir.is_dir():
|
|
shp_files = list(subdir.glob("*.shp"))
|
|
if shp_files:
|
|
return subdir
|
|
|
|
return None
|
|
|
|
|
|
def check_required_shapefiles(shapefile_dir: Path):
|
|
"""Check if all required shapefiles exist"""
|
|
missing = []
|
|
for shapefile_name in REQUIRED_SHAPEFILES:
|
|
shp_path = shapefile_dir / f"{shapefile_name}.shp"
|
|
if not shp_path.exists():
|
|
missing.append(shapefile_name)
|
|
return missing
|
|
|
|
|
|
def validate_projection(shp_path: Path, shapefile_name: str):
|
|
"""Validate shapefile is in WGS 84 projection"""
|
|
errors = []
|
|
prj_path = shp_path.with_suffix('.prj')
|
|
|
|
if not prj_path.exists():
|
|
errors.append(f"{shapefile_name}: Missing .prj file")
|
|
return errors
|
|
|
|
with open(prj_path, 'r') as f:
|
|
proj_content = f.read().strip()
|
|
|
|
# Check if it contains WGS 84 identifiers
|
|
if 'WGS_1984' not in proj_content and 'WGS84' not in proj_content:
|
|
errors.append(f"{shapefile_name}: Not in WGS 84 projection")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_uid_field(shp_path: Path, shapefile_name: str):
|
|
"""Validate UID field exists and contains unique integers"""
|
|
errors = []
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error reading shapefile - {str(e)}")
|
|
return errors
|
|
|
|
# Check if UID field exists
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
if 'UID' not in field_names:
|
|
errors.append(f"{shapefile_name}: Missing UID field")
|
|
return errors
|
|
|
|
# Get UID field index
|
|
uid_index = field_names.index('UID')
|
|
|
|
# Collect UIDs and validate
|
|
uids = []
|
|
non_integer_count = 0
|
|
|
|
for idx, record in enumerate(sf.records()):
|
|
uid = record[uid_index]
|
|
|
|
# Check if integer
|
|
if not isinstance(uid, int):
|
|
try:
|
|
uid = int(uid)
|
|
except (ValueError, TypeError):
|
|
non_integer_count += 1
|
|
if non_integer_count <= 10:
|
|
errors.append(f"{shapefile_name}: UID at feature index {idx} is not an integer")
|
|
continue
|
|
|
|
uids.append(uid)
|
|
|
|
if non_integer_count > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed UID is not an integer")
|
|
|
|
# Check for uniqueness
|
|
if len(uids) != len(set(uids)):
|
|
duplicate_count = len(uids) - len(set(uids))
|
|
if duplicate_count >= 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed UID is not unique")
|
|
else:
|
|
errors.append(f"{shapefile_name}: UID field contains {duplicate_count} duplicate values")
|
|
|
|
sf.close()
|
|
return errors
|
|
|
|
|
|
def validate_attributes(shp_path: Path, shapefile_name: str):
|
|
"""Validate shapefile-specific attributes"""
|
|
|
|
validators = {
|
|
"segments": validate_segments,
|
|
"access_points": validate_access_points,
|
|
"cabinet_boundaries": validate_cabinet_boundaries,
|
|
"permits": validate_permits,
|
|
"cables": validate_cables,
|
|
"parcels": validate_parcels,
|
|
"sites": validate_sites,
|
|
"splicing": validate_splicing,
|
|
"network_elements": validate_network_elements,
|
|
"poles": validate_poles
|
|
}
|
|
|
|
validator = validators.get(shapefile_name)
|
|
if validator:
|
|
return validator(shp_path, shapefile_name)
|
|
|
|
return []
|
|
|
|
|
|
def validate_segments(shp_path: Path, shapefile_name: str):
|
|
"""Validate segments shapefile attributes"""
|
|
errors = []
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
|
|
# Check required fields (Group 1 with space, not Group_01)
|
|
required_fields = ['Type', 'Group 1', 'Conduit']
|
|
for field in required_fields:
|
|
if field not in field_names:
|
|
errors.append(f"{shapefile_name}: Missing required field '{field}'")
|
|
return errors
|
|
|
|
type_idx = field_names.index('Type')
|
|
group_idx = field_names.index('Group 1')
|
|
conduit_idx = field_names.index('Conduit')
|
|
|
|
valid_types = ['Aerial', '3rd Party Duct', 'Underground', 'Existing VERO', 'Drop Cable']
|
|
failure_counts = {'type': 0, 'group': 0, 'conduit': 0}
|
|
|
|
for idx, record in enumerate(sf.records()):
|
|
# Validate Type
|
|
if record[type_idx] not in valid_types:
|
|
failure_counts['type'] += 1
|
|
if failure_counts['type'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Type value")
|
|
|
|
# Validate Group 1 format (Zone XX)
|
|
group_val = str(record[group_idx]) if record[group_idx] else ""
|
|
if not re.match(r'^Zone \d{2}$', group_val):
|
|
failure_counts['group'] += 1
|
|
if failure_counts['group'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format (should be 'Zone XX')")
|
|
|
|
# Validate Conduit (only for Underground)
|
|
if record[type_idx] == 'Underground':
|
|
conduit_val = str(record[conduit_idx]).strip() if record[conduit_idx] else ""
|
|
# Check if first 8 characters match "(1)-1.25" or "(3)-1.25"
|
|
# Using regex to handle any quote-like character
|
|
if not re.match(r'^\([13]\)-1\.25.', conduit_val):
|
|
failure_counts['conduit'] += 1
|
|
if failure_counts['conduit'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Conduit value for Underground type (must start with '(1)-1.25\"' or '(3)-1.25\"')")
|
|
|
|
for key, count in failure_counts.items():
|
|
if count > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
|
|
|
|
sf.close()
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_access_points(shp_path: Path, shapefile_name: str):
|
|
"""Validate access_points shapefile attributes"""
|
|
errors = []
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
|
|
required_fields = ['Type', 'Group 1', 'Latitude', 'Longitude']
|
|
for field in required_fields:
|
|
if field not in field_names:
|
|
errors.append(f"{shapefile_name}: Missing required field '{field}'")
|
|
return errors
|
|
|
|
type_idx = field_names.index('Type')
|
|
group_idx = field_names.index('Group 1')
|
|
lat_idx = field_names.index('Latitude')
|
|
lon_idx = field_names.index('Longitude')
|
|
|
|
valid_types = ['Handhole', 'Cabinet']
|
|
failure_counts = {'type': 0, 'group': 0, 'lat': 0, 'lon': 0}
|
|
|
|
for idx, record in enumerate(sf.records()):
|
|
if record[type_idx] not in valid_types:
|
|
failure_counts['type'] += 1
|
|
if failure_counts['type'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Type")
|
|
|
|
group_val = str(record[group_idx]) if record[group_idx] else ""
|
|
if not re.match(r'^Zone \d{2}$', group_val):
|
|
failure_counts['group'] += 1
|
|
if failure_counts['group'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format")
|
|
|
|
try:
|
|
float(record[lat_idx])
|
|
except (ValueError, TypeError):
|
|
failure_counts['lat'] += 1
|
|
if failure_counts['lat'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Latitude is not a number")
|
|
|
|
try:
|
|
float(record[lon_idx])
|
|
except (ValueError, TypeError):
|
|
failure_counts['lon'] += 1
|
|
if failure_counts['lon'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Longitude is not a number")
|
|
|
|
for key, count in failure_counts.items():
|
|
if count > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
|
|
|
|
sf.close()
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_cabinet_boundaries(shp_path: Path, shapefile_name: str):
|
|
"""Validate cabinet_boundaries shapefile attributes"""
|
|
errors = []
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
|
|
if 'Name' not in field_names:
|
|
errors.append(f"{shapefile_name}: Missing required field 'Name'")
|
|
return errors
|
|
|
|
name_idx = field_names.index('Name')
|
|
failure_count = 0
|
|
|
|
for idx, record in enumerate(sf.records()):
|
|
name_val = str(record[name_idx]) if record[name_idx] else ""
|
|
if not re.match(r'^Zone \d{2} Boundary$', name_val):
|
|
failure_count += 1
|
|
if failure_count <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Name format (should be 'Zone XX Boundary')")
|
|
|
|
if failure_count > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed Name validation")
|
|
|
|
sf.close()
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_permits(shp_path: Path, shapefile_name: str):
|
|
"""Validate permits shapefile attributes"""
|
|
errors = []
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
|
|
if 'Name' not in field_names:
|
|
errors.append(f"{shapefile_name}: Missing required field 'Name'")
|
|
return errors
|
|
|
|
name_idx = field_names.index('Name')
|
|
failure_count = 0
|
|
|
|
for idx, record in enumerate(sf.records()):
|
|
name_val = str(record[name_idx]) if record[name_idx] else ""
|
|
if not (name_val.startswith('ROW') or name_val.startswith('ROE') or name_val.startswith('LLP')):
|
|
failure_count += 1
|
|
if failure_count <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Name does not start with ROW, ROE, or LLP")
|
|
|
|
if failure_count > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed Name validation")
|
|
|
|
sf.close()
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_cables(shp_path: Path, shapefile_name: str):
|
|
"""Validate cables shapefile attributes"""
|
|
errors = []
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
|
|
if 'Name' not in field_names:
|
|
errors.append(f"{shapefile_name}: Missing required field 'Name'")
|
|
return errors
|
|
|
|
name_idx = field_names.index('Name')
|
|
failure_count = 0
|
|
|
|
for idx, record in enumerate(sf.records()):
|
|
name_val = str(record[name_idx]) if record[name_idx] else ""
|
|
if not re.match(r'^\d{3}F', name_val):
|
|
failure_count += 1
|
|
if failure_count <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Name does not begin with XXXF format (three digits followed by capital F)")
|
|
|
|
if failure_count > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed Name validation")
|
|
|
|
sf.close()
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_parcels(shp_path: Path, shapefile_name: str):
|
|
"""Validate parcels shapefile attributes"""
|
|
errors = []
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
|
|
required_fields = ['Name', 'Group 1']
|
|
for field in required_fields:
|
|
if field not in field_names:
|
|
errors.append(f"{shapefile_name}: Missing required field '{field}'")
|
|
return errors
|
|
|
|
name_idx = field_names.index('Name')
|
|
group_idx = field_names.index('Group 1')
|
|
|
|
failure_counts = {'name': 0, 'group': 0}
|
|
|
|
for idx, record in enumerate(sf.records()):
|
|
if record[name_idx] != 'Parcel':
|
|
failure_counts['name'] += 1
|
|
if failure_counts['name'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Name must be exactly 'Parcel'")
|
|
|
|
group_val = str(record[group_idx]) if record[group_idx] else ""
|
|
if not re.match(r'^Zone \d{2}$', group_val):
|
|
failure_counts['group'] += 1
|
|
if failure_counts['group'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format")
|
|
|
|
for key, count in failure_counts.items():
|
|
if count > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
|
|
|
|
sf.close()
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_sites(shp_path: Path, shapefile_name: str):
|
|
"""Validate sites shapefile attributes"""
|
|
errors = []
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
|
|
required_fields = ['Type', 'Address', 'State', 'Zip', 'BEN#', 'Latitude', 'Longitude']
|
|
for field in required_fields:
|
|
if field not in field_names:
|
|
errors.append(f"{shapefile_name}: Missing required field '{field}'")
|
|
return errors
|
|
|
|
type_idx = field_names.index('Type')
|
|
address_idx = field_names.index('Address')
|
|
state_idx = field_names.index('State')
|
|
zip_idx = field_names.index('Zip')
|
|
ben_idx = field_names.index('BEN#')
|
|
lat_idx = field_names.index('Latitude')
|
|
lon_idx = field_names.index('Longitude')
|
|
|
|
valid_types = ['School', 'Hub Site', 'MDU', 'Administration', 'MTU', 'Dwelling Unit',
|
|
'Vendor Location', 'Cell Tower', 'Government', 'Data Center', 'Hosptial',
|
|
'Internet', 'Large Business', 'Library', 'Museum', 'Power Substation',
|
|
'Small Business', 'Small Cell', 'Stadium', 'University', 'Splice Point',
|
|
'ILA', 'SFR', 'Vacant Lot', 'Mobile Home', 'Meet Me']
|
|
|
|
failure_counts = {'type': 0, 'address': 0, 'state': 0, 'zip': 0, 'ben': 0, 'lat': 0, 'lon': 0}
|
|
|
|
for idx, record in enumerate(sf.records()):
|
|
if record[type_idx] not in valid_types:
|
|
failure_counts['type'] += 1
|
|
if failure_counts['type'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Type")
|
|
|
|
if not record[address_idx] or str(record[address_idx]).strip() == '':
|
|
failure_counts['address'] += 1
|
|
if failure_counts['address'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Address must be populated")
|
|
|
|
state_val = str(record[state_idx]) if record[state_idx] else ""
|
|
if not re.match(r'^[A-Z]{2}$', state_val):
|
|
failure_counts['state'] += 1
|
|
if failure_counts['state'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} State must be 2 letters")
|
|
|
|
zip_val = str(record[zip_idx]) if record[zip_idx] else ""
|
|
if not re.match(r'^\d{5}$', zip_val):
|
|
failure_counts['zip'] += 1
|
|
if failure_counts['zip'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Zip must be 5 digits")
|
|
|
|
try:
|
|
int(record[ben_idx])
|
|
except (ValueError, TypeError):
|
|
failure_counts['ben'] += 1
|
|
if failure_counts['ben'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} BEN# must be an integer")
|
|
|
|
try:
|
|
float(record[lat_idx])
|
|
except (ValueError, TypeError):
|
|
failure_counts['lat'] += 1
|
|
if failure_counts['lat'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Latitude is not a number")
|
|
|
|
try:
|
|
float(record[lon_idx])
|
|
except (ValueError, TypeError):
|
|
failure_counts['lon'] += 1
|
|
if failure_counts['lon'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Longitude is not a number")
|
|
|
|
for key, count in failure_counts.items():
|
|
if count > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
|
|
|
|
sf.close()
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_splicing(shp_path: Path, shapefile_name: str):
|
|
"""Validate splicing shapefile attributes"""
|
|
errors = []
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
|
|
required_fields = ['AKA', 'Type', 'Group 1', 'Latitude', 'Longitude']
|
|
for field in required_fields:
|
|
if field not in field_names:
|
|
errors.append(f"{shapefile_name}: Missing required field '{field}'")
|
|
return errors
|
|
|
|
aka_idx = field_names.index('AKA')
|
|
type_idx = field_names.index('Type')
|
|
group_idx = field_names.index('Group 1')
|
|
lat_idx = field_names.index('Latitude')
|
|
lon_idx = field_names.index('Longitude')
|
|
|
|
valid_types = ['MST', 'Splice', 'FTP']
|
|
failure_counts = {'aka': 0, 'type': 0, 'group': 0, 'lat': 0, 'lon': 0}
|
|
|
|
for idx, record in enumerate(sf.records()):
|
|
aka_val = str(record[aka_idx]) if record[aka_idx] else ""
|
|
if not re.match(r'^[A-Z]{3}_[A-Z]', aka_val):
|
|
failure_counts['aka'] += 1
|
|
if failure_counts['aka'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} AKA must begin with YYY_Y format")
|
|
|
|
if record[type_idx] not in valid_types:
|
|
failure_counts['type'] += 1
|
|
if failure_counts['type'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Type")
|
|
|
|
group_val = str(record[group_idx]) if record[group_idx] else ""
|
|
if not re.match(r'^Zone \d{2}$', group_val):
|
|
failure_counts['group'] += 1
|
|
if failure_counts['group'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format")
|
|
|
|
try:
|
|
float(record[lat_idx])
|
|
except (ValueError, TypeError):
|
|
failure_counts['lat'] += 1
|
|
if failure_counts['lat'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Latitude is not a number")
|
|
|
|
try:
|
|
float(record[lon_idx])
|
|
except (ValueError, TypeError):
|
|
failure_counts['lon'] += 1
|
|
if failure_counts['lon'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Longitude is not a number")
|
|
|
|
for key, count in failure_counts.items():
|
|
if count > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
|
|
|
|
sf.close()
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_network_elements(shp_path: Path, shapefile_name: str):
|
|
"""Validate network_elements shapefile attributes"""
|
|
errors = []
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
|
|
required_fields = ['Type', 'Group 1', 'Latitude', 'Longitude']
|
|
for field in required_fields:
|
|
if field not in field_names:
|
|
errors.append(f"{shapefile_name}: Missing required field '{field}'")
|
|
return errors
|
|
|
|
type_idx = field_names.index('Type')
|
|
group_idx = field_names.index('Group 1')
|
|
lat_idx = field_names.index('Latitude')
|
|
lon_idx = field_names.index('Longitude')
|
|
|
|
valid_types = ['Slack Coil', 'Anchor', 'Bore Pit', 'Riser']
|
|
failure_counts = {'type': 0, 'group': 0, 'lat': 0, 'lon': 0}
|
|
|
|
for idx, record in enumerate(sf.records()):
|
|
if record[type_idx] not in valid_types:
|
|
failure_counts['type'] += 1
|
|
if failure_counts['type'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Type")
|
|
|
|
group_val = str(record[group_idx]) if record[group_idx] else ""
|
|
if not re.match(r'^Zone \d{2}$', group_val):
|
|
failure_counts['group'] += 1
|
|
if failure_counts['group'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format")
|
|
|
|
try:
|
|
float(record[lat_idx])
|
|
except (ValueError, TypeError):
|
|
failure_counts['lat'] += 1
|
|
if failure_counts['lat'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Latitude is not a number")
|
|
|
|
try:
|
|
float(record[lon_idx])
|
|
except (ValueError, TypeError):
|
|
failure_counts['lon'] += 1
|
|
if failure_counts['lon'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Longitude is not a number")
|
|
|
|
for key, count in failure_counts.items():
|
|
if count > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
|
|
|
|
sf.close()
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_poles(shp_path: Path, shapefile_name: str):
|
|
"""Validate poles shapefile attributes"""
|
|
errors = []
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
|
|
required_fields = ['Pole Tag', 'Pole Owner', 'Group 1', 'Latitude', 'Longitude']
|
|
for field in required_fields:
|
|
if field not in field_names:
|
|
errors.append(f"{shapefile_name}: Missing required field '{field}'")
|
|
return errors
|
|
|
|
tag_idx = field_names.index('Pole Tag')
|
|
owner_idx = field_names.index('Pole Owner')
|
|
group_idx = field_names.index('Group 1')
|
|
lat_idx = field_names.index('Latitude')
|
|
lon_idx = field_names.index('Longitude')
|
|
|
|
failure_counts = {'tag': 0, 'owner': 0, 'group': 0, 'lat': 0, 'lon': 0}
|
|
|
|
for idx, record in enumerate(sf.records()):
|
|
if not record[tag_idx] or str(record[tag_idx]).strip() == '':
|
|
failure_counts['tag'] += 1
|
|
if failure_counts['tag'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} 'Pole Tag' must be populated")
|
|
|
|
if not record[owner_idx] or str(record[owner_idx]).strip() == '':
|
|
failure_counts['owner'] += 1
|
|
if failure_counts['owner'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} 'Pole Owner' must be populated")
|
|
|
|
group_val = str(record[group_idx]) if record[group_idx] else ""
|
|
if not re.match(r'^Zone \d{2}$', group_val):
|
|
failure_counts['group'] += 1
|
|
if failure_counts['group'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} has invalid Group 1 format")
|
|
|
|
try:
|
|
float(record[lat_idx])
|
|
except (ValueError, TypeError):
|
|
failure_counts['lat'] += 1
|
|
if failure_counts['lat'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Latitude is not a number")
|
|
|
|
try:
|
|
float(record[lon_idx])
|
|
except (ValueError, TypeError):
|
|
failure_counts['lon'] += 1
|
|
if failure_counts['lon'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature {idx} Longitude is not a number")
|
|
|
|
for key, count in failure_counts.items():
|
|
if count > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed {key} validation")
|
|
|
|
sf.close()
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error validating attributes - {str(e)}")
|
|
|
|
return errors
|
|
|
|
|
|
def point_in_polygon(point, polygon):
|
|
"""Check if a point is inside a polygon using ray casting algorithm"""
|
|
x, y = point
|
|
n = len(polygon)
|
|
inside = False
|
|
|
|
p1x, p1y = polygon[0]
|
|
for i in range(n + 1):
|
|
p2x, p2y = polygon[i % n]
|
|
if y > min(p1y, p2y):
|
|
if y <= max(p1y, p2y):
|
|
if x <= max(p1x, p2x):
|
|
if p1y != p2y:
|
|
xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
|
|
if p1x == p2x or x <= xinters:
|
|
inside = not inside
|
|
p1x, p1y = p2x, p2y
|
|
|
|
return inside
|
|
|
|
|
|
def line_crosses_polygon_boundary(line_points, polygon):
|
|
"""Check if a line crosses a polygon boundary (for segment exception)"""
|
|
# Check if line has points both inside and outside the polygon
|
|
points_inside = sum(1 for point in line_points if point_in_polygon(point, polygon))
|
|
points_outside = len(line_points) - points_inside
|
|
|
|
return points_inside > 0 and points_outside > 0
|
|
|
|
|
|
def extract_zone_number(field_value):
|
|
"""Extract 2-digit zone number from field value like 'Zone 07' or 'Zone 07 Boundary'"""
|
|
match = re.search(r'Zone (\d{2})', str(field_value))
|
|
if match:
|
|
return match.group(1)
|
|
return None
|
|
|
|
|
|
def validate_spatial_containment(shapefile_dir: Path):
|
|
"""Validate that features are within their correct cabinet boundaries"""
|
|
errors = []
|
|
|
|
try:
|
|
# Load cabinet boundaries
|
|
cabinet_path = shapefile_dir / "cabinet_boundaries.shp"
|
|
cabinet_sf = shapefile.Reader(str(cabinet_path))
|
|
cabinet_records = cabinet_sf.records()
|
|
cabinet_shapes = cabinet_sf.shapes()
|
|
cabinet_fields = [field[0] for field in cabinet_sf.fields[1:]]
|
|
|
|
if 'Name' not in cabinet_fields:
|
|
errors.append("cabinet_boundaries: Missing 'Name' field for spatial validation")
|
|
return errors
|
|
|
|
name_idx = cabinet_fields.index('Name')
|
|
|
|
# Build cabinet boundary data structure
|
|
cabinets = []
|
|
for idx, (record, shape) in enumerate(zip(cabinet_records, cabinet_shapes)):
|
|
zone_num = extract_zone_number(record[name_idx])
|
|
if zone_num:
|
|
# Handle polygon or multipolygon
|
|
if shape.shapeType == 5: # Polygon
|
|
polygon = shape.points
|
|
cabinets.append({'zone': zone_num, 'polygon': polygon})
|
|
elif shape.shapeType == 15: # PolygonZ
|
|
polygon = shape.points
|
|
cabinets.append({'zone': zone_num, 'polygon': polygon})
|
|
|
|
if not cabinets:
|
|
errors.append("cabinet_boundaries: No valid cabinet boundaries found with zone numbers")
|
|
return errors
|
|
|
|
# Validate each feature type
|
|
feature_types = [
|
|
('sites', True, False), # (shapefile_name, has_group1, is_line)
|
|
('access_points', True, False),
|
|
('permits', False, False), # permits don't have Group 1, will skip zone matching
|
|
('splicing', True, False),
|
|
('network_elements', True, False),
|
|
('poles', True, False),
|
|
('segments', True, True), # segments are lines
|
|
]
|
|
|
|
for shapefile_name, has_group1, is_line in feature_types:
|
|
shp_path = shapefile_dir / f"{shapefile_name}.shp"
|
|
|
|
try:
|
|
sf = shapefile.Reader(str(shp_path))
|
|
records = sf.records()
|
|
shapes = sf.shapes()
|
|
field_names = [field[0] for field in sf.fields[1:]]
|
|
|
|
# Get Group 1 field index if it exists
|
|
group1_idx = None
|
|
if has_group1 and 'Group 1' in field_names:
|
|
group1_idx = field_names.index('Group 1')
|
|
|
|
# Get UID for error reporting
|
|
uid_idx = field_names.index('UID') if 'UID' in field_names else None
|
|
|
|
failure_counts = {'wrong_zone': 0, 'outside_all': 0}
|
|
|
|
for idx, (record, shape) in enumerate(zip(records, shapes)):
|
|
uid = record[uid_idx] if uid_idx is not None else idx
|
|
|
|
# Get feature zone number
|
|
feature_zone = None
|
|
if group1_idx is not None:
|
|
feature_zone = extract_zone_number(record[group1_idx])
|
|
|
|
# Get feature geometry
|
|
if is_line:
|
|
# For segments, get all points
|
|
feature_points = shape.points
|
|
else:
|
|
# For points, get the first point
|
|
if len(shape.points) > 0:
|
|
feature_points = [shape.points[0]]
|
|
else:
|
|
continue
|
|
|
|
# Check if feature is in any cabinet boundary
|
|
in_any_cabinet = False
|
|
in_correct_cabinet = False
|
|
crosses_boundary = False
|
|
|
|
for cabinet in cabinets:
|
|
if is_line:
|
|
# Check if line crosses this boundary
|
|
if line_crosses_polygon_boundary(feature_points, cabinet['polygon']):
|
|
crosses_boundary = True
|
|
break
|
|
# Check if any point is in this cabinet
|
|
for point in feature_points:
|
|
if point_in_polygon(point, cabinet['polygon']):
|
|
in_any_cabinet = True
|
|
if feature_zone == cabinet['zone']:
|
|
in_correct_cabinet = True
|
|
break
|
|
else:
|
|
# For points, check if in this cabinet
|
|
if point_in_polygon(feature_points[0], cabinet['polygon']):
|
|
in_any_cabinet = True
|
|
if feature_zone == cabinet['zone']:
|
|
in_correct_cabinet = True
|
|
break
|
|
|
|
# Exception for segments that cross boundaries
|
|
if shapefile_name == 'segments' and crosses_boundary:
|
|
continue
|
|
|
|
# Check if feature is outside all cabinets
|
|
if not in_any_cabinet:
|
|
failure_counts['outside_all'] += 1
|
|
if failure_counts['outside_all'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature UID {uid} is outside all cabinet boundaries")
|
|
|
|
# Check if feature is in wrong zone (only if has Group 1 field)
|
|
elif has_group1 and not in_correct_cabinet and feature_zone:
|
|
failure_counts['wrong_zone'] += 1
|
|
if failure_counts['wrong_zone'] <= 10:
|
|
errors.append(f"{shapefile_name}: Feature UID {uid} is in wrong cabinet boundary (expected Zone {feature_zone})")
|
|
|
|
if failure_counts['outside_all'] > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed outside all cabinet boundaries validation")
|
|
|
|
if failure_counts['wrong_zone'] > 10:
|
|
errors.append(f"{shapefile_name}: 10 or more features failed wrong cabinet boundary validation")
|
|
|
|
sf.close()
|
|
|
|
except Exception as e:
|
|
errors.append(f"{shapefile_name}: Error during spatial validation - {str(e)}")
|
|
|
|
cabinet_sf.close()
|
|
|
|
except Exception as e:
|
|
errors.append(f"Spatial validation error: {str(e)}")
|
|
|
|
return errors
|