Source code for snail.io
import importlib.util
import logging
from typing import List, Tuple
import geopandas
import numpy
import pandas
import rasterio
from snail.intersection import GridDefinition, get_raster_values_for_splits
[docs]
def associate_raster_files(splits, rasters):
"""Read values from a list of raster files for a set of indexed split geometries
Parameters
----------
splits: pandas.DataFrame
split geometries with raster indices in columns named "i_{grid_id}", "j_{grid_id}"
for each grid_id in `rasters`
rasters: pandas.DataFrame
table of raster metadata with columns: key, grid_id, path, bands
Returns
-------
pandas.DataFrame
split geometries with raster data values at indexed locations
"""
# to prevent a fragmented dataframe (and a memory explosion), add series to a dict
# and then concat afterwards -- do not append to an existing dataframe
raster_data: dict[str, pandas.Series] = {}
# associate values
for raster, band_number, band_data in read_rasters(rasters):
logging.info(
"Associating values from raster %s grid %s band %s",
raster.key,
raster.grid_id,
band_number,
)
raster_data[raster.key] = get_raster_values_for_splits(
splits,
band_data,
f"i_{raster.grid_id}",
f"j_{raster.grid_id}",
)
raster_data = pandas.DataFrame(raster_data)
splits = pandas.concat([splits, raster_data], axis="columns")
return splits
[docs]
def read_rasters(rasters):
for raster in rasters.itertuples():
for band_number in raster.bands:
yield raster, band_number, read_raster_band_data(
raster.path, band_number
)
[docs]
def read_raster_band_data(
fname: str,
band_number: int = 1,
) -> numpy.ndarray:
with rasterio.open(fname) as dataset:
band_data: numpy.ndarray = dataset.read(band_number)
return band_data
[docs]
def read_features(path, layer=None):
if path.suffix in (".parquet", ".geoparquet"):
features = geopandas.read_parquet(path)
else:
if importlib.util.find_spec("pyogrio"):
engine = "pyogrio"
else:
engine = "fiona"
if layer is not None:
features = geopandas.read_file(path, layer=layer, engine=engine)
else:
features = geopandas.read_file(path, engine=engine)
return features[~features.geometry.isna()]