Data ingestion and processing

import itertools
import os 
import pathlib
from glob import glob
from re import search

from osgeo import gdal # type: ignore
import rasterio as rs 
from rasterio.windows import Window
from rasterio.transform import xy
from rasterio.vrt import WarpedVRT
from rasterio.crs import CRS
from rasterio.windows import Window
from rasterio import open 

from pyproj import Transformer
import pandas as pd
from pandas import DataFrame
import geopandas as gpd
import numpy as np
from numpy import array,meshgrid,arange,log
from shapely import box


import ibis as ib
ib.options.interactive = True
# With dask

import dask as dk
import dask.array as da
import dask.dataframe as dd
# import dask_image as dki
import xarray as xr
import rioxarray as rx

from pyarrow import float16,float32,schema,field,uint16,table,Table
from pyarrow.parquet import ParquetWriter
import pyarrow as pya
import concurrent.futures
import threading
# src_file = "/Users/cenv1069/Documents/data/datasets/JRC/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0.tif"

# big file
# src_file="/Users/cenv1069/Documents/data/datasets/JRC/GHS_BUILT_C_MSZ_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19/GHS_BUILT_C_MSZ_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19.tif"
# big file
# src_file = "/Users/cenv1069/Documents/data/datasets/mining/Global_mining/v2/global_miningarea_v2_30arcsecond.tif"

# src_file = "/Users/cenv1069/Documents/data/datasets/JRC/S_10m/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19.tif"

# src_file = "/Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19.tif"
# 
# src_file = "Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R9_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R9_C19.tif"

# smaller size file
# src_file="/Users/cenv1069/Documents/data/datasets/JRC/S_1000m/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19.tif"
in_path="/Users/cenv1069/Documents/agriculture/mapspam/spam2017v2r1_ssa_yield"
if in_path[len(in_path)-1]!= '/':
    in_path=in_path+'/'
in_paths = [x for x in glob(in_path + "**", recursive=True) if search(pattern = r"(.ti[f]{1,2}$)|(.nc$)", string = x)]
in_paths[0]
str

Working workflows below

# inspired by : https://rasterio.readthedocs.io/en/stable/topics/virtual-warping.html

dst_crs = CRS.from_epsg(4326)
print(dst_crs)
EPSG:4326
vrt_options = {
    # 'resampling': Resampling.cubic,
    'crs': dst_crs,
    # 'transform': dst_transform,
    # 'height': dst_height,
    # 'width': dst_width,
}

out_path_vrt = "test_med.parquet"

rast_schema = schema([('lon',float32())
                    ,('lat',float32())
                    ,('band_var',float32())
                    ])

rast_schema.with_metadata({
        "lon" : "Longitude coordinate",
        "lat" : "Latitude coordinate",
        "band_var" : "Value associated",
                            })
lon: float
lat: float
band_var: float
-- schema metadata --
lon: 'Longitude coordinate'
lat: 'Latitude coordinate'
band_var: 'Value associated'

Using a virtual Warper and windows

out_path_vrt
'test_med.parquet'
%%time

with ParquetWriter(out_path_vrt, rast_schema) as writer:
    with rs.open(src_file) as src:
        print(src.shape)
        # raise
        src_crs = src.crs
        if len(src.nodatavals)>1:
            nodata = src.nodatavals[0]
        else :
            nodata = src.nodatavals

        print("Detected source crs : ", src_crs)
        print("No data value : ", nodata)

        with WarpedVRT(src, **vrt_options) as vrt:
            # At this point 'vrt' is a full dataset with dimensions,
            # CRS, and spatial extent matching 'vrt_options'.
            # Read all data into memory.
            # data = vrt.read()
            # Process the dataset in chunks.  Likely not very efficient.
            
            win_transfrom = vrt.window_transform

            for _, window in vrt.block_windows():
                # print(src.crs)
                band1 = vrt.read(window=window)

                height = band1.shape[1]
                width = band1.shape[2]
                cols, rows = meshgrid(arange(width), arange(height))

                xs, ys = xy(
                    transform = win_transfrom(window),
                    rows=rows,
                    cols=cols)

                lons = array(xs)
                lats = array(ys)
                
                out = DataFrame({"band_var" : array(band1).flatten()
                                        ,'lon': lons.flatten()
                                        ,'lat': lats.flatten()})
                
                out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)
                out.drop(index=out.loc[out.band_var<=0].index,inplace=True)
                # print(out.shape)
                # print(out.head())

                if out.shape[0]!=0:
                    writer.write_table(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))

            # # # Dump the aligned data into a new file.  A VRT representing
            # # # this transformation can also be produced by switching
            # # # to the VRT driver.
            # # directory, name = os.path.split(path)
            # # outfile = os.path.join(directory, 'aligned-{}'.format(name))
            # # rio_shutil.copy(vrt, outfile, driver='GTiff')
(1800000, 3608200)
Detected source crs :  ESRI:54009
No data value :  (255.0,)
(1, 128, 512)
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
File <timed exec>:27

RuntimeError: No active exception to reraise

Using a classic window approach

# %%time

with ParquetWriter(out_path_vrt, rast_schema) as writer:
        with rs.open(src_file) as src:
            
            src_crs = src.crs
            win_transfrom = src.window_transform
            
            transformer = Transformer.from_crs(str(src_crs), 'EPSG:4326', always_xy=True)
            
            if len(src.nodatavals)>1:
                nodata = src.nodatavals[0]
            else :
                nodata = src.nodatavals

            print("No data value : ", nodata)
            print("Detected source crs : ", src_crs)
            
            # Process the dataset in chunks.  Likely not very efficient.
            for ij, window in src.block_windows():
                # print(window)
                # print(src.crs)
                band1 = src.read(window=window)
                # print(band1[0])
                height = band1.shape[1]
                width = band1.shape[2]
                cols, rows = meshgrid(arange(width), arange(height))
                # print(win_transfrom(window))
                xs, ys = xy(
                    transform = win_transfrom(window),
                    rows=rows,
                    cols=cols)
                
                # print(xs,ys)
                
                lons,lats = transformer.transform(array(xs),array(ys))
                # print(lons.shape)
                # print(lats.shape)
                # print(len(array(band1).flatten()))
                # print(len(lons.flatten()))
                
                out = DataFrame({'lon': lons.flatten(),
                                    'lat': lats.flatten(),
                                    "band_var" : array(band1[0,:,:]).flatten(),
                                    })
                
                out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)
                out.drop(index=out.loc[out.band_var<=0].index,inplace=True)
                
                # print(out.shape)
                # print(out.head())
                
                if out.shape[0]!=0:
                        writer.write_table(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))
test_data = ib.read_parquet(out_path_vrt)
print(test_data.count())
test_data.head()
test_data.band_var.max()
# test_data.select("lon","lat")
# test_xy = test_data.select("lon","lat").to_pandas()
# gpd.GeoSeries(gpd.points_from_xy(test_xy["lon"],test_xy["lat"],crs="epsg:4326")).explore()

Custom window

in_path = src_file
include = False

out_path_cwin = f"rast_convert_cwin.parquet"
%%time

with ParquetWriter(out_path_cwin, rast_schema) as writer:
    with rs.open(src_file) as src:
        
        src_crs = src.crs
        if len(src.nodatavals)>1:
            nodata = src.nodatavals[0]
        else :
            nodata = src.nodatavals

        print("No data value : ", nodata)
        print("Detected source crs : ", src_crs)

        with WarpedVRT(src, **vrt_options) as vrt:
            # At this point 'vrt' is a full dataset with dimensions,
            # CRS, and spatial extent matching 'vrt_options'.
            # Read all data into memory.
            # data = vrt.read()
            # Process the dataset in chunks.  Likely not very efficient.
            
            win_transfrom = vrt.window_transform

            print(vrt.shape)

            # select window size 
            win_ratio = vrt.shape[0]/vrt.shape[1]
            win_width = int(np.min([vrt.shape[1],3500]))
            win_height = int(win_width*win_ratio)

            win_w = int(vrt.shape[1]/win_width)
            win_h = int(vrt.shape[0]/win_height)

            for (i,j) in itertools.product(range(win_w+1),range(win_h+1)):
                
                if i==win_w:
                    width = vrt.shape[1]-win_w*win_width
                else : 
                    width = win_width
                
                if j==win_h:
                    height = vrt.shape[0]-win_h*win_height
                else : 
                    height = win_height
            
                window = Window(col_off=i*win_width,row_off=j*win_height,width=width,height=height)
                
                band1 = vrt.read(window=window)
                
                # height = band1.shape[1]
                # width = band1.shape[2]
                cols, rows = meshgrid(arange(width), arange(height))

                xs, ys = xy(
                    transform = win_transfrom(window),
                    rows=rows,
                    cols=cols)

                lons = array(xs)
                lats = array(ys)
                
                out = DataFrame({"band_var" : array(band1).flatten()
                                        ,'lon': lons.flatten()
                                        ,'lat': lats.flatten()})
                
                out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)
                out.drop(index=out.loc[out.band_var<=0].index,inplace=True)
                # print(out.shape)
                # print(out.head())

                if out.shape[0]!=0:
                    writer.write(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))
            
        # # # Dump the aligned data into a new file.  A VRT representing
        # # # this transformation can also be produced by switching
        # # # to the VRT driver.
        # # directory, name = os.path.split(path)
        # # outfile = os.path.join(directory, 'aligned-{}'.format(name))
        # # rio_shutil.copy(vrt, outfile, driver='GTiff')
test_vrt = ib.read_parquet(out_path_vrt)
test_cwin = ib.read_parquet(out_path_cwin)
test_vrt.describe()
test_cwin.describe()
ib.difference(test_vrt,test_cwin).count()

Processing a huge file

# using the module from the script
! python ../../src/scalenav/rast_converter.py /Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19.tif ../../test_big.parquet
# using the module from the build package 
! python -m scalenav.rast_converter /Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19.tif ../../test_big.parquet

Reading a huge processed file

conn = ib.connect("duckdb://")
test_file = conn.read_parquet('/Users/cenv1069/Documents/data/datasets/JRC/H_AGBH_100.parquet')
test_file.band_var.min()
test_file.head()
test_file.count()
# with rs.open("/Users/cenv1069/Documents/data/datasets/JRC/S_1000m/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R7_C22/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R7_C22.tif") as src:
#     # Process the dataset in chunks.  Likely not very efficient.
#     print(src.height)
#     print(src.width)
    
#     for ij, window in src.block_windows():
#         print((ij, window))
import time
from tqdm import tqdm
from datetime import datetime

def process_data():
    # Simulate data processing loop
    for _ in tqdm(range(100), desc="Processing data..."):
        time.sleep(0.1)  # Simulating some processing time for each iteration

def show_clock():
    with tqdm(total=0, bar_format="{desc}", dynamic_ncols=True) as pbar:
        while True:
            # Get current time
            now = datetime.now().strftime("%H:%M:%S")
            # Update the tqdm bar with the current time
            pbar.set_description(f"Clock: {now}")
            time.sleep(1)  # Update every second


# Run the clock in the background
import threading
clock_thread = threading.Thread(target=show_clock, daemon=True)
clock_thread.start()

# Run the data processing function
process_data()

# Optionally, wait for the clock thread to finish if needed
clock_thread.join()

GDAL translate

slow af

# src_file = "/Users/cenv1069/Documents/data/datasets/JRC/S_1000m/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19.tif"
# # Open the GeoTIFF File:
# dataset = gdal.Open(src_file)
# # Convert GeoTIFF to XYZ Format:
# gdal.Translate('output.xyz', dataset, format='XYZ')
# # Read XYZ into Pandas DataFrame:
# df = pd.read_csv('output.xyz', sep=' ', header=None, names=['x', 'y', 'value'])

Parallel tests

include = False
num_workers = 3
out_fold = "results"
if not os.path.exists(out_fold):
    # os.rmdir(out_fold)
    os.mkdir(out_fold)
out_path = f"rast_convert_par_{num_workers}.parquet"
out_files = [out_fold + "/" + out_fold + "_" + str(i) + ".parquet" for i in range(num_workers)]
out_files
['results/results_0.parquet',
 'results/results_1.parquet',
 'results/results_2.parquet']
with open(src_file) as src:
      # print(type(src))
      if src.crs is not None:
            src_crs = src.crs
            print("Detected source crs : ", src_crs)
      else: 
            src_crs = 'EPSG:4326'
            print("Assuming source crs : ", src_crs)

      if len(src.nodatavals)>1:
            nodata = src.nodatavals[0]
      else :
            nodata = src.nodatavals

      print("No data value : ", nodata)      
            # At this point 'vrt' is a full dataset with dimensions,
            # CRS, and spatial extent matching 'vrt_options'.
            # Read all data into memory.
            # data = vrt.read()
            # Process the dataset in chunks.  Likely not very efficient.

      

      read_lock = threading.Lock() # not necessary ?
      # write_lock = threading.Lock()
      
      readers = [WarpedVRT(src, **vrt_options) for i in range(num_workers)]
      
      windows = [window for ij, window in readers[0].block_windows()]

      writers = [ParquetWriter(out_file, rast_schema) for out_file in out_files]

      # for w in writers:
      #       w.write(Table.from_pandas(df=pd.DataFrame(data={"lon" : [1.0,2.0,3.0],"lat" : [4.0,5.0,6.0],"band_var" : np.random.random(3)}),schema = rast_schema))
      # for w in writers:
      #       w.close()
      # raise Warning

      def process(writer,vrt,windows):
            print("Processing!")
            win_transfrom = vrt.window_transform
            for window in windows:

                  band1 = vrt.read(window=window)
                  
                  height = band1.shape[1]
                  width = band1.shape[2]
                  cols, rows = meshgrid(arange(width), arange(height))

                  xs, ys = xy(
                        transform = win_transfrom(window),
                        rows=rows,
                        cols=cols)

                  lons = array(xs)
                  lats = array(ys)
                  
                  out = DataFrame({"band_var" : array(band1).flatten()
                                          ,'lon': lons.flatten()
                                          ,'lat': lats.flatten()})
                  
                  out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)
                  if not include:
                        out.drop(index=out.loc[out.band_var<=0].index,inplace=True)
                  
                  if out.shape[0]!=0:
                        writer.write_table(Table.from_pandas(df=out,schema = rast_schema,nthreads=1,preserve_index=False,safe=True))

      # We map the process() function over the list of
      # windows.
      with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
      ) as executor:
            # executor.map(process, writers,readers,itertools.batched(windows,num_workers))
            for (w,v,win) in zip(writers,readers,itertools.batched(windows,num_workers)):
                  executor.submit(process(w,v,win))
                  
      for w in writers:
            w.close()
      for vrt in readers:
            vrt.close()
      # print(writers)
Detected source crs :  ESRI:54009
No data value :  (65535.0,)
Processing!
Processing!
Processing!
rast_convert_1 = ib.read_parquet(out_files)
print(rast_convert_1.count())
rast_convert_1

┌────┐
│ 12 │
└────┘
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━┓
┃ lon        lat        band_var ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━┩
│ float32float32float32  │
├───────────┼───────────┼──────────┤
│ -0.00352416.2559641404.0 │
│ -0.00077816.255051275.0 │
│  0.00013816.25505129.0 │
│ -0.00810016.251389836.0 │
│ -0.00718516.251389836.0 │
│ -0.00901516.250473531.0 │
│ -0.01908416.246813167.0 │
│ -0.01999916.245897323.0 │
│ -0.01999916.244982255.0 │
│ -0.01908416.244982242.0 │
│          │
└───────────┴───────────┴──────────┘
rast_convert_par = ib.read_parquet("rast_convert_par.parquet")
print(rast_convert_par.count())
rast_convert_par.head()
ib.difference(rast_convert_1,rast_convert_par)