Skip to content

optimizer module

Module for parameter optimization

WOFOSTOptimizer

A generalized optimizer for WOFOST simulations using Optuna.

Features: - Parallel Execution: Uses ProcessPoolExecutor to bypass the GIL and utilize all CPU cores. - Memory Efficient: Loads simulation engines (Weather/Soil/Agro) into RAM once and reuses them. - Multi-Objective: Supports optimizing multiple targets simultaneously (Pareto optimization). - Agnostic: Works with both Single-Location Runners and Batch Runners.

Source code in cropengine/optimizer.py
class WOFOSTOptimizer:
    """
    A generalized optimizer for WOFOST simulations using Optuna.

    Features:
    - **Parallel Execution**: Uses ProcessPoolExecutor to bypass the GIL and utilize all CPU cores.
    - **Memory Efficient**: Loads simulation engines (Weather/Soil/Agro) into RAM once and reuses them.
    - **Multi-Objective**: Supports optimizing multiple targets simultaneously (Pareto optimization).
    - **Agnostic**: Works with both Single-Location Runners and Batch Runners.
    """

    def __init__(self, runner, observed_data):
        """
        Instantiate WOFOSTOptimizer.

        Args:
        runner: An instance of WOFOSTCropSimulationRunner or WOFOSTCropSimulationBatchRunner.
        observed_data (pd.DataFrame): Ground truth data used by the loss function.
        """
        self.runner = runner
        self.observed_data = observed_data
        self.is_batch = hasattr(runner, "get_batch_rerunners")
        self.engines = {}

    def _get_sampler(
        self, sampler_input: Union[str, optuna.samplers.BaseSampler, None]
    ) -> optuna.samplers.BaseSampler:
        """
        Helper to resolve the sampler from a string name or object.
        """
        if sampler_input is None:
            return None  # Let Optuna choose default (usually TPESampler)

        if isinstance(sampler_input, optuna.samplers.BaseSampler):
            return sampler_input

        if isinstance(sampler_input, str):
            name = sampler_input.lower().strip()

            try:
                if name == "random":
                    return optuna.samplers.RandomSampler()

                elif name == "tpe":
                    return optuna.samplers.TPESampler()

                elif name == "cmaes":
                    # Requires 'cma' package
                    return optuna.samplers.CmaEsSampler()

                elif name == "nsgaii":
                    return optuna.samplers.NSGAIISampler()

                elif name == "nsgaiii":
                    return optuna.samplers.NSGAIIISampler()

                elif name == "qmc":
                    # Quasi-Monte Carlo (requires Scipy)
                    return optuna.samplers.QMCSampler()

                elif name == "bruteforce":
                    return optuna.samplers.BruteForceSampler()

                elif name == "grid":
                    return optuna.samplers.GridSampler(
                        search_space={}
                    )  # Note: GridSampler requires search space passed later or usually managed by study

                elif name == "botorch":
                    # Requires 'botorch' package
                    from optuna.integration import BoTorchSampler

                    return BoTorchSampler()

                elif name == "gp":
                    # Gaussian Process Sampler (Requires 'botorch' & 'scipy')
                    return optuna.samplers.GPSampler()

                else:
                    raise ValueError(f"Unknown sampler name: '{sampler_input}'.")

            except ImportError as e:
                # Catch missing dependency errors (e.g., missing botorch or cma)
                raise ImportError(
                    f"Could not initialize sampler '{name}'. Missing dependency: {e}. Please install the required package (e.g., 'pip install botorch' or 'pip install cma')."
                )

        raise TypeError(
            "Sampler must be a string name or an optuna.samplers.BaseSampler object."
        )

    def get_best_params(self, study: optuna.Study, search_space: Callable) -> Dict:
        """
        Retrieves the optimized parameters from the study, reconstructing any
        complex structures (lists/tables) defined in the search space.

        Args:
            study (optuna.Study): The completed optimization study.
            search_space (callable): The original search space function used for optimization.
                                     Required to reconstruct complex parameters (lists/tables)
                                     from the scalar values stored in the study.

        Returns:
            dict: A dictionary of parameter overrides (e.g., {'crop_params': {...}})
                  containing the best values found during optimization.
        """
        best_overrides = search_space(FixedTrial(study.best_params))

        return best_overrides

    def optimize(
        self,
        search_space: Callable[[optuna.Trial], Dict],
        loss_func: Callable[[pd.DataFrame, pd.DataFrame], Union[float, List[float]]],
        n_trials: int = 100,
        n_workers: int = 4,
        sampler: Optional[optuna.samplers.BaseSampler] = None,
        directions: Optional[List[str]] = None,
        output_folder: Optional[str] = None,
    ) -> optuna.Study:
        """
        Runs the optimization loop.

        Args:
            search_space (callable): A function that takes an Optuna `trial` object
                                     and returns a dictionary of parameter overrides.
                                     Example structure:
                                     {'crop_params': {'TSUM1': 1000}, 'soil_params': {...}}

            loss_func (callable): A function that takes (df_simulated, df_observed).
                                  Returns a float (single-objective) or list of floats (multi-objective).

            n_trials (int): Number of optimization trials to run.

            n_workers (int): Number of parallel processes to spawn.

            sampler (str | optuna.samplers.BaseSampler | None):
                The optimization strategy. Supported strings:

                **Standard:**
                - "TPE": Tree-structured Parzen Estimator (Default, good general purpose).
                - "Random": Pure random search.

                **Advanced (May require extra packages):**
                - "GP": Gaussian Process Sampler. Excellent for expensive simulations. (Requires `botorch`).
                - "CmaEs": Covariance Matrix Adaptation. Good for continuous global optima. (Requires `cma`).
                - "BoTorch": Bayesian Optimization. (Requires `botorch`).

                **Multi-Objective:**
                - "NSGAII": Standard for Pareto optimization.
                - "NSGAIII": For many-objective problems (3+ targets).

                **Grid/Deterministic:**
                - "BruteForce": Tries ALL combinations.
                - "Grid": Tries specified grid points.
                - "QMC": Quasi-Monte Carlo.

            directions (list[str]): Optimization directions.
                                    Default is ["minimize"].
                                    For multi-objective, use e.g., ["minimize", "maximize"].

            output_folder (str, optional): Path to a folder where simulation results
                                           for EACH trial will be saved (e.g., 'trial_0.csv').
                                           If None, results are not saved to disk.

        Returns:
            optuna.Study: The completed study object containing best params and trials.
        """
        # 1. SETUP OUTPUT FOLDER
        if output_folder:
            os.makedirs(output_folder, exist_ok=True)
            print(f"[OPT] Saving all trial outputs to: {output_folder}")

        # 2. RESOLVE SAMPLER
        optuna_sampler = self._get_sampler(sampler)
        if optuna_sampler:
            print(f"[OPT] Using Sampler: {optuna_sampler.__class__.__name__}")

        # 3. PRE-LOADING PHASE
        print("[OPT] Loading simulation engines...")
        if not self.engines:
            if self.is_batch:
                self.engines = self.runner.get_batch_rerunners()
            else:
                self.engines = {0: self.runner.get_rerunner()}

        print(f"[OPT] Ready. Optimized execution for {len(self.engines)} locations.")

        # 4. DEFINE OBJECTIVE
        def objective(trial):
            # A. Get Parameters from Optuna
            overrides = search_space(trial)

            # B. Prepare Tasks for Parallel Workers
            tasks = [
                (loc_id, engine, overrides) for loc_id, engine in self.engines.items()
            ]

            results = []

            # C. Execute in Parallel using JOBLIB
            try:
                # Parallel returns a list of results in order
                results_raw = Parallel(n_jobs=n_workers, backend="loky")(
                    delayed(_global_worker_task)(task) for task in tasks
                )

                # Filter out None values (failed runs)
                results = [res for res in results_raw if res is not None]

            except Exception as e:
                logging.error(f"[OPT] Parallel Execution Error: {e}")
                results = []

            # D. Validation
            if not results:
                if directions and len(directions) > 1:
                    return [float("inf")] * len(directions)
                return float("inf")

            # E. Aggregation & Loss Calculation
            try:
                # 1. Merge all location results into one DataFrame
                df_sim_all = pd.concat(results, ignore_index=True)

                if output_folder:
                    file_path = os.path.join(output_folder, f"trial_{trial.number}.csv")
                    df_sim_all.to_csv(file_path, index=False)

                # 2. Compute Loss (User Function)
                loss = loss_func(df_sim_all, self.observed_data)
                return loss

            except Exception as e:
                logging.error(f"[OPT] Loss Calculation Error: {e}")
                if directions and len(directions) > 1:
                    return [float("inf")] * len(directions)
                return float("inf")

        # 5. CREATE STUDY
        if directions is None:
            directions = ["minimize"]

        study = optuna.create_study(directions=directions, sampler=optuna_sampler)

        print(
            f"[OPT] Starting {len(directions)}-objective optimization with {n_trials} trials..."
        )
        study.optimize(objective, n_trials=n_trials)

        print("[OPT] Optimization Finished.")

        if len(directions) == 1:
            print("Best params:", study.best_params)
        else:
            print(
                f"Pareto front found with {len(study.best_trials)} optimal solutions."
            )

        return study

__init__(self, runner, observed_data) special

Instantiate WOFOSTOptimizer.

observed_data (pd.DataFrame): Ground truth data used by the loss function.

Source code in cropengine/optimizer.py
def __init__(self, runner, observed_data):
    """
    Instantiate WOFOSTOptimizer.

    Args:
    runner: An instance of WOFOSTCropSimulationRunner or WOFOSTCropSimulationBatchRunner.
    observed_data (pd.DataFrame): Ground truth data used by the loss function.
    """
    self.runner = runner
    self.observed_data = observed_data
    self.is_batch = hasattr(runner, "get_batch_rerunners")
    self.engines = {}

get_best_params(self, study, search_space)

Retrieves the optimized parameters from the study, reconstructing any complex structures (lists/tables) defined in the search space.

Parameters:

Name Type Description Default
study optuna.Study

The completed optimization study.

required
search_space callable

The original search space function used for optimization. Required to reconstruct complex parameters (lists/tables) from the scalar values stored in the study.

required

Returns:

Type Description
dict

A dictionary of parameter overrides (e.g., {'crop_params': {...}}) containing the best values found during optimization.

Source code in cropengine/optimizer.py
def get_best_params(self, study: optuna.Study, search_space: Callable) -> Dict:
    """
    Retrieves the optimized parameters from the study, reconstructing any
    complex structures (lists/tables) defined in the search space.

    Args:
        study (optuna.Study): The completed optimization study.
        search_space (callable): The original search space function used for optimization.
                                 Required to reconstruct complex parameters (lists/tables)
                                 from the scalar values stored in the study.

    Returns:
        dict: A dictionary of parameter overrides (e.g., {'crop_params': {...}})
              containing the best values found during optimization.
    """
    best_overrides = search_space(FixedTrial(study.best_params))

    return best_overrides

optimize(self, search_space, loss_func, n_trials=100, n_workers=4, sampler=None, directions=None, output_folder=None)

Runs the optimization loop.

Parameters:

Name Type Description Default
search_space callable

A function that takes an Optuna trial object and returns a dictionary of parameter overrides. Example structure: {'crop_params': {'TSUM1': 1000}, 'soil_params': {...}}

required
loss_func callable

A function that takes (df_simulated, df_observed). Returns a float (single-objective) or list of floats (multi-objective).

required
n_trials int

Number of optimization trials to run.

100
n_workers int

Number of parallel processes to spawn.

4
sampler str | optuna.samplers.BaseSampler | None

The optimization strategy. Supported strings:

Standard: - "TPE": Tree-structured Parzen Estimator (Default, good general purpose). - "Random": Pure random search.

Advanced (May require extra packages): - "GP": Gaussian Process Sampler. Excellent for expensive simulations. (Requires botorch). - "CmaEs": Covariance Matrix Adaptation. Good for continuous global optima. (Requires cma). - "BoTorch": Bayesian Optimization. (Requires botorch).

Multi-Objective: - "NSGAII": Standard for Pareto optimization. - "NSGAIII": For many-objective problems (3+ targets).

Grid/Deterministic: - "BruteForce": Tries ALL combinations. - "Grid": Tries specified grid points. - "QMC": Quasi-Monte Carlo.

None
directions list[str]

Optimization directions. Default is ["minimize"]. For multi-objective, use e.g., ["minimize", "maximize"].

None
output_folder str

Path to a folder where simulation results for EACH trial will be saved (e.g., 'trial_0.csv'). If None, results are not saved to disk.

None

Returns:

Type Description
optuna.Study

The completed study object containing best params and trials.

Source code in cropengine/optimizer.py
def optimize(
    self,
    search_space: Callable[[optuna.Trial], Dict],
    loss_func: Callable[[pd.DataFrame, pd.DataFrame], Union[float, List[float]]],
    n_trials: int = 100,
    n_workers: int = 4,
    sampler: Optional[optuna.samplers.BaseSampler] = None,
    directions: Optional[List[str]] = None,
    output_folder: Optional[str] = None,
) -> optuna.Study:
    """
    Runs the optimization loop.

    Args:
        search_space (callable): A function that takes an Optuna `trial` object
                                 and returns a dictionary of parameter overrides.
                                 Example structure:
                                 {'crop_params': {'TSUM1': 1000}, 'soil_params': {...}}

        loss_func (callable): A function that takes (df_simulated, df_observed).
                              Returns a float (single-objective) or list of floats (multi-objective).

        n_trials (int): Number of optimization trials to run.

        n_workers (int): Number of parallel processes to spawn.

        sampler (str | optuna.samplers.BaseSampler | None):
            The optimization strategy. Supported strings:

            **Standard:**
            - "TPE": Tree-structured Parzen Estimator (Default, good general purpose).
            - "Random": Pure random search.

            **Advanced (May require extra packages):**
            - "GP": Gaussian Process Sampler. Excellent for expensive simulations. (Requires `botorch`).
            - "CmaEs": Covariance Matrix Adaptation. Good for continuous global optima. (Requires `cma`).
            - "BoTorch": Bayesian Optimization. (Requires `botorch`).

            **Multi-Objective:**
            - "NSGAII": Standard for Pareto optimization.
            - "NSGAIII": For many-objective problems (3+ targets).

            **Grid/Deterministic:**
            - "BruteForce": Tries ALL combinations.
            - "Grid": Tries specified grid points.
            - "QMC": Quasi-Monte Carlo.

        directions (list[str]): Optimization directions.
                                Default is ["minimize"].
                                For multi-objective, use e.g., ["minimize", "maximize"].

        output_folder (str, optional): Path to a folder where simulation results
                                       for EACH trial will be saved (e.g., 'trial_0.csv').
                                       If None, results are not saved to disk.

    Returns:
        optuna.Study: The completed study object containing best params and trials.
    """
    # 1. SETUP OUTPUT FOLDER
    if output_folder:
        os.makedirs(output_folder, exist_ok=True)
        print(f"[OPT] Saving all trial outputs to: {output_folder}")

    # 2. RESOLVE SAMPLER
    optuna_sampler = self._get_sampler(sampler)
    if optuna_sampler:
        print(f"[OPT] Using Sampler: {optuna_sampler.__class__.__name__}")

    # 3. PRE-LOADING PHASE
    print("[OPT] Loading simulation engines...")
    if not self.engines:
        if self.is_batch:
            self.engines = self.runner.get_batch_rerunners()
        else:
            self.engines = {0: self.runner.get_rerunner()}

    print(f"[OPT] Ready. Optimized execution for {len(self.engines)} locations.")

    # 4. DEFINE OBJECTIVE
    def objective(trial):
        # A. Get Parameters from Optuna
        overrides = search_space(trial)

        # B. Prepare Tasks for Parallel Workers
        tasks = [
            (loc_id, engine, overrides) for loc_id, engine in self.engines.items()
        ]

        results = []

        # C. Execute in Parallel using JOBLIB
        try:
            # Parallel returns a list of results in order
            results_raw = Parallel(n_jobs=n_workers, backend="loky")(
                delayed(_global_worker_task)(task) for task in tasks
            )

            # Filter out None values (failed runs)
            results = [res for res in results_raw if res is not None]

        except Exception as e:
            logging.error(f"[OPT] Parallel Execution Error: {e}")
            results = []

        # D. Validation
        if not results:
            if directions and len(directions) > 1:
                return [float("inf")] * len(directions)
            return float("inf")

        # E. Aggregation & Loss Calculation
        try:
            # 1. Merge all location results into one DataFrame
            df_sim_all = pd.concat(results, ignore_index=True)

            if output_folder:
                file_path = os.path.join(output_folder, f"trial_{trial.number}.csv")
                df_sim_all.to_csv(file_path, index=False)

            # 2. Compute Loss (User Function)
            loss = loss_func(df_sim_all, self.observed_data)
            return loss

        except Exception as e:
            logging.error(f"[OPT] Loss Calculation Error: {e}")
            if directions and len(directions) > 1:
                return [float("inf")] * len(directions)
            return float("inf")

    # 5. CREATE STUDY
    if directions is None:
        directions = ["minimize"]

    study = optuna.create_study(directions=directions, sampler=optuna_sampler)

    print(
        f"[OPT] Starting {len(directions)}-objective optimization with {n_trials} trials..."
    )
    study.optimize(objective, n_trials=n_trials)

    print("[OPT] Optimization Finished.")

    if len(directions) == 1:
        print("Best params:", study.best_params)
    else:
        print(
            f"Pareto front found with {len(study.best_trials)} optimal solutions."
        )

    return study