lpcore.capture.triggerbasedimagecapture のソースコード

import logging
import threading
import time
from typing import Any
from lpcore import TaskStatus
from lpcore.capture.capturebase import CaptureBase

_logger = logging.getLogger(__name__)


[ドキュメント] class TriggerBasedImageCapture(CaptureBase): runtime_parameters_required = {"timeout_s": float}
[ドキュメント] def initialize(self) -> None: super().initialize() self._stop_event = threading.Event() self._thread: threading.Thread | None = None
def _capture_images(self, **kwargs: Any) -> None: timeout_s = kwargs["timeout_s"] start = time.time() try: while not self._stop_event.is_set(): self.devices["camera"].trigger() if time.time() - start >= timeout_s: _logger.warning("Timeout reached.") raise TimeoutError(f"Timeout reached after {timeout_s} s.") except Exception as e: self._stop_event.set() return def _execute(self, **kwargs: Any) -> tuple[TaskStatus, dict[str, Any]]: self._stop_event.clear() self._thread = threading.Thread(target=self._capture_images, kwargs={"timeout_s": kwargs["timeout_s"]}) self._thread.start() return TaskStatus.RUNNING, {} def stop_capture(self, filenames: list[str] = []) -> dict[str, Any]: self._stop_event.set() if self._thread is not None: self._thread.join() self._thread = None number_of_images = self._image_buffer.qsize() datapackets, timestamps_fetched = self._fetch_images(number_of_images) imgs = [dp.payload for dp in datapackets] timestamps_captured = [dp.timestamp for dp in datapackets] timestamps_camera: list[float] | None = None if datapackets[0].metadata is not None: print(f"+++ --- {datapackets[0]}") metadata = datapackets[0].metadata assert isinstance(metadata, dict) if "tsSec" in metadata and "tsUSec" in metadata: timestamps_camera = [ dp.metadata["tsSec"] + dp.metadata["tsUSec"] * 1e-6 for dp in datapackets if dp.metadata is not None ] if "use_thread_on_save" in self.parameters and self.parameters["use_thread_on_save"]: thread = threading.Thread(target=self._save_images, args=(datapackets, timestamps_fetched), kwargs={"filenames": filenames}) thread.start() self._saving_threads.append(thread) _logger.debug(f"Saving images in a separate thread {thread.name}.") else: self._save_images(datapackets, timestamps_fetched, filenames=filenames) return { "status": TaskStatus.SUCCESS, "imgs": imgs, "timestamps_camera": timestamps_camera, "timestamps_captured": timestamps_captured, "timestamps_fetched": timestamps_fetched, }