allow changing packet sharing behavior in old p2p networking via ini config

This commit is contained in:
a
2025-11-24 21:34:13 +02:00
parent e88797f733
commit 75cd76e69f
6 changed files with 260 additions and 95 deletions

View File

@@ -18,6 +18,7 @@
#pragma once
#include "base.h"
#include <tuple>
class P2p_Manager
@@ -31,8 +32,15 @@ private:
struct Packet_t
{
private:
std::chrono::high_resolution_clock::time_point time_created = std::chrono::high_resolution_clock::now();
public:
bool is_processed = false;
std::vector<char> data{};
EP2PSend send_type = EP2PSend::k_EP2PSendUnreliable;
const std::chrono::high_resolution_clock::time_point& get_time_created() const;
};
struct Channel_t
@@ -69,7 +77,17 @@ private:
// true if the connection was already accepted,
// false otherwise.
bool store_packet(CSteamID my_id, CSteamID steamIDRemote, const void *pubData, uint32 cubData, int nChannel);
bool store_packet(
CSteamID my_id, CSteamID steamIDRemote,
const void *pubData, uint32 cubData, int nChannel,
EP2PSend send_type
);
std::optional<std::tuple<
decltype(connections)::iterator,
decltype(Connection_t::channels)::iterator,
decltype(Channel_t::packets)::iterator
>> get_next_packet(CSteamID my_id, int nChannel);
void periodic_handle_connections(const std::chrono::high_resolution_clock::time_point &now);
void periodic_handle_channels(const std::chrono::high_resolution_clock::time_point &now);

View File

@@ -200,6 +200,32 @@ struct Branch_Info {
bool active = false;
};
struct OldP2pBehavior {
enum class EPacketShareMode {
// if the sending type is unreliable (UDP), share packets between gameserver and client
// otherwise, don't share packets
DEFAULT,
// always share packets between gameserver and client
ALWAYS_SHARE,
// never share packets between gameserver and client
NEVER_SHARE,
_LAST,
};
static EPacketShareMode to_share_mode(int val) {
if (val < 0 || val >= (unsigned)EPacketShareMode::_LAST) {
return EPacketShareMode::DEFAULT;
}
return (EPacketShareMode)val;
}
EPacketShareMode mode = EPacketShareMode::DEFAULT;
};
class Settings {
private:
CSteamID steam_id{}; // user id
@@ -373,6 +399,9 @@ public:
// free weekend
bool free_weekend = false;
// old P2P (ISteamNetworking) behavior
OldP2pBehavior old_p2p_behavior{};
// voice chat
bool enable_voice_chat = false;

View File

@@ -81,15 +81,15 @@ message Low_Level {
}
message Network_pb {
int32 channel = 1;
bytes data = 2;
enum Types {
DATA = 0;
FAILED_CONNECT = 1;
}
Types type = 3;
Types type = 1;
int32 channel = 2;
bytes data = 3;
uint32 send_type = 4; // EP2PSend
}
message Network_Old {

View File

@@ -28,6 +28,14 @@ constexpr static double SESSION_REQUEST_DELAY = 2.0;
// "if we can't get through to the user after a timeout of 20 seconds, then an error will be posted"
constexpr static double SESSION_REQUEST_TIMEOUT = 20.0;
constexpr static double PACKET_MAX_TIME_TO_LIVE = 20.0;
const std::chrono::high_resolution_clock::time_point& P2p_Manager::Packet_t::get_time_created() const
{
return time_created;
}
void P2p_Manager::steam_networking_callback(void *object, Common_Message *msg)
@@ -196,7 +204,11 @@ P2p_Manager::~P2p_Manager()
}
bool P2p_Manager::store_packet(CSteamID my_id, CSteamID steamIDRemote, const void *pubData, uint32 cubData, int nChannel)
bool P2p_Manager::store_packet(
CSteamID my_id, CSteamID steamIDRemote,
const void *pubData, uint32 cubData, int nChannel,
EP2PSend send_type
)
{
std::lock_guard lock(p2p_mtx);
@@ -205,6 +217,7 @@ bool P2p_Manager::store_packet(CSteamID my_id, CSteamID steamIDRemote, const voi
{
Packet_t channel_msg{};
channel_msg.is_processed = conn->is_accepted;
channel_msg.send_type = send_type;
if (pubData && cubData > 0) {
channel_msg.data.assign((const char *)pubData, (const char *)pubData + cubData);
}
@@ -221,11 +234,91 @@ bool P2p_Manager::store_packet(CSteamID my_id, CSteamID steamIDRemote, const voi
return conn->is_accepted;
}
std::optional<std::tuple<
decltype(P2p_Manager::connections)::iterator,
decltype(P2p_Manager::Connection_t::channels)::iterator,
decltype(P2p_Manager::Channel_t::packets)::iterator
>> P2p_Manager::get_next_packet(CSteamID my_id, int nChannel)
{
const auto my_settings = is_same_peer(my_id, settings_client->get_local_steam_id())
? settings_client
: settings_server;
for (auto conn_it = connections.begin(); connections.end() != conn_it; ++conn_it) {
if (!conn_it->is_accepted) {
continue;
}
auto ch_it = conn_it->channels.find(nChannel);
// channel doesn't exist for this connection
if (conn_it->channels.end() == ch_it) {
continue;
}
auto &packets = ch_it->second.packets;
auto packet_it = packets.begin();
// channel has no packets
if (packets.end() == packet_it) {
continue;
}
// no need to check next packets in this channel if the first one isn't processed yet
// once the connection is accepted, all messages in all channels will be marked as processed
if (!packet_it->is_processed) {
continue;
}
const bool is_packet_for_me = is_same_peer(my_id, conn_it->peer_conn.my_dest_id);
if (!is_packet_for_me) {
bool can_share_packet = false;
switch (my_settings->old_p2p_behavior.mode) {
default:
case OldP2pBehavior::EPacketShareMode::DEFAULT: {
// appids (353090, 301300) do this:
// - send packet from client >>> to gameserver
// - use the **client** to check for these packets
// it should use the gameserver instead
// but the client is expected to return these packets to the game
// this seems to be an old behavior
//
// on the contrary appids (701160, 248390) use k_EP2PSendReliable
// and they do not need the gameserver to share its packets with the client
// even multiplayer in appid 248390 won't work if packets were shared
can_share_packet =
packet_it->send_type == EP2PSend::k_EP2PSendUnreliable ||
packet_it->send_type == EP2PSend::k_EP2PSendUnreliableNoDelay;
}
break;
case OldP2pBehavior::EPacketShareMode::ALWAYS_SHARE: {
can_share_packet = true;
}
break;
case OldP2pBehavior::EPacketShareMode::NEVER_SHARE: {
can_share_packet = false;
}
break;
}
// avoid sharing packets between client and gameserver
if (!can_share_packet) {
continue;
}
}
return std::make_tuple(conn_it, ch_it, packet_it);
}
return {};
}
bool P2p_Manager::send_packet(CSteamID my_id, CSteamID steamIDRemote, const void *pubData, uint32 cubData, EP2PSend eP2PSendType, int nChannel)
{
bool reliable = false;
if (eP2PSendType == k_EP2PSendReliable || eP2PSendType == k_EP2PSendReliableWithBuffering) {
if (eP2PSendType == EP2PSend::k_EP2PSendReliable || eP2PSendType == EP2PSend::k_EP2PSendReliableWithBuffering) {
reliable = true;
}
@@ -257,6 +350,7 @@ bool P2p_Manager::send_packet(CSteamID my_id, CSteamID steamIDRemote, const void
msg.mutable_network()->set_type(Network_pb::DATA);
msg.mutable_network()->set_channel(nChannel);
msg.mutable_network()->set_data(pubData, cubData);
msg.mutable_network()->set_send_type(eP2PSendType);
bool ret = false;
{
@@ -281,31 +375,22 @@ bool P2p_Manager::is_packet_available(CSteamID my_id, uint32 *pcubMsgSize, int n
if (pcubMsgSize) *pcubMsgSize = 0;
for (const auto &conn : connections) {
if (!conn.is_accepted) {
continue;
}
auto ch_it = conn.channels.find(nChannel);
if (conn.channels.end() == ch_it) {
continue;
}
auto msg_it = ch_it->second.packets.begin();
if (ch_it->second.packets.end() != msg_it) {
if (msg_it->is_processed) {
uint32 size = static_cast<uint32>(msg_it->data.size());
if (pcubMsgSize) {
*pcubMsgSize = size;
}
PRINT_DEBUG(" available message from=[%llu], size=[%u]", conn.peer_conn.remote_id.ConvertToUint64(), size);
return true;
}
}
auto packet_opt = get_next_packet(my_id, nChannel);
if (!packet_opt) {
return false;
}
return false;
auto &[conn_it, ch_it, packet_it] = packet_opt.value();
uint32 size = static_cast<uint32>(packet_it->data.size());
if (pcubMsgSize) {
*pcubMsgSize = size;
}
PRINT_DEBUG(
" available message from=[%llu], size=[%u]",
conn_it->peer_conn.remote_id.ConvertToUint64(), size
);
return true;
}
bool P2p_Manager::read_packet(CSteamID my_id, void *pubDest, uint32 cubDest, uint32 *pcubMsgSize, CSteamID *psteamIDRemote, int nChannel)
@@ -315,49 +400,39 @@ bool P2p_Manager::read_packet(CSteamID my_id, void *pubDest, uint32 cubDest, uin
if (pcubMsgSize) *pcubMsgSize = 0;
if (psteamIDRemote) *psteamIDRemote = k_steamIDNil;
bool read = false;
for (auto &conn : connections) {
if (!conn.is_accepted) {
continue;
}
auto ch_it = conn.channels.find(nChannel);
if (conn.channels.end() == ch_it) {
continue;
}
auto msg_it = ch_it->second.packets.begin();
if (ch_it->second.packets.end() != msg_it) {
if (msg_it->is_processed) {
if (psteamIDRemote) {
*psteamIDRemote = conn.peer_conn.remote_id;
}
uint32 size = static_cast<uint32>(msg_it->data.size());
if (cubDest < size) {
// https://partner.steamgames.com/doc/api/ISteamNetworking#ReadP2PPacket
// "If the cubDest buffer is too small for the packet, then the message will be truncated"
size = cubDest;
}
if (pcubMsgSize) {
*pcubMsgSize = size;
}
if (pubDest) {
memcpy(pubDest, msg_it->data.data(), size);
}
ch_it->second.packets.erase(msg_it);
PRINT_DEBUG(" copied message from=[%llu], size=[%u]", conn.peer_conn.remote_id.ConvertToUint64(), size);
return true;
}
}
auto packet_opt = get_next_packet(my_id, nChannel);
if (!packet_opt) {
return false;
}
return false;
auto &[conn_it, ch_it, packet_it] = packet_opt.value();
if (psteamIDRemote) {
*psteamIDRemote = conn_it->peer_conn.remote_id;
}
uint32 size = static_cast<uint32>(packet_it->data.size());
if (cubDest < size) {
// https://partner.steamgames.com/doc/api/ISteamNetworking#ReadP2PPacket
// "If the cubDest buffer is too small for the packet, then the message will be truncated"
size = cubDest;
}
if (pcubMsgSize) {
*pcubMsgSize = size;
}
if (pubDest) {
memcpy(pubDest, packet_it->data.data(), size);
}
ch_it->second.packets.erase(packet_it);
PRINT_DEBUG(
" copied message from=[%llu], size=[%u]",
conn_it->peer_conn.remote_id.ConvertToUint64(), size
);
return true;
}
bool P2p_Manager::close_channel(CSteamID my_id, CSteamID steamIDRemote, int nChannel)
@@ -436,9 +511,9 @@ bool P2p_Manager::get_session_state(CSteamID my_id, CSteamID steamIDRemote, P2PS
if (pConnectionState) {
int32 pending_packets = 0;
int32 pending_bytes = 0;
for (auto& [ch_idx, channel] : conn->channels) {
for (const auto& [ch_idx, channel] : conn->channels) {
pending_packets += (int32)channel.packets.size();
for (auto &msg : channel.packets) {
for (const auto &msg : channel.packets) {
pending_bytes += (int32)msg.data.size();
}
}
@@ -488,14 +563,18 @@ bool P2p_Manager::accept_session(CSteamID my_id, CSteamID steamIDRemote)
void P2p_Manager::periodic_handle_connections(const std::chrono::high_resolution_clock::time_point &now)
{
for (auto conn_it = connections.begin(); connections.end() != conn_it; ) {
auto conn_it = connections.begin();
while (connections.end() != conn_it) {
bool is_remove = false;
if (!conn_it->is_accepted) {
if (check_timedout(conn_it->time_added, SESSION_REQUEST_TIMEOUT, now)) {
is_remove = true;
send_peer_session_failure(conn_it->peer_conn);
conn_it = connections.erase(conn_it);
} else {
++conn_it;
}
}
if (is_remove) {
conn_it = connections.erase(conn_it);
} else {
++conn_it;
}
@@ -509,10 +588,15 @@ void P2p_Manager::periodic_handle_channels(const std::chrono::high_resolution_cl
continue;
}
// remove channels with no packets
auto ch_it = conn.channels.begin();
while (conn.channels.end() != ch_it) {
// TODO anything channel related goes here
++ch_it;
auto &channel = ch_it->second;
if (channel.packets.empty()) {
ch_it = conn.channels.erase(ch_it);
} else {
++ch_it;
}
}
}
}
@@ -524,9 +608,22 @@ void P2p_Manager::periodic_handle_packets(const std::chrono::high_resolution_clo
continue;
}
for (auto& [ch_num, channel] : conn.channels) {
for (auto &msg : channel.packets) {
msg.is_processed = true;
for (auto &[ch_num, channel] : conn.channels) {
// remove outadated packets, and mark the rest as processed
auto packet_it = channel.packets.begin();
while (channel.packets.end() != packet_it) {
bool is_remove = false;
if (check_timedout(packet_it->get_time_created(), PACKET_MAX_TIME_TO_LIVE, now)) {
is_remove = true;
} else {
packet_it->is_processed = true;
}
if (is_remove) {
packet_it = channel.packets.erase(packet_it);
} else {
++packet_it;
}
}
}
}
@@ -548,22 +645,23 @@ void P2p_Manager::periodic_callback()
void P2p_Manager::network_data_packets(Common_Message *msg)
{
const CSteamID src_id = (uint64)msg->source_id();
const CSteamID dest_id = (uint64)msg->dest_id(); // this is us
const CSteamID my_dest_id = (uint64)msg->dest_id(); // this is us
PRINT_DEBUG("got network msg from [%llu], I am [%llu], type <%u>",
src_id.ConvertToUint64(), dest_id.ConvertToUint64(), msg->network().type()
src_id.ConvertToUint64(), my_dest_id.ConvertToUint64(), msg->network().type()
);
switch (msg->network().type()) {
case Network_pb::DATA: {
PRINT_DEBUG("got network data message");
const bool conn_is_accepted = store_packet(
dest_id, src_id,
my_dest_id, src_id,
msg->network().data().c_str(), (uint32)msg->network().data().size(),
(int)msg->network().channel()
(int)msg->network().channel(),
(EP2PSend)msg->network().send_type()
);
if (!conn_is_accepted) {
trigger_session_request(src_id, dest_id);
trigger_session_request(src_id, my_dest_id);
}
}
break;
@@ -574,13 +672,13 @@ void P2p_Manager::network_data_packets(Common_Message *msg)
data.m_steamIDRemote = src_id;
data.m_eP2PSessionError = EP2PSessionError::k_EP2PSessionErrorTimeout;
get_my_callbacks(dest_id)->addCBResult(data.k_iCallback, &data, sizeof(data));
get_my_callbacks(my_dest_id)->addCBResult(data.k_iCallback, &data, sizeof(data));
{
std::lock_guard lock(p2p_mtx);
remove_connection(src_id, dest_id);
remove_connection(dest_id, src_id);
remove_connection(src_id, my_dest_id);
remove_connection(my_dest_id, src_id);
}
}
break;
@@ -590,7 +688,7 @@ void P2p_Manager::network_data_packets(Common_Message *msg)
void P2p_Manager::network_low_level(Common_Message *msg)
{
const CSteamID src_id = (uint64)msg->source_id();
const CSteamID dest_id = (uint64)msg->dest_id(); // this is us
const CSteamID my_dest_id = (uint64)msg->dest_id(); // this is us
switch (msg->low_level().type()) {
case Low_Level::CONNECT: {
@@ -603,20 +701,20 @@ void P2p_Manager::network_low_level(Common_Message *msg)
{
std::lock_guard lock(p2p_mtx);
any_conn_removed |= remove_connection(src_id, dest_id);
any_conn_removed |= remove_connection(dest_id, src_id);
any_conn_removed |= remove_connection(src_id, my_dest_id);
any_conn_removed |= remove_connection(my_dest_id, src_id);
}
if (any_conn_removed) {
PRINT_DEBUG(
"[X] remote user [%llu] disconnected, sending P2PSessionConnectFail_t, I am [%llu]",
src_id.ConvertToUint64(), dest_id.ConvertToUint64()
src_id.ConvertToUint64(), my_dest_id.ConvertToUint64()
);
P2PSessionConnectFail_t data{};
data.m_steamIDRemote = src_id;
data.m_eP2PSessionError = k_EP2PSessionErrorDestinationNotLoggedIn;
get_my_callbacks(dest_id)->addCBResult(data.k_iCallback, &data, sizeof(data));
get_my_callbacks(my_dest_id)->addCBResult(data.k_iCallback, &data, sizeof(data));
}
}

View File

@@ -1583,6 +1583,13 @@ static void parse_simple_features(class Settings *settings_client, class Setting
settings_client->download_steamhttp_requests = ini.GetBoolValue("main::connectivity", "download_steamhttp_requests", settings_client->download_steamhttp_requests);
settings_server->download_steamhttp_requests = ini.GetBoolValue("main::connectivity", "download_steamhttp_requests", settings_server->download_steamhttp_requests);
settings_client->old_p2p_behavior.mode = OldP2pBehavior::to_share_mode(
(int)ini.GetLongValue("main::connectivity", "old_p2p_packet_sharing_mode", (unsigned)settings_client->old_p2p_behavior.mode)
);
settings_server->old_p2p_behavior.mode = OldP2pBehavior::to_share_mode(
(int)ini.GetLongValue("main::connectivity", "old_p2p_packet_sharing_mode", (unsigned)settings_server->old_p2p_behavior.mode)
);
// [main::misc]
settings_client->achievement_bypass = ini.GetBoolValue("main::misc", "achievements_bypass", settings_client->achievement_bypass);

View File

@@ -125,6 +125,19 @@ disable_lobby_creation=0
# this will **not** work if the app is using native/OS web APIs
# default=0
download_steamhttp_requests=0
# sharing mode of the old P2P networking interface (ISteamNetworking).
# this defines how the gameserver and the client intances can read P2P packets received from other players.
#
# some older games require sharing the received packets, i.e if the gameserver instance received a packet,
# then the client instance should be able to detect/read it as well.
# other games won't work properly if the packets were shared,
# and the original receiver (either gameserver or client) must handle that packet
#
# 0=share packets flagged as "unreliable" (k_EP2PSendUnreliable | k_EP2PSendUnreliableNoDelay), otherwise don't share packets
# 1=always share the packets
# 2=never share the packets
# default=0
old_p2p_packet_sharing_mode=0
# mostly workarounds for specific problems
[main::misc]