11import os
22import zmq
3- from ctevents import ctevents
4- from ctevents .ctevents import send_terminating_plugin_fb_event
3+ from ctevents import MonitorPowerStartEvent , MonitorPowerStopEvent , PluginTerminateEvent
4+ from ctevents .ctevents import socket_message_to_typed_event , send_terminate_plugin_fb_event , send_monitor_power_start_fb_event
55from pyevents .events import get_plugin_socket , get_next_msg , send_quit_command
66import logging
7- from subprocess import run , PIPE , STDOUT
7+ from subprocess import Popen , run , PIPE , STDOUT
8+ from math import ceil
9+ import yaml
10+ import requests
811
912log_level = os .environ .get ("VIDEO_GENERATING_LOG_LEVEL" , "INFO" )
1013input_video_path = os .environ .get ("INPUT_VIDEO_PATH" , "/example_video.mp4" )
14+ use_ground_truth_url = os .environ .get ("USE_CUSTOM_GROUND_TRUTH_FILE_URL" , False )
15+ ground_truth_url = os .environ .get ("CUSTOM_GROUND_TRUTH_URL" )
1116ground_truth_file = os .environ .get ("GROUND_TRUTH_FILE" , "/ground_truth.yml" )
12- device = os .environ .get ("DEVICE" , "http://0.0.0.0/8090" )
17+ device = os .environ .get ("DEVICE" , "/dev/video0" )
18+ mode = os .environ .get ("MODE" , "device" )
1319
1420logger = logging .getLogger ("Image Generating Plugin" )
1521if log_level == "DEBUG" :
@@ -33,7 +39,7 @@ def get_socket():
3339 for the port configured for this plugin.
3440 """
3541 # get the port assigned to the Image Generating plugin
36- PORT = os .environ .get ('VIDEO_GENERATING_PLUGIN_PORT' , 6000 )
42+ PORT = os .environ .get ('VIDEO_GENERATING_PLUGIN_PORT' , 6003 )
3743 # create the zmq context object
3844 context = zmq .Context ()
3945 socket = get_plugin_socket (context , PORT )
@@ -42,18 +48,33 @@ def get_socket():
4248
4349def get_video_duration ():
4450 result = run (["ffprobe" , "-v" , "error" , "-show_entries" , "format=duration" , "-of" , "default=noprint_wrappers=1:nokey=1" , input_video_path ], stdout = PIPE , stderr = STDOUT )
45- return ceil (float (result .stdout ))
51+ duration = ceil (float (result .stdout ))
52+ logger .info (f'{ input_video_path } has a duration of { duration } seconds' )
53+ return duration
4654
4755
4856def load_ground_truth ():
49- return None
50- with open (ground_truth_file , 'r' ) as f :
51- video_info = yaml .safe_load (f )
57+ video_info = {}
58+ try :
59+ if use_ground_truth_url :
60+ logger .info (f"Retrieving custom ground truth file: { ground_truth_url } " )
61+ response = requests .get (ground_truth_url )
62+ if response .status_code == 200 :
63+ yml_content = response .content .decode ('utf-8' ).splitlines ()
64+ video_info = yaml .safe_load (yml_content )
65+ else :
66+ with open (ground_truth_file , 'r' ) as f :
67+ video_info = yaml .safe_load (f )
68+ except FileNotFoundError :
69+ logger .error (f"File not found: { ground_truth_file } " )
70+ except Exception as e :
71+ logger .error (f'An error occurred: { e } ' )
5272 video_info ['duration' ] = get_video_duration ()
53- OUTPUT_DIR = os .environ .get ('TRAPS_VIDEO_OUTPUT_PATH' , '/output ' )
73+ OUTPUT_DIR = os .environ .get ('TRAPS_VIDEO_OUTPUT_PATH' , '/video_info ' )
5474 video_info_file = os .path .join (OUTPUT_DIR , 'video_info.yaml' )
5575 with open (video_info_file , 'w' ) as f :
5676 yaml .dump (video_info , f )
77+ logger .info (f'Updating { OUTPUT_DIR } /video_info.yaml' )
5778
5879def monitor_generating_power ():
5980 """
@@ -64,7 +85,7 @@ def monitor_generating_power():
6485 monitor_type = [1 ]
6586 monitor_seconds = 0
6687 if monitor_flag :
67- ctevents . send_monitor_power_start_fb_event (socket , pid , monitor_type , monitor_seconds )
88+ send_monitor_power_start_fb_event (socket , pid , monitor_type , monitor_seconds )
6889 logger .info (f"Monitoring image generating power" )
6990
7091def is_v4l2loopback_available ():
@@ -76,42 +97,50 @@ def is_v4l2loopback_available():
7697
7798def stream_file_to_device (input_video_path ):
7899 logger .info (f'starting video device stream to { device } ' )
79- run (['ffmpeg' , '-re' , '-stream_loop' , '-1' , '-i' , input_video_path , '-f' , 'v4l2' , '-pix_fmt' , 'yuv420p' , device ])
80- logger .info ('Video stream finished' )
81-
82- def stream_file_to_netcam (input_video_path , device ):
83- logger .info (f'starting netcam stream at { netcam_url } ' )
84- run (['ffmpeg' , '-re' , '-i' , input_video_path , '-f' , 'mjpeg' , '-pix_fmt' , 'yuv420p' , '-listen' , '1' , netcam_url ])
85- logger .info ('Video stream finished' )
100+ return Popen (['ffmpeg' , '-re' , '-stream_loop' , '-1' , '-i' , input_video_path , '-f' , 'v4l2' , '-pix_fmt' , 'yuv420p' , device ])
86101
87- def get_device_type ():
88- if '/dev/video' in device :
89- return 'video_device'
90- elif 'http://' in device :
91- return 'netcam'
92102
93103def process_video (input_video_path , ground_truth ):
94104 """
95- Main function that starts a video stream, either on /dev/video or as a netcam
105+ Main function that starts a video stream on a /dev/video device
96106 """
97107 logger .info (f"The input video path specified by the user:{ input_video_path } " )
98- device_type = get_device_type ()
99- if device_type == 'video_device' :
100- if is_v4l2loopback_available ():
101- stream_file_to_device (input_video_path )
102- else :
103- logger .info ('v4l2loopback not available for {device}. Falling back to netcam stream.' )
104- stream_file_to_netcam (input_video_path , DEFAULT_NETCAM )
108+ if is_v4l2loopback_available ():
109+ return stream_file_to_device (input_video_path )
105110 else :
106- stream_file_to_netcam ( input_video_path )
111+ logger . warning ( 'v4l2loopback not available for {device}. Shutting down' )
107112
108113def main ():
109114 global socket
110- socket = get_socket ()
111115 ground_truth = load_ground_truth ()
112- monitor_generating_power ()
113- #motion_ready_signal(socket)
114- process_video (input_video_path , ground_truth )
116+ stream_proc = None
117+ if mode == 'device' :
118+ monitor_generating_power ()
119+ stream_proc = process_video (input_video_path , ground_truth )
120+ done = False
121+ while not done :
122+ socket = get_socket ()
123+ try :
124+ message = get_next_msg (socket )
125+ except zmq .error .Again :
126+ logger .debug (f"Got a zmq.error.Again; i.e., waited { SOCKET_TIMEOUT } ms without getting a message" )
127+ continue
128+ except Exception as e :
129+ logger .debug (f"Got exception from get_next_msg; type(e): { type (e )} ; e: { e } " )
130+ done = True
131+ logger .info ("Video generating plugin stopping due to timeout limit..." )
132+ continue
133+ if not message :
134+ logger .info ("No message found in get_next_msg" )
135+
136+ event = socket_message_to_typed_event (message )
137+ logger .info (f"Got a message from the event socket of type: { type (event )} " )
138+ if isinstance (event , PluginTerminateEvent ):
139+ logging .info ('received PluginTerminateEvent' )
140+ done = True
141+
142+ if stream_proc :
143+ stream_proc .kill ()
115144 send_quit_command (socket )
116145
117146if __name__ == '__main__' :
0 commit comments