Data ingestion and processing

import itertools
import os 
import pathlib
from glob import glob
from re import search

from osgeo import gdal # type: ignore
import rasterio as rs 
from rasterio.windows import Window
from rasterio.transform import xy
from rasterio.vrt import WarpedVRT
from rasterio.crs import CRS
from rasterio.windows import Window
from rasterio import open 

from pyproj import Transformer
import pandas as pd
from pandas import DataFrame
import geopandas as gpd
import numpy as np
from numpy import array,meshgrid,arange,log
from shapely import box

from scalenav.rast_converter import rast_convert_core,check_crs,check_nodata,check_path

import ibis as ib
ib.options.interactive = True
# With dask

import dask as dk
import dask.array as da
import dask.dataframe as dd
# import dask_image as dki
import xarray as xr
import rioxarray as rx

from pyarrow import float16,float32,schema,field,uint16,table,Table
from pyarrow.parquet import ParquetWriter
import pyarrow as pya
import concurrent.futures
import threading
import scalenav.oop as snoo
conn = snoo.sn_connect()
# src_file = "/Users/cenv1069/Documents/data/datasets/mining/Global_mining/v2/global_miningarea_v2_30arcsecond.tif"

# src_file = "/Users/cenv1069/Documents/data/datasets/JRC/S_10m/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19.tif"

# copernicus
src_file = "/Users/cenv1069/Documents/data/datasets/copernicus/raw/LandCover2018_100m_global_v3/PROBAV_LC100_global_v3.0.1_2018-conso_BuiltUp-CoverFraction-layer_EPSG-4326.tif"

Source preview

from scalenav.rast_converter import infer_dtype

with rs.open(src_file) as src:
    print(src.dtypes)
    print(infer_dtype(src))
## sm
in_path="/Users/cenv1069/Documents/agriculture/mapspam/spam2017v2r1_ssa_yield"
if in_path[len(in_path)-1]!= '/':
    in_path=in_path+'/'
in_paths = [x for x in glob(in_path + "**", recursive=True) if search(pattern = r"(.ti[f]{1,2}$)|(.nc$)", string = x)]
in_paths[0]

Working workflows below

# inspired by : https://rasterio.readthedocs.io/en/stable/topics/virtual-warping.html
dst_crs = CRS.from_epsg(4326)
print(dst_crs)
vrt_options = {
    # 'resampling': Resampling.cubic,
    'crs': dst_crs,
    # 'transform': dst_transform,
    # 'height': dst_height,
    # 'width': dst_width,
}

out_path_vrt = "test_med.parquet"

rast_schema = schema([('lon',float32())
                    ,('lat',float32())
                    ,('band_var',float32())
                    ])

rast_schema.with_metadata({
        "lon" : "Longitude coordinate",
        "lat" : "Latitude coordinate",
        "band_var" : "Value associated",
                            })

Using a virtual Warper and windows

out_path_vrt
%%time

with ParquetWriter(out_path_vrt, rast_schema) as writer:
    with rs.open(src_file) as src:
        print(src.shape)
        # raise
        src_crs = src.crs
        if len(src.nodatavals)>1:
            nodata = src.nodatavals[0]
        else :
            nodata = src.nodatavals

        print("Detected source crs : ", src_crs)
        print("No data value : ", nodata)

        with WarpedVRT(src, **vrt_options) as vrt:
            # At this point 'vrt' is a full dataset with dimensions,
            # CRS, and spatial extent matching 'vrt_options'.
            # Read all data into memory.
            # data = vrt.read()
            # Process the dataset in chunks.  Likely not very efficient.
            
            win_transfrom = vrt.window_transform

            for _, window in vrt.block_windows():
                
                # print(window)
                
                band1 = vrt.read(window=window)

                height = band1.shape[1]
                width = band1.shape[2]
                cols, rows = meshgrid(arange(width), arange(height))

                xs, ys = xy(
                    transform = win_transfrom(window),
                    rows=rows,
                    cols=cols)

                lons = array(xs)
                lats = array(ys)
                
                out = DataFrame({"band_var" : array(band1).flatten()
                                        ,'lon': lons.flatten()
                                        ,'lat': lats.flatten()})
                
                out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)
                out.drop(index=out.loc[out.band_var<=0].index,inplace=True)
                # print(out.shape)
                # print(out.head())

                if out.shape[0]!=0:
                    writer.write_table(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))

            # # # Dump the aligned data into a new file.  A VRT representing
            # # # this transformation can also be produced by switching
            # # # to the VRT driver.
            # # directory, name = os.path.split(path)
            # # outfile = os.path.join(directory, 'aligned-{}'.format(name))
            # # rio_shutil.copy(vrt, outfile, driver='GTiff')

Using a classic window approach

# %%time

with ParquetWriter(out_path_vrt, rast_schema) as writer:
        with rs.open(src_file) as src:
            
            src_crs = src.crs
            win_transfrom = src.window_transform
            
            transformer = Transformer.from_crs(str(src_crs), 'EPSG:4326', always_xy=True)
            
            if len(src.nodatavals)>1:
                nodata = src.nodatavals[0]
            else :
                nodata = src.nodatavals

            print("No data value : ", nodata)
            print("Detected source crs : ", src_crs)
            
            # Process the dataset in chunks.  Likely not very efficient.
            for ij, window in src.block_windows():
                
                band1 = src.read(window=window)

                height = band1.shape[1]
                width = band1.shape[2]
                cols, rows = meshgrid(arange(width), arange(height))

                xs, ys = xy(
                    transform = win_transfrom(window),
                    rows=rows,
                    cols=cols)
                
                lons,lats = transformer.transform(array(xs),array(ys))
                
                out = DataFrame({'lon': lons.flatten(),
                                    'lat': lats.flatten(),
                                    "band_var" : array(band1[0,:,:]).flatten(),
                                    })
                
                out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)
                out.drop(index=out.loc[out.band_var<=0].index,inplace=True)

                if out.shape[0]!=0:
                        writer.write_table(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))
test_data = ib.read_parquet(out_path_vrt)
print(test_data.count())
test_data.head()
test_data.band_var.max()
# test_data.select("lon","lat")
# test_xy = test_data.select("lon","lat").to_pandas()
# gpd.GeoSeries(gpd.points_from_xy(test_xy["lon"],test_xy["lat"],crs="epsg:4326")).explore()

Custom window

in_path = src_file
include = False

out_path_cwin = f"rast_convert_cwin.parquet"
%%time

with ParquetWriter(out_path_cwin, rast_schema) as writer:
    with rs.open(src_file) as src:
        
        src_crs = src.crs
        if len(src.nodatavals)>1:
            nodata = src.nodatavals[0]
        else :
            nodata = src.nodatavals

        print("No data value : ", nodata)
        print("Detected source crs : ", src_crs)

        with WarpedVRT(src, **vrt_options) as vrt:
            # At this point 'vrt' is a full dataset with dimensions,
            # CRS, and spatial extent matching 'vrt_options'.
            # Read all data into memory.
            # data = vrt.read()
            # Process the dataset in chunks.  Likely not very efficient.
            
            win_transfrom = vrt.window_transform

            print(vrt.shape)

            # select window size 
            win_ratio = vrt.shape[0]/vrt.shape[1]
            win_width = int(np.min([vrt.shape[1],3500]))
            win_height = int(win_width*win_ratio)

            win_w = int(vrt.shape[1]/win_width)
            win_h = int(vrt.shape[0]/win_height)

            for (i,j) in itertools.product(range(win_w+1),range(win_h+1)):
                
                if i==win_w:
                    width = vrt.shape[1]-win_w*win_width
                else : 
                    width = win_width
                
                if j==win_h:
                    height = vrt.shape[0]-win_h*win_height
                else : 
                    height = win_height
            
                window = Window(col_off=i*win_width,row_off=j*win_height,width=width,height=height)
                
                band1 = vrt.read(window=window)
                
                # height = band1.shape[1]
                # width = band1.shape[2]
                cols, rows = meshgrid(arange(width), arange(height))

                xs, ys = xy(
                    transform = win_transfrom(window),
                    rows=rows,
                    cols=cols)

                lons = array(xs)
                lats = array(ys)
                
                out = DataFrame({"band_var" : array(band1).flatten()
                                        ,'lon': lons.flatten()
                                        ,'lat': lats.flatten()})
                
                out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)
                out.drop(index=out.loc[out.band_var<=0].index,inplace=True)
                # print(out.shape)
                # print(out.head())

                if out.shape[0]!=0:
                    writer.write(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))
            
        # # # Dump the aligned data into a new file.  A VRT representing
        # # # this transformation can also be produced by switching
        # # # to the VRT driver.
        # # directory, name = os.path.split(path)
        # # outfile = os.path.join(directory, 'aligned-{}'.format(name))
        # # rio_shutil.copy(vrt, outfile, driver='GTiff')
test_vrt = ib.read_parquet(out_path_vrt)
test_cwin = ib.read_parquet(out_path_cwin)
test_vrt.describe()
test_cwin.describe()
ib.difference(test_vrt,test_cwin).count()

Processing a huge file

# # using the module from the script
# ! python ../../src/scalenav/rast_converter.py /Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19.tif ../../test_big.parquet
# # using the module from the build package 
# ! python -m scalenav.rast_converter /Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19.tif ../../test_big.parquet

Reading a huge processed file

# '../datasets/mapspam/processed/spam2020_global_production.parquet'
test_file = conn.read_parquet("/Users/cenv1069/Documents/data/rast_convert_result/*")
test_file.head()
┏━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ lon      lat           band_var ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ float32float32uint16   │
├─────────┼──────────────┼──────────┤
│  -350.01.999650e+061404 │
│  -350.01.999550e+06167 │
│  -250.01.999550e+06390 │
│   -50.01.999450e+06275 │
│    50.01.999450e+0629 │
└─────────┴──────────────┴──────────┘
test_file.count()

┌───────┐
│ 14461 │
└───────┘
test_file.band_var.max()

┌──────┐
│ 9999 │
└──────┘
test_file.head()
test_file.count()
sample_df = test_file.sample(0.001).execute()
sample_df.describe()
# with rs.open("/Users/cenv1069/Documents/data/datasets/JRC/S_1000m/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R7_C22/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R7_C22.tif") as src:
#     # Process the dataset in chunks.  Likely not very efficient.
#     print(src.height)
#     print(src.width)
    
#     for ij, window in src.block_windows():
#         print((ij, window))
import time
from tqdm import tqdm
from datetime import datetime

def process_data():
    # Simulate data processing loop
    for _ in tqdm(range(100), desc="Processing data..."):
        time.sleep(0.1)  # Simulating some processing time for each iteration

def show_clock():
    with tqdm(total=0, bar_format="{desc}", dynamic_ncols=True) as pbar:
        while True:
            # Get current time
            now = datetime.now().strftime("%H:%M:%S")
            # Update the tqdm bar with the current time
            pbar.set_description(f"Clock: {now}")
            time.sleep(1)  # Update every second


# Run the clock in the background
import threading
clock_thread = threading.Thread(target=show_clock, daemon=True)
clock_thread.start()

# Run the data processing function
process_data()

# Optionally, wait for the clock thread to finish if needed
clock_thread.join()

GDAL translate

slow af

# src_file = "/Users/cenv1069/Documents/data/datasets/JRC/S_1000m/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19.tif"
# # Open the GeoTIFF File:
# dataset = gdal.Open(src_file)
# # Convert GeoTIFF to XYZ Format:
# gdal.Translate('output.xyz', dataset, format='XYZ')
# # Read XYZ into Pandas DataFrame:
# df = pd.read_csv('output.xyz', sep=' ', header=None, names=['x', 'y', 'value'])

Parallel tests

include = False
num_workers = 7
out_fold = "results"
if not os.path.exists(out_fold):
    # os.rmdir(out_fold)
    os.mkdir(out_fold)
    
out_path = f"rast_convert_par_{num_workers}.parquet"
out_files = [out_fold + "/" + out_fold + "_" + str(i) + ".parquet" for i in range(num_workers)]
# out_files
%%time

with open(src_file) as src:

      src_crs = check_crs(src)

      print("Source CRS : ", src_crs)

      nodata = check_nodata(src)
      
      print("No data value : ", nodata)  
      
      readers = [WarpedVRT(src, **vrt_options) for i in range(num_workers)]
      writers = [ParquetWriter(out_file, rast_schema) for out_file in out_files]

      windows = [window for _, window in readers[0].block_windows()]

      batch_size = int(np.ceil(len(windows)/(num_workers)))

      print("Batch size : ", batch_size)
      
      batches = itertools.batched(windows,batch_size)

      def process(writer,vrt,windows):

            win_transfrom = vrt.window_transform

            for window in windows:
                  
                  out = rast_convert_core(vrt,transform=win_transfrom,win=window)

                  out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)

                  if not include:
                        out.drop(index=out.loc[out.band_var<=0].index,inplace=True)
                  
                  if out.shape[0]!=0:
                        writer.write_table(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))

      # We map the process() function over the list of
      # windows.
      with concurrent.futures.ThreadPoolExecutor(
            max_workers=num_workers
      ) as executor:
            for (w,v,win) in zip(writers,readers,batches):
                  executor.submit(process(w,v,win))
      
      for w in writers:
            w.close()
      for vrt in readers:
            vrt.close()
rast_convert_1 = ib.read_parquet(out_path_vrt)
print(rast_convert_1.count())
rast_convert_1.head()
# out_files
ib.schema([("lon", "float16"),("lat","float16"),("band_var","float32")])
ib.dtype("float16")
rast_convert_par = conn.read_parquet(
    "/Users/cenv1069/Documents/data/S_NRES_10.parquet",
    # "../../results"
    table_name="s_nres",
    schema={"lon" : "", "lat" : "float32" ,"band_var" : "float32"},
    )
# print(rast_convert_par.count())
# rast_convert_par.cast({"lon" : "float16"
#                        ,"lat" : "float16"})
conn.list_tables()
print(ib.difference(rast_convert_1,rast_convert_par).count())
ib.difference(rast_convert_1,rast_convert_par)
diff = ib.difference(rast_convert_1,rast_convert_par).execute()
diff.shape
# gpd.GeoSeries.from_xy(diff["lon"],diff["lat"],crs="epsg:4326").explore()

Parallel with IPyparallel

# import time
# import ipyparallel as ipp

# task_durations = [1] * 25
# # request a cluster
# with ipp.Cluster() as rc:
#     # get a view on the cluster
#     view = rc.load_balanced_view()
#     # submit the tasks
#     asyncresult = view.map_async(time.sleep, task_durations)
#     # wait interactively for results
#     asyncresult.wait_interactive()
#     # retrieve actual results
#     result = asyncresult.get()
# # at this point, the cluster processes have been shutdown