lpcore.capture.capturebase のソースコード

from datetime import datetime
import logging
from os import PathLike
from queue import Queue
import threading
import time
from typing import Any, Literal, final

_logger = logging.getLogger(__name__)

from laserprocessing import DataPacket, SaveMode, TriggerMode, TriggerType
from laserprocessing.abc.camera import Camera, disabled_camera
from laserprocessing.storages import ImageSaver

from lpcore import TaskStatus
from lpcore.abc.process import Process


[ドキュメント] class CaptureBase(Process): devices_required = {"camera": Camera} parameters_optional = { "save_image": bool, "folderpath": PathLike | str, "filetype": str, "save_mode": SaveMode, "basename": str, # Manual modeでは機能しない "counter": int, "zeropadding": int, "timestamp_mode": Literal["camera", "captured", "fetched", "at-save"], "use_thread_on_save": bool, } runtime_parameters_optional = {"filenames": list[str]}
[ドキュメント] def initialize(self) -> None: self._image_buffer = Queue[DataPacket]() self._saving_threads: list[threading.Thread] = [] self._initialize_image_saver() self._initialize_camera()
def _initialize_image_saver(self) -> None: # ここでは,画像保存の設定を行う. # 画像保存は,Optional な機能として提供する. if "save_image" in self.parameters: self._save_image = self.parameters["save_image"] else: self._save_image = False self._image_saver: ImageSaver | None = None if self._save_image: key_missing_msg = "" for key in ["folderpath", "filetype", "save_mode"]: if key not in self.parameters: key_missing_msg += f"{key}, " if key_missing_msg: raise ValueError(f"Missing key(s) in parameters: {key_missing_msg[:-2]}") if "timestamp_mode" not in self.parameters: self._parameters_optional["timestamp_mode"] = "at-save" savemode = self.parameters["save_mode"] if savemode == SaveMode.TIMESTAMP: if self.parameters["timestamp_mode"] == "camera" or \ self.parameters["timestamp_mode"] == "captured" or \ self.parameters["timestamp_mode"] == "fetched": savemode = SaveMode.MANUAL self._img_saver = ImageSaver( folderpath=self.parameters["folderpath"], filetype=self.parameters["filetype"], savemode=savemode, basename=self.parameters["basename"] if "basename" in self.parameters else self.name, counter=self.parameters["counter"] if "counter" in self.parameters else 0, zeropadding=self.parameters["zeropadding"] if "zeropadding" in self.parameters else 4, ) def _initialize_camera(self) -> None: with disabled_camera(self.devices["camera"]): self.devices["camera"].set_client(self._image_buffer) self.devices["camera"].set_trigger(TriggerType.SOFTWARE, TriggerMode.ONCE) self.devices["camera"].enable() def _fetch_image(self, timeout: int = 5) -> tuple[DataPacket, float]: try: data_packet: DataPacket = self._image_buffer.get(timeout=timeout) te = time.time() except Exception as e: _logger.error(f"Exception while fetching image: {e}") return DataPacket( payload=None, timestamp=0, ), time.time() return data_packet, te def _fetch_images(self, num_images: int, timeout: int = 5) -> tuple[list[DataPacket], list[float]]: datapackets: list[DataPacket] = [] timestamps_fetched: list[float] = [] for i in range(num_images): data_packet, timestamp_fetched = self._fetch_image(timeout=timeout) if isinstance(data_packet.timestamp, float): timestamp_captured_f = data_packet.timestamp timestamp_captured_dt = datetime.fromtimestamp(timestamp_captured_f) elif isinstance(data_packet.timestamp, datetime): timestamp_captured_dt = data_packet.timestamp timestamp_captured_f = timestamp_captured_dt.timestamp() else: _logger.error( f"Unknown timestamp type: {type(data_packet.timestamp)} (value: {data_packet.timestamp}), setting to 0." ) timestamp_captured_dt = datetime.fromtimestamp(0) timestamp_captured_f = 0.0 if data_packet.payload is None: _logger.error(f"Failed to fetch image {i+1} of {num_images}") return datapackets, timestamps_fetched datapackets.append(data_packet) timestamps_fetched.append(timestamp_fetched) _logger.debug( f"Fetched image {i+1:05d} / {num_images:05d}: " f"Fetched time at {datetime.fromtimestamp(timestamp_fetched):%Y-%m-%d %H:%M:%S.%f} ({timestamp_fetched:15.6f}), " f"Captured time at {timestamp_captured_dt:%Y-%m-%d %H:%M:%S.%f} ({timestamp_captured_f:15.6f})" ) return datapackets, timestamps_fetched def _save_images(self, data_packets: list[DataPacket], timestamps_fetched: list[float], filenames: list[str] = []) -> None: if not self._save_image or len(data_packets) == 0: return # cameraでのタイムスタンプを利用する場合には,メタデータが必要 # メタデータがない場合には,キャプチャ時のPCのタイムスタンプを利用する if self.parameters["timestamp_mode"] == "camera": if data_packets[0].metadata is None: _logger.warning("Timestamp mode is set to 'camera', but metadata is not available. " "Switching to 'captured' mode.") self._parameters_optional["timestamp_mode"] = "captured" elif not "tsSec" in data_packets[0].metadata or not "tsUSec" in data_packets[0].metadata: _logger.warning( "Timestamp mode is set to 'camera', but FPGA timestamp is not available. " "Switching to 'captured' mode.") self._parameters_optional["timestamp_mode"] = "captured" for i, (data_packet, timestamp_fetched) in enumerate(zip(data_packets, timestamps_fetched)): img = data_packet.payload if isinstance(data_packet.timestamp, float): timestamp_captured_f = data_packet.timestamp timestamp_captured_dt = datetime.fromtimestamp(timestamp_captured_f) elif isinstance(data_packet.timestamp, datetime): timestamp_captured_dt = data_packet.timestamp timestamp_captured_f = timestamp_captured_dt.timestamp() else: _logger.error( f"Unknown timestamp type: {type(data_packet.timestamp)} (value: {data_packet.timestamp}), setting to 0." ) timestamp_captured_dt = datetime.fromtimestamp(0) timestamp_captured_f = 0.0 if isinstance( data_packet.metadata, dict) and "tsSec" in data_packet.metadata and "tsUSec" in data_packet.metadata: timestamp_fpga = data_packet.metadata[ "tsSec"] + data_packet.metadata["tsUSec"] * 1e-6 else: timestamp_fpga = None if self._img_saver.savemode == SaveMode.MANUAL: if self.parameters["timestamp_mode"] == "camera": filepart = f"{timestamp_fpga:.6f}" if timestamp_fpga is not None else f"MetadataError_at_index={i:06d}_{timestamp_captured_f:.6f}" elif self.parameters["timestamp_mode"] == "captured": filepart = f"{timestamp_captured_dt:%Y%m%d_%H%M%S_%f}" elif self.parameters["timestamp_mode"] == "fetched": filepart = f"{datetime.fromtimestamp(timestamp_fetched):%Y%m%d_%H%M%S_%f}" else: try: filepart = filenames[i] except IndexError: filepart = f"IndexError_at_index={i:06d}-{timestamp_captured_f:.6f}" else: filepart = None self._img_saver.save(img, filepart=filepart) _logger.info( f"Saved image {i+1:05d} / {len(data_packets):05d}: " f"Capture time at {timestamp_captured_dt:%Y-%m-%d %H:%M:%S.%f} ({timestamp_captured_f:15.6f}), " f"Timestamp of camera: {timestamp_fpga if timestamp_fpga is not None else -1:15.6f}" )
[ドキュメント] def _custom_execute(self, **kwargs: Any) -> tuple[TaskStatus, int]: """カメラにトリガーをかける処理を記述する. Returns: status (TaskStatus): タスクの実行結果 number_of_images (int): 画像の枚数.のちの処理で画像を取得する際に使用するために必要. """ raise NotImplementedError
@final def _execute(self, **kwargs: Any) -> tuple[TaskStatus, dict[str, Any]]: status, number_of_images = self._custom_execute(**kwargs) if status != TaskStatus.SUCCESS: return status, {} datapackets, timestamps_fetched = self._fetch_images(number_of_images) filenames = kwargs.get("filenames", []) imgs = [dp.payload for dp in datapackets] timestamps_captured = [dp.timestamp for dp in datapackets] timestamps_camera: list[float] | None = None if datapackets[0].metadata is not None: metadata = datapackets[0].metadata assert isinstance(metadata, dict) if "tsSec" in metadata and "tsUSec" in metadata: timestamps_camera = [ dp.metadata["tsSec"] + dp.metadata["tsUSec"] * 1e-6 for dp in datapackets if dp.metadata is not None ] if "use_thread_on_save" in self.parameters and self.parameters["use_thread_on_save"]: thread = threading.Thread(target=self._save_images, args=(datapackets, timestamps_fetched), kwargs={"filenames": filenames}) thread.daemon = False thread.start() self._saving_threads.append(thread) _logger.debug(f"Saving images in a separate thread {thread.name}.") else: self._save_images(datapackets, timestamps_fetched, filenames=filenames) return TaskStatus.SUCCESS, { "imgs": imgs, "timestamps_camera": timestamps_camera, "timestamps_captured": timestamps_captured, "timestamps_fetched": timestamps_fetched, } def check_threads(self) -> int: for thread in self._saving_threads: if not thread.is_alive(): _logger.debug(f"Thread {thread.name} is not alive. Removing it.") self._saving_threads.remove(thread) if len(self._saving_threads) > 0: _logger.debug( f"Threads are still alive: {[thread.name for thread in self._saving_threads]}") else: _logger.debug("No threads are alive.") return len(self._saving_threads)