Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: refactor frame conversion and improve threading, buffers
  • Loading branch information
richiemcilroy committed Oct 7, 2025
commit df79062c3fa98c554da7e911b653d20cf72956e1
7 changes: 5 additions & 2 deletions crates/enc-ffmpeg/src/audio/aac.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{thread, time::Duration};

use cap_media_info::{AudioInfo, FFRational};
use ffmpeg::{
Expand Down Expand Up @@ -45,7 +45,10 @@ impl AACEncoder {
) -> Result<Self, AACEncoderError> {
let codec = encoder::find_by_name("aac").ok_or(AACEncoderError::CodecNotFound)?;
let mut encoder_ctx = context::Context::new_with_codec(codec);
encoder_ctx.set_threading(Config::count(4));
let thread_count = thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(1);
encoder_ctx.set_threading(Config::count(thread_count));
let mut encoder = encoder_ctx.encoder().audio()?;

let rate = {
Expand Down
7 changes: 5 additions & 2 deletions crates/enc-ffmpeg/src/audio/opus.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{thread, time::Duration};

use cap_media_info::{AudioInfo, FFRational};
use ffmpeg::{
Expand Down Expand Up @@ -43,7 +43,10 @@ impl OpusEncoder {
) -> Result<Self, OpusEncoderError> {
let codec = encoder::find_by_name("libopus").ok_or(OpusEncoderError::CodecNotFound)?;
let mut encoder_ctx = context::Context::new_with_codec(codec);
encoder_ctx.set_threading(Config::count(4));
let thread_count = thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(1);
encoder_ctx.set_threading(Config::count(thread_count));
let mut encoder = encoder_ctx.encoder().audio()?;

let rate = {
Expand Down
7 changes: 5 additions & 2 deletions crates/enc-ffmpeg/src/video/h264.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{thread, time::Duration};

use cap_media_info::{Pixel, VideoInfo};
use ffmpeg::{
Expand Down Expand Up @@ -99,7 +99,10 @@

let mut encoder_ctx = context::Context::new_with_codec(codec);

encoder_ctx.set_threading(Config::count(4));
let thread_count = thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(1);
encoder_ctx.set_threading(Config::count(thread_count));
let mut encoder = encoder_ctx.encoder().video()?;

encoder.set_width(input_config.width);
Expand Down Expand Up @@ -139,7 +142,7 @@
pub struct H264Encoder {
base: EncoderBase,
encoder: encoder::Video,
config: VideoInfo,

Check warning on line 145 in crates/enc-ffmpeg/src/video/h264.rs

View workflow job for this annotation

GitHub Actions / Clippy

field `config` is never read

warning: field `config` is never read --> crates/enc-ffmpeg/src/video/h264.rs:145:5 | 142 | pub struct H264Encoder { | ----------- field in this struct ... 145 | config: VideoInfo, | ^^^^^^ | = note: `#[warn(dead_code)]` on by default
converter: Option<ffmpeg::software::scaling::Context>,
}

Expand Down
4 changes: 2 additions & 2 deletions crates/export/src/mp4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ impl Mp4ExportSettings {
info!("Exporting mp4 with settings: {:?}", &self);
info!("Expected to render {} frames", base.total_frames(self.fps));

let (tx_image_data, mut video_rx) = tokio::sync::mpsc::channel::<(RenderedFrame, u32)>(4);
let (frame_tx, frame_rx) = std::sync::mpsc::sync_channel::<MP4Input>(4);
let (tx_image_data, mut video_rx) = tokio::sync::mpsc::channel::<(RenderedFrame, u32)>(32);
let (frame_tx, frame_rx) = std::sync::mpsc::sync_channel::<MP4Input>(32);

Comment thread
Brendonovich marked this conversation as resolved.
Outdated
let fps = self.fps;

Expand Down
238 changes: 108 additions & 130 deletions crates/rendering/src/decoder/avassetreader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use ffmpeg::{Rational, format, frame};
use tokio::{runtime::Handle as TokioHandle, sync::oneshot};

use super::frame_converter::{FrameConverter, copy_rgba_plane};
use super::{FRAME_CACHE_SIZE, VideoDecoderMessage, pts_to_frame};

#[derive(Clone)]
Expand All @@ -30,150 +31,126 @@
Processed(ProcessedFrame),
}

impl CachedFrame {
fn process(&mut self) -> ProcessedFrame {
match self {
CachedFrame::Raw { image_buf, number } => {
let format = cap_video_decode::avassetreader::pixel_format_to_pixel(
image_buf.pixel_format(),
);

let data = if matches!(format, format::Pixel::RGBA) {
unsafe {
image_buf
.lock_base_addr(LockFlags::READ_ONLY)
.result()
.unwrap()
};
struct ImageBufProcessor {
converter: FrameConverter,
scratch_frame: frame::Video,
scratch_spec: Option<(format::Pixel, u32, u32)>,
}

let bytes_per_row = image_buf.plane_bytes_per_row(0);
let width = image_buf.width();
let height = image_buf.height();
impl ImageBufProcessor {
fn new() -> Self {
Self {
converter: FrameConverter::new(),
scratch_frame: frame::Video::empty(),
scratch_spec: None,
}
}

let slice = unsafe {
std::slice::from_raw_parts::<'static, _>(
image_buf.plane_base_address(0),
bytes_per_row * height,
)
};
fn convert(&mut self, image_buf: &mut R<cv::ImageBuf>) -> Vec<u8> {
let format =
cap_video_decode::avassetreader::pixel_format_to_pixel(image_buf.pixel_format());

let mut bytes = Vec::with_capacity(width * height * 4);
if matches!(format, format::Pixel::RGBA) {
return self.copy_rgba(image_buf);
}

let row_length = width * 4;
let width = image_buf.width() as u32;
let height = image_buf.height() as u32;
self.ensure_scratch(format, width, height);

for i in 0..height {
bytes.as_mut_slice()[i * row_length..((i + 1) * row_length)]
.copy_from_slice(
&slice[i * bytes_per_row..(i * bytes_per_row + row_length)],
)
}
unsafe {
image_buf
.lock_base_addr(LockFlags::READ_ONLY)
.result()
.unwrap();
}

unsafe { image_buf.unlock_lock_base_addr(LockFlags::READ_ONLY) };

bytes
} else {
let mut ffmpeg_frame = ffmpeg::frame::Video::new(
format,
image_buf.width() as u32,
image_buf.height() as u32,
);

unsafe {
image_buf
.lock_base_addr(LockFlags::READ_ONLY)
.result()
.unwrap()
};
self.copy_planes(image_buf);

match ffmpeg_frame.format() {
format::Pixel::NV12 => {
for plane_i in 0..image_buf.plane_count() {
let bytes_per_row = image_buf.plane_bytes_per_row(plane_i);
let height = image_buf.plane_height(plane_i);

let ffmpeg_stride = ffmpeg_frame.stride(plane_i);
let row_length = bytes_per_row.min(ffmpeg_stride);

let slice = unsafe {
std::slice::from_raw_parts::<'static, _>(
image_buf.plane_base_address(plane_i),
bytes_per_row * height,
)
};

for i in 0..height {
ffmpeg_frame.data_mut(plane_i)
[i * ffmpeg_stride..(i * ffmpeg_stride + row_length)]
.copy_from_slice(
&slice[i * bytes_per_row
..(i * bytes_per_row + row_length)],
)
}
}
}
format::Pixel::YUV420P => {
for plane_i in 0..image_buf.plane_count() {
let bytes_per_row = image_buf.plane_bytes_per_row(plane_i);
let height = image_buf.plane_height(plane_i);

let ffmpeg_stride = ffmpeg_frame.stride(plane_i);
let row_length = bytes_per_row.min(ffmpeg_stride);

let slice = unsafe {
std::slice::from_raw_parts::<'static, _>(
image_buf.plane_base_address(plane_i),
bytes_per_row * height,
)
};

for i in 0..height {
ffmpeg_frame.data_mut(plane_i)
[i * ffmpeg_stride..(i * ffmpeg_stride + row_length)]
.copy_from_slice(
&slice[i * bytes_per_row
..(i * bytes_per_row + row_length)],
)
}
}
}
format => todo!("implement {:?}", format),
}
unsafe { image_buf.unlock_lock_base_addr(LockFlags::READ_ONLY) };

self.converter.convert(&mut self.scratch_frame)
}

unsafe { image_buf.unlock_lock_base_addr(LockFlags::READ_ONLY) };
fn ensure_scratch(&mut self, format: format::Pixel, width: u32, height: u32) {
let needs_new =
self.scratch_spec
.map_or(true, |(current_format, current_width, current_height)| {
current_format != format || current_width != width || current_height != height
});

Check warning on line 80 in crates/rendering/src/decoder/avassetreader.rs

View workflow job for this annotation

GitHub Actions / Clippy

this `map_or` can be simplified

warning: this `map_or` can be simplified --> crates/rendering/src/decoder/avassetreader.rs:77:13 | 77 | / self.scratch_spec 78 | | .map_or(true, |(current_format, current_width, current_height)| { 79 | | current_format != format || current_width != width || current_height != height 80 | | }); | |__________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#unnecessary_map_or = note: `#[warn(clippy::unnecessary_map_or)]` on by default help: use is_none_or instead | 78 - .map_or(true, |(current_format, current_width, current_height)| { 78 + .is_none_or(|(current_format, current_width, current_height)| { |

let mut converter = ffmpeg::software::converter(
(ffmpeg_frame.width(), ffmpeg_frame.height()),
ffmpeg_frame.format(),
format::Pixel::RGBA,
)
.unwrap();
if needs_new {
self.scratch_frame = frame::Video::new(format, width, height);
self.scratch_spec = Some((format, width, height));
}
}

fn copy_rgba(&mut self, image_buf: &mut R<cv::ImageBuf>) -> Vec<u8> {
unsafe {
image_buf
.lock_base_addr(LockFlags::READ_ONLY)
.result()
.unwrap();
}

let bytes_per_row = image_buf.plane_bytes_per_row(0);
let width = image_buf.width() as usize;

Check warning on line 97 in crates/rendering/src/decoder/avassetreader.rs

View workflow job for this annotation

GitHub Actions / Clippy

casting to the same type is unnecessary (`usize` -> `usize`)

warning: casting to the same type is unnecessary (`usize` -> `usize`) --> crates/rendering/src/decoder/avassetreader.rs:97:21 | 97 | let width = image_buf.width() as usize; | ^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try: `image_buf.width()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#unnecessary_cast = note: `#[warn(clippy::unnecessary_cast)]` on by default
let height = image_buf.height() as usize;

Check warning on line 98 in crates/rendering/src/decoder/avassetreader.rs

View workflow job for this annotation

GitHub Actions / Clippy

casting to the same type is unnecessary (`usize` -> `usize`)

warning: casting to the same type is unnecessary (`usize` -> `usize`) --> crates/rendering/src/decoder/avassetreader.rs:98:22 | 98 | let height = image_buf.height() as usize; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try: `image_buf.height()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#unnecessary_cast

let slice = unsafe {
std::slice::from_raw_parts::<'static, _>(
image_buf.plane_base_address(0),
bytes_per_row * height,
)
};

let mut rgb_frame = frame::Video::empty();
converter.run(&ffmpeg_frame, &mut rgb_frame).unwrap();
let bytes = copy_rgba_plane(slice, bytes_per_row, width, height);

let slice = rgb_frame.data(0);
let width = rgb_frame.width();
let height = rgb_frame.height();
let bytes_per_row = rgb_frame.stride(0);
let row_length = width * 4;
unsafe { image_buf.unlock_lock_base_addr(LockFlags::READ_ONLY) };

let mut bytes = vec![0; (width * height * 4) as usize];
bytes
}

fn copy_planes(&mut self, image_buf: &mut R<cv::ImageBuf>) {
match self.scratch_frame.format() {
format::Pixel::NV12 | format::Pixel::YUV420P => {
let scratch = &mut self.scratch_frame;
for plane_i in 0..image_buf.plane_count() {
let bytes_per_row = image_buf.plane_bytes_per_row(plane_i);
let height = image_buf.plane_height(plane_i);

let ffmpeg_stride = scratch.stride(plane_i);
let row_length = bytes_per_row.min(ffmpeg_stride);

let slice = unsafe {
std::slice::from_raw_parts::<'static, _>(
image_buf.plane_base_address(plane_i),
bytes_per_row * height,
)
};

// TODO: allow for decoded frames to have stride, handle stride in shaders
for i in 0..height as usize {
bytes.as_mut_slice()[i * row_length as usize..(i + 1) * row_length as usize]
for i in 0..height {
scratch.data_mut(plane_i)
[i * ffmpeg_stride..(i * ffmpeg_stride + row_length)]
.copy_from_slice(
&slice
[(i * bytes_per_row)..i * bytes_per_row + row_length as usize],
)
&slice[i * bytes_per_row..(i * bytes_per_row + row_length)],
);
}
}
}
format => todo!("implement {:?}", format),
}
}
}

bytes
};

impl CachedFrame {
fn process(&mut self, processor: &mut ImageBufProcessor) -> ProcessedFrame {
match self {
CachedFrame::Raw { image_buf, number } => {
let frame_buffer = processor.convert(image_buf);
let data = ProcessedFrame {
number: *number,
data: Arc::new(data),
data: Arc::new(frame_buffer),
};
Comment thread
Brendonovich marked this conversation as resolved.

*self = Self::Processed(data.clone());
Expand Down Expand Up @@ -240,14 +217,15 @@
let last_sent_frame = Rc::new(RefCell::new(None::<ProcessedFrame>));

let mut frames = this.inner.frames();
let mut processor = ImageBufProcessor::new();

while let Ok(r) = rx.recv() {
match r {
VideoDecoderMessage::GetFrame(requested_time, sender) => {
let requested_frame = (requested_time * fps as f32).floor() as u32;

let mut sender = if let Some(cached) = cache.get_mut(&requested_frame) {
let data = cached.process();
let data = cached.process(&mut processor);

sender.send(data.data.clone()).ok();
*last_sent_frame.borrow_mut() = Some(data);
Expand Down Expand Up @@ -310,7 +288,7 @@
cache.iter_mut().rev().find(|v| *v.0 < requested_frame)
&& let Some(sender) = sender.take()
{
(sender)(most_recent_prev_frame.1.process());
(sender)(most_recent_prev_frame.1.process(&mut processor));
}

let exceeds_cache_bounds = current_frame > cache_max;
Expand All @@ -320,7 +298,7 @@
if current_frame == requested_frame
&& let Some(sender) = sender.take()
{
let data = cache_frame.process();
let data = cache_frame.process(&mut processor);
// info!("sending frame {requested_frame}");

(sender)(data);
Expand Down Expand Up @@ -368,7 +346,7 @@
// "sending forward frame {current_frame} for {requested_frame}",
// );

(sender)(cache_frame.process());
(sender)(cache_frame.process(&mut processor));
}
}

Expand Down
Loading
Loading