Data ingestion and processing¶
import itertools
import os
import pathlib
from glob import glob
from re import search
from osgeo import gdal # type: ignore
import rasterio as rs
from rasterio.windows import Window
from rasterio.transform import xy
from rasterio.vrt import WarpedVRT
from rasterio.crs import CRS
from rasterio.windows import Window
from rasterio import open
from pyproj import Transformer
import pandas as pd
from pandas import DataFrame
import geopandas as gpd
import numpy as np
from numpy import array,meshgrid,arange,log
from shapely import box
import ibis as ib
ib.options.interactive = True
# With dask
import dask as dk
import dask.array as da
import dask.dataframe as dd
# import dask_image as dki
import xarray as xr
import rioxarray as rx
from pyarrow import float16,float32,schema,field,uint16,table,Table
from pyarrow.parquet import ParquetWriter
import pyarrow as pya
import concurrent.futures
import threading
# src_file = "/Users/cenv1069/Documents/data/datasets/JRC/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0.tif"
# big file
# src_file="/Users/cenv1069/Documents/data/datasets/JRC/GHS_BUILT_C_MSZ_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19/GHS_BUILT_C_MSZ_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19.tif"
# big file
# src_file = "/Users/cenv1069/Documents/data/datasets/mining/Global_mining/v2/global_miningarea_v2_30arcsecond.tif"
# src_file = "/Users/cenv1069/Documents/data/datasets/JRC/S_10m/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19.tif"
# src_file = "/Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19.tif"
#
# src_file = "Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R9_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R9_C19.tif"
# smaller size file
# src_file="/Users/cenv1069/Documents/data/datasets/JRC/S_1000m/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19.tif"
in_path="/Users/cenv1069/Documents/agriculture/mapspam/spam2017v2r1_ssa_yield"
if in_path[len(in_path)-1]!= '/':
in_path=in_path+'/'
in_paths = [x for x in glob(in_path + "**", recursive=True) if search(pattern = r"(.ti[f]{1,2}$)|(.nc$)", string = x)]
in_paths[0]
str
Working workflows below¶
# inspired by : https://rasterio.readthedocs.io/en/stable/topics/virtual-warping.html
dst_crs = CRS.from_epsg(4326)
print(dst_crs)
EPSG:4326
vrt_options = {
# 'resampling': Resampling.cubic,
'crs': dst_crs,
# 'transform': dst_transform,
# 'height': dst_height,
# 'width': dst_width,
}
out_path_vrt = "test_med.parquet"
rast_schema = schema([('lon',float32())
,('lat',float32())
,('band_var',float32())
])
rast_schema.with_metadata({
"lon" : "Longitude coordinate",
"lat" : "Latitude coordinate",
"band_var" : "Value associated",
})
lon: float
lat: float
band_var: float
-- schema metadata --
lon: 'Longitude coordinate'
lat: 'Latitude coordinate'
band_var: 'Value associated'
Using a virtual Warper and windows¶
out_path_vrt
'test_med.parquet'
%%time
with ParquetWriter(out_path_vrt, rast_schema) as writer:
with rs.open(src_file) as src:
print(src.shape)
# raise
src_crs = src.crs
if len(src.nodatavals)>1:
nodata = src.nodatavals[0]
else :
nodata = src.nodatavals
print("Detected source crs : ", src_crs)
print("No data value : ", nodata)
with WarpedVRT(src, **vrt_options) as vrt:
# At this point 'vrt' is a full dataset with dimensions,
# CRS, and spatial extent matching 'vrt_options'.
# Read all data into memory.
# data = vrt.read()
# Process the dataset in chunks. Likely not very efficient.
win_transfrom = vrt.window_transform
for _, window in vrt.block_windows():
# print(src.crs)
band1 = vrt.read(window=window)
height = band1.shape[1]
width = band1.shape[2]
cols, rows = meshgrid(arange(width), arange(height))
xs, ys = xy(
transform = win_transfrom(window),
rows=rows,
cols=cols)
lons = array(xs)
lats = array(ys)
out = DataFrame({"band_var" : array(band1).flatten()
,'lon': lons.flatten()
,'lat': lats.flatten()})
out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)
out.drop(index=out.loc[out.band_var<=0].index,inplace=True)
# print(out.shape)
# print(out.head())
if out.shape[0]!=0:
writer.write_table(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))
# # # Dump the aligned data into a new file. A VRT representing
# # # this transformation can also be produced by switching
# # # to the VRT driver.
# # directory, name = os.path.split(path)
# # outfile = os.path.join(directory, 'aligned-{}'.format(name))
# # rio_shutil.copy(vrt, outfile, driver='GTiff')
(1800000, 3608200)
Detected source crs : ESRI:54009
No data value : (255.0,)
(1, 128, 512)
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
File <timed exec>:27
RuntimeError: No active exception to reraise
Using a classic window approach¶
# %%time
with ParquetWriter(out_path_vrt, rast_schema) as writer:
with rs.open(src_file) as src:
src_crs = src.crs
win_transfrom = src.window_transform
transformer = Transformer.from_crs(str(src_crs), 'EPSG:4326', always_xy=True)
if len(src.nodatavals)>1:
nodata = src.nodatavals[0]
else :
nodata = src.nodatavals
print("No data value : ", nodata)
print("Detected source crs : ", src_crs)
# Process the dataset in chunks. Likely not very efficient.
for ij, window in src.block_windows():
# print(window)
# print(src.crs)
band1 = src.read(window=window)
# print(band1[0])
height = band1.shape[1]
width = band1.shape[2]
cols, rows = meshgrid(arange(width), arange(height))
# print(win_transfrom(window))
xs, ys = xy(
transform = win_transfrom(window),
rows=rows,
cols=cols)
# print(xs,ys)
lons,lats = transformer.transform(array(xs),array(ys))
# print(lons.shape)
# print(lats.shape)
# print(len(array(band1).flatten()))
# print(len(lons.flatten()))
out = DataFrame({'lon': lons.flatten(),
'lat': lats.flatten(),
"band_var" : array(band1[0,:,:]).flatten(),
})
out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)
out.drop(index=out.loc[out.band_var<=0].index,inplace=True)
# print(out.shape)
# print(out.head())
if out.shape[0]!=0:
writer.write_table(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))
test_data = ib.read_parquet(out_path_vrt)
print(test_data.count())
test_data.head()
test_data.band_var.max()
# test_data.select("lon","lat")
# test_xy = test_data.select("lon","lat").to_pandas()
# gpd.GeoSeries(gpd.points_from_xy(test_xy["lon"],test_xy["lat"],crs="epsg:4326")).explore()
Custom window¶
in_path = src_file
include = False
out_path_cwin = f"rast_convert_cwin.parquet"
%%time
with ParquetWriter(out_path_cwin, rast_schema) as writer:
with rs.open(src_file) as src:
src_crs = src.crs
if len(src.nodatavals)>1:
nodata = src.nodatavals[0]
else :
nodata = src.nodatavals
print("No data value : ", nodata)
print("Detected source crs : ", src_crs)
with WarpedVRT(src, **vrt_options) as vrt:
# At this point 'vrt' is a full dataset with dimensions,
# CRS, and spatial extent matching 'vrt_options'.
# Read all data into memory.
# data = vrt.read()
# Process the dataset in chunks. Likely not very efficient.
win_transfrom = vrt.window_transform
print(vrt.shape)
# select window size
win_ratio = vrt.shape[0]/vrt.shape[1]
win_width = int(np.min([vrt.shape[1],3500]))
win_height = int(win_width*win_ratio)
win_w = int(vrt.shape[1]/win_width)
win_h = int(vrt.shape[0]/win_height)
for (i,j) in itertools.product(range(win_w+1),range(win_h+1)):
if i==win_w:
width = vrt.shape[1]-win_w*win_width
else :
width = win_width
if j==win_h:
height = vrt.shape[0]-win_h*win_height
else :
height = win_height
window = Window(col_off=i*win_width,row_off=j*win_height,width=width,height=height)
band1 = vrt.read(window=window)
# height = band1.shape[1]
# width = band1.shape[2]
cols, rows = meshgrid(arange(width), arange(height))
xs, ys = xy(
transform = win_transfrom(window),
rows=rows,
cols=cols)
lons = array(xs)
lats = array(ys)
out = DataFrame({"band_var" : array(band1).flatten()
,'lon': lons.flatten()
,'lat': lats.flatten()})
out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)
out.drop(index=out.loc[out.band_var<=0].index,inplace=True)
# print(out.shape)
# print(out.head())
if out.shape[0]!=0:
writer.write(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))
# # # Dump the aligned data into a new file. A VRT representing
# # # this transformation can also be produced by switching
# # # to the VRT driver.
# # directory, name = os.path.split(path)
# # outfile = os.path.join(directory, 'aligned-{}'.format(name))
# # rio_shutil.copy(vrt, outfile, driver='GTiff')
test_vrt = ib.read_parquet(out_path_vrt)
test_cwin = ib.read_parquet(out_path_cwin)
test_vrt.describe()
test_cwin.describe()
ib.difference(test_vrt,test_cwin).count()
Processing a huge file¶
# using the module from the script
! python ../../src/scalenav/rast_converter.py /Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19.tif ../../test_big.parquet
# using the module from the build package
! python -m scalenav.rast_converter /Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19.tif ../../test_big.parquet
Reading a huge processed file¶
conn = ib.connect("duckdb://")
test_file = conn.read_parquet('/Users/cenv1069/Documents/data/datasets/JRC/H_AGBH_100.parquet')
test_file.band_var.min()
test_file.head()
test_file.count()
# with rs.open("/Users/cenv1069/Documents/data/datasets/JRC/S_1000m/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R7_C22/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R7_C22.tif") as src:
# # Process the dataset in chunks. Likely not very efficient.
# print(src.height)
# print(src.width)
# for ij, window in src.block_windows():
# print((ij, window))
import time
from tqdm import tqdm
from datetime import datetime
def process_data():
# Simulate data processing loop
for _ in tqdm(range(100), desc="Processing data..."):
time.sleep(0.1) # Simulating some processing time for each iteration
def show_clock():
with tqdm(total=0, bar_format="{desc}", dynamic_ncols=True) as pbar:
while True:
# Get current time
now = datetime.now().strftime("%H:%M:%S")
# Update the tqdm bar with the current time
pbar.set_description(f"Clock: {now}")
time.sleep(1) # Update every second
# Run the clock in the background
import threading
clock_thread = threading.Thread(target=show_clock, daemon=True)
clock_thread.start()
# Run the data processing function
process_data()
# Optionally, wait for the clock thread to finish if needed
clock_thread.join()
GDAL translate¶
slow af
# src_file = "/Users/cenv1069/Documents/data/datasets/JRC/S_1000m/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19.tif"
# # Open the GeoTIFF File:
# dataset = gdal.Open(src_file)
# # Convert GeoTIFF to XYZ Format:
# gdal.Translate('output.xyz', dataset, format='XYZ')
# # Read XYZ into Pandas DataFrame:
# df = pd.read_csv('output.xyz', sep=' ', header=None, names=['x', 'y', 'value'])
Parallel tests¶
include = False
num_workers = 3
out_fold = "results"
if not os.path.exists(out_fold):
# os.rmdir(out_fold)
os.mkdir(out_fold)
out_path = f"rast_convert_par_{num_workers}.parquet"
out_files = [out_fold + "/" + out_fold + "_" + str(i) + ".parquet" for i in range(num_workers)]
out_files
['results/results_0.parquet',
'results/results_1.parquet',
'results/results_2.parquet']
with open(src_file) as src:
# print(type(src))
if src.crs is not None:
src_crs = src.crs
print("Detected source crs : ", src_crs)
else:
src_crs = 'EPSG:4326'
print("Assuming source crs : ", src_crs)
if len(src.nodatavals)>1:
nodata = src.nodatavals[0]
else :
nodata = src.nodatavals
print("No data value : ", nodata)
# At this point 'vrt' is a full dataset with dimensions,
# CRS, and spatial extent matching 'vrt_options'.
# Read all data into memory.
# data = vrt.read()
# Process the dataset in chunks. Likely not very efficient.
read_lock = threading.Lock() # not necessary ?
# write_lock = threading.Lock()
readers = [WarpedVRT(src, **vrt_options) for i in range(num_workers)]
windows = [window for ij, window in readers[0].block_windows()]
writers = [ParquetWriter(out_file, rast_schema) for out_file in out_files]
# for w in writers:
# w.write(Table.from_pandas(df=pd.DataFrame(data={"lon" : [1.0,2.0,3.0],"lat" : [4.0,5.0,6.0],"band_var" : np.random.random(3)}),schema = rast_schema))
# for w in writers:
# w.close()
# raise Warning
def process(writer,vrt,windows):
print("Processing!")
win_transfrom = vrt.window_transform
for window in windows:
band1 = vrt.read(window=window)
height = band1.shape[1]
width = band1.shape[2]
cols, rows = meshgrid(arange(width), arange(height))
xs, ys = xy(
transform = win_transfrom(window),
rows=rows,
cols=cols)
lons = array(xs)
lats = array(ys)
out = DataFrame({"band_var" : array(band1).flatten()
,'lon': lons.flatten()
,'lat': lats.flatten()})
out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)
if not include:
out.drop(index=out.loc[out.band_var<=0].index,inplace=True)
if out.shape[0]!=0:
writer.write_table(Table.from_pandas(df=out,schema = rast_schema,nthreads=1,preserve_index=False,safe=True))
# We map the process() function over the list of
# windows.
with concurrent.futures.ThreadPoolExecutor(
max_workers=num_workers
) as executor:
# executor.map(process, writers,readers,itertools.batched(windows,num_workers))
for (w,v,win) in zip(writers,readers,itertools.batched(windows,num_workers)):
executor.submit(process(w,v,win))
for w in writers:
w.close()
for vrt in readers:
vrt.close()
# print(writers)
Detected source crs : ESRI:54009
No data value : (65535.0,)
Processing!
Processing!
Processing!
rast_convert_1 = ib.read_parquet(out_files)
print(rast_convert_1.count())
rast_convert_1
┌────┐
│ 12 │
└────┘
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━┓ ┃ lon ┃ lat ┃ band_var ┃ ┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━┩ │ float32 │ float32 │ float32 │ ├───────────┼───────────┼──────────┤ │ -0.003524 │ 16.255964 │ 1404.0 │ │ -0.000778 │ 16.255051 │ 275.0 │ │ 0.000138 │ 16.255051 │ 29.0 │ │ -0.008100 │ 16.251389 │ 836.0 │ │ -0.007185 │ 16.251389 │ 836.0 │ │ -0.009015 │ 16.250473 │ 531.0 │ │ -0.019084 │ 16.246813 │ 167.0 │ │ -0.019999 │ 16.245897 │ 323.0 │ │ -0.019999 │ 16.244982 │ 255.0 │ │ -0.019084 │ 16.244982 │ 242.0 │ │ … │ … │ … │ └───────────┴───────────┴──────────┘
rast_convert_par = ib.read_parquet("rast_convert_par.parquet")
print(rast_convert_par.count())
rast_convert_par.head()
ib.difference(rast_convert_1,rast_convert_par)