"udf": "#%%\n\nimport os\nimport sys\nimport zipfile\nimport requests\nimport tempfile\nimport shutil\nimport functools\n\nfrom openeo.udf import inspect\n\ndef download_file(url, path):\n \"\"\"\n Downloads a file from the given URL to the specified path.\n \"\"\"\n response = requests.get(url, stream=True)\n with open(path, \"wb\") as file:\n file.write(response.content)\n\ndef extract_zip_to_temp(zip_path):\n \"\"\"\n Extracts a zip file to a temporary directory.\n \"\"\"\n # Create a temporary directory\n temp_dir = tempfile.mkdtemp()\n\n # Extract the zip file to the temporary directory\n with zipfile.ZipFile(zip_path, \"r\") as zip_ref:\n zip_ref.extractall(temp_dir)\n\n return temp_dir\n\ndef move_top_level_folder_to_destination(temp_dir, destination_dir):\n \"\"\"\n Moves the first top-level folder from the temporary directory to the destination directory.\n Throws an error if the folder already exists at the destination.\n \"\"\"\n # Find the top-level folders inside the extracted zip\n for item in os.listdir(temp_dir):\n item_path = os.path.join(temp_dir, item)\n \n if os.path.isdir(item_path):\n # Check if the folder already exists at destination\n dest_path = os.path.join(destination_dir, item)\n\n if os.path.exists(dest_path):\n # Throw an error if the folder already exists\n raise FileExistsError(f\"Error: The folder '{item}' already exists in the destination directory: {dest_path}\")\n\n # Move the folder out of temp and into the destination directory\n shutil.move(item_path, dest_path)\n\n\ndef add_to_sys_path(folder_path):\n \"\"\"\n Adds the folder path to sys.path.\n \"\"\"\n if folder_path not in sys.path:\n sys.path.append(folder_path)\n\
[email protected]_cache(maxsize=5)\ndef setup_dependencies(dependencies_url):\n \"\"\"\n Main function to download, unzip, move the top-level folder, and add it to sys.path.\n \"\"\"\n # Create a temporary directory for extracted files\n temp_dir = tempfile.mkdtemp()\n \n # Step 1: Download the zip file\n zip_path = os.path.join(temp_dir, \"temp.zip\")\n download_file(dependencies_url, zip_path)\n\n inspect(message=\"Extract dependencies to temp\")\n # Step 2: Extract the zip file to the temporary directory\n extracted_dir = extract_zip_to_temp(zip_path)\n\n # Step 3: Move the first top-level folder (dynamically) to the destination\n destination_dir = os.getcwd() # Current working directory\n inspect(message=\"Move top-level folder to destination\")\n moved_folder = move_top_level_folder_to_destination(extracted_dir, destination_dir)\n\n # Step 4: Add the folder to sys.path\n add_to_sys_path(moved_folder)\n inspect(message=\"Added to the sys path\")\n\n # Clean up the temporary zip file\n os.remove(zip_path)\n shutil.rmtree(temp_dir) # Remove the temporary extraction folder \n\n\nsetup_dependencies(\"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip\")\nimport os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in ['tmp/venv_static', 'tmp/venv']:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ['HOME'] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv('HOME')\n set_home('/tmp')\n user_file = Path.home() / '.config' / 'GPy' / 'user.cfg'\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config['plotting'] = {\n 'library': 'none'\n }\n with open(user_file, 'w') as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n dims = cube.get_array().dims\n result = mogpr(cube.get_array().to_dataset(dim=\"bands\"))\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n return Path(os.path.realpath(__file__)).read_text()\n"
0 commit comments