-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththermal_camera.py
More file actions
88 lines (70 loc) · 2.39 KB
/
Copy paththermal_camera.py
File metadata and controls
88 lines (70 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""
Thermal camera streaming to Plexus.
Reads from any supported thermal camera and sends frames to Plexus using
the SDK's send_thermal_frame() method. The gateway relays colorized JPEG
frames to the app, along with temperature range and (for small sensors)
per-pixel temperature data.
Supported hardware (--camera argument):
sim Simulated camera — no hardware needed (default)
mlx90640 MLX90640 32×24 I2C sensor (pip install adafruit-circuitpython-mlx90640)
mlx90641 MLX90641 16×12 I2C sensor (pip install adafruit-circuitpython-mlx90641)
usb USB thermal camera in Y16 format (InfiRay, Topdon, Seek, etc.)
Run:
export PLEXUS_API_KEY=plx_xxx
python thermal_camera.py
python thermal_camera.py --camera mlx90640
python thermal_camera.py --camera usb --device 2
"""
import argparse
import sys
import time
from plexus import Plexus
from plexus.cameras.thermal import NoCameraFound, ThermalSource
CAMERA_ID = "thermal"
FPS = 5
def main() -> None:
parser = argparse.ArgumentParser(description="Stream thermal camera to Plexus.")
parser.add_argument(
"--camera",
choices=["sim", "mlx90640", "mlx90641", "usb"],
default=None,
help="Camera driver (default: auto-detect)",
)
parser.add_argument(
"--device",
type=int,
default=0,
help="USB video device index (default: 0)",
)
args = parser.parse_args()
hint = args.device if args.camera == "usb" else args.camera
try:
cam = ThermalSource.open(hint)
except NoCameraFound as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
px = Plexus(transport="ws")
px.wait_connected()
interval = 1.0 / FPS
frame_count = 0
print(f"Streaming {cam.width}×{cam.height} thermal at {FPS} fps — Ctrl-C to stop")
try:
while True:
t0 = time.time()
temps = cam.read_frame()
px.send_thermal_frame(temps, camera_id=CAMERA_ID)
frame_count += 1
if frame_count % 50 == 0:
print(f" {frame_count} frames sent")
elapsed = time.time() - t0
wait = interval - elapsed
if wait > 0:
time.sleep(wait)
except KeyboardInterrupt:
pass
finally:
cam.close()
px.stop()
print(f"Done. {frame_count} frames sent.")
if __name__ == "__main__":
main()