dragndrop_hld/backend/verofy_uploader.py

1160 lines
44 KiB
Python
Raw Permalink Normal View History

"""
Verofy Shapefile Uploader
Reads shapefiles from temp folder and uploads them to Verofy API
"""
import os
import sys
import json
import requests
import geopandas as gpd
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Add verofy_api to path to reuse reference data
VEROFY_API_PATH = Path(__file__).parent.parent / "verofy_api"
sys.path.insert(0, str(VEROFY_API_PATH))
API_URL = "https://api.verofy.veronetworks.com/v1"
class VerofyUploader:
"""Handles uploading shapefiles to Verofy API"""
def __init__(self, email: str, password: str):
self.email = email
self.password = password
self.access_token = None
self.state_lookup = {}
self.segment_type_lookup = {}
self.access_point_type_lookup = {}
self.icon_type_lookup = {}
self.element_type_lookup = {}
self.element_status_lookup = {}
self.splice_type_lookup = {}
self.splice_status_lookup = {}
self.drop_type_lookup = {}
self.drop_status_lookup = {}
self._load_references()
def _load_references(self):
"""Load reference data from JSON files"""
try:
# Load state references
state_file = VEROFY_API_PATH / "State_references.json"
if state_file.exists():
with open(state_file, 'r') as f:
states = json.load(f)
self.state_lookup = {
item['short_name']: item['id']
for item in states.values()
if isinstance(item, dict) and 'short_name' in item
}
# Load segment type references
type_file = VEROFY_API_PATH / "MapSegmentType_references.json"
if type_file.exists():
with open(type_file, 'r') as f:
types = json.load(f)
self.segment_type_lookup = {
item['name']: item['id']
for item in types.values()
if isinstance(item, dict) and 'name' in item
}
# Load access point type references
ap_type_file = VEROFY_API_PATH / "MapAccessPointType_references.json"
if ap_type_file.exists():
with open(ap_type_file, 'r') as f:
ap_types = json.load(f)
self.access_point_type_lookup = {
item['name']: item['id']
for item in ap_types.values()
if isinstance(item, dict) and 'name' in item
}
# Load icon type references for sites
icon_type_file = VEROFY_API_PATH / "MapIconType_references.json"
if icon_type_file.exists():
with open(icon_type_file, 'r') as f:
icon_types = json.load(f)
self.icon_type_lookup = {
item['name']: item['id']
for item in icon_types.values()
if isinstance(item, dict) and 'name' in item
}
# Load network element type references
element_type_file = VEROFY_API_PATH / "MapElementType_references.json"
if element_type_file.exists():
with open(element_type_file, 'r') as f:
element_types = json.load(f)
self.element_type_lookup = {
item['name']: item['id']
for item in element_types.values()
if isinstance(item, dict) and 'name' in item
}
# Load network element status references
element_status_file = VEROFY_API_PATH / "MapElementStatus_references.json"
if element_status_file.exists():
with open(element_status_file, 'r') as f:
element_statuses = json.load(f)
self.element_status_lookup = {
item['name']: item['id']
for item in element_statuses.values()
if isinstance(item, dict) and 'name' in item
}
# Load splice type references
splice_type_file = VEROFY_API_PATH / "MapSpliceType_references.json"
if splice_type_file.exists():
with open(splice_type_file, 'r') as f:
splice_types = json.load(f)
self.splice_type_lookup = {
item['name']: item['id']
for item in splice_types.values()
if isinstance(item, dict) and 'name' in item
}
# Load splice status references
splice_status_file = VEROFY_API_PATH / "MapSpliceStatus_references.json"
if splice_status_file.exists():
with open(splice_status_file, 'r') as f:
splice_statuses = json.load(f)
self.splice_status_lookup = {
item['name']: item['id']
for item in splice_statuses.values()
if isinstance(item, dict) and 'name' in item
}
# Load drop type references
drop_type_file = VEROFY_API_PATH / "MapDropType_references.json"
if drop_type_file.exists():
with open(drop_type_file, 'r') as f:
drop_types = json.load(f)
self.drop_type_lookup = {
item['name']: item['id']
for item in drop_types.values()
if isinstance(item, dict) and 'name' in item
}
# Load drop status references
drop_status_file = VEROFY_API_PATH / "MapDropStatus_references.json"
if drop_status_file.exists():
with open(drop_status_file, 'r') as f:
drop_statuses = json.load(f)
self.drop_status_lookup = {
item['name']: item['id']
for item in drop_statuses.values()
if isinstance(item, dict) and 'name' in item
}
except Exception as e:
print(f"Warning: Could not load reference data: {e}")
def _calculate_line_metric(self, geometry) -> str:
"""
Calculate metric string for a line geometry (cables)
Format: "Mileage: {miles}; Footage: {feet}"
"""
if not geometry or geometry.is_empty:
return "Mileage: 0.0000; Footage: 0"
# Calculate length using approximate conversion for mid-latitudes (~40° N)
# At 40° latitude: 1 degree longitude ≈ 53 miles, 1 degree latitude ≈ 69 miles
coords = list(geometry.coords)
total_miles = 0
for i in range(len(coords) - 1):
lon1, lat1 = coords[i]
lon2, lat2 = coords[i + 1]
# Approximate distance
dlat = (lat2 - lat1) * 69
dlon = (lon2 - lon1) * 53 # At 40° latitude
segment_miles = (dlat**2 + dlon**2)**0.5
total_miles += segment_miles
total_feet = total_miles * 5280
return f"Mileage: {total_miles:.4f}; Footage: {total_feet:.0f}"
def _calculate_polygon_metric(self, geometry) -> str:
"""
Calculate metric string for a polygon geometry (boundaries, parcels)
Format: "Square Miles: {sq_miles}"
"""
if not geometry or geometry.is_empty:
return "Square Miles: 0.0000"
# Calculate area using approximate conversion for mid-latitudes (~40° N)
# At 40° latitude: 1 degree² ≈ 3,657 square miles
area_sq_degrees = geometry.area
area_sq_miles = area_sq_degrees * 3657
return f"Square Miles: {area_sq_miles:.4f}"
def authenticate(self) -> bool:
"""Get access token from Verofy API"""
try:
# Get refresh token
payload = {"email": self.email, "password": self.password}
response = requests.post(f"{API_URL}/login", json=payload)
if response.status_code != 200:
print(f"Login failed: {response.status_code}")
return False
refresh_token = response.json().get("refresh-token")
if not refresh_token:
print("No refresh token received")
return False
# Exchange for access token
headers = {"Authorization": f"Bearer {refresh_token}"}
token_response = requests.get(f"{API_URL}/refresh-token", headers=headers)
if token_response.status_code != 200:
print(f"Token refresh failed: {token_response.status_code}")
return False
self.access_token = token_response.json().get("access-token")
if not self.access_token:
print("No access token received")
return False
print("✅ Authenticated with Verofy API")
return True
except Exception as e:
print(f"Authentication error: {e}")
return False
def upload_all_shapefiles(self, temp_dir: Path, map_id: int, limit: int = None) -> Dict:
"""
Upload all shapefiles from temp directory to Verofy
Args:
temp_dir: Path to directory containing shapefiles
map_id: Verofy map project ID
limit: Optional limit on number of records per shapefile (for testing)
Returns dict with success status and statistics
"""
if not self.authenticate():
return {"success": False, "error": "Authentication failed"}
results = {
"success": True,
"uploaded": {},
"errors": []
}
# Upload in order: poles first, then segments, then sites, etc.
upload_order = [
("poles.shp", self._upload_poles),
("segments.shp", self._upload_segments),
("sites.shp", self._upload_sites),
("access_points.shp", self._upload_access_points),
("network_elements.shp", self._upload_network_elements),
("splicing.shp", self._upload_splicing),
("cabinet_boundaries.shp", self._upload_cabinet_boundaries),
("cables.shp", self._upload_cables),
("parcels.shp", self._upload_parcels),
("permits.shp", self._upload_permits),
("drops.shp", self._upload_drops),
]
for shapefile_name, upload_func in upload_order:
shapefile_path = temp_dir / shapefile_name
if not shapefile_path.exists():
print(f"⚠️ Skipping {shapefile_name} (not found)")
continue
try:
print(f"\n📤 Uploading {shapefile_name}...")
count, errors = upload_func(shapefile_path, map_id, limit)
results["uploaded"][shapefile_name] = count
if errors:
results["errors"].extend(errors)
print(f"✅ Uploaded {count} records from {shapefile_name}")
except Exception as e:
error_msg = f"Error uploading {shapefile_name}: {str(e)}"
print(f"{error_msg}")
results["errors"].append(error_msg)
results["success"] = False
return results
def _upload_poles(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
"""Upload poles from shapefile"""
gdf = gpd.read_file(shapefile_path)
success_count = 0
errors = []
# Apply limit if specified
if limit:
gdf = gdf.head(limit)
print(f" (Limited to first {limit} records for testing)")
for idx, row in gdf.iterrows():
try:
# Extract coordinates from geometry
lat = row.get('Latitude', row.geometry.y if row.geometry else None)
lon = row.get('Longitude', row.geometry.x if row.geometry else None)
if lat is None or lon is None:
continue
# Map shapefile fields to API fields
# Generate Pole ID (name field) from UID or index
pole_id = f'Pole-{idx}'
if 'UID' in row and row['UID'] is not None:
try:
pole_id = f'Pole-{int(row["UID"])}'
except (ValueError, TypeError):
pass
pole_data = {
"mapProjectId": int(map_id),
"name": pole_id, # This becomes "Pole ID" in Verofy
"latitude": str(lat),
"longitude": str(lon),
"mrStateId": 11 # Default state
}
# Map "Pole Tag" from shapefile to "tags" field (becomes "Pole Tag" in Verofy)
if 'Pole Tag' in row and row['Pole Tag']:
pole_data['tags'] = str(row['Pole Tag'])
# Add optional fields
if 'Pole Owner' in row and row['Pole Owner']:
pole_data['owner'] = str(row['Pole Owner'])
if 'Group 1' in row and row['Group 1']:
pole_data['group1'] = str(row['Group 1'])
if 'Group 2' in row and row['Group 2']:
pole_data['group2'] = str(row['Group 2'])
# Add Pole Height field (mapped from "Pole Heigh" typo in shapefile)
if 'Pole Heigh' in row and row['Pole Heigh']:
pole_data['poleHeight'] = str(row['Pole Heigh'])
# Create pole via API
if self._create_pole(pole_data):
success_count += 1
else:
errors.append(f"Failed to create pole at row {idx}")
except Exception as e:
errors.append(f"Error processing pole row {idx}: {str(e)}")
return success_count, errors
def _upload_segments(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
"""Upload segments from shapefile"""
gdf = gpd.read_file(shapefile_path)
success_count = 0
errors = []
# Apply limit if specified
if limit:
gdf = gdf.head(limit)
print(f" (Limited to first {limit} records for testing)")
for idx, row in gdf.iterrows():
try:
# Extract line geometry
if not row.geometry or row.geometry.is_empty:
continue
# Convert LineString to coordinate array
coords = list(row.geometry.coords)
poly = [{"lat": coord[1], "lng": coord[0]} for coord in coords]
# Map segment type
segment_type = row.get('Type', 'Underground')
type_id = self.segment_type_lookup.get(segment_type, 5) # Default to Underground
# Map shapefile fields to API fields
segment_data = {
"mapProjectId": int(map_id),
"name": f"Segment-{idx}",
"typeId": type_id,
"statusId": 1, # Default to Working
"poly": poly,
"color": "#ff2600",
"styleWidth": 5,
"exclude": 0,
"custom": 0
}
# Add optional fields
if 'Group 1' in row and row['Group 1']:
segment_data['group1'] = str(row['Group 1'])
if 'Group 2' in row and row['Group 2']:
segment_data['group2'] = str(row['Group 2'])
if 'Conduit' in row and row['Conduit']:
segment_data['conduit'] = str(row['Conduit'])
# Create segment via API
if self._create_segment(segment_data):
success_count += 1
else:
errors.append(f"Failed to create segment at row {idx}")
except Exception as e:
errors.append(f"Error processing segment row {idx}: {str(e)}")
return success_count, errors
def _upload_sites(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
"""Upload sites from shapefile"""
gdf = gpd.read_file(shapefile_path)
success_count = 0
errors = []
# Apply limit if specified
if limit:
gdf = gdf.head(limit)
print(f" (Limited to first {limit} records for testing)")
for idx, row in gdf.iterrows():
try:
# Extract coordinates
lat = row.get('Latitude', row.geometry.y if row.geometry else None)
lon = row.get('Longitude', row.geometry.x if row.geometry else None)
if lat is None or lon is None:
continue
# Generate site name
site_name = row.get('Address', f'Site-{idx}')
# Map Type field to iconTypeId using reference lookup
site_type = row.get('Type', None)
icon_type_id = 1 # Default to Hub Site
if site_type and site_type in self.icon_type_lookup:
icon_type_id = self.icon_type_lookup[site_type]
else:
print(f"⚠️ Warning: Unknown site type '{site_type}' at row {idx}, using default")
# Map shapefile fields to API fields
site_data = {
"mapProjectId": int(map_id),
"name": str(site_name),
"latitude": str(lat),
"longitude": str(lon),
"iconTypeId": icon_type_id,
"statusId": 1 # Default to status 1
}
# Map BEN# to unitCount if provided
if 'BEN#' in row and row['BEN#']:
try:
site_data['unitCount'] = int(row['BEN#'])
except (ValueError, TypeError):
pass
if 'Group 1' in row and row['Group 1']:
site_data['group1'] = str(row['Group 1'])
if 'Group 2' in row and row['Group 2']:
site_data['group2'] = str(row['Group 2'])
if 'Address' in row and row['Address']:
site_data['address1'] = str(row['Address'])
if 'City' in row and row['City']:
site_data['city'] = str(row['City'])
if 'State' in row and row['State']:
state_code = str(row['State'])
if state_code in self.state_lookup:
site_data['stateId'] = self.state_lookup[state_code]
if 'Zip' in row and row['Zip']:
site_data['zip'] = str(row['Zip'])
# Create site via API
if self._create_site(site_data):
success_count += 1
else:
errors.append(f"Failed to create site at row {idx}")
except Exception as e:
errors.append(f"Error processing site row {idx}: {str(e)}")
return success_count, errors
def _upload_access_points(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
"""Upload access points from shapefile"""
gdf = gpd.read_file(shapefile_path)
success_count = 0
errors = []
# Apply limit if specified
if limit:
gdf = gdf.head(limit)
print(f" (Limited to first {limit} records for testing)")
for idx, row in gdf.iterrows():
try:
# Extract coordinates
lat = row.get('Latitude', row.geometry.y if row.geometry else None)
lon = row.get('Longitude', row.geometry.x if row.geometry else None)
if lat is None or lon is None:
continue
# Map access point type using lookup
ap_type = row.get('Type', 'Handhole')
type_id = self.access_point_type_lookup.get(ap_type, 1) # Default to Handhole
# Map shapefile fields to API fields
ap_data = {
"mapProjectId": int(map_id),
"name": f"AP-{idx}",
"latitude": str(lat),
"longitude": str(lon),
"typeId": type_id,
"locked": 0 # Default to unlocked
}
# Add optional fields
if 'Group 1' in row and row['Group 1']:
ap_data['group1'] = str(row['Group 1'])
if 'Group 2' in row and row['Group 2']:
ap_data['group2'] = str(row['Group 2'])
# Create access point via API
if self._create_access_point(ap_data):
success_count += 1
else:
errors.append(f"Failed to create access point at row {idx}")
except Exception as e:
errors.append(f"Error processing access point row {idx}: {str(e)}")
return success_count, errors
def _upload_network_elements(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
"""Upload network elements from shapefile"""
gdf = gpd.read_file(shapefile_path)
success_count = 0
errors = []
# Apply limit if specified
if limit:
gdf = gdf.head(limit)
print(f" (Limited to first {limit} records for testing)")
for idx, row in gdf.iterrows():
try:
# Extract coordinates
lat = row.get('Latitude', row.geometry.y if row.geometry else None)
lon = row.get('Longitude', row.geometry.x if row.geometry else None)
if lat is None or lon is None:
continue
# Map element type from shapefile to typeId
element_type = row.get('Type', 'Anchor') # Default to Anchor if not specified
type_id = self.element_type_lookup.get(element_type, 35) # Default to 35 (Anchor)
# Generate name from UID if available
element_name = f'NE-{idx}'
if 'UID' in row and row['UID'] is not None:
try:
element_name = f'E-{int(row["UID"])}'
except (ValueError, TypeError):
pass
# Map shapefile fields to API fields
ne_data = {
"mapProjectId": int(map_id),
"name": element_name,
"latitude": str(lat),
"longitude": str(lon),
"typeId": type_id, # Use typeId instead of type string
"statusId": 1, # Default to Planned
"locked": 0, # Default to unlocked
"custom": 0 # Default to not custom
}
# Add optional fields
if 'Group 1' in row and row['Group 1']:
ne_data['group1'] = str(row['Group 1'])
if 'Group 2' in row and row['Group 2']:
ne_data['group2'] = str(row['Group 2'])
# Create network element via API
if self._create_network_element(ne_data):
success_count += 1
else:
errors.append(f"Failed to create network element at row {idx}")
except Exception as e:
errors.append(f"Error processing network element row {idx}: {str(e)}")
return success_count, errors
def _upload_splicing(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
"""Upload splicing points from shapefile"""
gdf = gpd.read_file(shapefile_path)
success_count = 0
errors = []
# Apply limit if specified
if limit:
gdf = gdf.head(limit)
print(f" (Limited to first {limit} records for testing)")
for idx, row in gdf.iterrows():
try:
# Extract coordinates
lat = row.get('Latitude', row.geometry.y if row.geometry else None)
lon = row.get('Longitude', row.geometry.x if row.geometry else None)
if lat is None or lon is None:
continue
# Map splice type from shapefile to typeId
splice_type = row.get('Type', 'Splice') # Default to Splice if not specified
type_id = self.splice_type_lookup.get(splice_type, 1) # Default to 1 (Splice)
# Generate aka from AKA field (preferred) or UID as fallback
# Note: "name" is auto-generated by API, use "aka" field instead
splice_aka = f'Splice-{idx}'
if 'AKA' in row and row['AKA']:
splice_aka = str(row['AKA'])
elif 'UID' in row and row['UID'] is not None:
try:
splice_aka = f'Splice-{int(row["UID"])}'
except (ValueError, TypeError):
pass
# Map shapefile fields to API fields
splicing_data = {
"mapProjectId": int(map_id),
"aka": splice_aka, # Use "aka" not "name" - name is auto-generated
"latitude": str(lat),
"longitude": str(lon),
"typeId": type_id, # Use typeId instead of type string
"statusId": 1, # Default to Planned
"locked": 0 # Default to unlocked
}
# Add optional fields
if 'Group 1' in row and row['Group 1']:
splicing_data['group1'] = str(row['Group 1'])
if 'Group 2' in row and row['Group 2']:
splicing_data['group2'] = str(row['Group 2'])
# Create splicing via API
if self._create_splicing(splicing_data):
success_count += 1
else:
errors.append(f"Failed to create splicing at row {idx}")
except Exception as e:
errors.append(f"Error processing splicing row {idx}: {str(e)}")
return success_count, errors
def _upload_cabinet_boundaries(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
"""Upload cabinet boundaries to info tab"""
gdf = gpd.read_file(shapefile_path)
success_count = 0
errors = []
# Apply limit if specified
if limit:
gdf = gdf.head(limit)
print(f" (Limited to first {limit} records for testing)")
for idx, row in gdf.iterrows():
try:
# Extract polygon geometry
if not row.geometry or row.geometry.is_empty:
continue
# Convert Polygon to coordinate array - NOTE: polygons need double array [[...]]
if row.geometry.geom_type == 'Polygon':
coords = list(row.geometry.exterior.coords)
data = [[{"lat": coord[1], "lng": coord[0]} for coord in coords]] # Double-nested array
# Calculate metric for polygon
metric = self._calculate_polygon_metric(row.geometry)
# Map shapefile fields to API fields for info object
info_data = {
"mapProjectId": int(map_id),
"name": str(row.get('Name', f'Cabinet-Boundary-{idx}')),
"mapinfoobjecttypeId": 3, # 3 = Polygon
"data": data,
"color": "#ffffff",
"alpha": "0.40",
"metric": metric,
"objectgroup": None,
"objectgroup2": None
}
# Create info object via API
if self._create_info_object(info_data):
success_count += 1
else:
errors.append(f"Failed to create cabinet boundary at row {idx}")
except Exception as e:
errors.append(f"Error processing cabinet boundary row {idx}: {str(e)}")
return success_count, errors
def _upload_cables(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
"""Upload cables to info tab"""
gdf = gpd.read_file(shapefile_path)
success_count = 0
errors = []
# Apply limit if specified
if limit:
gdf = gdf.head(limit)
print(f" (Limited to first {limit} records for testing)")
for idx, row in gdf.iterrows():
try:
# Extract line geometry
if not row.geometry or row.geometry.is_empty:
continue
# Convert LineString to coordinate array (single array for lines)
coords = list(row.geometry.coords)
data = [{"lat": coord[1], "lng": coord[0]} for coord in coords]
# Calculate metric for line
metric = self._calculate_line_metric(row.geometry)
# Map shapefile fields to API fields for info object
info_data = {
"mapProjectId": int(map_id),
"name": str(row.get('Name', f'Cable-{idx}')),
"mapinfoobjecttypeId": 2, # 2 = Line/Polyline
"data": data,
"color": "#ffffff",
"alpha": "1.00",
"metric": metric,
"objectgroup": None,
"objectgroup2": None
}
# Create info object via API
if self._create_info_object(info_data):
success_count += 1
else:
errors.append(f"Failed to create cable at row {idx}")
except Exception as e:
errors.append(f"Error processing cable row {idx}: {str(e)}")
return success_count, errors
def _upload_parcels(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
"""Upload parcels to info tab"""
gdf = gpd.read_file(shapefile_path)
success_count = 0
errors = []
# Apply limit if specified
if limit:
gdf = gdf.head(limit)
print(f" (Limited to first {limit} records for testing)")
for idx, row in gdf.iterrows():
try:
# Extract polygon geometry
if not row.geometry or row.geometry.is_empty:
continue
# Convert Polygon to coordinate array - NOTE: polygons need double array [[...]]
if row.geometry.geom_type == 'Polygon':
coords = list(row.geometry.exterior.coords)
data = [[{"lat": coord[1], "lng": coord[0]} for coord in coords]] # Double-nested array
# Calculate metric for polygon
metric = self._calculate_polygon_metric(row.geometry)
# Map shapefile fields to API fields for info object
info_data = {
"mapProjectId": int(map_id),
"name": str(row.get('Name', f'Parcel-{idx}')),
"mapinfoobjecttypeId": 3, # 3 = Polygon
"data": data,
"color": "#ffffff",
"alpha": "0.40",
"metric": metric,
"objectgroup": None,
"objectgroup2": None
}
# Add optional fields (Group 1/2 map to objectgroup/objectgroup2)
if 'Group 1' in row and row['Group 1']:
info_data['objectgroup'] = str(row['Group 1'])
if 'Group 2' in row and row['Group 2']:
info_data['objectgroup2'] = str(row['Group 2'])
# Create info object via API
if self._create_info_object(info_data):
success_count += 1
else:
errors.append(f"Failed to create parcel at row {idx}")
except Exception as e:
errors.append(f"Error processing parcel row {idx}: {str(e)}")
return success_count, errors
def _upload_permits(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
"""Upload permits from shapefile"""
import fiona
success_count = 0
errors = []
record_count = 0
try:
# Use fiona to read with better error tolerance
with fiona.open(shapefile_path) as src:
for idx, feature in enumerate(src):
# Apply limit if specified
if limit and idx >= limit:
break
record_count += 1
try:
# Extract geometry
geom = feature['geometry']
if not geom or geom['type'] != 'Polygon':
continue
# Check if polygon has enough coordinates
coords = geom['coordinates'][0] # Outer ring
if len(coords) < 4:
errors.append(f"Permit row {idx}: Invalid polygon (< 4 coordinates)")
continue
# Convert to lat/lng format - NOTE: poly must be wrapped in extra array
poly = [[{"lat": coord[1], "lng": coord[0]} for coord in coords]]
# Get properties from shapefile
props = feature['properties']
name = props.get('Name', f'Permit-{idx}')
group1 = props.get('Group 1', None)
# Map shapefile fields to API fields
permit_data = {
"mapProjectId": int(map_id),
"name": str(name),
"poly": poly,
"mappermitstatusId": 1, # Default to status 1
"mappermitentitytypeId": 6, # Default to entity type 6
"mappermitulrtypeId": 3, # Default to ULR type 3
"mappermitentitymeetId": 1, # Default to meet 1
"mappermitrequirementsId": 1 # Default to requirements 1
}
# Add permitgroup field (not group1) for Group 1 mapping
if group1:
permit_data['permitgroup'] = str(group1)
# Create permit via API
if self._create_permit(permit_data):
success_count += 1
else:
errors.append(f"Failed to create permit at row {idx}")
except Exception as e:
errors.append(f"Error processing permit row {idx}: {str(e)}")
except Exception as e:
print(f"⚠️ Error reading permits.shp: {e}")
return 0, [f"Permits shapefile error: {str(e)}"]
if record_count > 0 and limit:
print(f" (Limited to first {limit} records for testing)")
return success_count, errors
def _upload_drops(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]:
"""Upload drops from shapefile"""
gdf = gpd.read_file(shapefile_path)
success_count = 0
errors = []
# Apply limit if specified
if limit:
gdf = gdf.head(limit)
print(f" (Limited to first {limit} records for testing)")
for idx, row in gdf.iterrows():
try:
# Extract line geometry
if not row.geometry or row.geometry.is_empty:
continue
# Convert LineString to coordinate array
coords = list(row.geometry.coords)
poly = [{"lat": coord[1], "lng": coord[0]} for coord in coords]
# Map drop type from shapefile to typeId
drop_type = row.get('Type', 'Buried') # Default to Buried
type_id = self.drop_type_lookup.get(drop_type, 1) # Default to 1 (Buried)
# Map drop status if provided
status_id = 2 # Default to Planning
if 'Status' in row and row['Status']:
drop_status = row.get('Status')
if drop_status in self.drop_status_lookup:
status_id = self.drop_status_lookup[drop_status]
# Generate name from Name field or default
drop_name = row.get('Name', f'Drop-{idx}')
# Map shapefile fields to API fields
drop_data = {
"mapProjectId": int(map_id),
"name": str(drop_name),
"typeId": type_id,
"statusId": status_id,
"poly": poly,
"width": 1, # Default line width
"color": "#000" # Default black color
}
# Add optional fields
if 'Group 1' in row and row['Group 1']:
drop_data['group1'] = str(row['Group 1'])
if 'Group 2' in row and row['Group 2']:
drop_data['group2'] = str(row['Group 2'])
if 'Cabinet' in row and row['Cabinet']:
drop_data['cabinet'] = str(row['Cabinet'])
if 'Cabinet Port' in row and row['Cabinet Port']:
drop_data['cabinetPort'] = str(row['Cabinet Port'])
# Create drop via API
if self._create_drop(drop_data):
success_count += 1
else:
errors.append(f"Failed to create drop at row {idx}")
except Exception as e:
errors.append(f"Error processing drop row {idx}: {str(e)}")
return success_count, errors
def _create_pole(self, pole_data: Dict) -> bool:
"""Create a pole via Verofy API"""
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
response = requests.post(
f"{API_URL}/map-pole/create",
headers=headers,
json=pole_data
)
if response.status_code != 201:
print(f"❌ Pole API Error {response.status_code}: {response.text[:200]}")
return response.status_code == 201
def _create_segment(self, segment_data: Dict) -> bool:
"""Create a segment via Verofy API"""
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
response = requests.post(
f"{API_URL}/map-segment/create",
headers=headers,
json=segment_data
)
return response.status_code == 201
def _create_site(self, site_data: Dict) -> bool:
"""Create a site via Verofy API"""
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
# Debug: print first site data to see what we're sending
import json as json_module
print(f"DEBUG: Sending site data: {json_module.dumps(site_data, indent=2)}")
response = requests.post(
f"{API_URL}/map-site/create",
headers=headers,
json=site_data
)
if response.status_code != 201:
print(f"❌ Site API Error {response.status_code}: {response.text[:200]}")
return response.status_code == 201
def _create_access_point(self, ap_data: Dict) -> bool:
"""Create an access point via Verofy API"""
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
print(f"DEBUG: Sending access point data: {json.dumps(ap_data, indent=2)}")
response = requests.post(
f"{API_URL}/map-access-point/create",
headers=headers,
json=ap_data
)
if response.status_code != 201:
print(f"❌ Access Point API Error {response.status_code}: {response.text[:200]}")
return False
return True
def _create_network_element(self, ne_data: Dict) -> bool:
"""Create a network element via Verofy API"""
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
print(f"DEBUG: Sending network element data: {json.dumps(ne_data, indent=2)}")
response = requests.post(
f"{API_URL}/map-element/create",
headers=headers,
json=ne_data
)
if response.status_code != 201:
print(f"❌ Network Element API Error {response.status_code}: {response.text[:200]}")
return False
return True
def _create_splicing(self, splicing_data: Dict) -> bool:
"""Create a splicing point via Verofy API"""
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
print(f"DEBUG: Sending splicing data: {json.dumps(splicing_data, indent=2)}")
response = requests.post(
f"{API_URL}/map-splice/create", # Corrected endpoint: map-splice not map-splicing
headers=headers,
json=splicing_data
)
if response.status_code != 201:
print(f"❌ Splicing API Error {response.status_code}: {response.text[:200]}")
return False
return True
def _create_info_object(self, info_data: Dict) -> bool:
"""Create an info object via Verofy API (for info tab items like boundaries, cables, parcels)"""
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
# Send data as plain array - DO NOT JSON-encode it
print(f"DEBUG: Sending info object data: {json.dumps(info_data, indent=2)}")
response = requests.post(
f"{API_URL}/map-info-object/create",
headers=headers,
json=info_data
)
if response.status_code != 201:
print(f"❌ Info Object API Error {response.status_code}: {response.text[:200]}")
return response.status_code == 201
def _create_permit(self, permit_data: Dict) -> bool:
"""Create a permit via Verofy API"""
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
print(f"DEBUG: Sending permit data: {json.dumps(permit_data, indent=2)}")
response = requests.post(
f"{API_URL}/map-permit/create",
headers=headers,
json=permit_data
)
if response.status_code != 201:
print(f"❌ Permit API Error {response.status_code}: {response.text[:200]}")
return False
return True
def _create_drop(self, drop_data: Dict) -> bool:
"""Create a drop via Verofy API"""
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
print(f"DEBUG: Sending drop data: {json.dumps(drop_data, indent=2)}")
response = requests.post(
f"{API_URL}/map-drop/create",
headers=headers,
json=drop_data
)
if response.status_code != 201:
print(f"❌ Drop API Error {response.status_code}: {response.text[:200]}")
return False
return True
def upload_to_verofy(temp_dir: str, map_id: int, email: str, password: str, limit: int = None) -> Dict:
"""
Main function to upload all shapefiles to Verofy
Args:
temp_dir: Path to directory containing shapefiles
map_id: Verofy map project ID
email: Verofy user email
password: Verofy user password
limit: Optional limit on records per shapefile (for testing)
Returns:
Dict with success status and upload statistics
"""
uploader = VerofyUploader(email, password)
return uploader.upload_all_shapefiles(Path(temp_dir), map_id, limit)