lpcore.abc.task のソースコード

from abc import ABC
from collections.abc import Callable as CollectionsCallable
import logging
import traceback
from types import UnionType
from typing import Any, ClassVar, Final, Union, Literal, final, get_args, get_origin

from lpcore import TaskStatus

_logger = logging.getLogger(__name__)


[ドキュメント] class Task(ABC): """タスクを表す抽象基底クラス. Args: parameters (dict[str, Any], optional): タスクのパラメタ. パラメタは, :attr:`parameters_required` で指定されたパラメタと一致する必要がある. デフォルト値は None. child_tasks (dict[str, Any], optional): 子タスク. 子タスクは, :attr:`child_tasks_required` で指定されたタスクと一致する必要がある. デフォルト値は None. name (str, optional): タスクの名前. デフォルト値は None. """ parameters_required: ClassVar[dict[str, Any]] = {} """タスクが必要とするパラメタを指定する辞書型.""" parameters_optional: ClassVar[dict[str, Any]] = {} """タスクがオプションとして持つパラメタを指定する辞書型.""" child_tasks_required: ClassVar[dict[str, Any]] = {} """タスクが必要とする子タスクを指定する辞書型.""" child_tasks_optional: ClassVar[dict[str, Any]] = {} """タスクがオプションとして持つ子タスクを指定する辞書型.""" runtime_parameters_required: ClassVar[dict[str, Any]] = {} """タスクの実行に必要なパラメタを指定する辞書型.""" runtime_parameters_optional: ClassVar[dict[str, Any]] = {} """タスクの実行にオプションとして必要なパラメタを指定する辞書型.""" def __init__(self, parameters: dict[str, Any] | None = None, child_tasks: dict[str, Any] | None = None, name: str | None = None, *args: Any, **kwargs: Any) -> None: parameters = parameters or {} child_tasks = child_tasks or {} # 必須パラメタのチェック self.validate_parameters(parameters, self.parameters_required, "parameters") self.validate_parameters(child_tasks, self.child_tasks_required, "child_tasks") self._parameters: Final[dict[str, Any]] = { key: value for key, value in parameters.items() if key in self.parameters_required } self._child_tasks: Final[dict[str, Any]] = { key: value for key, value in child_tasks.items() if key in self.child_tasks_required } # オプションパラメタのチェック parameters_optional = { key: value for key, value in parameters.items() if key in self.parameters_optional } child_tasks_optional = { key: value for key, value in child_tasks.items() if key in self.child_tasks_optional } self.validate_optional_parameters(parameters_optional, self.parameters_optional, "parameters_optional") self.validate_optional_parameters(child_tasks_optional, self.child_tasks_optional, "child_tasks_optional") self._parameters_optional: Final[dict[str, Any]] = parameters_optional self._child_tasks_optional: Final[dict[str, Any]] = child_tasks_optional # タスク名の設定 self._name: Final[str] = name or f"id{hex(id(self))}" self.initialize(*args, **kwargs)
[ドキュメント] def validate_parameters(self, provided_items: dict[str, Any], required_items: dict[str, Any], group_name: str) -> None: """パラメータの型チェックを行うメソッド. コンストラクタの内部で呼び出される. Args: provided_items (dict[str, Any]): チェックされるべきパラメタ辞書. required_items (dict[str, Any]): パラメタに必要なキーと型の辞書. group_name (str): チェック対象のパラメタのグループ名. Raises: ValueError: パラメタが不足している場合. TypeError: パラメタの型が不正な場合. """ for key, expected_type in required_items.items(): if key not in provided_items: raise ValueError(f"Missing required {group_name}: '{key}'. " f"Provided keys: {list(provided_items.keys())}") value = provided_items[key] error = self._check_type(value, expected_type, path=f"{group_name}.{key}") if error is not None: # エラーがあれば TypeError raise TypeError(error)
[ドキュメント] def validate_optional_parameters(self, provided_items: dict[str, Any], optional_items: dict[str, Any], group_name: str) -> None: """オプションパラメタの型チェックを行うメソッド. Args: provided_items (dict[str, Any]): チェックされるべきパラメタ辞書. optional_items (dict[str, Any]): パラメタに必要なキーと型の辞書. group_name (str): チェック対象のパラメタのグループ名. Raises: TypeError: パラメタの型が不正な場合. """ for key, expected_type in optional_items.items(): if key not in provided_items: continue value = provided_items[key] error = self._check_type(value, expected_type, path=f"{group_name}.{key}") if error is not None: raise TypeError(error)
[ドキュメント] def _check_type(self, value: Any, expected_type: Any, path: str) -> str | None: """ `expected_type` に対して value が妥当なら None を返す. 不適合ならエラーメッセージ文字列を返す. Args: value (Any): チェック対象の値. expected_type (Any): 期待する型. path (str): 「parameters.param[0]」など「どこをチェックしているか」を示すための文字列. Returns: msg (str | None): エラーメッセージ. """ # 1. 通常のクラスオブジェクト (int, str, float 等) if isinstance(expected_type, type): if not isinstance(value, expected_type): return (f"Type mismatch at '{path}': " f"Expected '{expected_type.__name__}', but got '{type(value).__name__}'") return None # 2. Python 3.10+ UnionType (ex. int | str) if isinstance(expected_type, UnionType): union_args = get_args(expected_type) # (int, str) など # いずれかに合致すればOK sub_errors = [] for t in union_args: err = self._check_type(value, t, path) if err is None: # 合致 return None else: sub_errors.append(err) # すべてNG return (f"Type mismatch at '{path}': none of the union types matched.\n" + "\n".join(f" - {e}" for e in sub_errors)) # 3. 旧スタイル Union (typing.Union[int, str] など) origin = get_origin(expected_type) if origin is Union: union_args = get_args(expected_type) sub_errors = [] for t in union_args: err = self._check_type(value, t, path) if err is None: return None else: sub_errors.append(err) return (f"Type mismatch at '{path}': none of the union types matched.\n" + "\n".join(f" - {e}" for e in sub_errors)) # 4. Literal if origin is Literal: literal_values = get_args(expected_type) if value not in literal_values: return (f"Type mismatch at '{path}': " f"Expected one of {literal_values}, but got {value!r}") return None # 5. Callable if origin is CollectionsCallable: if not callable(value): return (f"Type mismatch at '{path}': " f"Expected 'Callable', but got '{type(value).__name__}'") return None # 6. ジェネリック (list, tuple, dict, set, 等) if origin in (list, tuple, dict, set): args = get_args(expected_type) # 6a. list[<T>] if origin is list: if not isinstance(value, list): return (f"Type mismatch at '{path}': " f"Expected 'list', but got '{type(value).__name__}'") if len(args) == 1: # 要素型を再帰的にチェック elem_type = args[0] for i, elem in enumerate(value): err = self._check_type(elem, elem_type, f"{path}[{i}]") if err is not None: return err # list の長さ固定や複数型パラメータは基本的に無い想定 return None # 6b. tuple[<T1>, <T2>, ...] if origin is tuple: if not isinstance(value, tuple): return (f"Type mismatch at '{path}': " f"Expected 'tuple', but got '{type(value).__name__}'") if len(args) != len(value): return (f"Length mismatch at '{path}': " f"Expected a tuple of length {len(args)}, but got {len(value)}") for i, (elem, elem_type) in enumerate(zip(value, args)): err = self._check_type(elem, elem_type, f"{path}[{i}]") if err is not None: return err return None # 6c. dict[<K>, <V>] if origin is dict: if not isinstance(value, dict): return (f"Type mismatch at '{path}': " f"Expected 'dict', but got '{type(value).__name__}'") if len(args) == 2: key_type, val_type = args for k, v in value.items(): # キーの型チェック err_k = self._check_type(k, key_type, f"{path} (key)") if err_k is not None: return err_k # 値の型チェック err_v = self._check_type(v, val_type, f"{path}[{repr(k)}]") if err_v is not None: return err_v return None # 6d. set[<T>] if origin is set: if not isinstance(value, set): return (f"Type mismatch at '{path}': " f"Expected 'set', but got '{type(value).__name__}'") if len(args) == 1: (elem_type,) = args for i, elem in enumerate(value): err = self._check_type(elem, elem_type, f"{path} (elem #{i})") if err is not None: return err return None # 7. それ以外 (Protocol, TypeVar など) # ここでは簡易的に `isinstance(value, expected_type)` を試みる try: if not isinstance(value, expected_type): return (f"Type mismatch at '{path}': " f"Expected '{expected_type}', but got '{type(value).__name__}'") return None except TypeError: # typing関連の型であって判定が難しい場合など return (f"Unrecognized type annotation '{expected_type}' at '{path}', " f"and isinstance check failed.")
[ドキュメント] def initialize(self, *args: Any, **kwargs: Any) -> None: """タスクの初期化メソッド. :class:`Task` クラスを継承したクラスで,このメソッドをオーバーライドして初期化処理を記述する. """ pass
@property def name(self) -> str: """タスクの名前(イミュータブル)""" return self._name @property def parameters(self) -> dict[str, Any]: """タスクのパラメタ(イミュータブル)""" return {**self._parameters, **self._parameters_optional} @property def child_tasks(self) -> dict[str, Any]: """子タスク(イミュータブル)""" return {**self._child_tasks, **self._child_tasks_optional} @property def task_hierarchy(self) -> dict[str, Any]: """タスクの階層構造(イミュータブル) このプロパティは,タスクの階層構造を辞書型で表す. この辞書は,タスクの名前をキーとし,そのタスクが持つ子タスクを値として持つ. See Also: :func:`show_hierarchy` :attr:`child_tasks` """ def _build_task_hierarchy(process: Task) -> dict[str, Any]: child_hierarchy = {} for child_task in process.child_tasks.values(): child_hierarchy[str(child_task)] = _build_task_hierarchy(child_task) return child_hierarchy return {str(self): _build_task_hierarchy(self)}
[ドキュメント] def show_hierarchy(self) -> None: """タスクの階層構造を表示するメソッド Args: level (int): 階層の深さ(内部的に用いる) See Also: :attr:`task_hierarchy` """ def print_hierarchy(hierarchy: dict[str, Any], indent_level: int) -> None: indent = " " * indent_level for process_name, children in hierarchy.items(): print(f"{indent}- {process_name}") if isinstance(children, dict): print_hierarchy(children, indent_level + 1) # process_hierarchy プロパティを使用して階層構造を取得 print("") print_hierarchy(self.task_hierarchy, 0)
def _execute(self, **kwargs: Any) -> tuple[TaskStatus, dict[str, Any]]: return TaskStatus.SUCCESS, {}
[ドキュメント] @final def execute(self, **kwargs: Any) -> dict[str, Any]: """タスクを実行するメソッド. Args: **kwargs: タスクの実行に必要なパラメタを受け取るための可変長引数. :attr:`runtime_parameters_required` で指定されたパラメタを指定する必要がある. Returns: dict: タスクの実行結果を返す辞書型. この辞書は,必ず :code:`status` キーを持ち, :class:`TaskStatus` の値を持つ. その他のキーは,タスクの実行結果を表し,継承したクラスが :func:`_execute` メソッド内で自由に定義できる. .. warning:: 継承したクラスは,このメソッドをオーバーライドすることは推奨されない. 各クラスが提供する機能は, :func:`_execute` メソッド内で実装することが推奨される. """ _logger.info(f"Executing task: {str(self)}") self.validate_parameters(kwargs, self.runtime_parameters_required, "runtime_parameters") self.validate_optional_parameters(kwargs, self.runtime_parameters_optional, "runtime_parameters_optional") try: status, result_data = self._execute(**kwargs) except Exception as e: status = TaskStatus.FAILED tb = traceback.extract_tb(e.__traceback__) last_call = tb[-1] filename = last_call.filename.split("/")[-1] lineno = last_call.lineno error_location = f"{filename}:{lineno}" _logger.error(f"Failed to execute task '{str(self)}': {error_location})") result_data = {"Error": str(e), "Traceback": last_call} return {"status": status, **result_data}
def __str__(self) -> str: return f"{self.__class__.__name__}({self.name})"