lpcore.capture.methodtriggeredimagecapture のソースコード

import logging
import time
from typing import Any, Callable
from lpcore import TaskStatus
from lpcore.abc.process import Process
from lpcore.capture.triggerbasedimagecapture import TriggerBasedImageCapture

_logger = logging.getLogger(__name__)


[ドキュメント] class MethodTriggeredImageCapture(Process): child_tasks_required = { "capture": TriggerBasedImageCapture, } runtime_parameters_required = { **TriggerBasedImageCapture.runtime_parameters_required, "capture_stop_condition": Callable[..., bool], } def _execute(self, **kwargs: Any) -> tuple[TaskStatus, dict[str, Any]]: timeout_s = kwargs["timeout_s"] self.child_tasks["capture"].execute(timeout_s=timeout_s) start_time = time.time() while (not kwargs["capture_stop_condition"]()) or (time.time() - start_time < 0.1): current_time = time.time() if current_time - start_time > timeout_s: _logger.warning(f"Timeout reached after {timeout_s} s.") result_capture = self.child_tasks["capture"].stop_capture() assert result_capture["status"] == TaskStatus.SUCCESS, f"Failed to stop capture: {result_capture}" return TaskStatus.FAILED, { "error": "Timeout", "imgs": result_capture["imgs"], "timestamps_camera": result_capture["timestamps_camera"], "timestamps_captured": result_capture["timestamps_captured"], "timestamps_fetched": result_capture["timestamps_fetched"], } time.sleep(0.001) _logger.info(f"Capture stop condition met after {time.time() - start_time} s.") result_capture = self.child_tasks["capture"].stop_capture() assert result_capture["status"] == TaskStatus.SUCCESS, f"Failed to stop capture: {result_capture}" return TaskStatus.SUCCESS, { "imgs": result_capture["imgs"], "timestamps_camera": result_capture["timestamps_camera"], "timestamps_captured": result_capture["timestamps_captured"], "timestamps_fetched": result_capture["timestamps_fetched"], }