ControllerInterface: DSU polish: avoid hanging host thread, add disconnection detection, ...

-Reworked thread waits to never hang the Host thread for more than a really small time
(e.g. when disabling DSU its thread now closes almost immediately)
-Improve robustness when a large amount of devices are connected
-Add devices disconnection detection (they'd stay there forever until manually refreshed)
This commit is contained in:
Filoppi 2021-05-15 12:28:52 +03:00
parent 83806462ec
commit 16e4dede72
2 changed files with 92 additions and 46 deletions

View File

@ -18,7 +18,6 @@
#include "Common/Flag.h"
#include "Common/Logging/Log.h"
#include "Common/MathUtil.h"
#include "Common/Matrix.h"
#include "Common/Random.h"
#include "Common/StringUtil.h"
#include "Common/Thread.h"
@ -157,6 +156,9 @@ constexpr auto SERVER_REREGISTER_INTERVAL = std::chrono::seconds{1};
constexpr auto SERVER_LISTPORTS_INTERVAL = std::chrono::seconds{1};
constexpr int TOUCH_X_AXIS_MAX = 1000;
constexpr int TOUCH_Y_AXIS_MAX = 500;
constexpr auto THREAD_MAX_WAIT_INTERVAL = std::chrono::milliseconds{250};
constexpr auto SERVER_UNRESPONSIVE_INTERVAL = std::chrono::seconds{1}; // Can be 0
constexpr u32 SERVER_ASKED_PADS = 4;
struct Server
{
@ -184,12 +186,13 @@ struct Server
std::mutex m_port_info_mutex;
std::array<Proto::MessageType::PortInfo, Proto::PORT_COUNT> m_port_info;
sf::UdpSocket m_socket;
SteadyClock::time_point m_disconnect_time = SteadyClock::now();
};
static bool s_servers_enabled;
static std::vector<Server> s_servers;
static u32 s_client_uid;
static SteadyClock::time_point s_next_listports;
static SteadyClock::time_point s_next_listports_time;
static std::thread s_hotplug_thread;
static Common::Flag s_hotplug_thread_running;
@ -206,26 +209,33 @@ static void HotplugThreadFunc()
Common::SetCurrentThreadName("DualShockUDPClient Hotplug Thread");
INFO_LOG_FMT(CONTROLLERINTERFACE, "DualShockUDPClient hotplug thread started");
std::vector<bool> timed_out_servers(s_servers.size(), false);
while (s_hotplug_thread_running.IsSet())
{
const auto now = SteadyClock::now();
if (now >= s_next_listports)
{
s_next_listports = now + SERVER_LISTPORTS_INTERVAL;
using namespace std::chrono;
using namespace std::chrono_literals;
for (auto& server : s_servers)
const auto now = SteadyClock::now();
if (now >= s_next_listports_time)
{
s_next_listports_time = now + SERVER_LISTPORTS_INTERVAL;
for (size_t i = 0; i < s_servers.size(); ++i)
{
// Request info on the four controller ports
auto& server = s_servers[i];
Proto::Message<Proto::MessageType::ListPorts> msg(s_client_uid);
auto& list_ports = msg.m_message;
list_ports.pad_request_count = 4;
list_ports.pad_id = {0, 1, 2, 3};
// We ask for x possible devices. We will receive a message for every connected device.
list_ports.pad_request_count = SERVER_ASKED_PADS;
list_ports.pad_ids = {0, 1, 2, 3};
msg.Finish();
if (server.m_socket.send(&list_ports, sizeof list_ports, server.m_address, server.m_port) !=
sf::Socket::Status::Done)
{
ERROR_LOG_FMT(CONTROLLERINTERFACE, "DualShockUDPClient HotplugThreadFunc send failed");
}
timed_out_servers[i] = true;
}
}
@ -235,44 +245,80 @@ static void HotplugThreadFunc()
selector.add(server.m_socket);
}
using namespace std::chrono;
using namespace std::chrono_literals;
const auto timeout = s_next_listports - SteadyClock::now();
auto timeout = duration_cast<milliseconds>(s_next_listports_time - SteadyClock::now());
// Selector's wait treats a timeout of zero as infinite timeout, which we don't want
const auto timeout_ms = std::max(duration_cast<milliseconds>(timeout), 1ms);
if (!selector.wait(sf::milliseconds(timeout_ms.count())))
// Receive controller port info within a time from our request.
// Run this even if we sent no new requests, to disconnect devices,
// sleep (wait) the thread and catch old responses.
do
{
continue;
}
for (auto& server : s_servers)
{
if (!selector.isReady(server.m_socket))
// Selector's wait treats a timeout of zero as infinite timeout, which we don't want,
// but we also don't want risk waiting for the whole SERVER_LISTPORTS_INTERVAL and hang
// the thead trying to close this one in case we received no answers.
const auto current_timeout = std::max(std::min(timeout, THREAD_MAX_WAIT_INTERVAL), 1ms);
timeout -= current_timeout;
// This will return at the first answer
if (selector.wait(sf::milliseconds(current_timeout.count())))
{
continue;
}
Proto::Message<Proto::MessageType::FromServer> msg;
std::size_t received_bytes;
sf::IpAddress sender;
u16 port;
if (server.m_socket.receive(&msg, sizeof(msg), received_bytes, sender, port) !=
sf::Socket::Status::Done)
{
continue;
}
if (auto port_info = msg.CheckAndCastTo<Proto::MessageType::PortInfo>())
{
const bool port_changed =
!IsSameController(*port_info, server.m_port_info[port_info->pad_id]);
// Now check all the servers because we don't know which one(s) sent a reply
for (size_t i = 0; i < s_servers.size(); ++i)
{
std::lock_guard lock{server.m_port_info_mutex};
server.m_port_info[port_info->pad_id] = *port_info;
auto& server = s_servers[i];
if (!selector.isReady(server.m_socket))
{
continue;
}
Proto::Message<Proto::MessageType::FromServer> msg;
std::size_t received_bytes;
sf::IpAddress sender;
u16 port;
if (server.m_socket.receive(&msg, sizeof(msg), received_bytes, sender, port) !=
sf::Socket::Status::Done)
{
continue;
}
if (auto port_info = msg.CheckAndCastTo<Proto::MessageType::PortInfo>())
{
server.m_disconnect_time = SteadyClock::now() + SERVER_UNRESPONSIVE_INTERVAL;
// We have receive at least one valid update, that's enough. This is needed to avoid
// false positive when checking for disconnection in case our thread waited too long
timed_out_servers[i] = false;
const bool port_changed =
!IsSameController(*port_info, server.m_port_info[port_info->pad_id]);
if (port_changed)
{
server.m_port_info[port_info->pad_id] = *port_info;
// Just remove and re-add all the devices for simplicity
g_controller_interface.PlatformPopulateDevices([] { PopulateDevices(); });
}
}
}
if (port_changed)
PopulateDevices();
}
if (!s_hotplug_thread_running.IsSet()) // Avoid hanging the thread for too long
return;
} while (timeout > 0ms);
// If we have failed to receive any information from the server (or even send it),
// disconnect all devices from it (after enough time has elapsed, to avoid false positives).
for (size_t i = 0; i < s_servers.size(); ++i)
{
auto& server = s_servers[i];
if (timed_out_servers[i] && SteadyClock::now() >= server.m_disconnect_time)
{
bool any_connected = false;
for (size_t port_index = 0; port_index < server.m_port_info.size(); port_index++)
{
any_connected = any_connected ||
server.m_port_info[port_index].pad_state == Proto::DsState::Connected;
server.m_port_info[port_index] = {};
server.m_port_info[port_index].pad_id = static_cast<u8>(port_index);
}
// We can't only remove devices added by this server as we wouldn't know which they are
if (any_connected)
g_controller_interface.PlatformPopulateDevices([] { PopulateDevices(); });
}
}
}
@ -313,7 +359,6 @@ static void Restart()
StopHotplugThread();
s_next_listports = std::chrono::steady_clock::time_point::min();
for (auto& server : s_servers)
{
for (size_t port_index = 0; port_index < server.m_port_info.size(); port_index++)
@ -326,6 +371,7 @@ static void Restart()
PopulateDevices(); // Only removes devices
s_client_uid = Common::Random::GenerateValue<u32>();
s_next_listports_time = SteadyClock::now();
if (s_servers_enabled && !s_servers.empty())
StartHotplugThread();

View File

@ -113,7 +113,7 @@ struct ListPorts
MessageHeader header;
u32 message_type;
u32 pad_request_count;
std::array<u8, 4> pad_id;
std::array<u8, 4> pad_ids;
};
struct PortInfo
@ -178,7 +178,7 @@ struct PadDataResponse
u8 trigger_l2;
Touch touch1;
Touch touch2;
u64 timestamp_us;
u64 accelerometer_timestamp_us;
float accelerometer_x_g;
float accelerometer_y_g;
float accelerometer_z_g;