lpcore.capture.methodtriggeredimagecapture のソースコード
import logging
import time
from typing import Any, Callable
from lpcore import TaskStatus
from lpcore.abc.process import Process
from lpcore.capture.triggerbasedimagecapture import TriggerBasedImageCapture
_logger = logging.getLogger(__name__)
[ドキュメント]
class MethodTriggeredImageCapture(Process):
child_tasks_required = {
"capture": TriggerBasedImageCapture,
}
runtime_parameters_required = {
**TriggerBasedImageCapture.runtime_parameters_required,
"capture_stop_condition": Callable[..., bool],
}
def _execute(self, **kwargs: Any) -> tuple[TaskStatus, dict[str, Any]]:
timeout_s = kwargs["timeout_s"]
self.child_tasks["capture"].execute(timeout_s=timeout_s)
start_time = time.time()
while (not kwargs["capture_stop_condition"]()) or (time.time() - start_time < 0.1):
current_time = time.time()
if current_time - start_time > timeout_s:
_logger.warning(f"Timeout reached after {timeout_s} s.")
result_capture = self.child_tasks["capture"].stop_capture()
assert result_capture["status"] == TaskStatus.SUCCESS, f"Failed to stop capture: {result_capture}"
return TaskStatus.FAILED, {
"error": "Timeout",
"imgs": result_capture["imgs"],
"timestamps_camera": result_capture["timestamps_camera"],
"timestamps_captured": result_capture["timestamps_captured"],
"timestamps_fetched": result_capture["timestamps_fetched"],
}
time.sleep(0.001)
_logger.info(f"Capture stop condition met after {time.time() - start_time} s.")
result_capture = self.child_tasks["capture"].stop_capture()
assert result_capture["status"] == TaskStatus.SUCCESS, f"Failed to stop capture: {result_capture}"
return TaskStatus.SUCCESS, {
"imgs": result_capture["imgs"],
"timestamps_camera": result_capture["timestamps_camera"],
"timestamps_captured": result_capture["timestamps_captured"],
"timestamps_fetched": result_capture["timestamps_fetched"],
}