io: flush zerocopy socket error queue on sendmsg failure due to ENOBUF

The kernel allocates extra metadata SKBs in case of a zerocopy send,
eventually used for zerocopy's notification mechanism. This metadata
memory is accounted for in the OPTMEM limit. The kernel queues
completion notifications on the socket error queue and this error queue
is freed when userspace reads it.

Usually, in the case of in-order processing, the kernel will batch the
notifications and merge the metadata into a single SKB and free the
rest. As a result, it never exceeds the OPTMEM limit. However, if there
is any out-of-order processing or intermittent zerocopy failures, this
error chain can grow significantly, exhausting the OPTMEM limit. As a
result, all new sendmsg requests fail to allocate any new SKB, leading
to an ENOBUF error. Depending on the amount of data queued before the
flush (i.e., large live migration iterations), even large OPTMEM limits
are prone to failure.

To work around this, if we encounter an ENOBUF error with a zerocopy
sendmsg, flush the error queue and retry once more.

Co-authored-by: Manish Mishra <manish.mishra@nutanix.com>
Signed-off-by: Tejus GK <tejus.gk@nutanix.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
[DB: change TRUE/FALSE to true/false for 'bool' type;
     add more #ifdef QEMU_MSG_ZEROCOPY blocks]
Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
This commit is contained in:
Manish Mishra
2025-10-24 13:15:43 +00:00
committed by Daniel P. Berrangé
parent e52d822716
commit 84005f4a2b
2 changed files with 75 additions and 14 deletions

View File

@@ -50,6 +50,11 @@ struct QIOChannelSocket {
ssize_t zero_copy_queued;
ssize_t zero_copy_sent;
bool blocking;
/**
* This flag indicates whether any new data was successfully sent with
* zerocopy since the last qio_channel_socket_flush() call.
*/
bool new_zero_copy_sent_success;
};

View File

@@ -37,6 +37,12 @@
#define SOCKET_MAX_FDS 16
#ifdef QEMU_MSG_ZEROCOPY
static int qio_channel_socket_flush_internal(QIOChannel *ioc,
bool block,
Error **errp);
#endif
SocketAddress *
qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
Error **errp)
@@ -66,6 +72,7 @@ qio_channel_socket_new(void)
sioc->zero_copy_queued = 0;
sioc->zero_copy_sent = 0;
sioc->blocking = false;
sioc->new_zero_copy_sent_success = false;
ioc = QIO_CHANNEL(sioc);
qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
@@ -618,6 +625,10 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
size_t fdsize = sizeof(int) * nfds;
struct cmsghdr *cmsg;
int sflags = 0;
#ifdef QEMU_MSG_ZEROCOPY
bool blocking = sioc->blocking;
bool zerocopy_flushed_once = false;
#endif
memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
@@ -662,13 +673,30 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
return QIO_CHANNEL_ERR_BLOCK;
case EINTR:
goto retry;
#ifdef QEMU_MSG_ZEROCOPY
case ENOBUFS:
if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
error_setg_errno(errp, errno,
"Process can't lock enough memory for using MSG_ZEROCOPY");
return -1;
/**
* Socket error queueing may exhaust the OPTMEM limit. Try
* flushing the error queue once.
*/
if (!zerocopy_flushed_once) {
ret = qio_channel_socket_flush_internal(ioc, blocking,
errp);
if (ret < 0) {
return -1;
}
zerocopy_flushed_once = true;
goto retry;
} else {
error_setg_errno(errp, errno,
"Process can't lock enough memory for "
"using MSG_ZEROCOPY");
return -1;
}
}
break;
#endif
}
error_setg_errno(errp, errno,
@@ -777,8 +805,9 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
#ifdef QEMU_MSG_ZEROCOPY
static int qio_channel_socket_flush(QIOChannel *ioc,
Error **errp)
static int qio_channel_socket_flush_internal(QIOChannel *ioc,
bool block,
Error **errp)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
struct msghdr msg = {};
@@ -786,7 +815,6 @@ static int qio_channel_socket_flush(QIOChannel *ioc,
struct cmsghdr *cm;
char control[CMSG_SPACE(sizeof(*serr))];
int received;
int ret;
if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
return 0;
@@ -796,16 +824,25 @@ static int qio_channel_socket_flush(QIOChannel *ioc,
msg.msg_controllen = sizeof(control);
memset(control, 0, sizeof(control));
ret = 1;
while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
if (received < 0) {
switch (errno) {
case EAGAIN:
/* Nothing on errqueue, wait until something is available */
qio_channel_wait(ioc, G_IO_ERR);
continue;
if (block) {
/*
* Nothing on errqueue, wait until something is
* available.
*
* Use G_IO_ERR instead of G_IO_IN since MSG_ERRQUEUE reads
* are signaled via POLLERR, not POLLIN, as the kernel
* sets POLLERR when zero-copy notificatons appear on the
* socket error queue.
*/
qio_channel_wait(ioc, G_IO_ERR);
continue;
}
return 0;
case EINTR:
continue;
default:
@@ -843,13 +880,32 @@ static int qio_channel_socket_flush(QIOChannel *ioc,
/* No errors, count successfully finished sendmsg()*/
sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
/* If any sendmsg() succeeded using zero copy, return 0 at the end */
/* If any sendmsg() succeeded using zero copy, mark zerocopy success */
if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
ret = 0;
sioc->new_zero_copy_sent_success = true;
}
}
return ret;
return 0;
}
static int qio_channel_socket_flush(QIOChannel *ioc,
Error **errp)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
int ret;
ret = qio_channel_socket_flush_internal(ioc, true, errp);
if (ret < 0) {
return ret;
}
if (sioc->new_zero_copy_sent_success) {
sioc->new_zero_copy_sent_success = false;
return 0;
}
return 1;
}
#endif /* QEMU_MSG_ZEROCOPY */