Merge pull request #1880 from nghttp2/nghttpx-tweak-worker-process-handling

Nghttpx tweak worker process handling
This commit is contained in:
Tatsuhiro Tsujikawa 2023-03-11 10:02:09 +09:00 committed by GitHub
commit b8cb6efb37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 217 additions and 72 deletions

View File

@ -218,23 +218,6 @@ struct WorkerProcess {
cid_prefixes(cid_prefixes)
#endif // ENABLE_HTTP3
{
ev_signal_init(&reopen_log_signalev, signal_cb, REOPEN_LOG_SIGNAL);
reopen_log_signalev.data = this;
ev_signal_start(loop, &reopen_log_signalev);
ev_signal_init(&exec_binary_signalev, signal_cb, EXEC_BINARY_SIGNAL);
exec_binary_signalev.data = this;
ev_signal_start(loop, &exec_binary_signalev);
ev_signal_init(&graceful_shutdown_signalev, signal_cb,
GRACEFUL_SHUTDOWN_SIGNAL);
graceful_shutdown_signalev.data = this;
ev_signal_start(loop, &graceful_shutdown_signalev);
ev_signal_init(&reload_signalev, signal_cb, RELOAD_SIGNAL);
reload_signalev.data = this;
ev_signal_start(loop, &reload_signalev);
ev_child_init(&worker_process_childev, worker_process_child_cb, worker_pid,
0);
worker_process_childev.data = this;
@ -242,8 +225,6 @@ struct WorkerProcess {
}
~WorkerProcess() {
shutdown_signal_watchers();
ev_child_stop(loop, &worker_process_childev);
#ifdef ENABLE_HTTP3
@ -258,17 +239,6 @@ struct WorkerProcess {
}
}
void shutdown_signal_watchers() {
ev_signal_stop(loop, &reopen_log_signalev);
ev_signal_stop(loop, &exec_binary_signalev);
ev_signal_stop(loop, &graceful_shutdown_signalev);
ev_signal_stop(loop, &reload_signalev);
}
ev_signal reopen_log_signalev;
ev_signal exec_binary_signalev;
ev_signal graceful_shutdown_signalev;
ev_signal reload_signalev;
ev_child worker_process_childev;
struct ev_loop *loop;
pid_t worker_pid;
@ -281,7 +251,7 @@ struct WorkerProcess {
};
namespace {
void reload_config(WorkerProcess *wp);
void reload_config();
} // namespace
namespace {
@ -416,18 +386,6 @@ void worker_process_kill(int signum, struct ev_loop *loop) {
}
} // namespace
namespace {
// Returns the last PID of worker process. Returns -1 if there is no
// worker process at the moment.
int worker_process_last_pid() {
if (worker_processes.empty()) {
return -1;
}
return worker_processes.back()->worker_pid;
}
} // namespace
namespace {
int save_pid() {
std::array<char, STRERROR_BUFSIZE> errbuf;
@ -712,15 +670,12 @@ void reopen_log(WorkerProcess *wp) {
namespace {
void signal_cb(struct ev_loop *loop, ev_signal *w, int revents) {
auto wp = static_cast<WorkerProcess *>(w->data);
if (wp->worker_pid == -1) {
ev_break(loop);
return;
}
switch (w->signum) {
case REOPEN_LOG_SIGNAL:
reopen_log(wp);
for (auto &wp : worker_processes) {
reopen_log(wp.get());
}
return;
case EXEC_BINARY_SIGNAL:
exec_binary();
@ -730,12 +685,17 @@ void signal_cb(struct ev_loop *loop, ev_signal *w, int revents) {
for (auto &addr : listenerconf.addrs) {
close(addr.fd);
}
ipc_send(wp, SHRPX_IPC_GRACEFUL_SHUTDOWN);
worker_process_set_termination_deadline(wp, loop);
for (auto &wp : worker_processes) {
ipc_send(wp.get(), SHRPX_IPC_GRACEFUL_SHUTDOWN);
worker_process_set_termination_deadline(wp.get(), loop);
}
return;
}
case RELOAD_SIGNAL:
reload_config(wp);
reload_config();
return;
default:
worker_process_kill(w->signum, loop);
@ -751,11 +711,9 @@ void worker_process_child_cb(struct ev_loop *loop, ev_child *w, int revents) {
log_chld(w->rpid, w->rstatus, "Worker process");
auto pid = wp->worker_pid;
worker_process_remove(wp, loop);
if (worker_process_last_pid() == pid) {
if (worker_processes.empty()) {
ev_break(loop);
}
}
@ -1412,6 +1370,30 @@ int create_ipc_socket(std::array<int, 2> &ipc_fd) {
}
} // namespace
namespace {
int create_worker_process_ready_ipc_socket(std::array<int, 2> &ipc_fd) {
std::array<char, STRERROR_BUFSIZE> errbuf;
int rv;
rv = socketpair(AF_UNIX, SOCK_DGRAM, 0, ipc_fd.data());
if (rv == -1) {
auto error = errno;
LOG(WARN) << "Failed to create socket pair to communicate worker process "
"readiness: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
return -1;
}
for (auto fd : ipc_fd) {
util::make_socket_closeonexec(fd);
}
util::make_socket_nonblocking(ipc_fd[0]);
return 0;
}
} // namespace
#ifdef ENABLE_HTTP3
namespace {
int create_quic_ipc_socket(std::array<int, 2> &quic_ipc_fd) {
@ -1483,6 +1465,130 @@ collect_quic_lingering_worker_processes() {
} // namespace
#endif // ENABLE_HTTP3
namespace {
ev_signal reopen_log_signalev;
ev_signal exec_binary_signalev;
ev_signal graceful_shutdown_signalev;
ev_signal reload_signalev;
} // namespace
namespace {
void start_signal_watchers(struct ev_loop *loop) {
ev_signal_init(&reopen_log_signalev, signal_cb, REOPEN_LOG_SIGNAL);
ev_signal_start(loop, &reopen_log_signalev);
ev_signal_init(&exec_binary_signalev, signal_cb, EXEC_BINARY_SIGNAL);
ev_signal_start(loop, &exec_binary_signalev);
ev_signal_init(&graceful_shutdown_signalev, signal_cb,
GRACEFUL_SHUTDOWN_SIGNAL);
ev_signal_start(loop, &graceful_shutdown_signalev);
ev_signal_init(&reload_signalev, signal_cb, RELOAD_SIGNAL);
ev_signal_start(loop, &reload_signalev);
}
} // namespace
namespace {
void shutdown_signal_watchers(struct ev_loop *loop) {
ev_signal_stop(loop, &reload_signalev);
ev_signal_stop(loop, &graceful_shutdown_signalev);
ev_signal_stop(loop, &exec_binary_signalev);
ev_signal_stop(loop, &reopen_log_signalev);
}
} // namespace
namespace {
// A pair of connected socket with which a worker process tells main
// process that it is ready for service. A worker process writes its
// PID to worker_process_ready_ipc_fd[1] and main process reads it
// from worker_process_ready_ipc_fd[0].
std::array<int, 2> worker_process_ready_ipc_fd;
} // namespace
namespace {
ev_io worker_process_ready_ipcev;
} // namespace
namespace {
// PID received via NGHTTPX_ORIG_PID environment variable.
pid_t orig_pid = -1;
} // namespace
namespace {
void worker_process_ready_ipc_readcb(struct ev_loop *loop, ev_io *w,
int revents) {
std::array<uint8_t, 8> buf;
ssize_t nread;
while ((nread = read(w->fd, buf.data(), buf.size())) == -1 && errno == EINTR)
;
if (nread == -1) {
std::array<char, STRERROR_BUFSIZE> errbuf;
auto error = errno;
LOG(ERROR) << "Failed to read data from worker process ready IPC channel: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
return;
}
if (nread == 0) {
return;
}
if (nread != sizeof(pid_t)) {
LOG(ERROR) << "Read " << nread
<< " bytes from worker process ready IPC channel";
return;
}
pid_t pid;
memcpy(&pid, buf.data(), sizeof(pid));
LOG(NOTICE) << "Worker process pid=" << pid << " is ready";
for (auto &wp : worker_processes) {
// Send graceful shutdown signal to all worker processes prior to
// pid.
if (wp->worker_pid == pid) {
break;
}
LOG(INFO) << "Sending graceful shutdown event to worker process pid="
<< wp->worker_pid;
ipc_send(wp.get(), SHRPX_IPC_GRACEFUL_SHUTDOWN);
worker_process_set_termination_deadline(wp.get(), loop);
}
if (orig_pid != -1) {
LOG(NOTICE) << "Send QUIT signal to the original main process to tell "
"that we are ready to serve requests.";
kill(orig_pid, SIGQUIT);
orig_pid = -1;
}
}
} // namespace
namespace {
void start_worker_process_ready_ipc_watcher(struct ev_loop *loop) {
ev_io_init(&worker_process_ready_ipcev, worker_process_ready_ipc_readcb,
worker_process_ready_ipc_fd[0], EV_READ);
ev_io_start(loop, &worker_process_ready_ipcev);
}
} // namespace
namespace {
void shutdown_worker_process_ready_ipc_watcher(struct ev_loop *loop) {
ev_io_stop(loop, &worker_process_ready_ipcev);
}
} // namespace
namespace {
// Creates worker process, and returns PID of worker process. On
// success, file descriptor for IPC (send only) is assigned to
@ -1545,6 +1651,9 @@ pid_t fork_worker_process(
}
if (pid == 0) {
// We are in new process now, update pid for logger.
log_config()->pid = getpid();
ev_loop_fork(EV_DEFAULT);
for (auto &addr : config->conn.listener.addrs) {
@ -1565,6 +1674,13 @@ pid_t fork_worker_process(
}
#endif // ENABLE_HTTP3
close(worker_process_ready_ipc_fd[0]);
shutdown_worker_process_ready_ipc_watcher(EV_DEFAULT);
if (!config->single_process) {
shutdown_signal_watchers(EV_DEFAULT);
}
// Remove all WorkerProcesses to stop any registered watcher on
// default loop.
worker_process_remove_all(EV_DEFAULT);
@ -1595,6 +1711,7 @@ pid_t fork_worker_process(
WorkerProcessConfig wpconf{
.ipc_fd = ipc_fd[0],
.ready_ipc_fd = worker_process_ready_ipc_fd[1],
#ifdef ENABLE_HTTP3
.cid_prefixes = cid_prefixes,
.quic_ipc_fd = quic_ipc_fd[0],
@ -1702,7 +1819,7 @@ int event_loop() {
close_unused_inherited_addr(iaddrs);
}
auto orig_pid = get_orig_pid_from_env();
orig_pid = get_orig_pid_from_env();
#ifdef ENABLE_HTTP3
inherited_quic_lingering_worker_processes =
@ -1724,6 +1841,13 @@ int event_loop() {
}
#endif // ENABLE_HTTP3
if (!config->single_process) {
start_signal_watchers(loop);
}
create_worker_process_ready_ipc_socket(worker_process_ready_ipc_fd);
start_worker_process_ready_ipc_watcher(loop);
auto pid = fork_worker_process(ipc_fd
#ifdef ENABLE_HTTP3
,
@ -1759,19 +1883,18 @@ int event_loop() {
save_pid();
}
// ready to serve requests
shrpx_sd_notifyf(0, "READY=1");
if (orig_pid != -1) {
LOG(NOTICE) << "Send QUIT signal to the original main process to tell "
"that we are ready to serve requests.";
kill(orig_pid, SIGQUIT);
}
ev_run(loop, 0);
ev_timer_stop(loop, &worker_process_grace_period_timer);
shutdown_worker_process_ready_ipc_watcher(loop);
if (!config->single_process) {
shutdown_signal_watchers(loop);
}
return 0;
}
} // namespace
@ -3837,7 +3960,7 @@ void close_not_inherited_fd(Config *config,
} // namespace
namespace {
void reload_config(WorkerProcess *wp) {
void reload_config() {
int rv;
LOG(NOTICE) << "Reloading configuration";
@ -3916,13 +4039,6 @@ void reload_config(WorkerProcess *wp) {
close_unused_inherited_addr(iaddrs);
// Send last worker process a graceful shutdown notice
auto &last_wp = worker_processes.back();
ipc_send(last_wp.get(), SHRPX_IPC_GRACEFUL_SHUTDOWN);
worker_process_set_termination_deadline(last_wp.get(), loop);
// We no longer use signals for this worker.
last_wp->shutdown_signal_watchers();
worker_process_add(std::make_unique<WorkerProcess>(loop, pid, ipc_fd
#ifdef ENABLE_HTTP3
,

View File

@ -405,6 +405,29 @@ void nb_child_cb(struct ev_loop *loop, ev_child *w, int revents) {
} // namespace
#endif // HAVE_NEVERBLEED
namespace {
int send_ready_event(int ready_ipc_fd) {
std::array<char, STRERROR_BUFSIZE> errbuf;
auto pid = getpid();
ssize_t nwrite;
while ((nwrite = write(ready_ipc_fd, &pid, sizeof(pid))) == -1 &&
errno == EINTR)
;
if (nwrite < 0) {
auto error = errno;
LOG(ERROR) << "Writing PID to ready IPC channel failed: "
<< xsi_strerror(error, errbuf.data(), errbuf.size());
return -1;
}
return 0;
}
} // namespace
int worker_process_event_loop(WorkerProcessConfig *wpconf) {
int rv;
std::array<char, STRERROR_BUFSIZE> errbuf;
@ -638,6 +661,10 @@ int worker_process_event_loop(WorkerProcessConfig *wpconf) {
LOG(INFO) << "Entering event loop";
}
if (send_ready_event(wpconf->ready_ipc_fd) != 0) {
return -1;
}
ev_run(loop, 0);
conn_handler->cancel_ocsp_update();

View File

@ -42,6 +42,8 @@ class ConnectionHandler;
struct WorkerProcessConfig {
// IPC socket to read event from main process
int ipc_fd;
// IPC socket to tell that a worker process is ready for service.
int ready_ipc_fd;
// IPv4 or UNIX domain socket, or -1 if not used
int server_fd;
// IPv6 socket, or -1 if not used