lpcore.move.singleaxismovement のソースコード
from collections import deque
import logging
import numbers
import threading
import time
from typing import Any, Literal
try:
from laserprocessing.abc import Stage, StageAxis
except ImportError:
print("ImportError: laserprocessing is not installed.")
from lpcore import TaskStatus
from lpcore.abc.process import Process
_logger = logging.getLogger(__name__)
[ドキュメント]
class SingleAxisMovement(Process):
devices_required = {"stage": Stage}
parameters_required = {"axis_name": str}
runtime_parameters_required = {
"position": numbers.Real,
"move_mode": Literal["move_to", "move_by", "move_to_and_wait", "move_by_and_wait"],
"velocity": numbers.Real,
"acceleration": numbers.Real,
}
runtime_parameters_optional = {"log_queue": deque, "timeout_s": numbers.Real}
_active_log_thread: list[threading.Thread] = []
@property
def stage(self) -> Stage:
"""このプロセスが使用するステージ"""
stage = self.devices["stage"]
assert isinstance(stage, Stage)
return stage
@property
def axis(self) -> StageAxis:
"""このプロセスが使用するステージの軸"""
stageaxis = self.stage.axes[self.parameters["axis_name"]]
assert isinstance(stageaxis, StageAxis), f"Expected StageAxis, but got {type(stageaxis)}"
return stageaxis
@property
def is_actively_moving(self) -> bool:
"""ステージの軸が動いているかどうかを返す"""
if getattr(self.axis, "is_actively_moving", None) is not None:
return self.axis.is_actively_moving()
else:
_logger.warning(
f"Axis {self.parameters['axis_name']} does not support is_actively_moving")
return False
def _log_position(self, queue: deque[tuple[float, float]], timeout_s: float = 60.0) -> None:
start_time = time.time()
_logger.info(f"Logging position of axis {self.parameters['axis_name']}")
while time.time() - start_time < timeout_s:
now = time.time()
pos = self.axis.position
queue.append((now, pos))
_logger.debug(
f"{now:15.6f}: Axis `{self.parameters['axis_name']}` at position {pos:12.4f} µm.")
if not self.is_actively_moving and now - start_time > 0.01:
self._cleanup_log_thread()
return
time.sleep(0.0001)
self._cleanup_log_thread()
_logger.warning(f"Logging position of axis {self.parameters['axis_name']} timed out")
def _cleanup_log_thread(self) -> None:
current_thread = threading.current_thread()
if current_thread in self._active_log_thread:
self._active_log_thread.remove(current_thread)
[ドキュメント]
def _execute(self, *args: Any, **kwargs: Any) -> tuple[TaskStatus, dict[str, Any]]:
"""
ステージを指定された位置・速度・加速度で移動させる
Args:
position (numbers.Real): 移動先の位置
move_mode (Literal["move_to", "move_by", "move_to_and_wait", "move_by_and_wait"]): 移動モード
velocity (numbers.Real): 速度
acceleration (numbers.Real): 加速度
log_queue (deque, optional): 位置をログするためのキュー. Defaults to None.
timeout_s (numbers.Real, optional): ログを取得する時間. Defaults to 60.0.
Returns:
status (TaskStatus): タスクのステータス
result (dict[str, Any]): 結果.失敗した場合は,"message" キーにエラーメッセージが格納される.
"""
axis = self.axis
axis.velocity = kwargs["velocity"]
axis.acceleration = kwargs["acceleration"]
log_queue = kwargs.get("log_queue", None)
timeout = kwargs.get("timeout_s", 60.0)
if log_queue is not None:
log_thread = threading.Thread(target=self._log_position,
args=(log_queue,),
kwargs={"timeout_s": timeout})
self._active_log_thread.append(log_thread)
log_thread.start()
if kwargs["move_mode"] == "move_to":
axis.move_to(kwargs["position"])
elif kwargs["move_mode"] == "move_by":
axis.move_by(kwargs["position"])
elif kwargs["move_mode"] == "move_to_and_wait":
if hasattr(axis, "move_to_and_wait"):
axis.move_to_and_wait(kwargs["position"])
else:
return TaskStatus.FAILED, {
"message":
f"Axis {self.parameters['axis_name']} does not support move_to_and_wait"
}
elif kwargs["move_mode"] == "move_by_and_wait":
if hasattr(axis, "move_by_and_wait"):
axis.move_by_and_wait(kwargs["position"])
else:
return TaskStatus.FAILED, {
"message":
f"Axis {self.parameters['axis_name']} does not support move_by_and_wait"
}
else:
return TaskStatus.FAILED, {"message": f"Invalid move mode: {kwargs['move_mode']}"}
return TaskStatus.SUCCESS, {}