blob: 16eb2f5c46ed54e3afed27dee3a4ee8923888267 [file] [log] [blame]
# Copyright 2022 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions for processing video recordings.
"""
# Each item in this list corresponds to quality levels defined per
# CamcorderProfile. For Video ITS, we will currently test below qualities
# only if supported by the camera device.
import dataclasses
import logging
import math
import os.path
import re
import subprocess
import error_util
import image_processing_utils
COLORSPACE_HDR = 'bt2020'
HR_TO_SEC = 3600
INDEX_FIRST_SUBGROUP = 1
MIN_TO_SEC = 60
ITS_SUPPORTED_QUALITIES = (
'HIGH',
'2160P',
'1080P',
'720P',
'480P',
'CIF',
'QCIF',
'QVGA',
'LOW',
'VGA'
)
LOW_RESOLUTION_SIZES = (
'176x144',
'192x144',
'352x288',
'384x288',
'320x240',
)
LOWEST_RES_TESTED_AREA = 640*360
VIDEO_QUALITY_SIZE = {
# '480P', '1080P', HIGH' & 'LOW' are not included as they are DUT-dependent
'2160P': '3840x2160',
'720P': '1280x720',
'VGA': '640x480',
'CIF': '352x288',
'QVGA': '320x240',
'QCIF': '176x144',
}
@dataclasses.dataclass
class CommonPreviewSizeData:
"""Class to store smallest and largest common sizes of preview and video."""
smallest_size: str
smallest_quality: str
largest_size: str
largest_quality: str
def get_preview_video_sizes_union(cam, camera_id, min_area=0):
"""Returns largest and smallest common size and quality of preview and video.
Args:
cam: camera object.
camera_id: str; camera ID.
min_area: int; Optional filter to eliminate smaller sizes (ex. 640*480).
Returns:
common_size_quality, CommonPreviewSizeData class
"""
supported_preview_sizes = set(cam.get_all_supported_preview_sizes(camera_id))
supported_video_qualities = cam.get_supported_video_qualities(camera_id)
logging.debug('Supported video profiles & IDs: %s', supported_video_qualities)
# Make dictionary on video quality and size according to compatibility
supported_video_size_to_quality = {}
for quality in supported_video_qualities:
video_quality = quality.split(':')[0]
if video_quality in VIDEO_QUALITY_SIZE:
video_size = VIDEO_QUALITY_SIZE[video_quality]
supported_video_size_to_quality[video_size] = video_quality
logging.debug(
'Supported video size to quality: %s', supported_video_size_to_quality
)
# Find the intersection of supported preview sizes and video sizes
common_sizes = supported_preview_sizes.intersection(
supported_video_size_to_quality.keys()
)
if not common_sizes:
raise AssertionError('No common size between Preview and Video!')
# Filter common sizes based on min_area
size_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1])
common_sizes = (
[size for size in common_sizes if size_to_area(size) >= min_area]
)
if not common_sizes:
raise AssertionError(
'No common size above min_area between Preview and Video!'
)
# Use areas of video sizes to find the smallest and largest common size
smallest_common_size = min(common_sizes, key=size_to_area)
largest_common_size = max(common_sizes, key=size_to_area)
logging.debug('Smallest common size: %s', smallest_common_size)
logging.debug('Largest common size: %s', largest_common_size)
# Find video quality of resolution with resolution as key
smallest_common_quality = (
supported_video_size_to_quality[smallest_common_size]
)
logging.debug('Smallest common quality: %s', smallest_common_quality)
largest_common_quality = supported_video_size_to_quality[largest_common_size]
logging.debug('Largest common quality: %s', largest_common_quality)
common_size_quality = CommonPreviewSizeData(
smallest_size=smallest_common_size,
smallest_quality=smallest_common_quality,
largest_size=largest_common_size,
largest_quality=largest_common_quality
)
return common_size_quality
def clamp_preview_sizes(preview_sizes, min_area=0, max_area=math.inf):
"""Returns a list of preview_sizes with areas between min/max_area.
Args:
preview_sizes: list; sizes to be filtered (ex. "1280x720")
min_area: int; optional filter to eliminate sizes <= to the specified
area (ex. 640*480).
max_area: int; optional filter to eliminate sizes >= to the specified
area (ex. 3840*2160).
Returns:
preview_sizes: list; filtered preview sizes clamped by min/max_area.
"""
size_to_area = lambda size: int(size.split('x')[0])*int(size.split('x')[1])
filtered_preview_sizes = [
size for size in preview_sizes
if max_area >= size_to_area(size) >= min_area]
logging.debug('Filtered preview sizes: %s', filtered_preview_sizes)
if not filtered_preview_sizes:
raise AssertionError(f'No preview sizes between {min_area} and {max_area}')
return filtered_preview_sizes
def log_ffmpeg_version():
"""Logs the ffmpeg version being used."""
ffmpeg_version_cmd = ('ffmpeg -version')
p = subprocess.Popen(ffmpeg_version_cmd, shell=True, stdout=subprocess.PIPE)
output, _ = p.communicate()
if p.poll() != 0:
raise error_util.CameraItsError('Error running ffmpeg version cmd.')
decoded_output = output.decode('utf-8')
logging.debug('ffmpeg version: %s', decoded_output.split(' ')[2])
def extract_key_frames_from_video(log_path, video_file_name):
"""Returns a list of extracted key frames.
Ffmpeg tool is used to extract key frames from the video at path
os.path.join(log_path, video_file_name).
The extracted key frames will have the name video_file_name with "_key_frame"
suffix to identify the frames for video of each quality. Since there can be
multiple key frames, each key frame image will be differentiated with its
frame index. All the extracted key frames will be available in jpeg format
at the same path as the video file.
The run time flag '-loglevel quiet' hides the information from terminal.
In order to see the detailed output of ffmpeg command change the loglevel
option to 'info'.
Args:
log_path: path for video file directory.
video_file_name: name of the video file.
Returns:
key_frame_files: a sorted list of files which contains a name per key
frame. Ex: VID_20220325_050918_0_preview_1920x1440_key_frame_0001.png
"""
ffmpeg_image_name = f'{os.path.splitext(video_file_name)[0]}_key_frame'
ffmpeg_image_file_path = os.path.join(
log_path, ffmpeg_image_name + '_%04d.png')
cmd = ['ffmpeg',
'-skip_frame',
'nokey',
'-i',
os.path.join(log_path, video_file_name),
'-vsync',
'vfr',
'-frame_pts',
'true',
ffmpeg_image_file_path,
'-loglevel',
'quiet',
]
logging.debug('Extracting key frames from: %s', video_file_name)
_ = subprocess.call(cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
arr = os.listdir(os.path.join(log_path))
key_frame_files = []
for file in arr:
if '.png' in file and not os.path.isdir(file) and ffmpeg_image_name in file:
key_frame_files.append(file)
key_frame_files.sort()
logging.debug('Extracted key frames: %s', key_frame_files)
logging.debug('Length of key_frame_files: %d', len(key_frame_files))
if not key_frame_files:
raise AssertionError('No key frames extracted. Check source video.')
return key_frame_files
def get_key_frame_to_process(key_frame_files):
"""Returns the key frame file from the list of key_frame_files.
If the size of the list is 1 then the file in the list will be returned else
the file with highest frame_index will be returned for further processing.
Args:
key_frame_files: A list of key frame files.
Returns:
key_frame_file to be used for further processing.
"""
if not key_frame_files:
raise AssertionError('key_frame_files list is empty.')
key_frame_files.sort()
return key_frame_files[-1]
def extract_all_frames_from_video(
log_path, video_file_name, img_format, video_fps=None):
"""Extracts and returns a list of frames from a video using FFmpeg.
Extract all frames from the video at path <log_path>/<video_file_name>.
The extracted frames will have the name video_file_name with "_frame"
suffix to identify the frames for video of each size. Each frame image
will be differentiated with its frame index. All extracted rames will be
available in the provided img_format format at the same path as the video.
The run time flag '-loglevel quiet' hides the information from terminal.
In order to see the detailed output of ffmpeg command change the loglevel
option to 'info'.
Args:
log_path: str; directory containing video file.
video_file_name: str; name of the video file.
img_format: str; desired image format for export frames. ex. 'png'
video_fps: str; fps of imported video.
Returns:
an ordered list of paths to the extracted frame images.
"""
logging.debug('Extracting all frames')
ffmpeg_image_name = f"{video_file_name.split('.')[0]}_frame"
logging.debug('ffmpeg_image_name: %s', ffmpeg_image_name)
ffmpeg_image_file_names = (
f'{os.path.join(log_path, ffmpeg_image_name)}_%04d.{img_format}')
if video_fps:
cmd = [
'ffmpeg', '-i', os.path.join(log_path, video_file_name),
'-r', video_fps, # force a constant frame rate for reliability
ffmpeg_image_file_names, '-loglevel', 'quiet'
]
else:
cmd = [
'ffmpeg', '-i', os.path.join(log_path, video_file_name),
'-vsync', 'passthrough', # prevents frame drops during decoding
ffmpeg_image_file_names, '-loglevel', 'quiet'
]
subprocess.call(cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
files = sorted(
[file for file in os.listdir(log_path) if
(file.endswith(img_format) and ffmpeg_image_name in file)])
if not files:
raise AssertionError('No frames extracted. Check source video.')
return files
def extract_last_key_frame_from_recording(log_path, file_name):
"""Extract last key frame from recordings.
Args:
log_path: str; file location
file_name: str file name for saved video
Returns:
numpy image of last key frame
"""
key_frame_files = extract_key_frames_from_video(log_path, file_name)
logging.debug('key_frame_files: %s', key_frame_files)
# Get the last_key_frame file to process.
last_key_frame_file = get_key_frame_to_process(key_frame_files)
logging.debug('last_key_frame: %s', last_key_frame_file)
# Convert last_key_frame to numpy array
np_image = image_processing_utils.convert_image_to_numpy_array(
os.path.join(log_path, last_key_frame_file))
logging.debug('last key frame image shape: %s', np_image.shape)
return np_image
def get_avg_frame_rate(video_file_name_with_path):
"""Get average frame rate assuming variable frame rate video.
Args:
video_file_name_with_path: path to the video to be analyzed
Returns:
Float. average frames per second.
"""
cmd = ['ffprobe',
'-v',
'quiet',
'-show_streams',
'-select_streams',
'v:0', # first video stream
video_file_name_with_path
]
logging.debug('Getting frame rate')
raw_output = ''
try:
raw_output = subprocess.check_output(cmd,
stdin=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
raise AssertionError(str(e.output)) from e
if raw_output:
output = str(raw_output.decode('utf-8')).strip()
logging.debug('ffprobe command %s output: %s', ' '.join(cmd), output)
average_frame_rate_data = (
re.search(r'avg_frame_rate=*([0-9]+/[0-9]+)', output)
.group(INDEX_FIRST_SUBGROUP)
)
average_frame_rate = (int(average_frame_rate_data.split('/')[0]) /
int(average_frame_rate_data.split('/')[1]))
logging.debug('Average FPS: %.4f', average_frame_rate)
return average_frame_rate
else:
raise AssertionError('ffprobe failed to provide frame rate data')
def get_frame_deltas(video_file_name_with_path, timestamp_type='pts'):
"""Get list of time diffs between frames.
Args:
video_file_name_with_path: path to the video to be analyzed
timestamp_type: 'pts' or 'pkt_dts'
Returns:
List of floats. Time diffs between frames in seconds.
"""
cmd = [
'ffprobe', '-show_entries', f'frame={timestamp_type}_time',
'-select_streams', 'v', video_file_name_with_path
]
logging.debug('Getting frame deltas')
raw_output = ''
try:
raw_output = subprocess.check_output(cmd,
stdin=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
raise AssertionError(str(e.output)) from e
if raw_output:
output = str(raw_output.decode('utf-8')).strip().split('\n')
deltas = []
prev_time = None
for line in output:
if timestamp_type not in line:
continue
curr_time = float(re.search(r'time= *([0-9][0-9\.]*)', line)
.group(INDEX_FIRST_SUBGROUP))
if prev_time is not None:
deltas.append(curr_time - prev_time)
prev_time = curr_time
logging.debug('Frame deltas: %s', deltas)
return deltas
else:
raise AssertionError('ffprobe failed to provide frame delta data')
def get_video_colorspace(log_path, video_file_name):
"""Get the video colorspace.
Args:
log_path: path for video file directory
video_file_name: name of the video file
Returns:
video colorspace, e.g. BT.2020 or BT.709
"""
cmd = ['ffprobe',
'-show_streams',
'-select_streams',
'v:0',
'-of',
'json',
'-i',
os.path.join(log_path, video_file_name)
]
logging.debug('Get the video colorspace')
raw_output = ''
try:
raw_output = subprocess.check_output(cmd,
stdin=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
raise AssertionError(str(e.output)) from e
logging.debug('raw_output: %s', raw_output)
if raw_output:
colorspace = ''
output = str(raw_output.decode('utf-8')).strip().split('\n')
logging.debug('output: %s', output)
for line in output:
logging.debug('line: %s', line)
metadata = re.search(r'"color_space": ("[a-z0-9]*")', line)
if metadata:
colorspace = metadata.group(INDEX_FIRST_SUBGROUP)
logging.debug('Colorspace: %s', colorspace)
return colorspace
else:
raise AssertionError('ffprobe failed to provide color space')