virtio_ring: shadow available ring flags & index

Improves cacheline transfer flow of available ring header.

Virtqueues are implemented as a pair of rings, one producer->consumer
avail ring and one consumer->producer used ring; preceding the
avail ring in memory are two contiguous u16 fields -- avail->flags
and avail->idx. A producer posts work by writing to avail->idx and
a consumer reads avail->idx.

The flags and idx fields only need to be written by a producer CPU
and only read by a consumer CPU; when the producer and consumer are
running on different CPUs and the virtio_ring code is structured to
only have source writes/sink reads, we can continuously transfer the
avail header cacheline between 'M' states between cores. This flow
optimizes core -> core bandwidth on certain CPUs.

(see: "Software Optimization Guide for AMD Family 15h Processors",
Section 11.6; similar language appears in the 10h guide and should
apply to CPUs w/ exclusive caches, using LLC as a transfer cache)

Unfortunately the existing virtio_ring code issued reads to the
avail->idx and read-modify-writes to avail->flags on the producer.

This change shadows the flags and index fields in producer memory;
the vring code now reads from the shadows and only ever writes to
avail->flags and avail->idx, allowing the cacheline to transfer
core -> core optimally.

In a concurrent version of vring_bench, the time required for
10,000,000 buffer checkout/returns was reduced by ~2% (average
across many runs) on an AMD Piledriver (15h) CPU:

(w/o shadowing):
 Performance counter stats for './vring_bench':
     5,451,082,016      L1-dcache-loads
     ...
       2.221477739 seconds time elapsed

(w/ shadowing):
 Performance counter stats for './vring_bench':
     5,405,701,361      L1-dcache-loads
     ...
       2.168405376 seconds time elapsed

The further away (in a NUMA sense) virtio producers and consumers are
from each other, the more we expect to benefit. Physical implementations
of virtio devices and implementations of virtio where the consumer polls
vring avail indexes (vhost) should also benefit.

Signed-off-by: Venkatesh Srinivas <venkateshs@google.com>
Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
This commit is contained in:
Venkatesh Srinivas 2015-11-10 16:21:07 -08:00 committed by Michael S. Tsirkin
parent 82107539bb
commit f277ec42f3

View File

@ -80,6 +80,12 @@ struct vring_virtqueue {
/* Last used index we've seen. */ /* Last used index we've seen. */
u16 last_used_idx; u16 last_used_idx;
/* Last written value to avail->flags */
u16 avail_flags_shadow;
/* Last written value to avail->idx in guest byte order */
u16 avail_idx_shadow;
/* How to notify other side. FIXME: commonalize hcalls! */ /* How to notify other side. FIXME: commonalize hcalls! */
bool (*notify)(struct virtqueue *vq); bool (*notify)(struct virtqueue *vq);
@ -235,13 +241,14 @@ static inline int virtqueue_add(struct virtqueue *_vq,
/* Put entry in available array (but don't update avail->idx until they /* Put entry in available array (but don't update avail->idx until they
* do sync). */ * do sync). */
avail = virtio16_to_cpu(_vq->vdev, vq->vring.avail->idx) & (vq->vring.num - 1); avail = vq->avail_idx_shadow & (vq->vring.num - 1);
vq->vring.avail->ring[avail] = cpu_to_virtio16(_vq->vdev, head); vq->vring.avail->ring[avail] = cpu_to_virtio16(_vq->vdev, head);
/* Descriptors and available array need to be set before we expose the /* Descriptors and available array need to be set before we expose the
* new available array entries. */ * new available array entries. */
virtio_wmb(vq->weak_barriers); virtio_wmb(vq->weak_barriers);
vq->vring.avail->idx = cpu_to_virtio16(_vq->vdev, virtio16_to_cpu(_vq->vdev, vq->vring.avail->idx) + 1); vq->avail_idx_shadow++;
vq->vring.avail->idx = cpu_to_virtio16(_vq->vdev, vq->avail_idx_shadow);
vq->num_added++; vq->num_added++;
pr_debug("Added buffer head %i to %p\n", head, vq); pr_debug("Added buffer head %i to %p\n", head, vq);
@ -354,8 +361,8 @@ bool virtqueue_kick_prepare(struct virtqueue *_vq)
* event. */ * event. */
virtio_mb(vq->weak_barriers); virtio_mb(vq->weak_barriers);
old = virtio16_to_cpu(_vq->vdev, vq->vring.avail->idx) - vq->num_added; old = vq->avail_idx_shadow - vq->num_added;
new = virtio16_to_cpu(_vq->vdev, vq->vring.avail->idx); new = vq->avail_idx_shadow;
vq->num_added = 0; vq->num_added = 0;
#ifdef DEBUG #ifdef DEBUG
@ -510,7 +517,7 @@ void *virtqueue_get_buf(struct virtqueue *_vq, unsigned int *len)
/* If we expect an interrupt for the next entry, tell host /* If we expect an interrupt for the next entry, tell host
* by writing event index and flush out the write before * by writing event index and flush out the write before
* the read in the next get_buf call. */ * the read in the next get_buf call. */
if (!(vq->vring.avail->flags & cpu_to_virtio16(_vq->vdev, VRING_AVAIL_F_NO_INTERRUPT))) { if (!(vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT)) {
vring_used_event(&vq->vring) = cpu_to_virtio16(_vq->vdev, vq->last_used_idx); vring_used_event(&vq->vring) = cpu_to_virtio16(_vq->vdev, vq->last_used_idx);
virtio_mb(vq->weak_barriers); virtio_mb(vq->weak_barriers);
} }
@ -537,7 +544,11 @@ void virtqueue_disable_cb(struct virtqueue *_vq)
{ {
struct vring_virtqueue *vq = to_vvq(_vq); struct vring_virtqueue *vq = to_vvq(_vq);
vq->vring.avail->flags |= cpu_to_virtio16(_vq->vdev, VRING_AVAIL_F_NO_INTERRUPT); if (!(vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT)) {
vq->avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow);
}
} }
EXPORT_SYMBOL_GPL(virtqueue_disable_cb); EXPORT_SYMBOL_GPL(virtqueue_disable_cb);
@ -565,7 +576,10 @@ unsigned virtqueue_enable_cb_prepare(struct virtqueue *_vq)
/* Depending on the VIRTIO_RING_F_EVENT_IDX feature, we need to /* Depending on the VIRTIO_RING_F_EVENT_IDX feature, we need to
* either clear the flags bit or point the event index at the next * either clear the flags bit or point the event index at the next
* entry. Always do both to keep code simple. */ * entry. Always do both to keep code simple. */
vq->vring.avail->flags &= cpu_to_virtio16(_vq->vdev, ~VRING_AVAIL_F_NO_INTERRUPT); if (vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT) {
vq->avail_flags_shadow &= ~VRING_AVAIL_F_NO_INTERRUPT;
vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow);
}
vring_used_event(&vq->vring) = cpu_to_virtio16(_vq->vdev, last_used_idx = vq->last_used_idx); vring_used_event(&vq->vring) = cpu_to_virtio16(_vq->vdev, last_used_idx = vq->last_used_idx);
END_USE(vq); END_USE(vq);
return last_used_idx; return last_used_idx;
@ -633,9 +647,12 @@ bool virtqueue_enable_cb_delayed(struct virtqueue *_vq)
/* Depending on the VIRTIO_RING_F_USED_EVENT_IDX feature, we need to /* Depending on the VIRTIO_RING_F_USED_EVENT_IDX feature, we need to
* either clear the flags bit or point the event index at the next * either clear the flags bit or point the event index at the next
* entry. Always do both to keep code simple. */ * entry. Always do both to keep code simple. */
vq->vring.avail->flags &= cpu_to_virtio16(_vq->vdev, ~VRING_AVAIL_F_NO_INTERRUPT); if (vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT) {
vq->avail_flags_shadow &= ~VRING_AVAIL_F_NO_INTERRUPT;
vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow);
}
/* TODO: tune this threshold */ /* TODO: tune this threshold */
bufs = (u16)(virtio16_to_cpu(_vq->vdev, vq->vring.avail->idx) - vq->last_used_idx) * 3 / 4; bufs = (u16)(vq->avail_idx_shadow - vq->last_used_idx) * 3 / 4;
vring_used_event(&vq->vring) = cpu_to_virtio16(_vq->vdev, vq->last_used_idx + bufs); vring_used_event(&vq->vring) = cpu_to_virtio16(_vq->vdev, vq->last_used_idx + bufs);
virtio_mb(vq->weak_barriers); virtio_mb(vq->weak_barriers);
if (unlikely((u16)(virtio16_to_cpu(_vq->vdev, vq->vring.used->idx) - vq->last_used_idx) > bufs)) { if (unlikely((u16)(virtio16_to_cpu(_vq->vdev, vq->vring.used->idx) - vq->last_used_idx) > bufs)) {
@ -670,7 +687,8 @@ void *virtqueue_detach_unused_buf(struct virtqueue *_vq)
/* detach_buf clears data, so grab it now. */ /* detach_buf clears data, so grab it now. */
buf = vq->data[i]; buf = vq->data[i];
detach_buf(vq, i); detach_buf(vq, i);
vq->vring.avail->idx = cpu_to_virtio16(_vq->vdev, virtio16_to_cpu(_vq->vdev, vq->vring.avail->idx) - 1); vq->avail_idx_shadow--;
vq->vring.avail->idx = cpu_to_virtio16(_vq->vdev, vq->avail_idx_shadow);
END_USE(vq); END_USE(vq);
return buf; return buf;
} }
@ -735,6 +753,8 @@ struct virtqueue *vring_new_virtqueue(unsigned int index,
vq->weak_barriers = weak_barriers; vq->weak_barriers = weak_barriers;
vq->broken = false; vq->broken = false;
vq->last_used_idx = 0; vq->last_used_idx = 0;
vq->avail_flags_shadow = 0;
vq->avail_idx_shadow = 0;
vq->num_added = 0; vq->num_added = 0;
list_add_tail(&vq->vq.list, &vdev->vqs); list_add_tail(&vq->vq.list, &vdev->vqs);
#ifdef DEBUG #ifdef DEBUG
@ -746,8 +766,10 @@ struct virtqueue *vring_new_virtqueue(unsigned int index,
vq->event = virtio_has_feature(vdev, VIRTIO_RING_F_EVENT_IDX); vq->event = virtio_has_feature(vdev, VIRTIO_RING_F_EVENT_IDX);
/* No callback? Tell other side not to bother us. */ /* No callback? Tell other side not to bother us. */
if (!callback) if (!callback) {
vq->vring.avail->flags |= cpu_to_virtio16(vdev, VRING_AVAIL_F_NO_INTERRUPT); vq->avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
vq->vring.avail->flags = cpu_to_virtio16(vdev, vq->avail_flags_shadow);
}
/* Put everything in free lists. */ /* Put everything in free lists. */
vq->free_head = 0; vq->free_head = 0;