""" Verofy Shapefile Uploader Reads shapefiles from temp folder and uploads them to Verofy API """ import os import sys import json import requests import geopandas as gpd from pathlib import Path from typing import Dict, List, Optional, Tuple # Add verofy_api to path to reuse reference data VEROFY_API_PATH = Path(__file__).parent.parent / "verofy_api" sys.path.insert(0, str(VEROFY_API_PATH)) API_URL = "https://api.verofy.veronetworks.com/v1" class VerofyUploader: """Handles uploading shapefiles to Verofy API""" def __init__(self, email: str, password: str): self.email = email self.password = password self.access_token = None self.state_lookup = {} self.segment_type_lookup = {} self.access_point_type_lookup = {} self.icon_type_lookup = {} self.element_type_lookup = {} self.element_status_lookup = {} self.splice_type_lookup = {} self.splice_status_lookup = {} self.drop_type_lookup = {} self.drop_status_lookup = {} self._load_references() def _load_references(self): """Load reference data from JSON files""" try: # Load state references state_file = VEROFY_API_PATH / "State_references.json" if state_file.exists(): with open(state_file, 'r') as f: states = json.load(f) self.state_lookup = { item['short_name']: item['id'] for item in states.values() if isinstance(item, dict) and 'short_name' in item } # Load segment type references type_file = VEROFY_API_PATH / "MapSegmentType_references.json" if type_file.exists(): with open(type_file, 'r') as f: types = json.load(f) self.segment_type_lookup = { item['name']: item['id'] for item in types.values() if isinstance(item, dict) and 'name' in item } # Load access point type references ap_type_file = VEROFY_API_PATH / "MapAccessPointType_references.json" if ap_type_file.exists(): with open(ap_type_file, 'r') as f: ap_types = json.load(f) self.access_point_type_lookup = { item['name']: item['id'] for item in ap_types.values() if isinstance(item, dict) and 'name' in item } # Load icon type references for sites icon_type_file = VEROFY_API_PATH / "MapIconType_references.json" if icon_type_file.exists(): with open(icon_type_file, 'r') as f: icon_types = json.load(f) self.icon_type_lookup = { item['name']: item['id'] for item in icon_types.values() if isinstance(item, dict) and 'name' in item } # Load network element type references element_type_file = VEROFY_API_PATH / "MapElementType_references.json" if element_type_file.exists(): with open(element_type_file, 'r') as f: element_types = json.load(f) self.element_type_lookup = { item['name']: item['id'] for item in element_types.values() if isinstance(item, dict) and 'name' in item } # Load network element status references element_status_file = VEROFY_API_PATH / "MapElementStatus_references.json" if element_status_file.exists(): with open(element_status_file, 'r') as f: element_statuses = json.load(f) self.element_status_lookup = { item['name']: item['id'] for item in element_statuses.values() if isinstance(item, dict) and 'name' in item } # Load splice type references splice_type_file = VEROFY_API_PATH / "MapSpliceType_references.json" if splice_type_file.exists(): with open(splice_type_file, 'r') as f: splice_types = json.load(f) self.splice_type_lookup = { item['name']: item['id'] for item in splice_types.values() if isinstance(item, dict) and 'name' in item } # Load splice status references splice_status_file = VEROFY_API_PATH / "MapSpliceStatus_references.json" if splice_status_file.exists(): with open(splice_status_file, 'r') as f: splice_statuses = json.load(f) self.splice_status_lookup = { item['name']: item['id'] for item in splice_statuses.values() if isinstance(item, dict) and 'name' in item } # Load drop type references drop_type_file = VEROFY_API_PATH / "MapDropType_references.json" if drop_type_file.exists(): with open(drop_type_file, 'r') as f: drop_types = json.load(f) self.drop_type_lookup = { item['name']: item['id'] for item in drop_types.values() if isinstance(item, dict) and 'name' in item } # Load drop status references drop_status_file = VEROFY_API_PATH / "MapDropStatus_references.json" if drop_status_file.exists(): with open(drop_status_file, 'r') as f: drop_statuses = json.load(f) self.drop_status_lookup = { item['name']: item['id'] for item in drop_statuses.values() if isinstance(item, dict) and 'name' in item } except Exception as e: print(f"Warning: Could not load reference data: {e}") def _calculate_line_metric(self, geometry) -> str: """ Calculate metric string for a line geometry (cables) Format: "Mileage: {miles}; Footage: {feet}" """ if not geometry or geometry.is_empty: return "Mileage: 0.0000; Footage: 0" # Calculate length using approximate conversion for mid-latitudes (~40° N) # At 40° latitude: 1 degree longitude ≈ 53 miles, 1 degree latitude ≈ 69 miles coords = list(geometry.coords) total_miles = 0 for i in range(len(coords) - 1): lon1, lat1 = coords[i] lon2, lat2 = coords[i + 1] # Approximate distance dlat = (lat2 - lat1) * 69 dlon = (lon2 - lon1) * 53 # At 40° latitude segment_miles = (dlat**2 + dlon**2)**0.5 total_miles += segment_miles total_feet = total_miles * 5280 return f"Mileage: {total_miles:.4f}; Footage: {total_feet:.0f}" def _calculate_polygon_metric(self, geometry) -> str: """ Calculate metric string for a polygon geometry (boundaries, parcels) Format: "Square Miles: {sq_miles}" """ if not geometry or geometry.is_empty: return "Square Miles: 0.0000" # Calculate area using approximate conversion for mid-latitudes (~40° N) # At 40° latitude: 1 degree² ≈ 3,657 square miles area_sq_degrees = geometry.area area_sq_miles = area_sq_degrees * 3657 return f"Square Miles: {area_sq_miles:.4f}" def authenticate(self) -> bool: """Get access token from Verofy API""" try: # Get refresh token payload = {"email": self.email, "password": self.password} response = requests.post(f"{API_URL}/login", json=payload) if response.status_code != 200: print(f"Login failed: {response.status_code}") return False refresh_token = response.json().get("refresh-token") if not refresh_token: print("No refresh token received") return False # Exchange for access token headers = {"Authorization": f"Bearer {refresh_token}"} token_response = requests.get(f"{API_URL}/refresh-token", headers=headers) if token_response.status_code != 200: print(f"Token refresh failed: {token_response.status_code}") return False self.access_token = token_response.json().get("access-token") if not self.access_token: print("No access token received") return False print("✅ Authenticated with Verofy API") return True except Exception as e: print(f"Authentication error: {e}") return False def upload_all_shapefiles(self, temp_dir: Path, map_id: int, limit: int = None) -> Dict: """ Upload all shapefiles from temp directory to Verofy Args: temp_dir: Path to directory containing shapefiles map_id: Verofy map project ID limit: Optional limit on number of records per shapefile (for testing) Returns dict with success status and statistics """ if not self.authenticate(): return {"success": False, "error": "Authentication failed"} results = { "success": True, "uploaded": {}, "errors": [] } # Upload in order: poles first, then segments, then sites, etc. upload_order = [ ("poles.shp", self._upload_poles), ("segments.shp", self._upload_segments), ("sites.shp", self._upload_sites), ("access_points.shp", self._upload_access_points), ("network_elements.shp", self._upload_network_elements), ("splicing.shp", self._upload_splicing), ("cabinet_boundaries.shp", self._upload_cabinet_boundaries), ("cables.shp", self._upload_cables), ("parcels.shp", self._upload_parcels), ("permits.shp", self._upload_permits), ("drops.shp", self._upload_drops), ] for shapefile_name, upload_func in upload_order: shapefile_path = temp_dir / shapefile_name if not shapefile_path.exists(): print(f"⚠️ Skipping {shapefile_name} (not found)") continue try: print(f"\n📤 Uploading {shapefile_name}...") count, errors = upload_func(shapefile_path, map_id, limit) results["uploaded"][shapefile_name] = count if errors: results["errors"].extend(errors) print(f"✅ Uploaded {count} records from {shapefile_name}") except Exception as e: error_msg = f"Error uploading {shapefile_name}: {str(e)}" print(f"❌ {error_msg}") results["errors"].append(error_msg) results["success"] = False return results def _upload_poles(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]: """Upload poles from shapefile""" gdf = gpd.read_file(shapefile_path) success_count = 0 errors = [] # Apply limit if specified if limit: gdf = gdf.head(limit) print(f" (Limited to first {limit} records for testing)") for idx, row in gdf.iterrows(): try: # Extract coordinates from geometry lat = row.get('Latitude', row.geometry.y if row.geometry else None) lon = row.get('Longitude', row.geometry.x if row.geometry else None) if lat is None or lon is None: continue # Map shapefile fields to API fields # Generate Pole ID (name field) from UID or index pole_id = f'Pole-{idx}' if 'UID' in row and row['UID'] is not None: try: pole_id = f'Pole-{int(row["UID"])}' except (ValueError, TypeError): pass pole_data = { "mapProjectId": int(map_id), "name": pole_id, # This becomes "Pole ID" in Verofy "latitude": str(lat), "longitude": str(lon), "mrStateId": 11 # Default state } # Map "Pole Tag" from shapefile to "tags" field (becomes "Pole Tag" in Verofy) if 'Pole Tag' in row and row['Pole Tag']: pole_data['tags'] = str(row['Pole Tag']) # Add optional fields if 'Pole Owner' in row and row['Pole Owner']: pole_data['owner'] = str(row['Pole Owner']) if 'Group 1' in row and row['Group 1']: pole_data['group1'] = str(row['Group 1']) if 'Group 2' in row and row['Group 2']: pole_data['group2'] = str(row['Group 2']) # Add Pole Height field (mapped from "Pole Heigh" typo in shapefile) if 'Pole Heigh' in row and row['Pole Heigh']: pole_data['poleHeight'] = str(row['Pole Heigh']) # Create pole via API if self._create_pole(pole_data): success_count += 1 else: errors.append(f"Failed to create pole at row {idx}") except Exception as e: errors.append(f"Error processing pole row {idx}: {str(e)}") return success_count, errors def _upload_segments(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]: """Upload segments from shapefile""" gdf = gpd.read_file(shapefile_path) success_count = 0 errors = [] # Apply limit if specified if limit: gdf = gdf.head(limit) print(f" (Limited to first {limit} records for testing)") for idx, row in gdf.iterrows(): try: # Extract line geometry if not row.geometry or row.geometry.is_empty: continue # Convert LineString to coordinate array coords = list(row.geometry.coords) poly = [{"lat": coord[1], "lng": coord[0]} for coord in coords] # Map segment type segment_type = row.get('Type', 'Underground') type_id = self.segment_type_lookup.get(segment_type, 5) # Default to Underground # Map shapefile fields to API fields segment_data = { "mapProjectId": int(map_id), "name": f"Segment-{idx}", "typeId": type_id, "statusId": 1, # Default to Working "poly": poly, "color": "#ff2600", "styleWidth": 5, "exclude": 0, "custom": 0 } # Add optional fields if 'Group 1' in row and row['Group 1']: segment_data['group1'] = str(row['Group 1']) if 'Group 2' in row and row['Group 2']: segment_data['group2'] = str(row['Group 2']) if 'Conduit' in row and row['Conduit']: segment_data['conduit'] = str(row['Conduit']) # Create segment via API if self._create_segment(segment_data): success_count += 1 else: errors.append(f"Failed to create segment at row {idx}") except Exception as e: errors.append(f"Error processing segment row {idx}: {str(e)}") return success_count, errors def _upload_sites(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]: """Upload sites from shapefile""" gdf = gpd.read_file(shapefile_path) success_count = 0 errors = [] # Apply limit if specified if limit: gdf = gdf.head(limit) print(f" (Limited to first {limit} records for testing)") for idx, row in gdf.iterrows(): try: # Extract coordinates lat = row.get('Latitude', row.geometry.y if row.geometry else None) lon = row.get('Longitude', row.geometry.x if row.geometry else None) if lat is None or lon is None: continue # Generate site name site_name = row.get('Address', f'Site-{idx}') # Map Type field to iconTypeId using reference lookup site_type = row.get('Type', None) icon_type_id = 1 # Default to Hub Site if site_type and site_type in self.icon_type_lookup: icon_type_id = self.icon_type_lookup[site_type] else: print(f"⚠️ Warning: Unknown site type '{site_type}' at row {idx}, using default") # Map shapefile fields to API fields site_data = { "mapProjectId": int(map_id), "name": str(site_name), "latitude": str(lat), "longitude": str(lon), "iconTypeId": icon_type_id, "statusId": 1 # Default to status 1 } # Map BEN# to unitCount if provided if 'BEN#' in row and row['BEN#']: try: site_data['unitCount'] = int(row['BEN#']) except (ValueError, TypeError): pass if 'Group 1' in row and row['Group 1']: site_data['group1'] = str(row['Group 1']) if 'Group 2' in row and row['Group 2']: site_data['group2'] = str(row['Group 2']) if 'Address' in row and row['Address']: site_data['address1'] = str(row['Address']) if 'City' in row and row['City']: site_data['city'] = str(row['City']) if 'State' in row and row['State']: state_code = str(row['State']) if state_code in self.state_lookup: site_data['stateId'] = self.state_lookup[state_code] if 'Zip' in row and row['Zip']: site_data['zip'] = str(row['Zip']) # Create site via API if self._create_site(site_data): success_count += 1 else: errors.append(f"Failed to create site at row {idx}") except Exception as e: errors.append(f"Error processing site row {idx}: {str(e)}") return success_count, errors def _upload_access_points(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]: """Upload access points from shapefile""" gdf = gpd.read_file(shapefile_path) success_count = 0 errors = [] # Apply limit if specified if limit: gdf = gdf.head(limit) print(f" (Limited to first {limit} records for testing)") for idx, row in gdf.iterrows(): try: # Extract coordinates lat = row.get('Latitude', row.geometry.y if row.geometry else None) lon = row.get('Longitude', row.geometry.x if row.geometry else None) if lat is None or lon is None: continue # Map access point type using lookup ap_type = row.get('Type', 'Handhole') type_id = self.access_point_type_lookup.get(ap_type, 1) # Default to Handhole # Map shapefile fields to API fields ap_data = { "mapProjectId": int(map_id), "name": f"AP-{idx}", "latitude": str(lat), "longitude": str(lon), "typeId": type_id, "locked": 0 # Default to unlocked } # Add optional fields if 'Group 1' in row and row['Group 1']: ap_data['group1'] = str(row['Group 1']) if 'Group 2' in row and row['Group 2']: ap_data['group2'] = str(row['Group 2']) # Create access point via API if self._create_access_point(ap_data): success_count += 1 else: errors.append(f"Failed to create access point at row {idx}") except Exception as e: errors.append(f"Error processing access point row {idx}: {str(e)}") return success_count, errors def _upload_network_elements(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]: """Upload network elements from shapefile""" gdf = gpd.read_file(shapefile_path) success_count = 0 errors = [] # Apply limit if specified if limit: gdf = gdf.head(limit) print(f" (Limited to first {limit} records for testing)") for idx, row in gdf.iterrows(): try: # Extract coordinates lat = row.get('Latitude', row.geometry.y if row.geometry else None) lon = row.get('Longitude', row.geometry.x if row.geometry else None) if lat is None or lon is None: continue # Map element type from shapefile to typeId element_type = row.get('Type', 'Anchor') # Default to Anchor if not specified type_id = self.element_type_lookup.get(element_type, 35) # Default to 35 (Anchor) # Generate name from UID if available element_name = f'NE-{idx}' if 'UID' in row and row['UID'] is not None: try: element_name = f'E-{int(row["UID"])}' except (ValueError, TypeError): pass # Map shapefile fields to API fields ne_data = { "mapProjectId": int(map_id), "name": element_name, "latitude": str(lat), "longitude": str(lon), "typeId": type_id, # Use typeId instead of type string "statusId": 1, # Default to Planned "locked": 0, # Default to unlocked "custom": 0 # Default to not custom } # Add optional fields if 'Group 1' in row and row['Group 1']: ne_data['group1'] = str(row['Group 1']) if 'Group 2' in row and row['Group 2']: ne_data['group2'] = str(row['Group 2']) # Create network element via API if self._create_network_element(ne_data): success_count += 1 else: errors.append(f"Failed to create network element at row {idx}") except Exception as e: errors.append(f"Error processing network element row {idx}: {str(e)}") return success_count, errors def _upload_splicing(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]: """Upload splicing points from shapefile""" gdf = gpd.read_file(shapefile_path) success_count = 0 errors = [] # Apply limit if specified if limit: gdf = gdf.head(limit) print(f" (Limited to first {limit} records for testing)") for idx, row in gdf.iterrows(): try: # Extract coordinates lat = row.get('Latitude', row.geometry.y if row.geometry else None) lon = row.get('Longitude', row.geometry.x if row.geometry else None) if lat is None or lon is None: continue # Map splice type from shapefile to typeId splice_type = row.get('Type', 'Splice') # Default to Splice if not specified type_id = self.splice_type_lookup.get(splice_type, 1) # Default to 1 (Splice) # Generate aka from AKA field (preferred) or UID as fallback # Note: "name" is auto-generated by API, use "aka" field instead splice_aka = f'Splice-{idx}' if 'AKA' in row and row['AKA']: splice_aka = str(row['AKA']) elif 'UID' in row and row['UID'] is not None: try: splice_aka = f'Splice-{int(row["UID"])}' except (ValueError, TypeError): pass # Map shapefile fields to API fields splicing_data = { "mapProjectId": int(map_id), "aka": splice_aka, # Use "aka" not "name" - name is auto-generated "latitude": str(lat), "longitude": str(lon), "typeId": type_id, # Use typeId instead of type string "statusId": 1, # Default to Planned "locked": 0 # Default to unlocked } # Add optional fields if 'Group 1' in row and row['Group 1']: splicing_data['group1'] = str(row['Group 1']) if 'Group 2' in row and row['Group 2']: splicing_data['group2'] = str(row['Group 2']) # Create splicing via API if self._create_splicing(splicing_data): success_count += 1 else: errors.append(f"Failed to create splicing at row {idx}") except Exception as e: errors.append(f"Error processing splicing row {idx}: {str(e)}") return success_count, errors def _upload_cabinet_boundaries(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]: """Upload cabinet boundaries to info tab""" gdf = gpd.read_file(shapefile_path) success_count = 0 errors = [] # Apply limit if specified if limit: gdf = gdf.head(limit) print(f" (Limited to first {limit} records for testing)") for idx, row in gdf.iterrows(): try: # Extract polygon geometry if not row.geometry or row.geometry.is_empty: continue # Convert Polygon to coordinate array - NOTE: polygons need double array [[...]] if row.geometry.geom_type == 'Polygon': coords = list(row.geometry.exterior.coords) data = [[{"lat": coord[1], "lng": coord[0]} for coord in coords]] # Double-nested array # Calculate metric for polygon metric = self._calculate_polygon_metric(row.geometry) # Map shapefile fields to API fields for info object info_data = { "mapProjectId": int(map_id), "name": str(row.get('Name', f'Cabinet-Boundary-{idx}')), "mapinfoobjecttypeId": 3, # 3 = Polygon "data": data, "color": "#ffffff", "alpha": "0.40", "metric": metric, "objectgroup": None, "objectgroup2": None } # Create info object via API if self._create_info_object(info_data): success_count += 1 else: errors.append(f"Failed to create cabinet boundary at row {idx}") except Exception as e: errors.append(f"Error processing cabinet boundary row {idx}: {str(e)}") return success_count, errors def _upload_cables(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]: """Upload cables to info tab""" gdf = gpd.read_file(shapefile_path) success_count = 0 errors = [] # Apply limit if specified if limit: gdf = gdf.head(limit) print(f" (Limited to first {limit} records for testing)") for idx, row in gdf.iterrows(): try: # Extract line geometry if not row.geometry or row.geometry.is_empty: continue # Convert LineString to coordinate array (single array for lines) coords = list(row.geometry.coords) data = [{"lat": coord[1], "lng": coord[0]} for coord in coords] # Calculate metric for line metric = self._calculate_line_metric(row.geometry) # Map shapefile fields to API fields for info object info_data = { "mapProjectId": int(map_id), "name": str(row.get('Name', f'Cable-{idx}')), "mapinfoobjecttypeId": 2, # 2 = Line/Polyline "data": data, "color": "#ffffff", "alpha": "1.00", "metric": metric, "objectgroup": None, "objectgroup2": None } # Create info object via API if self._create_info_object(info_data): success_count += 1 else: errors.append(f"Failed to create cable at row {idx}") except Exception as e: errors.append(f"Error processing cable row {idx}: {str(e)}") return success_count, errors def _upload_parcels(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]: """Upload parcels to info tab""" gdf = gpd.read_file(shapefile_path) success_count = 0 errors = [] # Apply limit if specified if limit: gdf = gdf.head(limit) print(f" (Limited to first {limit} records for testing)") for idx, row in gdf.iterrows(): try: # Extract polygon geometry if not row.geometry or row.geometry.is_empty: continue # Convert Polygon to coordinate array - NOTE: polygons need double array [[...]] if row.geometry.geom_type == 'Polygon': coords = list(row.geometry.exterior.coords) data = [[{"lat": coord[1], "lng": coord[0]} for coord in coords]] # Double-nested array # Calculate metric for polygon metric = self._calculate_polygon_metric(row.geometry) # Map shapefile fields to API fields for info object info_data = { "mapProjectId": int(map_id), "name": str(row.get('Name', f'Parcel-{idx}')), "mapinfoobjecttypeId": 3, # 3 = Polygon "data": data, "color": "#ffffff", "alpha": "0.40", "metric": metric, "objectgroup": None, "objectgroup2": None } # Add optional fields (Group 1/2 map to objectgroup/objectgroup2) if 'Group 1' in row and row['Group 1']: info_data['objectgroup'] = str(row['Group 1']) if 'Group 2' in row and row['Group 2']: info_data['objectgroup2'] = str(row['Group 2']) # Create info object via API if self._create_info_object(info_data): success_count += 1 else: errors.append(f"Failed to create parcel at row {idx}") except Exception as e: errors.append(f"Error processing parcel row {idx}: {str(e)}") return success_count, errors def _upload_permits(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]: """Upload permits from shapefile""" import fiona success_count = 0 errors = [] record_count = 0 try: # Use fiona to read with better error tolerance with fiona.open(shapefile_path) as src: for idx, feature in enumerate(src): # Apply limit if specified if limit and idx >= limit: break record_count += 1 try: # Extract geometry geom = feature['geometry'] if not geom or geom['type'] != 'Polygon': continue # Check if polygon has enough coordinates coords = geom['coordinates'][0] # Outer ring if len(coords) < 4: errors.append(f"Permit row {idx}: Invalid polygon (< 4 coordinates)") continue # Convert to lat/lng format - NOTE: poly must be wrapped in extra array poly = [[{"lat": coord[1], "lng": coord[0]} for coord in coords]] # Get properties from shapefile props = feature['properties'] name = props.get('Name', f'Permit-{idx}') group1 = props.get('Group 1', None) # Map shapefile fields to API fields permit_data = { "mapProjectId": int(map_id), "name": str(name), "poly": poly, "mappermitstatusId": 1, # Default to status 1 "mappermitentitytypeId": 6, # Default to entity type 6 "mappermitulrtypeId": 3, # Default to ULR type 3 "mappermitentitymeetId": 1, # Default to meet 1 "mappermitrequirementsId": 1 # Default to requirements 1 } # Add permitgroup field (not group1) for Group 1 mapping if group1: permit_data['permitgroup'] = str(group1) # Create permit via API if self._create_permit(permit_data): success_count += 1 else: errors.append(f"Failed to create permit at row {idx}") except Exception as e: errors.append(f"Error processing permit row {idx}: {str(e)}") except Exception as e: print(f"⚠️ Error reading permits.shp: {e}") return 0, [f"Permits shapefile error: {str(e)}"] if record_count > 0 and limit: print(f" (Limited to first {limit} records for testing)") return success_count, errors def _upload_drops(self, shapefile_path: Path, map_id: int, limit: int = None) -> Tuple[int, List[str]]: """Upload drops from shapefile""" gdf = gpd.read_file(shapefile_path) success_count = 0 errors = [] # Apply limit if specified if limit: gdf = gdf.head(limit) print(f" (Limited to first {limit} records for testing)") for idx, row in gdf.iterrows(): try: # Extract line geometry if not row.geometry or row.geometry.is_empty: continue # Convert LineString to coordinate array coords = list(row.geometry.coords) poly = [{"lat": coord[1], "lng": coord[0]} for coord in coords] # Map drop type from shapefile to typeId drop_type = row.get('Type', 'Buried') # Default to Buried type_id = self.drop_type_lookup.get(drop_type, 1) # Default to 1 (Buried) # Map drop status if provided status_id = 2 # Default to Planning if 'Status' in row and row['Status']: drop_status = row.get('Status') if drop_status in self.drop_status_lookup: status_id = self.drop_status_lookup[drop_status] # Generate name from Name field or default drop_name = row.get('Name', f'Drop-{idx}') # Map shapefile fields to API fields drop_data = { "mapProjectId": int(map_id), "name": str(drop_name), "typeId": type_id, "statusId": status_id, "poly": poly, "width": 1, # Default line width "color": "#000" # Default black color } # Add optional fields if 'Group 1' in row and row['Group 1']: drop_data['group1'] = str(row['Group 1']) if 'Group 2' in row and row['Group 2']: drop_data['group2'] = str(row['Group 2']) if 'Cabinet' in row and row['Cabinet']: drop_data['cabinet'] = str(row['Cabinet']) if 'Cabinet Port' in row and row['Cabinet Port']: drop_data['cabinetPort'] = str(row['Cabinet Port']) # Create drop via API if self._create_drop(drop_data): success_count += 1 else: errors.append(f"Failed to create drop at row {idx}") except Exception as e: errors.append(f"Error processing drop row {idx}: {str(e)}") return success_count, errors def _create_pole(self, pole_data: Dict) -> bool: """Create a pole via Verofy API""" headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json" } response = requests.post( f"{API_URL}/map-pole/create", headers=headers, json=pole_data ) if response.status_code != 201: print(f"❌ Pole API Error {response.status_code}: {response.text[:200]}") return response.status_code == 201 def _create_segment(self, segment_data: Dict) -> bool: """Create a segment via Verofy API""" headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json" } response = requests.post( f"{API_URL}/map-segment/create", headers=headers, json=segment_data ) return response.status_code == 201 def _create_site(self, site_data: Dict) -> bool: """Create a site via Verofy API""" headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json" } # Debug: print first site data to see what we're sending import json as json_module print(f"DEBUG: Sending site data: {json_module.dumps(site_data, indent=2)}") response = requests.post( f"{API_URL}/map-site/create", headers=headers, json=site_data ) if response.status_code != 201: print(f"❌ Site API Error {response.status_code}: {response.text[:200]}") return response.status_code == 201 def _create_access_point(self, ap_data: Dict) -> bool: """Create an access point via Verofy API""" headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json" } print(f"DEBUG: Sending access point data: {json.dumps(ap_data, indent=2)}") response = requests.post( f"{API_URL}/map-access-point/create", headers=headers, json=ap_data ) if response.status_code != 201: print(f"❌ Access Point API Error {response.status_code}: {response.text[:200]}") return False return True def _create_network_element(self, ne_data: Dict) -> bool: """Create a network element via Verofy API""" headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json" } print(f"DEBUG: Sending network element data: {json.dumps(ne_data, indent=2)}") response = requests.post( f"{API_URL}/map-element/create", headers=headers, json=ne_data ) if response.status_code != 201: print(f"❌ Network Element API Error {response.status_code}: {response.text[:200]}") return False return True def _create_splicing(self, splicing_data: Dict) -> bool: """Create a splicing point via Verofy API""" headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json" } print(f"DEBUG: Sending splicing data: {json.dumps(splicing_data, indent=2)}") response = requests.post( f"{API_URL}/map-splice/create", # Corrected endpoint: map-splice not map-splicing headers=headers, json=splicing_data ) if response.status_code != 201: print(f"❌ Splicing API Error {response.status_code}: {response.text[:200]}") return False return True def _create_info_object(self, info_data: Dict) -> bool: """Create an info object via Verofy API (for info tab items like boundaries, cables, parcels)""" headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json" } # Send data as plain array - DO NOT JSON-encode it print(f"DEBUG: Sending info object data: {json.dumps(info_data, indent=2)}") response = requests.post( f"{API_URL}/map-info-object/create", headers=headers, json=info_data ) if response.status_code != 201: print(f"❌ Info Object API Error {response.status_code}: {response.text[:200]}") return response.status_code == 201 def _create_permit(self, permit_data: Dict) -> bool: """Create a permit via Verofy API""" headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json" } print(f"DEBUG: Sending permit data: {json.dumps(permit_data, indent=2)}") response = requests.post( f"{API_URL}/map-permit/create", headers=headers, json=permit_data ) if response.status_code != 201: print(f"❌ Permit API Error {response.status_code}: {response.text[:200]}") return False return True def _create_drop(self, drop_data: Dict) -> bool: """Create a drop via Verofy API""" headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json" } print(f"DEBUG: Sending drop data: {json.dumps(drop_data, indent=2)}") response = requests.post( f"{API_URL}/map-drop/create", headers=headers, json=drop_data ) if response.status_code != 201: print(f"❌ Drop API Error {response.status_code}: {response.text[:200]}") return False return True def upload_to_verofy(temp_dir: str, map_id: int, email: str, password: str, limit: int = None) -> Dict: """ Main function to upload all shapefiles to Verofy Args: temp_dir: Path to directory containing shapefiles map_id: Verofy map project ID email: Verofy user email password: Verofy user password limit: Optional limit on records per shapefile (for testing) Returns: Dict with success status and upload statistics """ uploader = VerofyUploader(email, password) return uploader.upload_all_shapefiles(Path(temp_dir), map_id, limit)