avformat: Add ZeroMQ as a protocol

When ffmpeg was streaming, multiple clients were only supported by using a
multicast destination address. An alternative was to stream to a server which
re-distributes the content. This commit adds ZeroMQ as a protocol, which allows
multiple clients to connect to a single ffmpeg instance.

Signed-off-by: Marton Balint <cus@passwd.hu>
This commit is contained in:
Andriy Gelman 2019-07-30 14:39:32 -04:00 committed by Marton Balint
parent b022d9ba28
commit ef43a4d6b3
8 changed files with 254 additions and 2 deletions

View File

@ -8,6 +8,7 @@ version <next>:
- support for TrueHD in mp4
- Supoort AMD AMF encoder on Linux (via Vulkan)
- IMM5 video decoder
- ZeroMQ protocol
version 4.2:

4
configure vendored
View File

@ -3415,6 +3415,8 @@ libsrt_protocol_deps="libsrt"
libsrt_protocol_select="network"
libssh_protocol_deps="libssh"
libtls_conflict="openssl gnutls mbedtls"
libzmq_protocol_deps="libzmq"
libzmq_protocol_select="network"
# filters
afftdn_filter_deps="avcodec"
@ -6325,7 +6327,7 @@ enabled libxavs && require libxavs "stdint.h xavs.h" xavs_encoder_enco
enabled libxavs2 && require_pkg_config libxavs2 "xavs2 >= 1.3.0" "stdint.h xavs2.h" xavs2_api_get
enabled libxvid && require libxvid xvid.h xvid_global -lxvidcore
enabled libzimg && require_pkg_config libzimg "zimg >= 2.7.0" zimg.h zimg_get_api_version
enabled libzmq && require_pkg_config libzmq libzmq zmq.h zmq_ctx_new
enabled libzmq && require_pkg_config libzmq "libzmq >= 4.2.1" zmq.h zmq_ctx_new
enabled libzvbi && require_pkg_config libzvbi zvbi-0.2 libzvbi.h vbi_decoder_new &&
{ test_cpp_condition libzvbi.h "VBI_VERSION_MAJOR > 0 || VBI_VERSION_MINOR > 2 || VBI_VERSION_MINOR == 2 && VBI_VERSION_MICRO >= 28" ||
enabled gpl || die "ERROR: libzvbi requires version 0.2.28 or --enable-gpl."; }

View File

@ -1339,6 +1339,7 @@ performance on systems without hardware floating point support).
@item TCP @tab X
@item TLS @tab X
@item UDP @tab X
@item ZMQ @tab E
@end multitable
@code{X} means that the protocol is supported.

View File

@ -1728,4 +1728,51 @@ Timeout in ms.
Create the Unix socket in listening mode.
@end table
@section zmq
ZeroMQ asynchronous messaging using the libzmq library.
This library supports unicast streaming to multiple clients without relying on
an external server.
The required syntax for streaming or connecting to a stream is:
@example
zmq:tcp://ip-address:port
@end example
Example:
Create a localhost stream on port 5555:
@example
ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
@end example
Multiple clients may connect to the stream using:
@example
ffplay zmq:tcp://127.0.0.1:5555
@end example
Streaming to multiple clients is implemented using a ZeroMQ Pub-Sub pattern.
The server side binds to a port and publishes data. Clients connect to the
server (via IP address/port) and subscribe to the stream. The order in which
the server and client start generally does not matter.
ffmpeg must be compiled with the --enable-libzmq option to support
this protocol.
Options can be set on the @command{ffmpeg}/@command{ffplay} command
line. The following options are supported:
@table @option
@item pkt_size
Forces the maximum packet size for sending/receiving data. The default value is
32,768 bytes. On the server side, this sets the maximum size of sent packets
via ZeroMQ. On the clients, it sets an internal buffer size for receiving
packets. Note that pkt_size on the clients should be equal to or greater than
pkt_size on the server. Otherwise the received message may be truncated causing
decoding errors.
@end table
@c man end PROTOCOLS

View File

@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL) += librtmp.o
OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL) += libsmbclient.o
OBJS-$(CONFIG_LIBSRT_PROTOCOL) += libsrt.o
OBJS-$(CONFIG_LIBSSH_PROTOCOL) += libssh.o
OBJS-$(CONFIG_LIBZMQ_PROTOCOL) += libzmq.o
# libavdevice dependencies
OBJS-$(CONFIG_IEC61883_INDEV) += dv.o

199
libavformat/libzmq.c Normal file
View File

@ -0,0 +1,199 @@
/*
* ZeroMQ Protocol
* Copyright (c) 2019 Andriy Gelman
*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with FFmpeg; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <zmq.h>
#include "url.h"
#include "network.h"
#include "libavutil/avstring.h"
#include "libavutil/opt.h"
#include "libavutil/time.h"
#define ZMQ_STRERROR zmq_strerror(zmq_errno())
typedef struct ZMQContext {
const AVClass *class;
void *context;
void *socket;
int pkt_size;
int pkt_size_overflow; /*keep track of the largest packet during overflow*/
} ZMQContext;
#define OFFSET(x) offsetof(ZMQContext, x)
#define D AV_OPT_FLAG_DECODING_PARAM
#define E AV_OPT_FLAG_ENCODING_PARAM
static const AVOption options[] = {
{ "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 32768 }, -1, INT_MAX, .flags = D | E },
{ NULL }
};
static int zmq_proto_wait(URLContext *h, void *socket, int write)
{
int ret;
int ev = write ? ZMQ_POLLOUT : ZMQ_POLLIN;
zmq_pollitem_t items = { .socket = socket, .fd = 0, .events = ev, .revents = 0 };
ret = zmq_poll(&items, 1, POLLING_TIME);
if (ret == -1) {
av_log(h, AV_LOG_ERROR, "Error occured during zmq_poll(): %s\n", ZMQ_STRERROR);
return AVERROR_EXTERNAL;
}
return items.revents & ev ? 0 : AVERROR(EAGAIN);
}
static int zmq_proto_wait_timeout(URLContext *h, void *socket, int write, int64_t timeout, AVIOInterruptCB *int_cb)
{
int ret;
int64_t wait_start = 0;
while (1) {
if (ff_check_interrupt(int_cb))
return AVERROR_EXIT;
ret = zmq_proto_wait(h, socket, write);
if (ret != AVERROR(EAGAIN))
return ret;
if (timeout > 0) {
if (!wait_start)
wait_start = av_gettime_relative();
else if (av_gettime_relative() - wait_start > timeout)
return AVERROR(ETIMEDOUT);
}
}
}
static int zmq_proto_open(URLContext *h, const char *uri, int flags)
{
int ret;
ZMQContext *s = h->priv_data;
s->pkt_size_overflow = 0;
h->is_streamed = 1;
if (s->pkt_size > 0)
h->max_packet_size = s->pkt_size;
s->context = zmq_ctx_new();
if (!s->context) {
/*errno not set on failure during zmq_ctx_new()*/
av_log(h, AV_LOG_ERROR, "Error occured during zmq_ctx_new()\n");
return AVERROR_EXTERNAL;
}
av_strstart(uri, "zmq:", &uri);
/*publish during write*/
if (h->flags & AVIO_FLAG_WRITE) {
s->socket = zmq_socket(s->context, ZMQ_PUB);
if (!s->socket) {
av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
zmq_ctx_term(s->context);
return AVERROR_EXTERNAL;
}
ret = zmq_bind(s->socket, uri);
if (ret == -1) {
av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", ZMQ_STRERROR);
zmq_close(s->socket);
zmq_ctx_term(s->context);
return AVERROR_EXTERNAL;
}
}
/*subscribe for read*/
if (h->flags & AVIO_FLAG_READ) {
s->socket = zmq_socket(s->context, ZMQ_SUB);
if (!s->socket) {
av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
zmq_ctx_term(s->context);
return AVERROR_EXTERNAL;
}
zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
ret = zmq_connect(s->socket, uri);
if (ret == -1) {
av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", ZMQ_STRERROR);
zmq_close(s->socket);
zmq_ctx_term(s->context);
return AVERROR_EXTERNAL;
}
}
return 0;
}
static int zmq_proto_write(URLContext *h, const unsigned char *buf, int size)
{
int ret;
ZMQContext *s = h->priv_data;
ret = zmq_proto_wait_timeout(h, s->socket, 1, h->rw_timeout, &h->interrupt_callback);
if (ret)
return ret;
ret = zmq_send(s->socket, buf, size, 0);
if (ret == -1) {
av_log(h, AV_LOG_ERROR, "Error occured during zmq_send(): %s\n", ZMQ_STRERROR);
return AVERROR_EXTERNAL;
}
return ret; /*number of bytes sent*/
}
static int zmq_proto_read(URLContext *h, unsigned char *buf, int size)
{
int ret;
ZMQContext *s = h->priv_data;
ret = zmq_proto_wait_timeout(h, s->socket, 0, h->rw_timeout, &h->interrupt_callback);
if (ret)
return ret;
ret = zmq_recv(s->socket, buf, size, 0);
if (ret == -1) {
av_log(h, AV_LOG_ERROR, "Error occured during zmq_recv(): %s\n", ZMQ_STRERROR);
return AVERROR_EXTERNAL;
}
if (ret > size) {
s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, ret);
av_log(h, AV_LOG_WARNING, "Message exceeds available space in the buffer. Message will be truncated. Setting -pkt_size %d may resolve the issue.\n", s->pkt_size_overflow);
ret = size;
}
return ret; /*number of bytes read*/
}
static int zmq_proto_close(URLContext *h)
{
ZMQContext *s = h->priv_data;
zmq_close(s->socket);
zmq_ctx_term(s->context);
return 0;
}
static const AVClass zmq_context_class = {
.class_name = "zmq",
.item_name = av_default_item_name,
.option = options,
.version = LIBAVUTIL_VERSION_INT,
};
const URLProtocol ff_libzmq_protocol = {
.name = "zmq",
.url_close = zmq_proto_close,
.url_open = zmq_proto_open,
.url_read = zmq_proto_read,
.url_write = zmq_proto_write,
.priv_data_size = sizeof(ZMQContext),
.priv_data_class = &zmq_context_class,
.flags = URL_PROTOCOL_FLAG_NETWORK,
};

View File

@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol;
extern const URLProtocol ff_libsrt_protocol;
extern const URLProtocol ff_libssh_protocol;
extern const URLProtocol ff_libsmbclient_protocol;
extern const URLProtocol ff_libzmq_protocol;
#include "libavformat/protocol_list.c"

View File

@ -32,7 +32,7 @@
// Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium)
// Also please add any ticket numbers that you believe might be affected here
#define LIBAVFORMAT_VERSION_MAJOR 58
#define LIBAVFORMAT_VERSION_MINOR 31
#define LIBAVFORMAT_VERSION_MINOR 32
#define LIBAVFORMAT_VERSION_MICRO 104
#define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \