e2evideo.video_preprocessing
Video pre-processing module that extract frames from videos. Main functionality: * Create './images' folder, if it does not exist already. * Read all videos from a given videos folder and a specified video extension (default mp4). * Choose between either Saving all frames or one frame per second, in a video in corresponding images folder with the same video file name.
1""" 2Video pre-processing module that extract frames from videos. 3Main functionality: 4 * Create './images' folder, if it does not exist already. 5 * Read all videos from a given videos folder and a specified video extension 6 (default mp4). 7 * Choose between either Saving all frames or one frame per second, 8 in a video in corresponding images folder with the same video file name. 9""" 10# pylint: disable=no-member 11# import packages needed 12import argparse 13from dataclasses import dataclass 14import os 15import glob 16import math 17import cv2 18 19os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" 20OUTPUT_PATH = "../data/ucf_sports_actions/frames" 21 22 23@dataclass 24class VideoConfig: 25 """Configuration class for video pre-processing.""" 26 27 videos_folder: str 28 video_format: str = "mp4" 29 image_format: str = "jpg" 30 sampling_mode: str = "every_frame" 31 num_frames: int = 10 32 output_folder: str = OUTPUT_PATH 33 back_sub: str = None 34 save_frames: bool = True 35 36 37@dataclass 38class FrameData: 39 """Data class for frame data.""" 40 41 frame: any = None 42 frames: list = None 43 frame_indices: list = None 44 count: int = 0 45 frames_folder: str = None 46 frame_rate: int = 0 47 48 49back_sub_algo_mapping = { 50 "MOG2": cv2.createBackgroundSubtractorMOG2(), # pylint: disable=E1101 51 "KNN": cv2.createBackgroundSubtractorKNN(), # pylint: disable=E1101 52} 53 54 55class VideoPreprocessor: 56 """Video pre-processing class that extract frames from videos.""" 57 58 def __init__(self, config: VideoConfig): 59 """initialize the class.""" 60 self.config = config 61 62 def create_folder(self, dir_): 63 """ 64 create a folder at dir if it does not exist already. 65 Input: 66 - dir: the folder directory 67 """ 68 try: 69 os.makedirs(dir_) 70 except OSError: 71 pass 72 73 def get_video_files(self): 74 """Get all video files from the given videos folder.""" 75 assert os.path.isdir( 76 self.config.videos_folder 77 ), "The given videos_folder does not exist" 78 self.create_folder(self.config.output_folder) 79 search_path = os.path.join( 80 self.config.videos_folder, "**", "*" + self.config.video_format 81 ) 82 video_files = glob.glob(search_path, recursive=True) 83 assert ( 84 len(video_files) != 0 85 ), "The given videos folder does not contain any vidoes" 86 return video_files 87 88 def calculate_frame_indices(self, frame_rate, total_num_frames, video_file): 89 """Calculate the frame indices for the fixed-frames mode.""" 90 if self.config.num_frames > total_num_frames: 91 print( 92 f"Warning: the number of fixed-frames is larger than the total \ 93 number of frames" 94 f"in the video {video_file}" 95 ) 96 frame_indices = list(range(int(total_num_frames))) 97 else: 98 frame_indices = [ 99 round(frame_rate * i) for i in range(self.config.num_frames) 100 ] 101 return frame_indices 102 103 def interpolate_missing_frames(self, frame_data: FrameData): 104 """Interpolate missing frames if the number of frames is less than the number \ 105 of frames.""" 106 while len(frame_data.frames) < self.config.num_frames: 107 alpha = float(len(frame_data.frames)) / len(frame_data.frame_indices) 108 # pylint: disable=E1101 109 interpolated_frame = cv2.addWeighted( 110 frame_data.frame, 1 - alpha, frame_data.frames[-1], alpha, 0 111 ) 112 frame_data.frames.append(interpolated_frame) 113 file_name = f"frame{frame_data.count}." + self.config.image_format 114 cv2.imwrite(frame_data.frames_folder + "/" + file_name, interpolated_frame) 115 frame_data.count += 1 116 return frame_data.count 117 118 def process_video(self): 119 """ 120 Main function will run when the python file is called from terminal. 121 """ 122 video_files = self.get_video_files() 123 for video_file in video_files: 124 print("\n\n") 125 frame_data = FrameData() 126 # video_format_len = len(self.config.video_format) + 1 127 images_sub_folder = video_file.split("/")[-1].split(".")[0] 128 frame_data.frames_folder = os.path.join( 129 self.config.output_folder, images_sub_folder 130 ) 131 self.create_folder(frame_data.frames_folder) 132 133 # if the frames are already extracted, remove them 134 if len(os.listdir(frame_data.frames_folder)) != 0: 135 print( 136 f"Warning: the frames folder {frame_data.frames_folder} \ 137 is not empty" 138 ) 139 print("Removing existing frames...") 140 for file_name in os.listdir(frame_data.frames_folder): 141 os.remove(os.path.join(frame_data.frames_folder, file_name)) 142 143 print(f"Extracting frames from {video_file}...") 144 # capture the video from the video file 145 cap = cv2.VideoCapture(video_file) # pylint: disable=E1101 146 frame_data.frame_rate = cap.get(cv2.CAP_PROP_FPS) # pylint: disable=E1101 147 total_num_frames = cap.get( 148 cv2.CAP_PROP_FRAME_COUNT 149 ) # pylint: disable=E1101 150 frame_data.frame_indices = self.calculate_frame_indices( 151 frame_data.frame_rate, total_num_frames, video_file 152 ) 153 frame_data.count = 0 154 frame_data.frames = [] 155 cond1 = self.config.sampling_mode == "fixed_frames" 156 if self.config.back_sub in back_sub_algo_mapping: 157 back_sub_algo = back_sub_algo_mapping[self.config.back_sub] 158 else: 159 print( 160 f"No background subtraction algorithm \ 161 provided : {self.config.back_sub}" 162 ) 163 while cap.isOpened(): 164 frame_id = cap.get(cv2.CAP_PROP_POS_FRAMES) # pylint: disable=E1101 165 ret, frame_data.frame = cap.read() 166 if not ret: 167 break 168 file_name = f"frame{frame_data.count}." + self.config.image_format 169 masked_frame_name = None 170 if self.config.back_sub is not None: 171 masked_frame_name = ( 172 f"fgMask_frame{frame_data.count}." + self.config.image_format 173 ) 174 fg_mask = back_sub_algo.apply(frame_data.frame) 175 should_save_frame = False 176 if self.config.sampling_mode == "every_frame": 177 should_save_frame = True 178 elif self.config.sampling_mode == "per_second": 179 should_save_frame = ( 180 frame_id % math.floor(frame_data.frame_rate) == 0 181 ) 182 elif self.config.sampling_mode == "per_minute": 183 frames_per_minute = math.floor(frame_data.frame_rate) * 60 184 should_save_frame = frame_id % frames_per_minute == 0 185 elif cond1: 186 should_save_frame = frame_id in frame_data.frame_indices 187 if should_save_frame: 188 cv2.imwrite( 189 frame_data.frames_folder + "/" + file_name, frame_data.frame 190 ) # pylint: disable=E1101 191 if masked_frame_name is not None: 192 # Apply the mask to the original frame 193 # pylint: disable=E1101 194 fg_frame = cv2.bitwise_and( 195 frame_data.frame, frame_data.frame, mask=fg_mask 196 ) 197 cv2.imwrite( 198 frame_data.frames_folder + "/" + masked_frame_name, fg_frame 199 ) 200 frame_data.count += 1 201 frame_data.frames.append(frame_data.frame) 202 # interpolate missing frames if count is less than the number of frames 203 cond2 = frame_id == total_num_frames - 1 204 cond3 = len(frame_data.frames) < self.config.num_frames 205 if cond1 and cond2 and cond3: 206 print( 207 ( 208 f"Warning: the number of frames extracted from \ 209 {video_file} is" 210 "less than the number of frames specified" 211 ) 212 ) 213 print("Interpolating missing frames...") 214 frame_data.count = self.interpolate_missing_frames(frame_data) 215 cap.release() 216 if self.config.save_frames: 217 print( 218 f"Done! {frame_data.count} images of format JPG is " 219 f"saved in {frame_data.frames_folder}" 220 ) 221 else: 222 print("Done!") 223 return frame_data.frames 224 225 226def main(): 227 parser = argparse.ArgumentParser() 228 parser.add_argument( 229 "--videos_folder", required=True, help="Path to the videos folder." 230 ) 231 parser.add_argument( 232 "--video_format", default="mp4", help="choose the video format to read." 233 ) 234 parser.add_argument( 235 "--image_format", default="jpg", help="choose the format for the output images." 236 ) 237 parser.add_argument( 238 "--sampling_mode", 239 default="every_frame", 240 choices=["fixed_frames", "every_frame", "per_second", "per_minute"], 241 ) 242 parser.add_argument("--num_frames", default=10, type=int) 243 parser.add_argument("--output_folder", default=OUTPUT_PATH) 244 parser.add_argument("--backsub", default=None, choices=["MOG2", "KNN"]) 245 parser.add_argument( 246 "--save_frames", default="False", type=bool, help="Save the frames locally." 247 ) 248 opts = parser.parse_args() 249 videos_config = VideoConfig( 250 opts.videos_folder, 251 opts.video_format, 252 opts.image_format, 253 opts.sampling_mode, 254 opts.num_frames, 255 opts.output_folder, 256 opts.backsub, 257 opts.save_frames, 258 ) 259 processor = VideoPreprocessor(videos_config) 260 processor.process_video() 261 262 263if __name__ == "__main__": 264 main()
24@dataclass 25class VideoConfig: 26 """Configuration class for video pre-processing.""" 27 28 videos_folder: str 29 video_format: str = "mp4" 30 image_format: str = "jpg" 31 sampling_mode: str = "every_frame" 32 num_frames: int = 10 33 output_folder: str = OUTPUT_PATH 34 back_sub: str = None 35 save_frames: bool = True
Configuration class for video pre-processing.
38@dataclass 39class FrameData: 40 """Data class for frame data.""" 41 42 frame: any = None 43 frames: list = None 44 frame_indices: list = None 45 count: int = 0 46 frames_folder: str = None 47 frame_rate: int = 0
Data class for frame data.
56class VideoPreprocessor: 57 """Video pre-processing class that extract frames from videos.""" 58 59 def __init__(self, config: VideoConfig): 60 """initialize the class.""" 61 self.config = config 62 63 def create_folder(self, dir_): 64 """ 65 create a folder at dir if it does not exist already. 66 Input: 67 - dir: the folder directory 68 """ 69 try: 70 os.makedirs(dir_) 71 except OSError: 72 pass 73 74 def get_video_files(self): 75 """Get all video files from the given videos folder.""" 76 assert os.path.isdir( 77 self.config.videos_folder 78 ), "The given videos_folder does not exist" 79 self.create_folder(self.config.output_folder) 80 search_path = os.path.join( 81 self.config.videos_folder, "**", "*" + self.config.video_format 82 ) 83 video_files = glob.glob(search_path, recursive=True) 84 assert ( 85 len(video_files) != 0 86 ), "The given videos folder does not contain any vidoes" 87 return video_files 88 89 def calculate_frame_indices(self, frame_rate, total_num_frames, video_file): 90 """Calculate the frame indices for the fixed-frames mode.""" 91 if self.config.num_frames > total_num_frames: 92 print( 93 f"Warning: the number of fixed-frames is larger than the total \ 94 number of frames" 95 f"in the video {video_file}" 96 ) 97 frame_indices = list(range(int(total_num_frames))) 98 else: 99 frame_indices = [ 100 round(frame_rate * i) for i in range(self.config.num_frames) 101 ] 102 return frame_indices 103 104 def interpolate_missing_frames(self, frame_data: FrameData): 105 """Interpolate missing frames if the number of frames is less than the number \ 106 of frames.""" 107 while len(frame_data.frames) < self.config.num_frames: 108 alpha = float(len(frame_data.frames)) / len(frame_data.frame_indices) 109 # pylint: disable=E1101 110 interpolated_frame = cv2.addWeighted( 111 frame_data.frame, 1 - alpha, frame_data.frames[-1], alpha, 0 112 ) 113 frame_data.frames.append(interpolated_frame) 114 file_name = f"frame{frame_data.count}." + self.config.image_format 115 cv2.imwrite(frame_data.frames_folder + "/" + file_name, interpolated_frame) 116 frame_data.count += 1 117 return frame_data.count 118 119 def process_video(self): 120 """ 121 Main function will run when the python file is called from terminal. 122 """ 123 video_files = self.get_video_files() 124 for video_file in video_files: 125 print("\n\n") 126 frame_data = FrameData() 127 # video_format_len = len(self.config.video_format) + 1 128 images_sub_folder = video_file.split("/")[-1].split(".")[0] 129 frame_data.frames_folder = os.path.join( 130 self.config.output_folder, images_sub_folder 131 ) 132 self.create_folder(frame_data.frames_folder) 133 134 # if the frames are already extracted, remove them 135 if len(os.listdir(frame_data.frames_folder)) != 0: 136 print( 137 f"Warning: the frames folder {frame_data.frames_folder} \ 138 is not empty" 139 ) 140 print("Removing existing frames...") 141 for file_name in os.listdir(frame_data.frames_folder): 142 os.remove(os.path.join(frame_data.frames_folder, file_name)) 143 144 print(f"Extracting frames from {video_file}...") 145 # capture the video from the video file 146 cap = cv2.VideoCapture(video_file) # pylint: disable=E1101 147 frame_data.frame_rate = cap.get(cv2.CAP_PROP_FPS) # pylint: disable=E1101 148 total_num_frames = cap.get( 149 cv2.CAP_PROP_FRAME_COUNT 150 ) # pylint: disable=E1101 151 frame_data.frame_indices = self.calculate_frame_indices( 152 frame_data.frame_rate, total_num_frames, video_file 153 ) 154 frame_data.count = 0 155 frame_data.frames = [] 156 cond1 = self.config.sampling_mode == "fixed_frames" 157 if self.config.back_sub in back_sub_algo_mapping: 158 back_sub_algo = back_sub_algo_mapping[self.config.back_sub] 159 else: 160 print( 161 f"No background subtraction algorithm \ 162 provided : {self.config.back_sub}" 163 ) 164 while cap.isOpened(): 165 frame_id = cap.get(cv2.CAP_PROP_POS_FRAMES) # pylint: disable=E1101 166 ret, frame_data.frame = cap.read() 167 if not ret: 168 break 169 file_name = f"frame{frame_data.count}." + self.config.image_format 170 masked_frame_name = None 171 if self.config.back_sub is not None: 172 masked_frame_name = ( 173 f"fgMask_frame{frame_data.count}." + self.config.image_format 174 ) 175 fg_mask = back_sub_algo.apply(frame_data.frame) 176 should_save_frame = False 177 if self.config.sampling_mode == "every_frame": 178 should_save_frame = True 179 elif self.config.sampling_mode == "per_second": 180 should_save_frame = ( 181 frame_id % math.floor(frame_data.frame_rate) == 0 182 ) 183 elif self.config.sampling_mode == "per_minute": 184 frames_per_minute = math.floor(frame_data.frame_rate) * 60 185 should_save_frame = frame_id % frames_per_minute == 0 186 elif cond1: 187 should_save_frame = frame_id in frame_data.frame_indices 188 if should_save_frame: 189 cv2.imwrite( 190 frame_data.frames_folder + "/" + file_name, frame_data.frame 191 ) # pylint: disable=E1101 192 if masked_frame_name is not None: 193 # Apply the mask to the original frame 194 # pylint: disable=E1101 195 fg_frame = cv2.bitwise_and( 196 frame_data.frame, frame_data.frame, mask=fg_mask 197 ) 198 cv2.imwrite( 199 frame_data.frames_folder + "/" + masked_frame_name, fg_frame 200 ) 201 frame_data.count += 1 202 frame_data.frames.append(frame_data.frame) 203 # interpolate missing frames if count is less than the number of frames 204 cond2 = frame_id == total_num_frames - 1 205 cond3 = len(frame_data.frames) < self.config.num_frames 206 if cond1 and cond2 and cond3: 207 print( 208 ( 209 f"Warning: the number of frames extracted from \ 210 {video_file} is" 211 "less than the number of frames specified" 212 ) 213 ) 214 print("Interpolating missing frames...") 215 frame_data.count = self.interpolate_missing_frames(frame_data) 216 cap.release() 217 if self.config.save_frames: 218 print( 219 f"Done! {frame_data.count} images of format JPG is " 220 f"saved in {frame_data.frames_folder}" 221 ) 222 else: 223 print("Done!") 224 return frame_data.frames
Video pre-processing class that extract frames from videos.
63 def create_folder(self, dir_): 64 """ 65 create a folder at dir if it does not exist already. 66 Input: 67 - dir: the folder directory 68 """ 69 try: 70 os.makedirs(dir_) 71 except OSError: 72 pass
create a folder at dir if it does not exist already. Input: - dir: the folder directory
74 def get_video_files(self): 75 """Get all video files from the given videos folder.""" 76 assert os.path.isdir( 77 self.config.videos_folder 78 ), "The given videos_folder does not exist" 79 self.create_folder(self.config.output_folder) 80 search_path = os.path.join( 81 self.config.videos_folder, "**", "*" + self.config.video_format 82 ) 83 video_files = glob.glob(search_path, recursive=True) 84 assert ( 85 len(video_files) != 0 86 ), "The given videos folder does not contain any vidoes" 87 return video_files
Get all video files from the given videos folder.
89 def calculate_frame_indices(self, frame_rate, total_num_frames, video_file): 90 """Calculate the frame indices for the fixed-frames mode.""" 91 if self.config.num_frames > total_num_frames: 92 print( 93 f"Warning: the number of fixed-frames is larger than the total \ 94 number of frames" 95 f"in the video {video_file}" 96 ) 97 frame_indices = list(range(int(total_num_frames))) 98 else: 99 frame_indices = [ 100 round(frame_rate * i) for i in range(self.config.num_frames) 101 ] 102 return frame_indices
Calculate the frame indices for the fixed-frames mode.
104 def interpolate_missing_frames(self, frame_data: FrameData): 105 """Interpolate missing frames if the number of frames is less than the number \ 106 of frames.""" 107 while len(frame_data.frames) < self.config.num_frames: 108 alpha = float(len(frame_data.frames)) / len(frame_data.frame_indices) 109 # pylint: disable=E1101 110 interpolated_frame = cv2.addWeighted( 111 frame_data.frame, 1 - alpha, frame_data.frames[-1], alpha, 0 112 ) 113 frame_data.frames.append(interpolated_frame) 114 file_name = f"frame{frame_data.count}." + self.config.image_format 115 cv2.imwrite(frame_data.frames_folder + "/" + file_name, interpolated_frame) 116 frame_data.count += 1 117 return frame_data.count
Interpolate missing frames if the number of frames is less than the number of frames.
119 def process_video(self): 120 """ 121 Main function will run when the python file is called from terminal. 122 """ 123 video_files = self.get_video_files() 124 for video_file in video_files: 125 print("\n\n") 126 frame_data = FrameData() 127 # video_format_len = len(self.config.video_format) + 1 128 images_sub_folder = video_file.split("/")[-1].split(".")[0] 129 frame_data.frames_folder = os.path.join( 130 self.config.output_folder, images_sub_folder 131 ) 132 self.create_folder(frame_data.frames_folder) 133 134 # if the frames are already extracted, remove them 135 if len(os.listdir(frame_data.frames_folder)) != 0: 136 print( 137 f"Warning: the frames folder {frame_data.frames_folder} \ 138 is not empty" 139 ) 140 print("Removing existing frames...") 141 for file_name in os.listdir(frame_data.frames_folder): 142 os.remove(os.path.join(frame_data.frames_folder, file_name)) 143 144 print(f"Extracting frames from {video_file}...") 145 # capture the video from the video file 146 cap = cv2.VideoCapture(video_file) # pylint: disable=E1101 147 frame_data.frame_rate = cap.get(cv2.CAP_PROP_FPS) # pylint: disable=E1101 148 total_num_frames = cap.get( 149 cv2.CAP_PROP_FRAME_COUNT 150 ) # pylint: disable=E1101 151 frame_data.frame_indices = self.calculate_frame_indices( 152 frame_data.frame_rate, total_num_frames, video_file 153 ) 154 frame_data.count = 0 155 frame_data.frames = [] 156 cond1 = self.config.sampling_mode == "fixed_frames" 157 if self.config.back_sub in back_sub_algo_mapping: 158 back_sub_algo = back_sub_algo_mapping[self.config.back_sub] 159 else: 160 print( 161 f"No background subtraction algorithm \ 162 provided : {self.config.back_sub}" 163 ) 164 while cap.isOpened(): 165 frame_id = cap.get(cv2.CAP_PROP_POS_FRAMES) # pylint: disable=E1101 166 ret, frame_data.frame = cap.read() 167 if not ret: 168 break 169 file_name = f"frame{frame_data.count}." + self.config.image_format 170 masked_frame_name = None 171 if self.config.back_sub is not None: 172 masked_frame_name = ( 173 f"fgMask_frame{frame_data.count}." + self.config.image_format 174 ) 175 fg_mask = back_sub_algo.apply(frame_data.frame) 176 should_save_frame = False 177 if self.config.sampling_mode == "every_frame": 178 should_save_frame = True 179 elif self.config.sampling_mode == "per_second": 180 should_save_frame = ( 181 frame_id % math.floor(frame_data.frame_rate) == 0 182 ) 183 elif self.config.sampling_mode == "per_minute": 184 frames_per_minute = math.floor(frame_data.frame_rate) * 60 185 should_save_frame = frame_id % frames_per_minute == 0 186 elif cond1: 187 should_save_frame = frame_id in frame_data.frame_indices 188 if should_save_frame: 189 cv2.imwrite( 190 frame_data.frames_folder + "/" + file_name, frame_data.frame 191 ) # pylint: disable=E1101 192 if masked_frame_name is not None: 193 # Apply the mask to the original frame 194 # pylint: disable=E1101 195 fg_frame = cv2.bitwise_and( 196 frame_data.frame, frame_data.frame, mask=fg_mask 197 ) 198 cv2.imwrite( 199 frame_data.frames_folder + "/" + masked_frame_name, fg_frame 200 ) 201 frame_data.count += 1 202 frame_data.frames.append(frame_data.frame) 203 # interpolate missing frames if count is less than the number of frames 204 cond2 = frame_id == total_num_frames - 1 205 cond3 = len(frame_data.frames) < self.config.num_frames 206 if cond1 and cond2 and cond3: 207 print( 208 ( 209 f"Warning: the number of frames extracted from \ 210 {video_file} is" 211 "less than the number of frames specified" 212 ) 213 ) 214 print("Interpolating missing frames...") 215 frame_data.count = self.interpolate_missing_frames(frame_data) 216 cap.release() 217 if self.config.save_frames: 218 print( 219 f"Done! {frame_data.count} images of format JPG is " 220 f"saved in {frame_data.frames_folder}" 221 ) 222 else: 223 print("Done!") 224 return frame_data.frames
Main function will run when the python file is called from terminal.
227def main(): 228 parser = argparse.ArgumentParser() 229 parser.add_argument( 230 "--videos_folder", required=True, help="Path to the videos folder." 231 ) 232 parser.add_argument( 233 "--video_format", default="mp4", help="choose the video format to read." 234 ) 235 parser.add_argument( 236 "--image_format", default="jpg", help="choose the format for the output images." 237 ) 238 parser.add_argument( 239 "--sampling_mode", 240 default="every_frame", 241 choices=["fixed_frames", "every_frame", "per_second", "per_minute"], 242 ) 243 parser.add_argument("--num_frames", default=10, type=int) 244 parser.add_argument("--output_folder", default=OUTPUT_PATH) 245 parser.add_argument("--backsub", default=None, choices=["MOG2", "KNN"]) 246 parser.add_argument( 247 "--save_frames", default="False", type=bool, help="Save the frames locally." 248 ) 249 opts = parser.parse_args() 250 videos_config = VideoConfig( 251 opts.videos_folder, 252 opts.video_format, 253 opts.image_format, 254 opts.sampling_mode, 255 opts.num_frames, 256 opts.output_folder, 257 opts.backsub, 258 opts.save_frames, 259 ) 260 processor = VideoPreprocessor(videos_config) 261 processor.process_video()