lpcore.move.singleaxismovement のソースコード

from collections import deque
import logging
import numbers
import threading
import time
from typing import Any, Literal

try:
    from laserprocessing.abc import Stage, StageAxis
except ImportError:
    print("ImportError: laserprocessing is not installed.")

from lpcore import TaskStatus
from lpcore.abc.process import Process

_logger = logging.getLogger(__name__)


[ドキュメント] class SingleAxisMovement(Process): devices_required = {"stage": Stage} parameters_required = {"axis_name": str} runtime_parameters_required = { "position": numbers.Real, "move_mode": Literal["move_to", "move_by", "move_to_and_wait", "move_by_and_wait"], "velocity": numbers.Real, "acceleration": numbers.Real, } runtime_parameters_optional = {"log_queue": deque, "timeout_s": numbers.Real} _active_log_thread: list[threading.Thread] = [] @property def stage(self) -> Stage: """このプロセスが使用するステージ""" stage = self.devices["stage"] assert isinstance(stage, Stage) return stage @property def axis(self) -> StageAxis: """このプロセスが使用するステージの軸""" stageaxis = self.stage.axes[self.parameters["axis_name"]] assert isinstance(stageaxis, StageAxis), f"Expected StageAxis, but got {type(stageaxis)}" return stageaxis @property def is_actively_moving(self) -> bool: """ステージの軸が動いているかどうかを返す""" if getattr(self.axis, "is_actively_moving", None) is not None: return self.axis.is_actively_moving() else: _logger.warning( f"Axis {self.parameters['axis_name']} does not support is_actively_moving") return False def _log_position(self, queue: deque[tuple[float, float]], timeout_s: float = 60.0) -> None: start_time = time.time() _logger.info(f"Logging position of axis {self.parameters['axis_name']}") while time.time() - start_time < timeout_s: now = time.time() pos = self.axis.position queue.append((now, pos)) _logger.debug( f"{now:15.6f}: Axis `{self.parameters['axis_name']}` at position {pos:12.4f} µm.") if not self.is_actively_moving and now - start_time > 0.01: self._cleanup_log_thread() return time.sleep(0.0001) self._cleanup_log_thread() _logger.warning(f"Logging position of axis {self.parameters['axis_name']} timed out") def _cleanup_log_thread(self) -> None: current_thread = threading.current_thread() if current_thread in self._active_log_thread: self._active_log_thread.remove(current_thread)
[ドキュメント] def _execute(self, *args: Any, **kwargs: Any) -> tuple[TaskStatus, dict[str, Any]]: """ ステージを指定された位置・速度・加速度で移動させる Args: position (numbers.Real): 移動先の位置 move_mode (Literal["move_to", "move_by", "move_to_and_wait", "move_by_and_wait"]): 移動モード velocity (numbers.Real): 速度 acceleration (numbers.Real): 加速度 log_queue (deque, optional): 位置をログするためのキュー. Defaults to None. timeout_s (numbers.Real, optional): ログを取得する時間. Defaults to 60.0. Returns: status (TaskStatus): タスクのステータス result (dict[str, Any]): 結果.失敗した場合は,"message" キーにエラーメッセージが格納される. """ axis = self.axis axis.velocity = kwargs["velocity"] axis.acceleration = kwargs["acceleration"] log_queue = kwargs.get("log_queue", None) timeout = kwargs.get("timeout_s", 60.0) if log_queue is not None: log_thread = threading.Thread(target=self._log_position, args=(log_queue,), kwargs={"timeout_s": timeout}) self._active_log_thread.append(log_thread) log_thread.start() if kwargs["move_mode"] == "move_to": axis.move_to(kwargs["position"]) elif kwargs["move_mode"] == "move_by": axis.move_by(kwargs["position"]) elif kwargs["move_mode"] == "move_to_and_wait": if hasattr(axis, "move_to_and_wait"): axis.move_to_and_wait(kwargs["position"]) else: return TaskStatus.FAILED, { "message": f"Axis {self.parameters['axis_name']} does not support move_to_and_wait" } elif kwargs["move_mode"] == "move_by_and_wait": if hasattr(axis, "move_by_and_wait"): axis.move_by_and_wait(kwargs["position"]) else: return TaskStatus.FAILED, { "message": f"Axis {self.parameters['axis_name']} does not support move_by_and_wait" } else: return TaskStatus.FAILED, {"message": f"Invalid move mode: {kwargs['move_mode']}"} return TaskStatus.SUCCESS, {}