[libc] Factor specifics of packet type out of process

NFC. Simplifies process slightly, gives more options for testing it.

Reviewed By: jhuber6

Differential Revision: https://reviews.llvm.org/D153604
This commit is contained in:
Jon Chesterfield 2023-06-23 03:45:22 +01:00
parent 7e799342e1
commit d4d8cd8446
2 changed files with 53 additions and 47 deletions

View File

@ -74,7 +74,7 @@ constexpr uint64_t DEFAULT_PORT_COUNT = 64;
/// - The client will always start with a 'send' operation.
/// - The server will always start with a 'recv' operation.
/// - Every 'send' or 'recv' call is mirrored by the other process.
template <bool Invert, uint32_t lane_size> struct Process {
template <bool Invert, typename Packet> struct Process {
LIBC_INLINE Process() = default;
LIBC_INLINE Process(const Process &) = delete;
LIBC_INLINE Process &operator=(const Process &) = delete;
@ -85,7 +85,7 @@ template <bool Invert, uint32_t lane_size> struct Process {
uint64_t port_count;
cpp::Atomic<uint32_t> *inbox;
cpp::Atomic<uint32_t> *outbox;
Packet<lane_size> *packet;
Packet *packet;
cpp::Atomic<uint32_t> lock[DEFAULT_PORT_COUNT] = {0};
@ -96,8 +96,8 @@ template <bool Invert, uint32_t lane_size> struct Process {
advance(buffer, inbox_offset(port_count)));
this->outbox = reinterpret_cast<cpp::Atomic<uint32_t> *>(
advance(buffer, outbox_offset(port_count)));
this->packet = reinterpret_cast<Packet<lane_size> *>(
advance(buffer, buffer_offset(port_count)));
this->packet =
reinterpret_cast<Packet *>(advance(buffer, buffer_offset(port_count)));
}
/// Returns the beginning of the unified buffer. Intended for initializing the
@ -221,30 +221,6 @@ template <bool Invert, uint32_t lane_size> struct Process {
gpu::sync_lane(lane_mask);
}
/// Invokes a function accross every active buffer across the total lane size.
LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
Packet<lane_size> &packet) {
if constexpr (is_process_gpu()) {
fn(&packet.payload.slot[gpu::get_lane_id()]);
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
if (packet.header.mask & 1ul << i)
fn(&packet.payload.slot[i]);
}
}
/// Alternate version that also provides the index of the current lane.
LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
Packet<lane_size> &packet) {
if constexpr (is_process_gpu()) {
fn(&packet.payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
if (packet.header.mask & 1ul << i)
fn(&packet.payload.slot[i], i);
}
}
/// Number of bytes to allocate for an inbox or outbox.
LIBC_INLINE static constexpr uint64_t mailbox_bytes(uint64_t port_count) {
return port_count * sizeof(cpp::Atomic<uint32_t>);
@ -252,7 +228,7 @@ template <bool Invert, uint32_t lane_size> struct Process {
/// Number of bytes to allocate for the buffer containing the packets.
LIBC_INLINE static constexpr uint64_t buffer_bytes(uint64_t port_count) {
return port_count * sizeof(Packet<lane_size>);
return port_count * sizeof(Packet);
}
/// Offset of the inbox in memory. This is the same as the outbox if inverted.
@ -267,14 +243,40 @@ template <bool Invert, uint32_t lane_size> struct Process {
/// Offset of the buffer containing the packets after the inbox and outbox.
LIBC_INLINE static constexpr uint64_t buffer_offset(uint64_t port_count) {
return align_up(2 * mailbox_bytes(port_count), alignof(Packet<lane_size>));
return align_up(2 * mailbox_bytes(port_count), alignof(Packet));
}
};
/// Invokes a function accross every active buffer across the total lane size.
template <uint32_t lane_size>
static LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
Packet<lane_size> &packet) {
if constexpr (is_process_gpu()) {
fn(&packet.payload.slot[gpu::get_lane_id()]);
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
if (packet.header.mask & 1ul << i)
fn(&packet.payload.slot[i]);
}
}
/// Alternate version that also provides the index of the current lane.
template <uint32_t lane_size>
static LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
Packet<lane_size> &packet) {
if constexpr (is_process_gpu()) {
fn(&packet.payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
} else {
for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
if (packet.header.mask & 1ul << i)
fn(&packet.payload.slot[i], i);
}
}
/// The port provides the interface to communicate between the multiple
/// processes. A port is conceptually an index into the memory provided by the
/// underlying process that is guarded by a lock bit.
template <bool T, uint32_t S> struct Port {
template <bool T, typename S> struct Port {
LIBC_INLINE Port(Process<T, S> &process, uint64_t lane_mask, uint64_t index,
uint32_t out)
: process(process), lane_mask(lane_mask), index(index), out(out),
@ -330,7 +332,7 @@ struct Client {
LIBC_INLINE Client &operator=(const Client &) = delete;
LIBC_INLINE ~Client() = default;
using Port = rpc::Port<false, gpu::LANE_SIZE>;
using Port = rpc::Port<false, Packet<gpu::LANE_SIZE>>;
template <uint16_t opcode> LIBC_INLINE cpp::optional<Port> try_open();
template <uint16_t opcode> LIBC_INLINE Port open();
@ -339,7 +341,7 @@ struct Client {
}
private:
Process<false, gpu::LANE_SIZE> process;
Process<false, Packet<gpu::LANE_SIZE>> process;
};
/// The RPC server used to respond to the client.
@ -349,7 +351,7 @@ template <uint32_t lane_size> struct Server {
LIBC_INLINE Server &operator=(const Server &) = delete;
LIBC_INLINE ~Server() = default;
using Port = rpc::Port<true, lane_size>;
using Port = rpc::Port<true, Packet<lane_size>>;
LIBC_INLINE cpp::optional<Port> try_open();
LIBC_INLINE Port open();
@ -362,15 +364,15 @@ template <uint32_t lane_size> struct Server {
}
LIBC_INLINE static uint64_t allocation_size(uint64_t port_count) {
return Process<true, lane_size>::allocation_size(port_count);
return Process<true, Packet<lane_size>>::allocation_size(port_count);
}
private:
Process<true, lane_size> process;
Process<true, Packet<lane_size>> process;
};
/// Applies \p fill to the shared buffer and initiates a send operation.
template <bool T, uint32_t S>
template <bool T, typename S>
template <typename F>
LIBC_INLINE void Port<T, S>::send(F fill) {
uint32_t in = owns_buffer ? out ^ T : process.load_inbox(index);
@ -379,14 +381,14 @@ LIBC_INLINE void Port<T, S>::send(F fill) {
process.wait_for_ownership(index, out, in);
// Apply the \p fill function to initialize the buffer and release the memory.
process.invoke_rpc(fill, process.packet[index]);
invoke_rpc(fill, process.packet[index]);
out = process.invert_outbox(index, out);
owns_buffer = false;
receive = false;
}
/// Applies \p use to the shared buffer and acknowledges the send.
template <bool T, uint32_t S>
template <bool T, typename S>
template <typename U>
LIBC_INLINE void Port<T, S>::recv(U use) {
// We only exchange ownership of the buffer during a receive if we are waiting
@ -402,13 +404,13 @@ LIBC_INLINE void Port<T, S>::recv(U use) {
process.wait_for_ownership(index, out, in);
// Apply the \p use function to read the memory out of the buffer.
process.invoke_rpc(use, process.packet[index]);
invoke_rpc(use, process.packet[index]);
receive = true;
owns_buffer = true;
}
/// Combines a send and receive into a single function.
template <bool T, uint32_t S>
template <bool T, typename S>
template <typename F, typename U>
LIBC_INLINE void Port<T, S>::send_and_recv(F fill, U use) {
send(fill);
@ -418,7 +420,7 @@ LIBC_INLINE void Port<T, S>::send_and_recv(F fill, U use) {
/// Combines a receive and send operation into a single function. The \p work
/// function modifies the buffer in-place and the send is only used to initiate
/// the copy back.
template <bool T, uint32_t S>
template <bool T, typename S>
template <typename W>
LIBC_INLINE void Port<T, S>::recv_and_send(W work) {
recv(work);
@ -427,7 +429,7 @@ LIBC_INLINE void Port<T, S>::recv_and_send(W work) {
/// Helper routine to simplify the interface when sending from the GPU using
/// thread private pointers to the underlying value.
template <bool T, uint32_t S>
template <bool T, typename S>
LIBC_INLINE void Port<T, S>::send_n(const void *src, uint64_t size) {
static_assert(is_process_gpu(), "Only valid when running on the GPU");
const void **src_ptr = &src;
@ -437,7 +439,7 @@ LIBC_INLINE void Port<T, S>::send_n(const void *src, uint64_t size) {
/// Sends an arbitrarily sized data buffer \p src across the shared channel in
/// multiples of the packet length.
template <bool T, uint32_t S>
template <bool T, typename S>
LIBC_INLINE void Port<T, S>::send_n(const void *const *src, uint64_t *size) {
uint64_t num_sends = 0;
send([&](Buffer *buffer, uint32_t id) {
@ -467,7 +469,7 @@ LIBC_INLINE void Port<T, S>::send_n(const void *const *src, uint64_t *size) {
/// Receives an arbitrarily sized data buffer across the shared channel in
/// multiples of the packet length. The \p alloc function is called with the
/// size of the data so that we can initialize the size of the \p dst buffer.
template <bool T, uint32_t S>
template <bool T, typename S>
template <typename A>
LIBC_INLINE void Port<T, S>::recv_n(void **dst, uint64_t *size, A &&alloc) {
uint64_t num_recvs = 0;

View File

@ -13,8 +13,12 @@
namespace {
enum { lane_size = 8, port_count = 4 };
using ProcAType = __llvm_libc::rpc::Process<false, lane_size>;
using ProcBType = __llvm_libc::rpc::Process<true, lane_size>;
struct Packet {
uint64_t unused;
};
using ProcAType = __llvm_libc::rpc::Process<false, Packet>;
using ProcBType = __llvm_libc::rpc::Process<true, Packet>;
static_assert(ProcAType::inbox_offset(port_count) ==
ProcBType::outbox_offset(port_count));