{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Data ingestion and processing" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import itertools\n", "import os \n", "import pathlib\n", "from glob import glob\n", "from re import search\n", "\n", "from osgeo import gdal # type: ignore\n", "import rasterio as rs \n", "from rasterio.windows import Window\n", "from rasterio.transform import xy\n", "from rasterio.vrt import WarpedVRT\n", "from rasterio.crs import CRS\n", "from rasterio.windows import Window\n", "from rasterio import open \n", "\n", "from pyproj import Transformer\n", "import pandas as pd\n", "from pandas import DataFrame\n", "import geopandas as gpd\n", "import numpy as np\n", "from numpy import array,meshgrid,arange,log\n", "from shapely import box\n", "\n", "from scalenav.rast_converter import rast_convert_core,check_crs,check_nodata,check_path\n", "\n", "import ibis as ib\n", "ib.options.interactive = True" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# With dask\n", "\n", "import dask as dk\n", "import dask.array as da\n", "import dask.dataframe as dd\n", "# import dask_image as dki\n", "import xarray as xr\n", "import rioxarray as rx\n", "\n", "from pyarrow import float16,float32,schema,field,uint16,table,Table\n", "from pyarrow.parquet import ParquetWriter\n", "import pyarrow as pya" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import concurrent.futures\n", "import threading" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "import scalenav.oop as snoo\n", "conn = snoo.sn_connect()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# src_file = \"/Users/cenv1069/Documents/data/datasets/mining/Global_mining/v2/global_miningarea_v2_30arcsecond.tif\"\n", "\n", "# src_file = \"/Users/cenv1069/Documents/data/datasets/JRC/S_10m/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19/GHS_BUILT_S_NRES_E2018_GLOBE_R2023A_54009_10_V1_0_R8_C19.tif\"\n", "\n", "# copernicus\n", "src_file = \"/Users/cenv1069/Documents/data/datasets/copernicus/raw/LandCover2018_100m_global_v3/PROBAV_LC100_global_v3.0.1_2018-conso_BuiltUp-CoverFraction-layer_EPSG-4326.tif\"\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Source preview" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from scalenav.rast_converter import infer_dtype\n", "\n", "with rs.open(src_file) as src:\n", " print(src.dtypes)\n", " print(infer_dtype(src))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## sm" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "in_path=\"/Users/cenv1069/Documents/agriculture/mapspam/spam2017v2r1_ssa_yield\"\n", "if in_path[len(in_path)-1]!= '/':\n", " in_path=in_path+'/'\n", "in_paths = [x for x in glob(in_path + \"**\", recursive=True) if search(pattern = r\"(.ti[f]{1,2}$)|(.nc$)\", string = x)]\n", "in_paths[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Working workflows below" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# inspired by : https://rasterio.readthedocs.io/en/stable/topics/virtual-warping.html\n", "dst_crs = CRS.from_epsg(4326)\n", "print(dst_crs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "vrt_options = {\n", " # 'resampling': Resampling.cubic,\n", " 'crs': dst_crs,\n", " # 'transform': dst_transform,\n", " # 'height': dst_height,\n", " # 'width': dst_width,\n", "}\n", "\n", "out_path_vrt = \"test_med.parquet\"\n", "\n", "rast_schema = schema([('lon',float32())\n", " ,('lat',float32())\n", " ,('band_var',float32())\n", " ])\n", "\n", "rast_schema.with_metadata({\n", " \"lon\" : \"Longitude coordinate\",\n", " \"lat\" : \"Latitude coordinate\",\n", " \"band_var\" : \"Value associated\",\n", " })\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Using a virtual Warper and windows" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "out_path_vrt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "with ParquetWriter(out_path_vrt, rast_schema) as writer:\n", " with rs.open(src_file) as src:\n", " print(src.shape)\n", " # raise\n", " src_crs = src.crs\n", " if len(src.nodatavals)>1:\n", " nodata = src.nodatavals[0]\n", " else :\n", " nodata = src.nodatavals\n", "\n", " print(\"Detected source crs : \", src_crs)\n", " print(\"No data value : \", nodata)\n", "\n", " with WarpedVRT(src, **vrt_options) as vrt:\n", " # At this point 'vrt' is a full dataset with dimensions,\n", " # CRS, and spatial extent matching 'vrt_options'.\n", " # Read all data into memory.\n", " # data = vrt.read()\n", " # Process the dataset in chunks. Likely not very efficient.\n", " \n", " win_transfrom = vrt.window_transform\n", "\n", " for _, window in vrt.block_windows():\n", " \n", " # print(window)\n", " \n", " band1 = vrt.read(window=window)\n", "\n", " height = band1.shape[1]\n", " width = band1.shape[2]\n", " cols, rows = meshgrid(arange(width), arange(height))\n", "\n", " xs, ys = xy(\n", " transform = win_transfrom(window),\n", " rows=rows,\n", " cols=cols)\n", "\n", " lons = array(xs)\n", " lats = array(ys)\n", " \n", " out = DataFrame({\"band_var\" : array(band1).flatten()\n", " ,'lon': lons.flatten()\n", " ,'lat': lats.flatten()})\n", " \n", " out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)\n", " out.drop(index=out.loc[out.band_var<=0].index,inplace=True)\n", " # print(out.shape)\n", " # print(out.head())\n", "\n", " if out.shape[0]!=0:\n", " writer.write_table(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))\n", "\n", " # # # Dump the aligned data into a new file. A VRT representing\n", " # # # this transformation can also be produced by switching\n", " # # # to the VRT driver.\n", " # # directory, name = os.path.split(path)\n", " # # outfile = os.path.join(directory, 'aligned-{}'.format(name))\n", " # # rio_shutil.copy(vrt, outfile, driver='GTiff')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Using a classic window approach" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# %%time\n", "\n", "with ParquetWriter(out_path_vrt, rast_schema) as writer:\n", " with rs.open(src_file) as src:\n", " \n", " src_crs = src.crs\n", " win_transfrom = src.window_transform\n", " \n", " transformer = Transformer.from_crs(str(src_crs), 'EPSG:4326', always_xy=True)\n", " \n", " if len(src.nodatavals)>1:\n", " nodata = src.nodatavals[0]\n", " else :\n", " nodata = src.nodatavals\n", "\n", " print(\"No data value : \", nodata)\n", " print(\"Detected source crs : \", src_crs)\n", " \n", " # Process the dataset in chunks. Likely not very efficient.\n", " for ij, window in src.block_windows():\n", " \n", " band1 = src.read(window=window)\n", "\n", " height = band1.shape[1]\n", " width = band1.shape[2]\n", " cols, rows = meshgrid(arange(width), arange(height))\n", "\n", " xs, ys = xy(\n", " transform = win_transfrom(window),\n", " rows=rows,\n", " cols=cols)\n", " \n", " lons,lats = transformer.transform(array(xs),array(ys))\n", " \n", " out = DataFrame({'lon': lons.flatten(),\n", " 'lat': lats.flatten(),\n", " \"band_var\" : array(band1[0,:,:]).flatten(),\n", " })\n", " \n", " out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)\n", " out.drop(index=out.loc[out.band_var<=0].index,inplace=True)\n", "\n", " if out.shape[0]!=0:\n", " writer.write_table(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_data = ib.read_parquet(out_path_vrt)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(test_data.count())\n", "test_data.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_data.band_var.max()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# test_data.select(\"lon\",\"lat\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# test_xy = test_data.select(\"lon\",\"lat\").to_pandas()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# gpd.GeoSeries(gpd.points_from_xy(test_xy[\"lon\"],test_xy[\"lat\"],crs=\"epsg:4326\")).explore()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Custom window" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "in_path = src_file\n", "include = False\n", "\n", "out_path_cwin = f\"rast_convert_cwin.parquet\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "with ParquetWriter(out_path_cwin, rast_schema) as writer:\n", " with rs.open(src_file) as src:\n", " \n", " src_crs = src.crs\n", " if len(src.nodatavals)>1:\n", " nodata = src.nodatavals[0]\n", " else :\n", " nodata = src.nodatavals\n", "\n", " print(\"No data value : \", nodata)\n", " print(\"Detected source crs : \", src_crs)\n", "\n", " with WarpedVRT(src, **vrt_options) as vrt:\n", " # At this point 'vrt' is a full dataset with dimensions,\n", " # CRS, and spatial extent matching 'vrt_options'.\n", " # Read all data into memory.\n", " # data = vrt.read()\n", " # Process the dataset in chunks. Likely not very efficient.\n", " \n", " win_transfrom = vrt.window_transform\n", "\n", " print(vrt.shape)\n", "\n", " # select window size \n", " win_ratio = vrt.shape[0]/vrt.shape[1]\n", " win_width = int(np.min([vrt.shape[1],3500]))\n", " win_height = int(win_width*win_ratio)\n", "\n", " win_w = int(vrt.shape[1]/win_width)\n", " win_h = int(vrt.shape[0]/win_height)\n", "\n", " for (i,j) in itertools.product(range(win_w+1),range(win_h+1)):\n", " \n", " if i==win_w:\n", " width = vrt.shape[1]-win_w*win_width\n", " else : \n", " width = win_width\n", " \n", " if j==win_h:\n", " height = vrt.shape[0]-win_h*win_height\n", " else : \n", " height = win_height\n", " \n", " window = Window(col_off=i*win_width,row_off=j*win_height,width=width,height=height)\n", " \n", " band1 = vrt.read(window=window)\n", " \n", " # height = band1.shape[1]\n", " # width = band1.shape[2]\n", " cols, rows = meshgrid(arange(width), arange(height))\n", "\n", " xs, ys = xy(\n", " transform = win_transfrom(window),\n", " rows=rows,\n", " cols=cols)\n", "\n", " lons = array(xs)\n", " lats = array(ys)\n", " \n", " out = DataFrame({\"band_var\" : array(band1).flatten()\n", " ,'lon': lons.flatten()\n", " ,'lat': lats.flatten()})\n", " \n", " out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)\n", " out.drop(index=out.loc[out.band_var<=0].index,inplace=True)\n", " # print(out.shape)\n", " # print(out.head())\n", "\n", " if out.shape[0]!=0:\n", " writer.write(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))\n", " \n", " # # # Dump the aligned data into a new file. A VRT representing\n", " # # # this transformation can also be produced by switching\n", " # # # to the VRT driver.\n", " # # directory, name = os.path.split(path)\n", " # # outfile = os.path.join(directory, 'aligned-{}'.format(name))\n", " # # rio_shutil.copy(vrt, outfile, driver='GTiff')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_vrt = ib.read_parquet(out_path_vrt)\n", "test_cwin = ib.read_parquet(out_path_cwin)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_vrt.describe()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_cwin.describe()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ib.difference(test_vrt,test_cwin).count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Processing a huge file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # using the module from the script\n", "# ! python ../../src/scalenav/rast_converter.py /Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19.tif ../../test_big.parquet" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # using the module from the build package \n", "# ! python -m scalenav.rast_converter /Users/cenv1069/Documents/data/datasets/JRC/S_100m/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19/GHS_BUILT_S_NRES_E2015_GLOBE_R2023A_54009_100_V1_0_R8_C19.tif ../../test_big.parquet" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reading a huge processed file" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "# '../datasets/mapspam/processed/spam2020_global_production.parquet'\n", "test_file = conn.read_parquet(\"/Users/cenv1069/Documents/data/rast_convert_result/*\")" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
┏━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━┓\n",
       "┃ lon      lat           band_var ┃\n",
       "┡━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━┩\n",
       "│ float32float32uint16   │\n",
       "├─────────┼──────────────┼──────────┤\n",
       "│  -350.01.999650e+061404 │\n",
       "│  -350.01.999550e+06167 │\n",
       "│  -250.01.999550e+06390 │\n",
       "│   -50.01.999450e+06275 │\n",
       "│    50.01.999450e+0629 │\n",
       "└─────────┴──────────────┴──────────┘\n",
       "
\n" ], "text/plain": [ "┏━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━┓\n", "┃\u001b[1m \u001b[0m\u001b[1mlon\u001b[0m\u001b[1m \u001b[0m\u001b[1m \u001b[0m┃\u001b[1m \u001b[0m\u001b[1mlat\u001b[0m\u001b[1m \u001b[0m\u001b[1m \u001b[0m┃\u001b[1m \u001b[0m\u001b[1mband_var\u001b[0m\u001b[1m \u001b[0m┃\n", "┡━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━┩\n", "│ \u001b[2mfloat32\u001b[0m │ \u001b[2mfloat32\u001b[0m │ \u001b[2muint16\u001b[0m │\n", "├─────────┼──────────────┼──────────┤\n", "│ \u001b[1;36m-350.0\u001b[0m │ \u001b[1;36m1.999650e+06\u001b[0m │ \u001b[1;36m1404\u001b[0m │\n", "│ \u001b[1;36m-350.0\u001b[0m │ \u001b[1;36m1.999550e+06\u001b[0m │ \u001b[1;36m167\u001b[0m │\n", "│ \u001b[1;36m-250.0\u001b[0m │ \u001b[1;36m1.999550e+06\u001b[0m │ \u001b[1;36m390\u001b[0m │\n", "│ \u001b[1;36m-50.0\u001b[0m │ \u001b[1;36m1.999450e+06\u001b[0m │ \u001b[1;36m275\u001b[0m │\n", "│ \u001b[1;36m50.0\u001b[0m │ \u001b[1;36m1.999450e+06\u001b[0m │ \u001b[1;36m29\u001b[0m │\n", "└─────────┴──────────────┴──────────┘" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "test_file.head()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n"
      ],
      "text/plain": []
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "┌───────┐\n",
       "│ \u001b[1;36m14461\u001b[0m │\n",
       "└───────┘"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "test_file.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "
\n"
      ],
      "text/plain": []
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "┌──────┐\n",
       "│ \u001b[1;36m9999\u001b[0m │\n",
       "└──────┘"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "test_file.band_var.max()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "test_file.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "test_file.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sample_df = test_file.sample(0.001).execute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sample_df.describe()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# with rs.open(\"/Users/cenv1069/Documents/data/datasets/JRC/S_1000m/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R7_C22/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R7_C22.tif\") as src:\n",
    "#     # Process the dataset in chunks.  Likely not very efficient.\n",
    "#     print(src.height)\n",
    "#     print(src.width)\n",
    "    \n",
    "#     for ij, window in src.block_windows():\n",
    "#         print((ij, window))\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "from tqdm import tqdm\n",
    "from datetime import datetime\n",
    "\n",
    "def process_data():\n",
    "    # Simulate data processing loop\n",
    "    for _ in tqdm(range(100), desc=\"Processing data...\"):\n",
    "        time.sleep(0.1)  # Simulating some processing time for each iteration\n",
    "\n",
    "def show_clock():\n",
    "    with tqdm(total=0, bar_format=\"{desc}\", dynamic_ncols=True) as pbar:\n",
    "        while True:\n",
    "            # Get current time\n",
    "            now = datetime.now().strftime(\"%H:%M:%S\")\n",
    "            # Update the tqdm bar with the current time\n",
    "            pbar.set_description(f\"Clock: {now}\")\n",
    "            time.sleep(1)  # Update every second\n",
    "\n",
    "\n",
    "# Run the clock in the background\n",
    "import threading\n",
    "clock_thread = threading.Thread(target=show_clock, daemon=True)\n",
    "clock_thread.start()\n",
    "\n",
    "# Run the data processing function\n",
    "process_data()\n",
    "\n",
    "# Optionally, wait for the clock thread to finish if needed\n",
    "clock_thread.join()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## GDAL translate\n",
    "slow af"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# src_file = \"/Users/cenv1069/Documents/data/datasets/JRC/S_1000m/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19/GHS_BUILT_S_NRES_E2020_GLOBE_R2023A_54009_1000_V1_0_R8_C19.tif\"\n",
    "# # Open the GeoTIFF File:\n",
    "# dataset = gdal.Open(src_file)\n",
    "# # Convert GeoTIFF to XYZ Format:\n",
    "# gdal.Translate('output.xyz', dataset, format='XYZ')\n",
    "# # Read XYZ into Pandas DataFrame:\n",
    "# df = pd.read_csv('output.xyz', sep=' ', header=None, names=['x', 'y', 'value'])\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Parallel tests"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "include = False\n",
    "num_workers = 7\n",
    "out_fold = \"results\"\n",
    "if not os.path.exists(out_fold):\n",
    "    # os.rmdir(out_fold)\n",
    "    os.mkdir(out_fold)\n",
    "    \n",
    "out_path = f\"rast_convert_par_{num_workers}.parquet\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "out_files = [out_fold + \"/\" + out_fold + \"_\" + str(i) + \".parquet\" for i in range(num_workers)]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# out_files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "with open(src_file) as src:\n",
    "\n",
    "      src_crs = check_crs(src)\n",
    "\n",
    "      print(\"Source CRS : \", src_crs)\n",
    "\n",
    "      nodata = check_nodata(src)\n",
    "      \n",
    "      print(\"No data value : \", nodata)  \n",
    "      \n",
    "      readers = [WarpedVRT(src, **vrt_options) for i in range(num_workers)]\n",
    "      writers = [ParquetWriter(out_file, rast_schema) for out_file in out_files]\n",
    "\n",
    "      windows = [window for _, window in readers[0].block_windows()]\n",
    "\n",
    "      batch_size = int(np.ceil(len(windows)/(num_workers)))\n",
    "\n",
    "      print(\"Batch size : \", batch_size)\n",
    "      \n",
    "      batches = itertools.batched(windows,batch_size)\n",
    "\n",
    "      def process(writer,vrt,windows):\n",
    "\n",
    "            win_transfrom = vrt.window_transform\n",
    "\n",
    "            for window in windows:\n",
    "                  \n",
    "                  out = rast_convert_core(vrt,transform=win_transfrom,win=window)\n",
    "\n",
    "                  out.drop(index=out.loc[out.band_var==nodata].index,inplace=True)\n",
    "\n",
    "                  if not include:\n",
    "                        out.drop(index=out.loc[out.band_var<=0].index,inplace=True)\n",
    "                  \n",
    "                  if out.shape[0]!=0:\n",
    "                        writer.write_table(Table.from_pandas(df=out,schema = rast_schema,preserve_index=False,safe=True))\n",
    "\n",
    "      # We map the process() function over the list of\n",
    "      # windows.\n",
    "      with concurrent.futures.ThreadPoolExecutor(\n",
    "            max_workers=num_workers\n",
    "      ) as executor:\n",
    "            for (w,v,win) in zip(writers,readers,batches):\n",
    "                  executor.submit(process(w,v,win))\n",
    "      \n",
    "      for w in writers:\n",
    "            w.close()\n",
    "      for vrt in readers:\n",
    "            vrt.close()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "rast_convert_1 = ib.read_parquet(out_path_vrt)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(rast_convert_1.count())\n",
    "rast_convert_1.head()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# out_files\n",
    "ib.schema([(\"lon\", \"float16\"),(\"lat\",\"float16\"),(\"band_var\",\"float32\")])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.dtype(\"float16\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "rast_convert_par = conn.read_parquet(\n",
    "    \"/Users/cenv1069/Documents/data/S_NRES_10.parquet\",\n",
    "    # \"../../results\"\n",
    "    table_name=\"s_nres\",\n",
    "    schema={\"lon\" : \"\", \"lat\" : \"float32\" ,\"band_var\" : \"float32\"},\n",
    "    )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# print(rast_convert_par.count())\n",
    "# rast_convert_par.cast({\"lon\" : \"float16\"\n",
    "#                        ,\"lat\" : \"float16\"})\n",
    "conn.list_tables()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(ib.difference(rast_convert_1,rast_convert_par).count())\n",
    "ib.difference(rast_convert_1,rast_convert_par)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "diff = ib.difference(rast_convert_1,rast_convert_par).execute()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "diff.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# gpd.GeoSeries.from_xy(diff[\"lon\"],diff[\"lat\"],crs=\"epsg:4326\").explore()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Parallel with IPyparallel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# import time\n",
    "# import ipyparallel as ipp\n",
    "\n",
    "# task_durations = [1] * 25\n",
    "# # request a cluster\n",
    "# with ipp.Cluster() as rc:\n",
    "#     # get a view on the cluster\n",
    "#     view = rc.load_balanced_view()\n",
    "#     # submit the tasks\n",
    "#     asyncresult = view.map_async(time.sleep, task_durations)\n",
    "#     # wait interactively for results\n",
    "#     asyncresult.wait_interactive()\n",
    "#     # retrieve actual results\n",
    "#     result = asyncresult.get()\n",
    "# # at this point, the cluster processes have been shutdown"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}