🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1160 lines
44 KiB
Python
1160 lines
44 KiB
Python
"""
|
|
Verofy Shapefile Uploader
|
|
Reads shapefiles from temp folder and uploads them to Verofy API
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import requests
|
|
import geopandas as gpd
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
# Add verofy_api to path to reuse reference data
|
|
VEROFY_API_PATH = Path(__file__).parent.parent / "verofy_api"
|
|
sys.path.insert(0, str(VEROFY_API_PATH))
|
|
|
|
API_URL = "https://api.verofy.veronetworks.com/v1"
|
|
|
|
|
|
class VerofyUploader:
|
|
"""Handles uploading shapefiles to Verofy API"""
|
|
|
|
def __init__(self, email: str, password: str):
|
|
self.email = email
|
|
self.password = password
|
|
self.access_token = None
|
|
self.state_lookup = {}
|
|
self.segment_type_lookup = {}
|
|
self.access_point_type_lookup = {}
|
|
self.icon_type_lookup = {}
|
|
self.element_type_lookup = {}
|
|
self.element_status_lookup = {}
|
|
self.splice_type_lookup = {}
|
|
self.splice_status_lookup = {}
|
|
self.drop_type_lookup = {}
|
|
self.drop_status_lookup = {}
|
|
self._load_references()
|
|
|
|
def _load_references(self):
|
|
"""Load reference data from JSON files"""
|
|
try:
|
|
# Load state references
|
|
state_file = VEROFY_API_PATH / "State_references.json"
|
|
if state_file.exists():
|
|
with open(state_file, 'r') as f:
|
|
states = json.load(f)
|
|
self.state_lookup = {
|
|
item['short_name']: item['id']
|
|
for item in states.values()
|
|
if isinstance(item, dict) and 'short_name' in item
|
|
}
|
|
|
|
# Load segment type references
|
|
type_file = VEROFY_API_PATH / "MapSegmentType_references.json"
|
|
if type_file.exists():
|
|
with open(type_file, 'r') as f:
|
|
types = json.load(f)
|
|
self.segment_type_lookup = {
|
|
item['name']: item['id']
|
|
for item in types.values()
|
|
if isinstance(item, dict) and 'name' in item
|
|
}
|
|
|
|
# Load access point type references
|
|
ap_type_file = VEROFY_API_PATH / "MapAccessPointType_references.json"
|
|
if ap_type_file.exists():
|
|
with open(ap_type_file, 'r') as f:
|
|
ap_types = json.load(f)
|
|
self.access_point_type_lookup = {
|
|
item['name']: item['id']
|
|
for item in ap_types.values()
|
|
if isinstance(item, dict) and 'name' in item
|
|
}
|
|
|
|
# Load icon type references for sites
|
|
icon_type_file = VEROFY_API_PATH / "MapIconType_references.json"
|
|
if icon_type_file.exists():
|
|
with open(icon_type_file, 'r') as f:
|
|
icon_types = json.load(f)
|
|
self.icon_type_lookup = {
|
|
item['name']: item['id']
|
|
for item in icon_types.values()
|
|
if isinstance(item, dict) and 'name' in item
|
|
}
|
|
|
|
# Load network element type references
|
|
element_type_file = VEROFY_API_PATH / "MapElementType_references.json"
|
|
if element_type_file.exists():
|
|
with open(element_type_file, 'r') as f:
|
|
element_types = json.load(f)
|
|
self.element_type_lookup = {
|
|
item['name']: item['id']
|
|
for item in element_types.values()
|
|
if isinstance(item, dict) and 'name' in item
|
|
}
|
|
|
|
# Load network element status references
|
|
element_status_file = VEROFY_API_PATH / "MapElementStatus_references.json"
|
|
if element_status_file.exists():
|
|
with open(element_status_file, 'r') as f:
|
|
element_statuses = json.load(f)
|
|
self.element_status_lookup = {
|
|
item['name']: item['id']
|
|
for item in element_statuses.values()
|
|
if isinstance(item, dict) and 'name' in item
|
|
}
|
|
|
|
# Load splice type references
|
|
splice_type_file = VEROFY_API_PATH / "MapSpliceType_references.json"
|
|
if splice_type_file.exists():
|
|
with open(splice_type_file, 'r') as f:
|
|
splice_types = json.load(f)
|
|
self.splice_type_lookup = {
|
|
item['name']: item['id']
|
|
for item in splice_types.values()
|
|
if isinstance(item, dict) and 'name' in item
|
|
}
|
|
|
|
# Load splice status references
|
|
splice_status_file = VEROFY_API_PATH / "MapSpliceStatus_references.json"
|
|
if splice_status_file.exists():
|
|
with open(splice_status_file, 'r') as f:
|
|
splice_statuses = json.load(f)
|
|
self.splice_status_lookup = {
|
|
item['name']: item['id']
|
|
for item in splice_statuses.values()
|
|
if isinstance(item, dict) and 'name' in item
|
|
}
|
|
|
|
# Load drop type references
|
|
drop_type_file = VEROFY_API_PATH / "MapDropType_references.json"
|
|
if drop_type_file.exists():
|
|
with open(drop_type_file, 'r') as f:
|
|
drop_types = json.load(f)
|
|
self.drop_type_lookup = {
|
|
item['name']: item['id']
|
|
for item in drop_types.values()
|
|
if isinstance(item, dict) and 'name' in item
|
|
}
|
|
|
|
# Load drop status references
|
|
drop_status_file = VEROFY_API_PATH / "MapDropStatus_references.json"
|
|
if drop_status_file.exists():
|
|
with open(drop_status_file, 'r') as f:
|
|
drop_statuses = json.load(f)
|
|
self.drop_status_lookup = {
|
|
item['name']: item['id']
|
|
for item in drop_statuses.values()
|
|
if isinstance(item, dict) and 'name' in item
|
|
}
|
|
except Exception as e:
|
|
print(f"Warning: Could not load reference data: {e}")
|
|
|
|
def _calculate_line_metric(self, geometry) -> str:
|
|
"""
|
|
Calculate metric string for a line geometry (cables)
|
|
Format: "Mileage: {miles}; Footage: {feet}"
|
|
"""
|
|
if not geometry or geometry.is_empty:
|
|
return "Mileage: 0.0000; Footage: 0"
|
|
|
|
# Calculate length using approximate conversion for mid-latitudes (~40° N)
|
|
# At 40° latitude: 1 degree longitude ≈ 53 miles, 1 degree latitude ≈ 69 miles
|
|
coords = list(geometry.coords)
|
|
total_miles = 0
|
|
|
|
for i in range(len(coords) - 1):
|
|
lon1, lat1 = coords[i]
|
|
lon2, lat2 = coords[i + 1]
|
|
|
|
# Approximate distance
|
|
dlat = (lat2 - lat1) * 69
|
|
dlon = (lon2 - lon1) * 53 # At 40° latitude
|
|
segment_miles = (dlat**2 + dlon**2)**0.5
|
|
total_miles += segment_miles
|
|
|
|
total_feet = total_miles * 5280
|
|
return f"Mileage: {total_miles:.4f}; Footage: {total_feet:.0f}"
|
|
|
|
def _calculate_polygon_metric(self, geometry) -> str:
|
|
"""
|
|
Calculate metric string for a polygon geometry (boundaries, parcels)
|
|
Format: "Square Miles: {sq_miles}"
|
|
"""
|
|
if not geometry or geometry.is_empty:
|
|
return "Square Miles: 0.0000"
|
|
|
|
# Calculate area using approximate conversion for mid-latitudes (~40° N)
|
|
# At 40° latitude: 1 degree² ≈ 3,657 square miles
|
|
area_sq_degrees = geometry.area
|
|
area_sq_miles = area_sq_degrees * 3657
|
|
|
|
return f"Square Miles: {area_sq_miles:.4f}"
|
|
|
|
def authenticate(self) -> bool:
|
|
"""Get access token from Verofy API"""
|
|
try:
|
|
# Get refresh token
|
|
payload = {"email": self.email, "password": self.password}
|
|
response = requests.post(f"{API_URL}/login", json=payload)
|
|
|
|
if response.status_code != 200:
|
|
print(f"Login failed: {response.status_code}")
|
|
return False
|
|
|
|
refresh_token = response.json().get("refresh-token")
|
|
if not refresh_token:
|
|
print("No refresh token received")
|
|
return False
|
|
|
|
# Exchange for access token
|
|
headers = {"Authorization": f"Bearer {refresh_token}"}
|
|
token_response = requests.get(f"{API_URL}/refresh-token", headers=headers)
|
|
|
|
if token_response.status_code != 200:
|
|
print(f"Token refresh failed: {token_response.status_code}")
|
|
return False
|
|
|
|
self.access_token = token_response.json().get("access-token")
|
|
if not self.access_token:
|
|
print("No access token received")
|
|
return False
|
|
|
|
print("✅ Authenticated with Verofy API")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Authentication error: {e}")
|
|
return False
|
|
|
|
def upload_all_shapefiles(self, temp_dir: Path, map_id: int, limit: int = None) -> Dict:
|
|
"""
|
|
Upload all shapefiles from temp directory to Verofy
|
|
|
|
Args:
|
|
temp_dir: Path to directory containing shapefiles
|
|
map_id: Verofy map project ID
|
|
limit: Optional limit on number of records per shapefile (for testing)
|
|
|
|
Returns dict with success status and statistics
|
|
"""
|
|
if not self.authenticate():
|
|
return {"success": False, "error": "Authentication failed"}
|
|
|
|
results = {
|
|
"success": True,
|
|
"uploaded": {},
|
|
"errors": []
|
|
}
|
|
|
|
# Upload in order: poles first, then segments, then sites, etc.
|
|
upload_order = [
|
|
("poles.shp", self._upload_poles),
|
|
("segments.shp", self._upload_segments),
|
|
("sites.shp", self._upload_sites),
|
|
("access_points.shp", self._upload_access_points),
|
|
("network_elements.shp", self._upload_network_elements),
|
|
("splicing.shp", self._upload_splicing),
|
|
("cabinet_boundaries.shp", self._upload_cabinet_boundaries),
|
|
("cables.shp", self._upload_cables),
|
|
("parcels.shp", self._upload_parcels),
|
|
("permits.shp", self._upload_permits),
|
|
("drops.shp", self._upload_drops),
|
|
]
|
|
|
|
for shapefile_name, upload_func in upload_order:
|
|
shapefile_path = temp_dir / shapefile_name
|
|
if not shapefile_path.exists():
|
|
print(f"⚠️ Skipping {shapefile_name} (not found)")
|
|
continue
|
|
|
|
try:
|
|
print(f"\n📤 Uploading {shapefile_name}...")
|
|
count, errors = upload_func(shapefile_path, map_id, limit)
|
|
results["uploaded"][shapefile_name] = count
|
|
if errors:
|
|
results["errors"].extend(errors)
|
|
print(f"✅ Uploaded {count} records from {shapefile_name}")
|
|
except Exception as e:
|
|
error_msg = f"Error uploading {shapefile_name}: {str(e)}"
|
|
print(f"❌ {error_msg}")
|
|
results["errors"].append(error_msg)
|
|
results["success"] = False
|
|
|
|
return results
|
|
|
|
def _upload_poles(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
|
|
"""Upload poles from shapefile"""
|
|
gdf = gpd.read_file(shapefile_path)
|
|
success_count = 0
|
|
errors = []
|
|
|
|
# Apply limit if specified
|
|
if limit:
|
|
gdf = gdf.head(limit)
|
|
print(f" (Limited to first {limit} records for testing)")
|
|
|
|
for idx, row in gdf.iterrows():
|
|
try:
|
|
# Extract coordinates from geometry
|
|
lat = row.get('Latitude', row.geometry.y if row.geometry else None)
|
|
lon = row.get('Longitude', row.geometry.x if row.geometry else None)
|
|
|
|
if lat is None or lon is None:
|
|
continue
|
|
|
|
# Map shapefile fields to API fields
|
|
# Generate Pole ID (name field) from UID or index
|
|
pole_id = f'Pole-{idx}'
|
|
if 'UID' in row and row['UID'] is not None:
|
|
try:
|
|
pole_id = f'Pole-{int(row["UID"])}'
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
pole_data = {
|
|
"mapProjectId": int(map_id),
|
|
"name": pole_id, # This becomes "Pole ID" in Verofy
|
|
"latitude": str(lat),
|
|
"longitude": str(lon),
|
|
"mrStateId": 11 # Default state
|
|
}
|
|
|
|
# Map "Pole Tag" from shapefile to "tags" field (becomes "Pole Tag" in Verofy)
|
|
if 'Pole Tag' in row and row['Pole Tag']:
|
|
pole_data['tags'] = str(row['Pole Tag'])
|
|
|
|
# Add optional fields
|
|
if 'Pole Owner' in row and row['Pole Owner']:
|
|
pole_data['owner'] = str(row['Pole Owner'])
|
|
|
|
if 'Group 1' in row and row['Group 1']:
|
|
pole_data['group1'] = str(row['Group 1'])
|
|
|
|
if 'Group 2' in row and row['Group 2']:
|
|
pole_data['group2'] = str(row['Group 2'])
|
|
|
|
# Add Pole Height field (mapped from "Pole Heigh" typo in shapefile)
|
|
if 'Pole Heigh' in row and row['Pole Heigh']:
|
|
pole_data['poleHeight'] = str(row['Pole Heigh'])
|
|
|
|
# Create pole via API
|
|
if self._create_pole(pole_data):
|
|
success_count += 1
|
|
else:
|
|
errors.append(f"Failed to create pole at row {idx}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing pole row {idx}: {str(e)}")
|
|
|
|
return success_count, errors
|
|
|
|
def _upload_segments(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
|
|
"""Upload segments from shapefile"""
|
|
gdf = gpd.read_file(shapefile_path)
|
|
success_count = 0
|
|
errors = []
|
|
|
|
# Apply limit if specified
|
|
if limit:
|
|
gdf = gdf.head(limit)
|
|
print(f" (Limited to first {limit} records for testing)")
|
|
|
|
for idx, row in gdf.iterrows():
|
|
try:
|
|
# Extract line geometry
|
|
if not row.geometry or row.geometry.is_empty:
|
|
continue
|
|
|
|
# Convert LineString to coordinate array
|
|
coords = list(row.geometry.coords)
|
|
poly = [{"lat": coord[1], "lng": coord[0]} for coord in coords]
|
|
|
|
# Map segment type
|
|
segment_type = row.get('Type', 'Underground')
|
|
type_id = self.segment_type_lookup.get(segment_type, 5) # Default to Underground
|
|
|
|
# Map shapefile fields to API fields
|
|
segment_data = {
|
|
"mapProjectId": int(map_id),
|
|
"name": f"Segment-{idx}",
|
|
"typeId": type_id,
|
|
"statusId": 1, # Default to Working
|
|
"poly": poly,
|
|
"color": "#ff2600",
|
|
"styleWidth": 5,
|
|
"exclude": 0,
|
|
"custom": 0
|
|
}
|
|
|
|
# Add optional fields
|
|
if 'Group 1' in row and row['Group 1']:
|
|
segment_data['group1'] = str(row['Group 1'])
|
|
|
|
if 'Group 2' in row and row['Group 2']:
|
|
segment_data['group2'] = str(row['Group 2'])
|
|
|
|
if 'Conduit' in row and row['Conduit']:
|
|
segment_data['conduit'] = str(row['Conduit'])
|
|
|
|
# Create segment via API
|
|
if self._create_segment(segment_data):
|
|
success_count += 1
|
|
else:
|
|
errors.append(f"Failed to create segment at row {idx}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing segment row {idx}: {str(e)}")
|
|
|
|
return success_count, errors
|
|
|
|
def _upload_sites(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
|
|
"""Upload sites from shapefile"""
|
|
gdf = gpd.read_file(shapefile_path)
|
|
success_count = 0
|
|
errors = []
|
|
|
|
# Apply limit if specified
|
|
if limit:
|
|
gdf = gdf.head(limit)
|
|
print(f" (Limited to first {limit} records for testing)")
|
|
|
|
for idx, row in gdf.iterrows():
|
|
try:
|
|
# Extract coordinates
|
|
lat = row.get('Latitude', row.geometry.y if row.geometry else None)
|
|
lon = row.get('Longitude', row.geometry.x if row.geometry else None)
|
|
|
|
if lat is None or lon is None:
|
|
continue
|
|
|
|
# Generate site name
|
|
site_name = row.get('Address', f'Site-{idx}')
|
|
|
|
# Map Type field to iconTypeId using reference lookup
|
|
site_type = row.get('Type', None)
|
|
icon_type_id = 1 # Default to Hub Site
|
|
if site_type and site_type in self.icon_type_lookup:
|
|
icon_type_id = self.icon_type_lookup[site_type]
|
|
else:
|
|
print(f"⚠️ Warning: Unknown site type '{site_type}' at row {idx}, using default")
|
|
|
|
# Map shapefile fields to API fields
|
|
site_data = {
|
|
"mapProjectId": int(map_id),
|
|
"name": str(site_name),
|
|
"latitude": str(lat),
|
|
"longitude": str(lon),
|
|
"iconTypeId": icon_type_id,
|
|
"statusId": 1 # Default to status 1
|
|
}
|
|
|
|
# Map BEN# to unitCount if provided
|
|
if 'BEN#' in row and row['BEN#']:
|
|
try:
|
|
site_data['unitCount'] = int(row['BEN#'])
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if 'Group 1' in row and row['Group 1']:
|
|
site_data['group1'] = str(row['Group 1'])
|
|
|
|
if 'Group 2' in row and row['Group 2']:
|
|
site_data['group2'] = str(row['Group 2'])
|
|
|
|
if 'Address' in row and row['Address']:
|
|
site_data['address1'] = str(row['Address'])
|
|
|
|
if 'City' in row and row['City']:
|
|
site_data['city'] = str(row['City'])
|
|
|
|
if 'State' in row and row['State']:
|
|
state_code = str(row['State'])
|
|
if state_code in self.state_lookup:
|
|
site_data['stateId'] = self.state_lookup[state_code]
|
|
|
|
if 'Zip' in row and row['Zip']:
|
|
site_data['zip'] = str(row['Zip'])
|
|
|
|
# Create site via API
|
|
if self._create_site(site_data):
|
|
success_count += 1
|
|
else:
|
|
errors.append(f"Failed to create site at row {idx}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing site row {idx}: {str(e)}")
|
|
|
|
return success_count, errors
|
|
|
|
def _upload_access_points(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
|
|
"""Upload access points from shapefile"""
|
|
gdf = gpd.read_file(shapefile_path)
|
|
success_count = 0
|
|
errors = []
|
|
|
|
# Apply limit if specified
|
|
if limit:
|
|
gdf = gdf.head(limit)
|
|
print(f" (Limited to first {limit} records for testing)")
|
|
|
|
for idx, row in gdf.iterrows():
|
|
try:
|
|
# Extract coordinates
|
|
lat = row.get('Latitude', row.geometry.y if row.geometry else None)
|
|
lon = row.get('Longitude', row.geometry.x if row.geometry else None)
|
|
|
|
if lat is None or lon is None:
|
|
continue
|
|
|
|
# Map access point type using lookup
|
|
ap_type = row.get('Type', 'Handhole')
|
|
type_id = self.access_point_type_lookup.get(ap_type, 1) # Default to Handhole
|
|
|
|
# Map shapefile fields to API fields
|
|
ap_data = {
|
|
"mapProjectId": int(map_id),
|
|
"name": f"AP-{idx}",
|
|
"latitude": str(lat),
|
|
"longitude": str(lon),
|
|
"typeId": type_id,
|
|
"locked": 0 # Default to unlocked
|
|
}
|
|
|
|
# Add optional fields
|
|
if 'Group 1' in row and row['Group 1']:
|
|
ap_data['group1'] = str(row['Group 1'])
|
|
|
|
if 'Group 2' in row and row['Group 2']:
|
|
ap_data['group2'] = str(row['Group 2'])
|
|
|
|
# Create access point via API
|
|
if self._create_access_point(ap_data):
|
|
success_count += 1
|
|
else:
|
|
errors.append(f"Failed to create access point at row {idx}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing access point row {idx}: {str(e)}")
|
|
|
|
return success_count, errors
|
|
|
|
def _upload_network_elements(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
|
|
"""Upload network elements from shapefile"""
|
|
gdf = gpd.read_file(shapefile_path)
|
|
success_count = 0
|
|
errors = []
|
|
|
|
# Apply limit if specified
|
|
if limit:
|
|
gdf = gdf.head(limit)
|
|
print(f" (Limited to first {limit} records for testing)")
|
|
|
|
for idx, row in gdf.iterrows():
|
|
try:
|
|
# Extract coordinates
|
|
lat = row.get('Latitude', row.geometry.y if row.geometry else None)
|
|
lon = row.get('Longitude', row.geometry.x if row.geometry else None)
|
|
|
|
if lat is None or lon is None:
|
|
continue
|
|
|
|
# Map element type from shapefile to typeId
|
|
element_type = row.get('Type', 'Anchor') # Default to Anchor if not specified
|
|
type_id = self.element_type_lookup.get(element_type, 35) # Default to 35 (Anchor)
|
|
|
|
# Generate name from UID if available
|
|
element_name = f'NE-{idx}'
|
|
if 'UID' in row and row['UID'] is not None:
|
|
try:
|
|
element_name = f'E-{int(row["UID"])}'
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Map shapefile fields to API fields
|
|
ne_data = {
|
|
"mapProjectId": int(map_id),
|
|
"name": element_name,
|
|
"latitude": str(lat),
|
|
"longitude": str(lon),
|
|
"typeId": type_id, # Use typeId instead of type string
|
|
"statusId": 1, # Default to Planned
|
|
"locked": 0, # Default to unlocked
|
|
"custom": 0 # Default to not custom
|
|
}
|
|
|
|
# Add optional fields
|
|
if 'Group 1' in row and row['Group 1']:
|
|
ne_data['group1'] = str(row['Group 1'])
|
|
|
|
if 'Group 2' in row and row['Group 2']:
|
|
ne_data['group2'] = str(row['Group 2'])
|
|
|
|
# Create network element via API
|
|
if self._create_network_element(ne_data):
|
|
success_count += 1
|
|
else:
|
|
errors.append(f"Failed to create network element at row {idx}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing network element row {idx}: {str(e)}")
|
|
|
|
return success_count, errors
|
|
|
|
def _upload_splicing(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
|
|
"""Upload splicing points from shapefile"""
|
|
gdf = gpd.read_file(shapefile_path)
|
|
success_count = 0
|
|
errors = []
|
|
|
|
# Apply limit if specified
|
|
if limit:
|
|
gdf = gdf.head(limit)
|
|
print(f" (Limited to first {limit} records for testing)")
|
|
|
|
for idx, row in gdf.iterrows():
|
|
try:
|
|
# Extract coordinates
|
|
lat = row.get('Latitude', row.geometry.y if row.geometry else None)
|
|
lon = row.get('Longitude', row.geometry.x if row.geometry else None)
|
|
|
|
if lat is None or lon is None:
|
|
continue
|
|
|
|
# Map splice type from shapefile to typeId
|
|
splice_type = row.get('Type', 'Splice') # Default to Splice if not specified
|
|
type_id = self.splice_type_lookup.get(splice_type, 1) # Default to 1 (Splice)
|
|
|
|
# Generate aka from AKA field (preferred) or UID as fallback
|
|
# Note: "name" is auto-generated by API, use "aka" field instead
|
|
splice_aka = f'Splice-{idx}'
|
|
if 'AKA' in row and row['AKA']:
|
|
splice_aka = str(row['AKA'])
|
|
elif 'UID' in row and row['UID'] is not None:
|
|
try:
|
|
splice_aka = f'Splice-{int(row["UID"])}'
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Map shapefile fields to API fields
|
|
splicing_data = {
|
|
"mapProjectId": int(map_id),
|
|
"aka": splice_aka, # Use "aka" not "name" - name is auto-generated
|
|
"latitude": str(lat),
|
|
"longitude": str(lon),
|
|
"typeId": type_id, # Use typeId instead of type string
|
|
"statusId": 1, # Default to Planned
|
|
"locked": 0 # Default to unlocked
|
|
}
|
|
|
|
# Add optional fields
|
|
if 'Group 1' in row and row['Group 1']:
|
|
splicing_data['group1'] = str(row['Group 1'])
|
|
|
|
if 'Group 2' in row and row['Group 2']:
|
|
splicing_data['group2'] = str(row['Group 2'])
|
|
|
|
# Create splicing via API
|
|
if self._create_splicing(splicing_data):
|
|
success_count += 1
|
|
else:
|
|
errors.append(f"Failed to create splicing at row {idx}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing splicing row {idx}: {str(e)}")
|
|
|
|
return success_count, errors
|
|
|
|
def _upload_cabinet_boundaries(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
|
|
"""Upload cabinet boundaries to info tab"""
|
|
gdf = gpd.read_file(shapefile_path)
|
|
success_count = 0
|
|
errors = []
|
|
|
|
# Apply limit if specified
|
|
if limit:
|
|
gdf = gdf.head(limit)
|
|
print(f" (Limited to first {limit} records for testing)")
|
|
|
|
for idx, row in gdf.iterrows():
|
|
try:
|
|
# Extract polygon geometry
|
|
if not row.geometry or row.geometry.is_empty:
|
|
continue
|
|
|
|
# Convert Polygon to coordinate array - NOTE: polygons need double array [[...]]
|
|
if row.geometry.geom_type == 'Polygon':
|
|
coords = list(row.geometry.exterior.coords)
|
|
data = [[{"lat": coord[1], "lng": coord[0]} for coord in coords]] # Double-nested array
|
|
|
|
# Calculate metric for polygon
|
|
metric = self._calculate_polygon_metric(row.geometry)
|
|
|
|
# Map shapefile fields to API fields for info object
|
|
info_data = {
|
|
"mapProjectId": int(map_id),
|
|
"name": str(row.get('Name', f'Cabinet-Boundary-{idx}')),
|
|
"mapinfoobjecttypeId": 3, # 3 = Polygon
|
|
"data": data,
|
|
"color": "#ffffff",
|
|
"alpha": "0.40",
|
|
"metric": metric,
|
|
"objectgroup": None,
|
|
"objectgroup2": None
|
|
}
|
|
|
|
# Create info object via API
|
|
if self._create_info_object(info_data):
|
|
success_count += 1
|
|
else:
|
|
errors.append(f"Failed to create cabinet boundary at row {idx}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing cabinet boundary row {idx}: {str(e)}")
|
|
|
|
return success_count, errors
|
|
|
|
def _upload_cables(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
|
|
"""Upload cables to info tab"""
|
|
gdf = gpd.read_file(shapefile_path)
|
|
success_count = 0
|
|
errors = []
|
|
|
|
# Apply limit if specified
|
|
if limit:
|
|
gdf = gdf.head(limit)
|
|
print(f" (Limited to first {limit} records for testing)")
|
|
|
|
for idx, row in gdf.iterrows():
|
|
try:
|
|
# Extract line geometry
|
|
if not row.geometry or row.geometry.is_empty:
|
|
continue
|
|
|
|
# Convert LineString to coordinate array (single array for lines)
|
|
coords = list(row.geometry.coords)
|
|
data = [{"lat": coord[1], "lng": coord[0]} for coord in coords]
|
|
|
|
# Calculate metric for line
|
|
metric = self._calculate_line_metric(row.geometry)
|
|
|
|
# Map shapefile fields to API fields for info object
|
|
info_data = {
|
|
"mapProjectId": int(map_id),
|
|
"name": str(row.get('Name', f'Cable-{idx}')),
|
|
"mapinfoobjecttypeId": 2, # 2 = Line/Polyline
|
|
"data": data,
|
|
"color": "#ffffff",
|
|
"alpha": "1.00",
|
|
"metric": metric,
|
|
"objectgroup": None,
|
|
"objectgroup2": None
|
|
}
|
|
|
|
# Create info object via API
|
|
if self._create_info_object(info_data):
|
|
success_count += 1
|
|
else:
|
|
errors.append(f"Failed to create cable at row {idx}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing cable row {idx}: {str(e)}")
|
|
|
|
return success_count, errors
|
|
|
|
def _upload_parcels(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
|
|
"""Upload parcels to info tab"""
|
|
gdf = gpd.read_file(shapefile_path)
|
|
success_count = 0
|
|
errors = []
|
|
|
|
# Apply limit if specified
|
|
if limit:
|
|
gdf = gdf.head(limit)
|
|
print(f" (Limited to first {limit} records for testing)")
|
|
|
|
for idx, row in gdf.iterrows():
|
|
try:
|
|
# Extract polygon geometry
|
|
if not row.geometry or row.geometry.is_empty:
|
|
continue
|
|
|
|
# Convert Polygon to coordinate array - NOTE: polygons need double array [[...]]
|
|
if row.geometry.geom_type == 'Polygon':
|
|
coords = list(row.geometry.exterior.coords)
|
|
data = [[{"lat": coord[1], "lng": coord[0]} for coord in coords]] # Double-nested array
|
|
|
|
# Calculate metric for polygon
|
|
metric = self._calculate_polygon_metric(row.geometry)
|
|
|
|
# Map shapefile fields to API fields for info object
|
|
info_data = {
|
|
"mapProjectId": int(map_id),
|
|
"name": str(row.get('Name', f'Parcel-{idx}')),
|
|
"mapinfoobjecttypeId": 3, # 3 = Polygon
|
|
"data": data,
|
|
"color": "#ffffff",
|
|
"alpha": "0.40",
|
|
"metric": metric,
|
|
"objectgroup": None,
|
|
"objectgroup2": None
|
|
}
|
|
|
|
# Add optional fields (Group 1/2 map to objectgroup/objectgroup2)
|
|
if 'Group 1' in row and row['Group 1']:
|
|
info_data['objectgroup'] = str(row['Group 1'])
|
|
|
|
if 'Group 2' in row and row['Group 2']:
|
|
info_data['objectgroup2'] = str(row['Group 2'])
|
|
|
|
# Create info object via API
|
|
if self._create_info_object(info_data):
|
|
success_count += 1
|
|
else:
|
|
errors.append(f"Failed to create parcel at row {idx}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing parcel row {idx}: {str(e)}")
|
|
|
|
return success_count, errors
|
|
|
|
def _upload_permits(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
|
|
"""Upload permits from shapefile"""
|
|
import fiona
|
|
|
|
success_count = 0
|
|
errors = []
|
|
record_count = 0
|
|
|
|
try:
|
|
# Use fiona to read with better error tolerance
|
|
with fiona.open(shapefile_path) as src:
|
|
for idx, feature in enumerate(src):
|
|
# Apply limit if specified
|
|
if limit and idx >= limit:
|
|
break
|
|
|
|
record_count += 1
|
|
|
|
try:
|
|
# Extract geometry
|
|
geom = feature['geometry']
|
|
if not geom or geom['type'] != 'Polygon':
|
|
continue
|
|
|
|
# Check if polygon has enough coordinates
|
|
coords = geom['coordinates'][0] # Outer ring
|
|
if len(coords) < 4:
|
|
errors.append(f"Permit row {idx}: Invalid polygon (< 4 coordinates)")
|
|
continue
|
|
|
|
# Convert to lat/lng format - NOTE: poly must be wrapped in extra array
|
|
poly = [[{"lat": coord[1], "lng": coord[0]} for coord in coords]]
|
|
|
|
# Get properties from shapefile
|
|
props = feature['properties']
|
|
name = props.get('Name', f'Permit-{idx}')
|
|
group1 = props.get('Group 1', None)
|
|
|
|
# Map shapefile fields to API fields
|
|
permit_data = {
|
|
"mapProjectId": int(map_id),
|
|
"name": str(name),
|
|
"poly": poly,
|
|
"mappermitstatusId": 1, # Default to status 1
|
|
"mappermitentitytypeId": 6, # Default to entity type 6
|
|
"mappermitulrtypeId": 3, # Default to ULR type 3
|
|
"mappermitentitymeetId": 1, # Default to meet 1
|
|
"mappermitrequirementsId": 1 # Default to requirements 1
|
|
}
|
|
|
|
# Add permitgroup field (not group1) for Group 1 mapping
|
|
if group1:
|
|
permit_data['permitgroup'] = str(group1)
|
|
|
|
# Create permit via API
|
|
if self._create_permit(permit_data):
|
|
success_count += 1
|
|
else:
|
|
errors.append(f"Failed to create permit at row {idx}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing permit row {idx}: {str(e)}")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Error reading permits.shp: {e}")
|
|
return 0, [f"Permits shapefile error: {str(e)}"]
|
|
|
|
if record_count > 0 and limit:
|
|
print(f" (Limited to first {limit} records for testing)")
|
|
|
|
return success_count, errors
|
|
|
|
def _upload_drops(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
|
|
"""Upload drops from shapefile"""
|
|
gdf = gpd.read_file(shapefile_path)
|
|
success_count = 0
|
|
errors = []
|
|
|
|
# Apply limit if specified
|
|
if limit:
|
|
gdf = gdf.head(limit)
|
|
print(f" (Limited to first {limit} records for testing)")
|
|
|
|
for idx, row in gdf.iterrows():
|
|
try:
|
|
# Extract line geometry
|
|
if not row.geometry or row.geometry.is_empty:
|
|
continue
|
|
|
|
# Convert LineString to coordinate array
|
|
coords = list(row.geometry.coords)
|
|
poly = [{"lat": coord[1], "lng": coord[0]} for coord in coords]
|
|
|
|
# Map drop type from shapefile to typeId
|
|
drop_type = row.get('Type', 'Buried') # Default to Buried
|
|
type_id = self.drop_type_lookup.get(drop_type, 1) # Default to 1 (Buried)
|
|
|
|
# Map drop status if provided
|
|
status_id = 2 # Default to Planning
|
|
if 'Status' in row and row['Status']:
|
|
drop_status = row.get('Status')
|
|
if drop_status in self.drop_status_lookup:
|
|
status_id = self.drop_status_lookup[drop_status]
|
|
|
|
# Generate name from Name field or default
|
|
drop_name = row.get('Name', f'Drop-{idx}')
|
|
|
|
# Map shapefile fields to API fields
|
|
drop_data = {
|
|
"mapProjectId": int(map_id),
|
|
"name": str(drop_name),
|
|
"typeId": type_id,
|
|
"statusId": status_id,
|
|
"poly": poly,
|
|
"width": 1, # Default line width
|
|
"color": "#000" # Default black color
|
|
}
|
|
|
|
# Add optional fields
|
|
if 'Group 1' in row and row['Group 1']:
|
|
drop_data['group1'] = str(row['Group 1'])
|
|
|
|
if 'Group 2' in row and row['Group 2']:
|
|
drop_data['group2'] = str(row['Group 2'])
|
|
|
|
if 'Cabinet' in row and row['Cabinet']:
|
|
drop_data['cabinet'] = str(row['Cabinet'])
|
|
|
|
if 'Cabinet Port' in row and row['Cabinet Port']:
|
|
drop_data['cabinetPort'] = str(row['Cabinet Port'])
|
|
|
|
# Create drop via API
|
|
if self._create_drop(drop_data):
|
|
success_count += 1
|
|
else:
|
|
errors.append(f"Failed to create drop at row {idx}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing drop row {idx}: {str(e)}")
|
|
|
|
return success_count, errors
|
|
|
|
def _create_pole(self, pole_data: Dict) -> bool:
|
|
"""Create a pole via Verofy API"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{API_URL}/map-pole/create",
|
|
headers=headers,
|
|
json=pole_data
|
|
)
|
|
|
|
if response.status_code != 201:
|
|
print(f"❌ Pole API Error {response.status_code}: {response.text[:200]}")
|
|
return response.status_code == 201
|
|
|
|
def _create_segment(self, segment_data: Dict) -> bool:
|
|
"""Create a segment via Verofy API"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{API_URL}/map-segment/create",
|
|
headers=headers,
|
|
json=segment_data
|
|
)
|
|
|
|
return response.status_code == 201
|
|
|
|
def _create_site(self, site_data: Dict) -> bool:
|
|
"""Create a site via Verofy API"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Debug: print first site data to see what we're sending
|
|
import json as json_module
|
|
print(f"DEBUG: Sending site data: {json_module.dumps(site_data, indent=2)}")
|
|
|
|
response = requests.post(
|
|
f"{API_URL}/map-site/create",
|
|
headers=headers,
|
|
json=site_data
|
|
)
|
|
|
|
if response.status_code != 201:
|
|
print(f"❌ Site API Error {response.status_code}: {response.text[:200]}")
|
|
return response.status_code == 201
|
|
|
|
def _create_access_point(self, ap_data: Dict) -> bool:
|
|
"""Create an access point via Verofy API"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
print(f"DEBUG: Sending access point data: {json.dumps(ap_data, indent=2)}")
|
|
|
|
response = requests.post(
|
|
f"{API_URL}/map-access-point/create",
|
|
headers=headers,
|
|
json=ap_data
|
|
)
|
|
|
|
if response.status_code != 201:
|
|
print(f"❌ Access Point API Error {response.status_code}: {response.text[:200]}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def _create_network_element(self, ne_data: Dict) -> bool:
|
|
"""Create a network element via Verofy API"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
print(f"DEBUG: Sending network element data: {json.dumps(ne_data, indent=2)}")
|
|
|
|
response = requests.post(
|
|
f"{API_URL}/map-element/create",
|
|
headers=headers,
|
|
json=ne_data
|
|
)
|
|
|
|
if response.status_code != 201:
|
|
print(f"❌ Network Element API Error {response.status_code}: {response.text[:200]}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def _create_splicing(self, splicing_data: Dict) -> bool:
|
|
"""Create a splicing point via Verofy API"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
print(f"DEBUG: Sending splicing data: {json.dumps(splicing_data, indent=2)}")
|
|
|
|
response = requests.post(
|
|
f"{API_URL}/map-splice/create", # Corrected endpoint: map-splice not map-splicing
|
|
headers=headers,
|
|
json=splicing_data
|
|
)
|
|
|
|
if response.status_code != 201:
|
|
print(f"❌ Splicing API Error {response.status_code}: {response.text[:200]}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def _create_info_object(self, info_data: Dict) -> bool:
|
|
"""Create an info object via Verofy API (for info tab items like boundaries, cables, parcels)"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Send data as plain array - DO NOT JSON-encode it
|
|
print(f"DEBUG: Sending info object data: {json.dumps(info_data, indent=2)}")
|
|
|
|
response = requests.post(
|
|
f"{API_URL}/map-info-object/create",
|
|
headers=headers,
|
|
json=info_data
|
|
)
|
|
|
|
if response.status_code != 201:
|
|
print(f"❌ Info Object API Error {response.status_code}: {response.text[:200]}")
|
|
return response.status_code == 201
|
|
|
|
def _create_permit(self, permit_data: Dict) -> bool:
|
|
"""Create a permit via Verofy API"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
print(f"DEBUG: Sending permit data: {json.dumps(permit_data, indent=2)}")
|
|
|
|
response = requests.post(
|
|
f"{API_URL}/map-permit/create",
|
|
headers=headers,
|
|
json=permit_data
|
|
)
|
|
|
|
if response.status_code != 201:
|
|
print(f"❌ Permit API Error {response.status_code}: {response.text[:200]}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def _create_drop(self, drop_data: Dict) -> bool:
|
|
"""Create a drop via Verofy API"""
|
|
headers = {
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
print(f"DEBUG: Sending drop data: {json.dumps(drop_data, indent=2)}")
|
|
|
|
response = requests.post(
|
|
f"{API_URL}/map-drop/create",
|
|
headers=headers,
|
|
json=drop_data
|
|
)
|
|
|
|
if response.status_code != 201:
|
|
print(f"❌ Drop API Error {response.status_code}: {response.text[:200]}")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def upload_to_verofy(temp_dir: str, map_id: int, email: str, password: str, limit: int = None) -> Dict:
|
|
"""
|
|
Main function to upload all shapefiles to Verofy
|
|
|
|
Args:
|
|
temp_dir: Path to directory containing shapefiles
|
|
map_id: Verofy map project ID
|
|
email: Verofy user email
|
|
password: Verofy user password
|
|
limit: Optional limit on records per shapefile (for testing)
|
|
|
|
Returns:
|
|
Dict with success status and upload statistics
|
|
"""
|
|
uploader = VerofyUploader(email, password)
|
|
return uploader.upload_all_shapefiles(Path(temp_dir), map_id, limit)
|