Country ISO
LGA
/ 2
#!/usr/bin/env python3
import argparse
import os
import requests
import shutil
import zipfile
import glob
import geopandas as gpd
import rasterio
from rasterio.mask import mask
from shapely.geometry import shape
import pandas as pd
import matplotlib.pyplot as plt
# --------------------------------------------------------------------
# 1) Parse CLI Arguments
# --------------------------------------------------------------------
def parse_cli_args():
parser = argparse.ArgumentParser(
description="Download boundaries from geoBoundaries and a WorldPop raster, then aggregate population."
)
parser.add_argument(
"iso_code",
type=str,
help="3-letter ISO-3166 country code (e.g. KEN, NGA, ETH)."
)
parser.add_argument(
"adm_level",
type=int,
choices=range(0, 6),
help="Admin level (0=country boundary, 1=first subnational level, etc.)."
)
return parser.parse_args()
# --------------------------------------------------------------------
# 2) Fetch GeoBoundaries Shapefile
# --------------------------------------------------------------------
def fetch_geoboundaries_shapefile(iso_code, adm_level, release_type="gbOpen"):
"""
Uses the geoBoundaries v4 API to download an admin-level boundary shapefile (or GeoJSON)
for the specified ISO code and ADM level. Returns the path to the main vector file.
"""
iso_code = iso_code.upper()
adm_label = f"ADM{adm_level}"
base_url = f"https://www.geoboundaries.org/api/current/{release_type}/{iso_code}/{adm_label}/"
print(f"[INFO] Querying geoBoundaries at: {base_url}")
resp = requests.get(base_url, timeout=30)
if resp.status_code != 200:
raise RuntimeError(
f"Failed to reach geoBoundaries API.\nURL: {base_url}\nHTTP {resp.status_code}\nResponse: {resp.text}"
)
data = resp.json()
# The API may return a single dict or a list of dicts
if isinstance(data, dict):
data = [data]
if not data:
raise ValueError(f"No data returned for {iso_code} - {adm_label} in {release_type}.")
# We'll just take the first record if multiple
boundary_info = data[0]
zip_url = boundary_info.get("staticDownloadLink")
if not zip_url:
raise ValueError(f"No 'staticDownloadLink' found in the boundary info:\n{boundary_info}")
# Create an output folder for the boundary
boundary_folder = f"{iso_code}_{adm_label}_boundary"
os.makedirs(boundary_folder, exist_ok=True)
# Download ZIP
zip_path = os.path.join(boundary_folder, "boundary.zip")
print(f"[INFO] Downloading boundary zip from: {zip_url}")
with requests.get(zip_url, stream=True) as r:
r.raise_for_status()
with open(zip_path, "wb") as f:
shutil.copyfileobj(r.raw, f)
# Unzip
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(boundary_folder)
print(f"[INFO] Extracted boundary files to: {boundary_folder}")
# Find a .shp or .geojson to return
shp_files = glob.glob(os.path.join(boundary_folder, "**/*.shp"), recursive=True)
geojson_files = glob.glob(os.path.join(boundary_folder, "**/*.geojson"), recursive=True)
vector_files = shp_files + geojson_files
if not vector_files:
raise FileNotFoundError(
"No shapefile or geojson found after extracting the boundary zip."
)
# Return the first shapefile or geojson we find
return vector_files[0]
# --------------------------------------------------------------------
# 3) Download WorldPop Raster (2020)
# --------------------------------------------------------------------
def fetch_worldpop_raster(iso_code, year=2020):
"""
Downloads the WorldPop 2020 UN-adjusted constrained raster TIF for the given ISO code.
Returns the path to the local TIF file.
"""
iso_code_lower = iso_code.lower()
# Following the typical pattern for the WorldPop 2020 constrained data:
# e.g.: https://data.worldpop.org/GIS/Population/Global_2000_2020_Constrained/2020/maxar_v1/KEN/ken_ppp_2020_UNadj_constrained.tif
base_url = (
"https://data.worldpop.org/GIS/Population/Global_2000_2020_Constrained/"
f"{year}/BSGM/{iso_code}/{iso_code_lower}_ppp_{year}_UNadj_constrained.tif"
)
tif_filename = f"{iso_code_lower}_ppp_{year}_UNadj_constrained.tif"
if os.path.exists(tif_filename):
print(f"[INFO] Population raster already exists locally: {tif_filename}")
return tif_filename
print(f"[INFO] Downloading WorldPop raster from: {base_url}")
resp = requests.get(base_url, stream=True)
resp.raise_for_status()
with open(tif_filename, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
f.write(chunk)
print(f"[INFO] Saved raster to: {tif_filename}")
return tif_filename
# --------------------------------------------------------------------
# 4) Aggregate Population by Boundary
# --------------------------------------------------------------------
def aggregate_population_by_shapefile(shapefile_path, raster_path, id_column=None):
print(f"[INFO] Loading boundary: {shapefile_path}")
gdf = gpd.read_file(shapefile_path)
# If no ID column was provided, attempt to pick one
if not id_column:
potential_cols = [c for c in gdf.columns if any(k in c.upper() for k in ["NAME", "CODE", "ID"])]
if potential_cols:
id_column = potential_cols[0]
print(f"[INFO] Auto-selected ID column: {id_column}")
else:
id_column = gdf.index
print(f"[INFO] Using '{id_column}' as the region ID.")
print(f"[INFO] Opening population raster: {raster_path}")
results = []
with rasterio.open(raster_path) as src:
for idx, row in gdf.iterrows():
geom = [row.geometry]
# Mask the raster by this geometry
out_image, _ = mask(src, geom, crop=True)
# If it's a single-band raster, out_image has shape (1, height, width).
# Convert to (height, width):
data_array = out_image[0]
# Sum pixel values > 0 (or ignore nodata if needed)
population_sum = data_array[data_array > 0].sum()
region_id_val = row[id_column] if isinstance(id_column, str) else idx
results.append({
"region_id": region_id_val,
"population": float(population_sum)
})
return pd.DataFrame(results)
# --------------------------------------------------------------------
# 5) Visualize
# --------------------------------------------------------------------
def visualize_population(shapefile_path, df_pop, id_column=None):
"""
Joins the population DataFrame to the shapefile and plots a choropleth.
"""
print(f"[INFO] Visualizing aggregated results...")
gdf = gpd.read_file(shapefile_path)
if not id_column:
# Attempt the same heuristic for merging
potential_cols = [c for c in gdf.columns if any(k in c.upper() for k in ["NAME", "CODE", "ID"])]
if potential_cols:
id_column = potential_cols[0]
else:
# If there's truly no good column, we can't do a meaningful join
print("No ID column found for the shapefile. Skipping plot.")
return
# Merge the DataFrame
merged_gdf = gdf.merge(df_pop, left_on=id_column, right_on="region_id", how="left")
# Plot with a basic choropleth
fig, ax = plt.subplots(1, 1, figsize=(8, 6))
merged_gdf.plot(column="population", cmap="OrRd", legend=True, ax=ax)
plt.title("Population by Region")
plt.tight_layout()
plt.show()
# --------------------------------------------------------------------
# Main
# --------------------------------------------------------------------
# Normally you'd want to use argparse, but here we want to pass values from deepnote widgets
def main():
#args = parse_cli_args()
#iso_code = args.iso_code.upper()
#adm_level = args.adm_level
# set iso_code to value from text_input widget
iso_code = country_iso
print( f"{iso_code=}")
# set adm_level to value from slider widget
adm_level = int(lga)
print( f"{adm_level=}")
# 1) Get boundary vector
shapefile_path = fetch_geoboundaries_shapefile(iso_code, adm_level)
# 2) Get worldpop raster
raster_path = fetch_worldpop_raster(iso_code, year=2020) # or param if needed
# 3) Aggregate population
df_pop = aggregate_population_by_shapefile(shapefile_path, raster_path)
# 4) Save to CSV
csv_filename = f"{iso_code.lower()}_adm{adm_level}_population.csv"
df_pop.to_csv(csv_filename, index=False)
print(f"[INFO] Population data saved to {csv_filename}")
# 5) (Optional) Visualize
# We'll try to use the same ID column that was auto-detected during the aggregate step.
# We can't easily re-check which column was used, but let's re-run the same heuristic:
visualize_population(shapefile_path, df_pop)
main()
Run to view results