Source code for tune.api.suggest

from typing import Any, List, Optional, Tuple

from fugue import FugueWorkflow
from fugue.exceptions import FugueDataFrameError
from triad import assert_or_throw
from tune._utils import from_base64
from tune.api.factory import TUNE_OBJECT_FACTORY, parse_logger
from tune.api.optimize import (
    optimize_by_continuous_asha,
    optimize_by_hyperband,
    optimize_by_sha,
    optimize_noniterative,
)
from tune.concepts.flow import TrialReport
from tune.concepts.logger import make_logger
from tune.concepts.space import Space
from tune.constants import TUNE_DATASET_DF_DEFAULT_NAME, TUNE_REPORT, TUNE_REPORT_METRIC
from tune.exceptions import TuneCompileError


[docs]def suggest_for_noniterative_objective( objective: Any, space: Space, df: Any = None, df_name: str = TUNE_DATASET_DF_DEFAULT_NAME, temp_path: str = "", partition_keys: Optional[List[str]] = None, top_n: int = 1, local_optimizer: Any = None, logger: Any = None, monitor: Any = None, stopper: Any = None, stop_check_interval: Any = None, distributed: Optional[bool] = None, shuffle_candidates: bool = True, execution_engine: Any = None, execution_engine_conf: Any = None, ) -> List[TrialReport]: """Given non-iterative ``objective``, ``space`` and (optional) dataframe, suggest the best parameter combinations. .. important:: Please read :ref:`Non-Iterative Tuning Guide </notebooks/noniterative.ipynb>` :param objective: |NonIterativeObjective| :param space: search space, please read |SpaceTutorial| :param df: |DataFrameLike|, defaults to None :param df_name: dataframe name, defaults to the value of ``TUNE_DATASET_DF_DEFAULT_NAME`` :param temp_path: |TempPath|, defaults to "" :param partition_keys: partition keys for ``df``, defaults to None. For details, please read |DatasetTutorial| :param top_n: number of best results to return, defaults to 1. If `<=0` all results will be returned :param local_optimizer: |NonIterativeOptimizer|, defaults to None :param logger: |LoggerLikeObject|, defaults to None :param monitor: realtime monitor, defaults to None. Read :ref:`Monitoring Guide </notebooks/noniterative.ipynb#Realtime-Monitoring>` :param stopper: early stopper, defaults to None. Read :ref:`Early Stopping Guide </notebooks/noniterative.ipynb#Early-Stopping>` :param stop_check_interval: an object that can be converted to timedelta, defaults to None. For details, read :func:`~triad.utils.convert.to_timedelta` :param distributed: whether to use the exeuction engine to run different trials distributedly, defaults to None. If None, it's equal to True. :param shuffle_candidates: whether to shuffle the candidate configurations, defaults to True. This is no effect on final result. :param execution_engine: Fugue |ExecutionEngine| like object, defaults to None. If None, :class:`~fugue.execution.native_execution_engine.NativeExecutionEngine` will be used, the task will be running on local machine. :param execution_engine_conf: |ParamsLikeObject|, defaults to None :return: a list of best results """ logger = parse_logger(logger) dag = FugueWorkflow() dataset = TUNE_OBJECT_FACTORY.make_dataset( dag, space, df=df, df_name=df_name, partition_keys=partition_keys, temp_path=temp_path, shuffle=shuffle_candidates, ) study = optimize_noniterative( objective=objective, dataset=dataset, optimizer=local_optimizer, distributed=distributed, monitor=monitor, logger=logger, stopper=stopper, stop_check_interval=stop_check_interval, ) study.result(top_n).yield_dataframe_as("result") return _run( dag=dag, execution_engine=execution_engine, execution_engine_conf=execution_engine_conf, logger=logger, )
[docs]def suggest_by_sha( objective: Any, space: Space, plan: List[Tuple[float, int]], train_df: Any = None, temp_path: str = "", partition_keys: Optional[List[str]] = None, top_n: int = 1, monitor: Any = None, distributed: Optional[bool] = None, execution_engine: Any = None, execution_engine_conf: Any = None, ) -> List[TrialReport]: assert_or_throw( not space.has_stochastic, TuneCompileError( "space can't contain random parameters, " "use sample method before calling this function" ), ) dag = FugueWorkflow() dataset = TUNE_OBJECT_FACTORY.make_dataset( dag, space, df=train_df, partition_keys=partition_keys, temp_path=temp_path, ) study = optimize_by_sha( objective=objective, dataset=dataset, plan=plan, checkpoint_path=temp_path, distributed=distributed, monitor=monitor, ) study.result(top_n).yield_dataframe_as("result") return _run( dag=dag, execution_engine=execution_engine, execution_engine_conf=execution_engine_conf, )
[docs]def suggest_by_hyperband( objective: Any, space: Space, plans: List[List[Tuple[float, int]]], train_df: Any = None, temp_path: str = "", partition_keys: Optional[List[str]] = None, top_n: int = 1, monitor: Any = None, distributed: Optional[bool] = None, execution_engine: Any = None, execution_engine_conf: Any = None, ) -> List[TrialReport]: assert_or_throw( not space.has_stochastic, TuneCompileError( "space can't contain random parameters, " "use sample method before calling this function" ), ) dag = FugueWorkflow() dataset = TUNE_OBJECT_FACTORY.make_dataset( dag, space, df=train_df, partition_keys=partition_keys, temp_path=temp_path, ) study = optimize_by_hyperband( objective=objective, dataset=dataset, plans=plans, checkpoint_path=temp_path, distributed=distributed, monitor=monitor, ) study.result(top_n).yield_dataframe_as("result") return _run( dag=dag, execution_engine=execution_engine, execution_engine_conf=execution_engine_conf, )
[docs]def suggest_by_continuous_asha( objective: Any, space: Space, plan: List[Tuple[float, int]], train_df: Any = None, temp_path: str = "", partition_keys: Optional[List[str]] = None, top_n: int = 1, monitor: Any = None, execution_engine: Any = None, execution_engine_conf: Any = None, ) -> List[TrialReport]: assert_or_throw( not space.has_stochastic, TuneCompileError( "space can't contain random parameters, " "use sample method before calling this function" ), ) dag = FugueWorkflow() dataset = TUNE_OBJECT_FACTORY.make_dataset( dag, space, df=train_df, partition_keys=partition_keys, temp_path=temp_path, ) study = optimize_by_continuous_asha( objective=objective, dataset=dataset, plan=plan, checkpoint_path=temp_path, monitor=monitor, ) study.result(top_n).yield_dataframe_as("result") return _run( dag=dag, execution_engine=execution_engine, execution_engine_conf=execution_engine_conf, )
def _run( dag: FugueWorkflow, execution_engine: Any, execution_engine_conf: Any, logger: Any = None, ) -> List[TrialReport]: def _safe(xx): return float("inf") if xx is None else float(xx) try: rows = list( dag.run( execution_engine, conf=execution_engine_conf, )["result"].as_dict_iterable() ) result = [ from_base64(r[TUNE_REPORT]) for r in sorted(rows, key=lambda r: _safe(r[TUNE_REPORT_METRIC])) ] if len(result) > 0 and logger is not None: try: with make_logger(logger) as p_logger: p_logger.log_report( result[0], log_params=True, extract_metrics=True, log_metadata=True, ) except Exception: pass return result except FugueDataFrameError as e: # pragma: no cover raise e.__cause__ or e.__context__ or e