Merge pull request #9970 from hasenbanck/packet-buffer

Implement the packet buffer for the FFMPEG core
This commit is contained in:
Twinaphex 2020-01-08 22:06:19 +01:00 committed by GitHub
commit e9db190baa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 381 additions and 68 deletions

View File

@ -1963,8 +1963,9 @@ endif
ifeq ($(HAVE_FFMPEG), 1)
OBJ += record/drivers/record_ffmpeg.o \
cores/libretro-ffmpeg/ffmpeg_core.o \
cores/libretro-ffmpeg/packet_buffer.o \
cores/libretro-ffmpeg/video_buffer.o \
$(LIBRETRO_COMM_DIR)/rthreads/tpool.o
$(LIBRETRO_COMM_DIR)/rthreads/tpool.o
LIBS += $(AVCODEC_LIBS) $(AVFORMAT_LIBS) $(AVUTIL_LIBS) $(SWSCALE_LIBS) $(SWRESAMPLE_LIBS) $(FFMPEG_LIBS)
DEFINES += -DHAVE_FFMPEG

View File

@ -19,6 +19,7 @@ SWRESAMPLE_DIR := $(BASE_DIR)/libswresample
INCFLAGS += -I$(BASE_DIR) -I$(CORE_DIR) -I$(LIBRETRO_COMM_DIR)/include -I$(LIBRETRO_COMM_DIR)/include/compat
LIBRETRO_SOURCE += $(CORE_DIR)/ffmpeg_core.c \
$(CORE_DIR)/packet_buffer.c \
$(CORE_DIR)/video_buffer.c \
$(LIBRETRO_COMM_DIR)/rthreads/tpool.c \
$(LIBRETRO_COMM_DIR)/queues/fifo_queue.c \

View File

@ -50,6 +50,7 @@ extern "C" {
#include <rthreads/tpool.h>
#include <queues/fifo_queue.h>
#include <string/stdstring.h>
#include "packet_buffer.h"
#include "video_buffer.h"
#include <libretro.h>
@ -1354,11 +1355,6 @@ static void decode_video(AVCodecContext *ctx, AVPacket *pkt, size_t frame_size)
/* Stop decoding thread until video_buffer is not full again */
while (!decode_thread_dead && !video_buffer_has_open_slot(video_buffer))
{
/* If we don't buffer enough video frames we can run into a deadlock.
* for now drop frames in this case. This could happen with MP4 files
* since the often save the audio frames into the stream.
* Longterm solution: audio and video decoding in their own threads
* with their own file handle. */
if (main_sleeping)
{
log_cb(RETRO_LOG_ERROR, "[FFMPEG] Thread: Video deadlock detected.\n");
@ -1534,14 +1530,22 @@ static void decode_thread_seek(double time)
#endif
}
static bool earlier_or_close_enough(double p1, double p2) {
return (p1 <= p2 || (p1-p2) <= 0.001);
}
static void decode_thread(void *data)
{
unsigned i;
bool eof = false;
struct SwrContext *swr[audio_streams_num];
AVFrame *aud_frame = NULL;
size_t frame_size = 0;
int16_t *audio_buffer = NULL;
size_t audio_buffer_cap = 0;
packet_buffer_t *audio_packet_buffer;
packet_buffer_t *video_packet_buffer;
double last_audio_end = 0;
(void)data;
@ -1559,11 +1563,13 @@ static void decode_thread(void *data)
}
aud_frame = av_frame_alloc();
audio_packet_buffer = packet_buffer_create();
video_packet_buffer = packet_buffer_create();
if (video_stream_index >= 0)
{
frame_size = avpicture_get_size(PIX_FMT_RGB32, media.width, media.height);
video_buffer = video_buffer_create(32, frame_size, media.width, media.height);
video_buffer = video_buffer_create(4, frame_size, media.width, media.height);
tpool = tpool_create(sw_sws_threads);
log_cb(RETRO_LOG_INFO, "[FFMPEG] Configured worker threads: %d\n", sw_sws_threads);
}
@ -1571,12 +1577,19 @@ static void decode_thread(void *data)
while (!decode_thread_dead)
{
bool seek;
AVPacket pkt;
int subtitle_stream;
double seek_time_thread;
int audio_stream, audio_stream_ptr;
int audio_stream_index, audio_stream_ptr;
double audio_timebase = 0.0;
double video_timebase = 0.0;
double next_video_end = 0.0;
double next_audio_start = 0.0;
AVPacket *pkt = av_packet_alloc();
AVCodecContext *actx_active = NULL;
AVCodecContext *sctx_active = NULL;
#ifdef HAVE_SSA
ASS_Track *ass_track_active = NULL;
#endif
@ -1591,22 +1604,25 @@ static void decode_thread(void *data)
decode_thread_seek(seek_time_thread);
slock_lock(fifo_lock);
do_seek = false;
seek_time = 0.0;
do_seek = false;
eof = false;
seek_time = 0.0;
next_video_end = 0.0;
next_audio_start = 0.0;
last_audio_end = 0.0;
if (audio_decode_fifo)
fifo_clear(audio_decode_fifo);
packet_buffer_clear(&audio_packet_buffer);
packet_buffer_clear(&video_packet_buffer);
scond_signal(fifo_cond);
slock_unlock(fifo_lock);
}
memset(&pkt, 0, sizeof(pkt));
if (av_read_frame(fctx, &pkt) < 0)
break;
slock_lock(decode_thread_lock);
audio_stream = audio_streams[audio_streams_ptr];
audio_stream_index = audio_streams[audio_streams_ptr];
audio_stream_ptr = audio_streams_ptr;
subtitle_stream = subtitle_streams[subtitle_streams_ptr];
actx_active = actx[audio_streams_ptr];
@ -1614,22 +1630,77 @@ static void decode_thread(void *data)
#ifdef HAVE_SSA
ass_track_active = ass_track[subtitle_streams_ptr];
#endif
audio_timebase = av_q2d(fctx->streams[audio_stream_index]->time_base);
if (video_stream_index >= 0)
video_timebase = av_q2d(fctx->streams[video_stream_index]->time_base);
slock_unlock(decode_thread_lock);
if (pkt.stream_index == video_stream_index)
#ifdef HAVE_SSA
decode_video(vctx, &pkt, frame_size, ass_track_active);
#else
decode_video(vctx, &pkt, frame_size);
#endif
else if (pkt.stream_index == audio_stream && actx_active)
if (!packet_buffer_empty(audio_packet_buffer))
next_audio_start = audio_timebase * packet_buffer_peek_start_pts(audio_packet_buffer);
if (!packet_buffer_empty(video_packet_buffer))
next_video_end = video_timebase * packet_buffer_peek_end_pts(video_packet_buffer);
/*
* Decode audio packet if:
* 1. there is a vido packet for in the buffer
* 2. it's the start of file
* 3. it's audio only media
* 4. EOF
**/
if (!packet_buffer_empty(audio_packet_buffer) &&
((!eof && earlier_or_close_enough(next_audio_start, next_video_end)) ||
next_video_end == 0.0 ||
eof))
{
audio_buffer = decode_audio(actx_active, &pkt, aud_frame,
audio_buffer, &audio_buffer_cap,
swr[audio_stream_ptr]);
packet_buffer_get_packet(audio_packet_buffer, pkt);
last_audio_end = audio_timebase * (pkt->pts + pkt->duration);
audio_buffer = decode_audio(actx_active, pkt, aud_frame,
audio_buffer, &audio_buffer_cap,
swr[audio_stream_ptr]);
av_packet_unref(pkt);
}
else if (pkt.stream_index == subtitle_stream && sctx_active)
/*
* Decode video packet if:
* 1. we already decoded an audio packet
* 2. there is no audio stream to play
* 3. EOF
**/
if (!packet_buffer_empty(video_packet_buffer) &&
((!eof && earlier_or_close_enough(next_video_end, last_audio_end)) || !actx_active || eof ))
{
packet_buffer_get_packet(video_packet_buffer, pkt);
#ifdef HAVE_SSA
decode_video(vctx, pkt, frame_size, ass_track_active);
#else
decode_video(vctx, pkt, frame_size);
#endif
av_packet_unref(pkt);
}
if (packet_buffer_empty(audio_packet_buffer) && packet_buffer_empty(video_packet_buffer) && eof)
{
av_packet_free(&pkt);
break;
}
// Read the next frame and stage it in case of audio or video frame.
if (av_read_frame(fctx, pkt) < 0)
eof = true;
else if (pkt->stream_index == audio_stream_index && actx_active)
packet_buffer_add_packet(audio_packet_buffer, pkt);
else if (pkt->stream_index == video_stream_index)
packet_buffer_add_packet(video_packet_buffer, pkt);
else if (pkt->stream_index == subtitle_stream && sctx_active)
{
/**
* Decode subtitle packets right away, since SSA/ASS can operate this way.
* If we ever support other subtitles, we need to handle this with a
* buffer too
**/
AVSubtitle sub;
int finished = 0;
@ -1637,13 +1708,12 @@ static void decode_thread(void *data)
while (!finished)
{
if (avcodec_decode_subtitle2(sctx_active, &sub, &finished, &pkt) < 0)
if (avcodec_decode_subtitle2(sctx_active, &sub, &finished, pkt) < 0)
{
log_cb(RETRO_LOG_ERROR, "[FFMPEG] Decode subtitles failed.\n");
break;
}
}
#ifdef HAVE_SSA
for (i = 0; i < sub.num_rects; i++)
{
@ -1654,11 +1724,10 @@ static void decode_thread(void *data)
slock_unlock(ass_lock);
}
#endif
avsubtitle_free(&sub);
av_packet_unref(pkt);
}
av_free_packet(&pkt);
av_packet_free(&pkt);
}
for (i = 0; (int)i < audio_streams_num; i++)
@ -1667,6 +1736,9 @@ static void decode_thread(void *data)
if (vctx && vctx->hw_device_ctx)
av_buffer_unref(&vctx->hw_device_ctx);
packet_buffer_destroy(audio_packet_buffer);
packet_buffer_destroy(video_packet_buffer);
av_frame_free(&aud_frame);
av_freep(&audio_buffer);
@ -1967,9 +2039,9 @@ bool CORE_PREFIX(retro_load_game)(const struct retro_game_info *info)
}
if (audio_streams_num > 0)
{
/* audio fifo is 4 seconds deep */
/* audio fifo is 2 seconds deep */
audio_decode_fifo = fifo_new(
media.sample_rate * sizeof(int16_t) * 2 * 4
media.sample_rate * sizeof(int16_t) * 2 * 2
);
}

View File

@ -0,0 +1,135 @@
#include "packet_buffer.h"
struct AVPacketNode {
AVPacket *data;
struct AVPacketNode *next;
struct AVPacketNode *previous;
};
typedef struct AVPacketNode AVPacketNode_t;
struct packet_buffer
{
AVPacketNode_t *head;
AVPacketNode_t *tail;
size_t size;
};
packet_buffer_t *packet_buffer_create()
{
packet_buffer_t *b = malloc(sizeof(packet_buffer_t));
if (!b)
return NULL;
memset(b, 0, sizeof(packet_buffer_t));
return b;
}
void packet_buffer_destroy(packet_buffer_t *packet_buffer)
{
AVPacketNode_t *node;
if (!packet_buffer)
return;
if (packet_buffer->head)
{
node = packet_buffer->head;
while (node)
{
AVPacketNode_t *next = node->next;
av_packet_free(&node->data);
free(node);
node = next;
}
}
free(packet_buffer);
}
void packet_buffer_clear(packet_buffer_t **packet_buffer)
{
if (!packet_buffer)
return;
packet_buffer_destroy(*packet_buffer);
*packet_buffer = packet_buffer_create();
}
bool packet_buffer_empty(packet_buffer_t *packet_buffer)
{
if (!packet_buffer)
return true;
return packet_buffer->size == 0;
}
size_t packet_buffer_size(packet_buffer_t *packet_buffer)
{
if (!packet_buffer)
return 0;
return packet_buffer->size;
}
void packet_buffer_add_packet(packet_buffer_t *packet_buffer, AVPacket *pkt)
{
AVPacketNode_t *new_head = (AVPacketNode_t *) malloc(sizeof(AVPacketNode_t));
new_head->data = av_packet_alloc();
av_packet_move_ref(new_head->data, pkt);
if (packet_buffer->head)
{
new_head->next = packet_buffer->head;
packet_buffer->head->previous = new_head;
}
else
{
new_head->next = NULL;
packet_buffer->tail = new_head;
}
packet_buffer->head = new_head;
packet_buffer->head->previous = NULL;
packet_buffer->size++;
}
void packet_buffer_get_packet(packet_buffer_t *packet_buffer, AVPacket *pkt)
{
AVPacketNode_t *new_tail = NULL;
if (packet_buffer->tail == NULL)
return;
av_packet_move_ref(pkt, packet_buffer->tail->data);
if (packet_buffer->tail->previous)
{
new_tail = packet_buffer->tail->previous;
new_tail->next = NULL;
}
else
packet_buffer->head = NULL;
av_packet_free(&packet_buffer->tail->data);
free(packet_buffer->tail);
packet_buffer->tail = new_tail;
packet_buffer->size--;
}
int64_t packet_buffer_peek_start_pts(packet_buffer_t *packet_buffer)
{
if (!packet_buffer->tail)
return 0;
return packet_buffer->tail->data->pts;
}
int64_t packet_buffer_peek_end_pts(packet_buffer_t *packet_buffer)
{
if (!packet_buffer->tail)
return 0;
return packet_buffer->tail->data->pts + packet_buffer->tail->data->duration;
}

View File

@ -0,0 +1,110 @@
#ifndef __LIBRETRO_SDK_PACKETBUFFER_H__
#define __LIBRETRO_SDK_PACKETBUFFER_H__
#include <retro_common_api.h>
#include <boolean.h>
#include <stdint.h>
#include <libavcodec/avcodec.h>
#include <retro_miscellaneous.h>
RETRO_BEGIN_DECLS
/**
* packet_buffer
*
* Just a simple double linked list for AVPackets.
*
*/
struct packet_buffer;
typedef struct packet_buffer packet_buffer_t;
/**
* packet_buffer_create:
*
* Create a packet_buffer.
*
* Returns: A packet buffer.
*/
packet_buffer_t *packet_buffer_create();
/**
* packet_buffer_destroy:
* @packet_buffer : packet buffer
*
* Destroys a packet buffer.
*
**/
void packet_buffer_destroy(packet_buffer_t *packet_buffer);
/**
* packet_buffer_clear:
* @packet_buffer : packet buffer
*
* Clears a packet buffer by re-creating it.
*
**/
void packet_buffer_clear(packet_buffer_t **packet_buffer);
/**
* packet_buffer_empty:
* @packet_buffer : packet buffer
*
* Return true if the buffer is empty;
*
**/
bool packet_buffer_empty(packet_buffer_t *packet_buffer);
/**
* packet_buffer_size:
* @packet_buffer : packet buffer
*
* Returns the number of AVPackets the buffer currently
* holds.
*
**/
size_t packet_buffer_size(packet_buffer_t *packet_buffer);
/**
* packet_buffer_add_packet:
* @packet_buffer : packet buffer
* @pkt : packet
*
* Copies the given packet into the selected buffer.
*
**/
void packet_buffer_add_packet(packet_buffer_t *packet_buffer, AVPacket *pkt);
/**
* packet_buffer_get_packet:
* @packet_buffer : packet buffer
* @pkt : packet
*
* Get the next packet. User needs to unref the packet with av_packet_unref().
*
**/
void packet_buffer_get_packet(packet_buffer_t *packet_buffer, AVPacket *pkt);
/**
* packet_buffer_peek_start_pts:
* @packet_buffer : packet buffer
*
* Returns the start pts of the next packet in the buffer.
*
**/
int64_t packet_buffer_peek_start_pts(packet_buffer_t *packet_buffer);
/**
* packet_buffer_peek_end_pts:
* @packet_buffer : packet buffer
*
* Returns the end pts of the next packet in the buffer.
*
**/
int64_t packet_buffer_peek_end_pts(packet_buffer_t *packet_buffer);
RETRO_END_DECLS
#endif

View File

@ -15,7 +15,7 @@ struct video_buffer
{
video_decoder_context_t *buffer;
enum kbStatus *status;
size_t size;
size_t capacity;
slock_t *lock;
scond_t *open_cond;
scond_t *finished_cond;
@ -23,24 +23,19 @@ struct video_buffer
int64_t tail;
};
video_buffer_t *video_buffer_create(size_t num, int frame_size, int width, int height)
video_buffer_t *video_buffer_create(size_t capacity, int frame_size, int width, int height)
{
video_buffer_t *b = malloc(sizeof (video_buffer_t));
video_buffer_t *b = malloc(sizeof(video_buffer_t));
if (!b)
return NULL;
b->lock = NULL;
b->open_cond = NULL;
b->finished_cond = NULL;
b->buffer = NULL;
b->size = num;
b->head = 0;
b->tail = 0;
memset(b, 0, sizeof(video_buffer_t));
b->capacity = capacity;
b->status = malloc(sizeof(enum kbStatus) * num);
b->status = malloc(sizeof(enum kbStatus) * capacity);
if (!b->status)
goto fail;
for (int i = 0; i < num; i++)
for (int i = 0; i < capacity; i++)
b->status[i] = KB_OPEN;
b->lock = slock_new();
@ -49,11 +44,11 @@ video_buffer_t *video_buffer_create(size_t num, int frame_size, int width, int h
if (!b->lock || !b->open_cond || !b->finished_cond)
goto fail;
b->buffer = malloc(sizeof(video_decoder_context_t) * num);
b->buffer = malloc(sizeof(video_decoder_context_t) * capacity);
if (!b->buffer)
goto fail;
for (int i = 0; i < num; i++)
for (int i = 0; i < capacity; i++)
{
b->buffer[i].index = i;
b->buffer[i].pts = 0;
@ -94,7 +89,7 @@ void video_buffer_destroy(video_buffer_t *video_buffer)
scond_free(video_buffer->finished_cond);
free(video_buffer->status);
if (video_buffer->buffer)
for (int i = 0; i < video_buffer->size; i++)
for (int i = 0; i < video_buffer->capacity; i++)
{
#if LIBAVUTIL_VERSION_MAJOR > 55
av_frame_free(&video_buffer->buffer[i].hw_source);
@ -120,7 +115,7 @@ void video_buffer_clear(video_buffer_t *video_buffer)
video_buffer->head = 0;
video_buffer->tail = 0;
for (int i = 0; i < video_buffer->size; i++)
for (int i = 0; i < video_buffer->capacity; i++)
video_buffer->status[i] = KB_OPEN;
slock_unlock(video_buffer->lock);
@ -135,7 +130,7 @@ void video_buffer_get_open_slot(video_buffer_t *video_buffer, video_decoder_cont
*context = &video_buffer->buffer[video_buffer->head];
video_buffer->status[video_buffer->head] = KB_IN_PROGRESS;
video_buffer->head++;
video_buffer->head %= video_buffer->size;
video_buffer->head %= video_buffer->capacity;
}
slock_unlock(video_buffer->lock);
@ -149,7 +144,7 @@ void video_buffer_return_open_slot(video_buffer_t *video_buffer, video_decoder_c
{
video_buffer->status[context->index] = KB_OPEN;
video_buffer->head--;
video_buffer->head %= video_buffer->size;
video_buffer->head %= video_buffer->capacity;
}
slock_unlock(video_buffer->lock);
@ -163,7 +158,7 @@ void video_buffer_open_slot(video_buffer_t *video_buffer, video_decoder_context_
{
video_buffer->status[context->index] = KB_OPEN;
video_buffer->tail++;
video_buffer->tail %= (video_buffer->size);
video_buffer->tail %= (video_buffer->capacity);
scond_signal(video_buffer->open_cond);
}

View File

@ -1,5 +1,5 @@
#ifndef __LIBRETRO_SDK_SWSBUFFER_H__
#define __LIBRETRO_SDK_SWSBUFFER_H__
#ifndef __LIBRETRO_SDK_VIDEOBUFFER_H__
#define __LIBRETRO_SDK_VIDEOBUFFER_H__
#include <retro_common_api.h>
@ -47,7 +47,7 @@ typedef struct video_decoder_context video_decoder_context_t;
/**
* video_buffer
*
* The video_buffer is a ring buffer, that can be used as a
* The video buffer is a ring buffer, that can be used as a
* buffer for many workers while keeping the order.
*
* It is thread safe in a sensem that it is designed to work
@ -61,26 +61,26 @@ typedef struct video_buffer video_buffer_t;
/**
* video_buffer_create:
* @num : Size of the buffer.
* @capacity : Size of the buffer.
* @frame_size : Size of the target frame.
* @width : Width of the target frame.
* @height : Height of the target frame.
*
* Create a video_buffer.
* Create a video buffer.
*
* Returns: A video buffer.
*/
video_buffer_t *video_buffer_create(size_t num, int frame_size, int width, int height);
video_buffer_t *video_buffer_create(size_t capacity, int frame_size, int width, int height);
/**
* video_buffer_destroy:
* @video_buffer : video buffer.
*
* Destory a video_buffer.
* Destroys a video buffer.
*
* Does also free the buffer allocated with video_buffer_create().
* User has to shut down any external worker threads that may have
* a reference to this video_buffer.
* a reference to this video buffer.
*
**/
void video_buffer_destroy(video_buffer_t *video_buffer);
@ -89,7 +89,7 @@ void video_buffer_destroy(video_buffer_t *video_buffer);
* video_buffer_clear:
* @video_buffer : video buffer.
*
* Clears a video_buffer.
* Clears a video buffer.
*
**/
void video_buffer_clear(video_buffer_t *video_buffer);
@ -97,7 +97,7 @@ void video_buffer_clear(video_buffer_t *video_buffer);
/**
* video_buffer_get_open_slot:
* @video_buffer : video buffer.
* @contex : sws context.
* @context : sws context.
*
* Returns the next open context inside the ring buffer
* and it's index. The status of the slot will be marked as
@ -110,7 +110,7 @@ void video_buffer_get_open_slot(video_buffer_t *video_buffer, video_decoder_cont
/**
* video_buffer_return_open_slot:
* @video_buffer : video buffer.
* @contex : sws context.
* @context : sws context.
*
* Marks the given sws context that is "in progress" as "open" again.
*
@ -120,7 +120,7 @@ void video_buffer_return_open_slot(video_buffer_t *video_buffer, video_decoder_c
/**
* video_buffer_open_slot:
* @video_buffer : video buffer.
* @context : sws context.
* @context : sws context.
*
* Sets the status of the given context from "finished" to "open".
* The slot is then available for producers to claim again with video_buffer_get_open_slot().
@ -130,21 +130,20 @@ void video_buffer_open_slot(video_buffer_t *video_buffer, video_decoder_context_
/**
* video_buffer_get_finished_slot:
* @video_buffer : video buffer.
* @context : sws context.
* @context : sws context.
*
* Returns a reference for the next context inside
* the ring buffer. User needs to use video_buffer_open_slot()
* to open the slot in the ringbuffer for the next
* work assignment. User is free to re-allocate or
* re-use the context.
*
*/
void video_buffer_get_finished_slot(video_buffer_t *video_buffer, video_decoder_context_t **context);
/**
* video_buffer_finish_slot:
* @video_buffer : video buffer.
* @context : sws context.
* @context : sws context.
*
* Sets the status of the given context from "in progress" to "finished".
* This is normally done by a producer. User can then retrieve the finished work