lpcore.capture.triggerbasedimagecapture のソースコード
import logging
import threading
import time
from typing import Any
from lpcore import TaskStatus
from lpcore.capture.capturebase import CaptureBase
_logger = logging.getLogger(__name__)
[ドキュメント]
class TriggerBasedImageCapture(CaptureBase):
runtime_parameters_required = {"timeout_s": float}
[ドキュメント]
def initialize(self) -> None:
super().initialize()
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
def _capture_images(self, **kwargs: Any) -> None:
timeout_s = kwargs["timeout_s"]
start = time.time()
try:
while not self._stop_event.is_set():
self.devices["camera"].trigger()
if time.time() - start >= timeout_s:
_logger.warning("Timeout reached.")
raise TimeoutError(f"Timeout reached after {timeout_s} s.")
except Exception as e:
self._stop_event.set()
return
def _execute(self, **kwargs: Any) -> tuple[TaskStatus, dict[str, Any]]:
self._stop_event.clear()
self._thread = threading.Thread(target=self._capture_images, kwargs={"timeout_s": kwargs["timeout_s"]})
self._thread.start()
return TaskStatus.RUNNING, {}
def stop_capture(self, filenames: list[str] = []) -> dict[str, Any]:
self._stop_event.set()
if self._thread is not None:
self._thread.join()
self._thread = None
number_of_images = self._image_buffer.qsize()
datapackets, timestamps_fetched = self._fetch_images(number_of_images)
imgs = [dp.payload for dp in datapackets]
timestamps_captured = [dp.timestamp for dp in datapackets]
timestamps_camera: list[float] | None = None
if datapackets[0].metadata is not None:
print(f"+++ --- {datapackets[0]}")
metadata = datapackets[0].metadata
assert isinstance(metadata, dict)
if "tsSec" in metadata and "tsUSec" in metadata:
timestamps_camera = [
dp.metadata["tsSec"] + dp.metadata["tsUSec"] * 1e-6 for dp in datapackets if dp.metadata is not None
]
if "use_thread_on_save" in self.parameters and self.parameters["use_thread_on_save"]:
thread = threading.Thread(target=self._save_images,
args=(datapackets, timestamps_fetched),
kwargs={"filenames": filenames})
thread.start()
self._saving_threads.append(thread)
_logger.debug(f"Saving images in a separate thread {thread.name}.")
else:
self._save_images(datapackets, timestamps_fetched, filenames=filenames)
return {
"status": TaskStatus.SUCCESS,
"imgs": imgs,
"timestamps_camera": timestamps_camera,
"timestamps_captured": timestamps_captured,
"timestamps_fetched": timestamps_fetched,
}