from datetime import datetime
import logging
from os import PathLike
from queue import Queue
import threading
import time
from typing import Any, Literal, final
_logger = logging.getLogger(__name__)
from laserprocessing import DataPacket, SaveMode, TriggerMode, TriggerType
from laserprocessing.abc.camera import Camera, disabled_camera
from laserprocessing.storages import ImageSaver
from lpcore import TaskStatus
from lpcore.abc.process import Process
[ドキュメント]
class CaptureBase(Process):
devices_required = {"camera": Camera}
parameters_optional = {
"save_image": bool,
"folderpath": PathLike | str,
"filetype": str,
"save_mode": SaveMode,
"basename": str, # Manual modeでは機能しない
"counter": int,
"zeropadding": int,
"timestamp_mode": Literal["camera", "captured", "fetched", "at-save"],
"use_thread_on_save": bool,
}
runtime_parameters_optional = {"filenames": list[str]}
[ドキュメント]
def initialize(self) -> None:
self._image_buffer = Queue[DataPacket]()
self._saving_threads: list[threading.Thread] = []
self._initialize_image_saver()
self._initialize_camera()
def _initialize_image_saver(self) -> None:
# ここでは,画像保存の設定を行う.
# 画像保存は,Optional な機能として提供する.
if "save_image" in self.parameters:
self._save_image = self.parameters["save_image"]
else:
self._save_image = False
self._image_saver: ImageSaver | None = None
if self._save_image:
key_missing_msg = ""
for key in ["folderpath", "filetype", "save_mode"]:
if key not in self.parameters:
key_missing_msg += f"{key}, "
if key_missing_msg:
raise ValueError(f"Missing key(s) in parameters: {key_missing_msg[:-2]}")
if "timestamp_mode" not in self.parameters:
self._parameters_optional["timestamp_mode"] = "at-save"
savemode = self.parameters["save_mode"]
if savemode == SaveMode.TIMESTAMP:
if self.parameters["timestamp_mode"] == "camera" or \
self.parameters["timestamp_mode"] == "captured" or \
self.parameters["timestamp_mode"] == "fetched":
savemode = SaveMode.MANUAL
self._img_saver = ImageSaver(
folderpath=self.parameters["folderpath"],
filetype=self.parameters["filetype"],
savemode=savemode,
basename=self.parameters["basename"]
if "basename" in self.parameters else self.name,
counter=self.parameters["counter"] if "counter" in self.parameters else 0,
zeropadding=self.parameters["zeropadding"]
if "zeropadding" in self.parameters else 4,
)
def _initialize_camera(self) -> None:
with disabled_camera(self.devices["camera"]):
self.devices["camera"].set_client(self._image_buffer)
self.devices["camera"].set_trigger(TriggerType.SOFTWARE, TriggerMode.ONCE)
self.devices["camera"].enable()
def _fetch_image(self, timeout: int = 5) -> tuple[DataPacket, float]:
try:
data_packet: DataPacket = self._image_buffer.get(timeout=timeout)
te = time.time()
except Exception as e:
_logger.error(f"Exception while fetching image: {e}")
return DataPacket(
payload=None,
timestamp=0,
), time.time()
return data_packet, te
def _fetch_images(self,
num_images: int,
timeout: int = 5) -> tuple[list[DataPacket], list[float]]:
datapackets: list[DataPacket] = []
timestamps_fetched: list[float] = []
for i in range(num_images):
data_packet, timestamp_fetched = self._fetch_image(timeout=timeout)
if isinstance(data_packet.timestamp, float):
timestamp_captured_f = data_packet.timestamp
timestamp_captured_dt = datetime.fromtimestamp(timestamp_captured_f)
elif isinstance(data_packet.timestamp, datetime):
timestamp_captured_dt = data_packet.timestamp
timestamp_captured_f = timestamp_captured_dt.timestamp()
else:
_logger.error(
f"Unknown timestamp type: {type(data_packet.timestamp)} (value: {data_packet.timestamp}), setting to 0."
)
timestamp_captured_dt = datetime.fromtimestamp(0)
timestamp_captured_f = 0.0
if data_packet.payload is None:
_logger.error(f"Failed to fetch image {i+1} of {num_images}")
return datapackets, timestamps_fetched
datapackets.append(data_packet)
timestamps_fetched.append(timestamp_fetched)
_logger.debug(
f"Fetched image {i+1:05d} / {num_images:05d}: "
f"Fetched time at {datetime.fromtimestamp(timestamp_fetched):%Y-%m-%d %H:%M:%S.%f} ({timestamp_fetched:15.6f}), "
f"Captured time at {timestamp_captured_dt:%Y-%m-%d %H:%M:%S.%f} ({timestamp_captured_f:15.6f})"
)
return datapackets, timestamps_fetched
def _save_images(self,
data_packets: list[DataPacket],
timestamps_fetched: list[float],
filenames: list[str] = []) -> None:
if not self._save_image or len(data_packets) == 0:
return
# cameraでのタイムスタンプを利用する場合には,メタデータが必要
# メタデータがない場合には,キャプチャ時のPCのタイムスタンプを利用する
if self.parameters["timestamp_mode"] == "camera":
if data_packets[0].metadata is None:
_logger.warning("Timestamp mode is set to 'camera', but metadata is not available. "
"Switching to 'captured' mode.")
self._parameters_optional["timestamp_mode"] = "captured"
elif not "tsSec" in data_packets[0].metadata or not "tsUSec" in data_packets[0].metadata:
_logger.warning(
"Timestamp mode is set to 'camera', but FPGA timestamp is not available. "
"Switching to 'captured' mode.")
self._parameters_optional["timestamp_mode"] = "captured"
for i, (data_packet, timestamp_fetched) in enumerate(zip(data_packets, timestamps_fetched)):
img = data_packet.payload
if isinstance(data_packet.timestamp, float):
timestamp_captured_f = data_packet.timestamp
timestamp_captured_dt = datetime.fromtimestamp(timestamp_captured_f)
elif isinstance(data_packet.timestamp, datetime):
timestamp_captured_dt = data_packet.timestamp
timestamp_captured_f = timestamp_captured_dt.timestamp()
else:
_logger.error(
f"Unknown timestamp type: {type(data_packet.timestamp)} (value: {data_packet.timestamp}), setting to 0."
)
timestamp_captured_dt = datetime.fromtimestamp(0)
timestamp_captured_f = 0.0
if isinstance(
data_packet.metadata,
dict) and "tsSec" in data_packet.metadata and "tsUSec" in data_packet.metadata:
timestamp_fpga = data_packet.metadata[
"tsSec"] + data_packet.metadata["tsUSec"] * 1e-6
else:
timestamp_fpga = None
if self._img_saver.savemode == SaveMode.MANUAL:
if self.parameters["timestamp_mode"] == "camera":
filepart = f"{timestamp_fpga:.6f}" if timestamp_fpga is not None else f"MetadataError_at_index={i:06d}_{timestamp_captured_f:.6f}"
elif self.parameters["timestamp_mode"] == "captured":
filepart = f"{timestamp_captured_dt:%Y%m%d_%H%M%S_%f}"
elif self.parameters["timestamp_mode"] == "fetched":
filepart = f"{datetime.fromtimestamp(timestamp_fetched):%Y%m%d_%H%M%S_%f}"
else:
try:
filepart = filenames[i]
except IndexError:
filepart = f"IndexError_at_index={i:06d}-{timestamp_captured_f:.6f}"
else:
filepart = None
self._img_saver.save(img, filepart=filepart)
_logger.info(
f"Saved image {i+1:05d} / {len(data_packets):05d}: "
f"Capture time at {timestamp_captured_dt:%Y-%m-%d %H:%M:%S.%f} ({timestamp_captured_f:15.6f}), "
f"Timestamp of camera: {timestamp_fpga if timestamp_fpga is not None else -1:15.6f}"
)
[ドキュメント]
def _custom_execute(self, **kwargs: Any) -> tuple[TaskStatus, int]:
"""カメラにトリガーをかける処理を記述する.
Returns:
status (TaskStatus): タスクの実行結果
number_of_images (int): 画像の枚数.のちの処理で画像を取得する際に使用するために必要.
"""
raise NotImplementedError
@final
def _execute(self, **kwargs: Any) -> tuple[TaskStatus, dict[str, Any]]:
status, number_of_images = self._custom_execute(**kwargs)
if status != TaskStatus.SUCCESS:
return status, {}
datapackets, timestamps_fetched = self._fetch_images(number_of_images)
filenames = kwargs.get("filenames", [])
imgs = [dp.payload for dp in datapackets]
timestamps_captured = [dp.timestamp for dp in datapackets]
timestamps_camera: list[float] | None = None
if datapackets[0].metadata is not None:
metadata = datapackets[0].metadata
assert isinstance(metadata, dict)
if "tsSec" in metadata and "tsUSec" in metadata:
timestamps_camera = [
dp.metadata["tsSec"] + dp.metadata["tsUSec"] * 1e-6
for dp in datapackets
if dp.metadata is not None
]
if "use_thread_on_save" in self.parameters and self.parameters["use_thread_on_save"]:
thread = threading.Thread(target=self._save_images,
args=(datapackets, timestamps_fetched),
kwargs={"filenames": filenames})
thread.daemon = False
thread.start()
self._saving_threads.append(thread)
_logger.debug(f"Saving images in a separate thread {thread.name}.")
else:
self._save_images(datapackets, timestamps_fetched, filenames=filenames)
return TaskStatus.SUCCESS, {
"imgs": imgs,
"timestamps_camera": timestamps_camera,
"timestamps_captured": timestamps_captured,
"timestamps_fetched": timestamps_fetched,
}
def check_threads(self) -> int:
for thread in self._saving_threads:
if not thread.is_alive():
_logger.debug(f"Thread {thread.name} is not alive. Removing it.")
self._saving_threads.remove(thread)
if len(self._saving_threads) > 0:
_logger.debug(
f"Threads are still alive: {[thread.name for thread in self._saving_threads]}")
else:
_logger.debug("No threads are alive.")
return len(self._saving_threads)