internal/poll: move buffer pinning inside execIO

This is a step towards deferring adding the handle to IOCP until the
first IO operation.

The goal of this CL is to avoid the fd.isBlocking check in fd.pin,
which was happening outside execIO, and making buffer pinning less
error-prone.

This also fixes an issue where buffer used in Pwrite and WriteTo
were unpinned too early when the write buffer was larger than the
maximum chunk size.

For #76391

Cq-Include-Trybots: luci.golang.try:gotip-windows-amd64-longtest,gotip-windows-amd64-race
Change-Id: Ia181dcb57a559ae466a4341c36a307ad6678aac0
Reviewed-on: https://go-review.googlesource.com/c/go/+/740561
Reviewed-by: Damien Neil <dneil@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
This commit is contained in:
qmuntal 2026-01-30 14:29:58 +01:00 committed by Quim Muntal
parent 3b2a451cef
commit cce3fea08f
2 changed files with 57 additions and 85 deletions

View file

@ -232,27 +232,10 @@ func (fd *FD) waitIO(o *operation) error {
return err
}
// pin pins ptr for the duration of the IO operation.
// If fd is in blocking mode, pin does nothing.
func (fd *FD) pin(mode int, ptr any) {
if fd.isBlocking {
return
}
if mode == 'r' {
fd.readPinner.Pin(ptr)
} else {
fd.writePinner.Pin(ptr)
}
}
// execIO executes a single IO operation o.
// It supports both synchronous and asynchronous IO.
func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error)) (int, error) {
if mode == 'r' {
defer fd.readPinner.Unpin()
} else {
defer fd.writePinner.Unpin()
}
// buf, if not nil, will be pinned during the lifetime of the operation.
func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error), buf []byte) (int, error) {
// Notify runtime netpoll about starting IO.
err := fd.pd.prepare(mode, fd.isFile)
if err != nil {
@ -268,21 +251,37 @@ func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error)) (int,
runtimeCtx: fd.pd.runtimeCtx,
mode: int32(mode),
}
// Start IO.
if !fd.isBlocking && !fd.pollable() {
// If the handle is opened for overlapped IO but we can't
// use the runtime poller, then we need to use an
// event to wait for the IO to complete.
h, err := windows.CreateEvent(nil, 0, 0, nil)
if err != nil {
// This shouldn't happen when all CreateEvent arguments are zero.
panic(err)
if !fd.isBlocking {
if len(buf) > 0 {
ptr := unsafe.SliceData(buf)
if mode == 'r' {
fd.readPinner.Pin(ptr)
} else {
fd.writePinner.Pin(ptr)
}
defer func() {
if mode == 'r' {
fd.readPinner.Unpin()
} else {
fd.writePinner.Unpin()
}
}()
}
if !fd.pollable() {
// If the handle is opened for overlapped IO but we can't
// use the runtime poller, then we need to use an
// event to wait for the IO to complete.
h, err := windows.CreateEvent(nil, 0, 0, nil)
if err != nil {
// This shouldn't happen when all CreateEvent arguments are zero.
panic(err)
}
// Set the low bit so that the external IOCP doesn't receive the completion packet.
o.o.HEvent = h | 1
defer syscall.CloseHandle(h)
}
// Set the low bit so that the external IOCP doesn't receive the completion packet.
o.o.HEvent = h | 1
defer syscall.CloseHandle(h)
}
fd.pin(mode, o)
// Start IO.
qty, err := submit(o)
var waitErr error
// Blocking operations shouldn't return ERROR_IO_PENDING.
@ -544,10 +543,6 @@ func (fd *FD) Read(buf []byte) (int, error) {
defer fd.readUnlock()
}
if len(buf) > 0 {
fd.pin('r', &buf[0])
}
if len(buf) > maxRW {
buf = buf[:maxRW]
}
@ -561,7 +556,7 @@ func (fd *FD) Read(buf []byte) (int, error) {
n, err = fd.execIO('r', func(o *operation) (qty uint32, err error) {
err = syscall.ReadFile(fd.Sysfd, buf, &qty, fd.overlapped(o))
return qty, err
})
}, buf)
fd.addOffset(n)
switch err {
case syscall.ERROR_HANDLE_EOF:
@ -577,7 +572,7 @@ func (fd *FD) Read(buf []byte) (int, error) {
var flags uint32
err = syscall.WSARecv(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, &o.o, nil)
return qty, err
})
}, buf)
if race.Enabled {
race.Acquire(unsafe.Pointer(&ioSync))
}
@ -674,10 +669,6 @@ func (fd *FD) Pread(buf []byte, off int64) (int, error) {
}
defer fd.readWriteUnlock()
if len(buf) > 0 {
fd.pin('r', &buf[0])
}
if len(buf) > maxRW {
buf = buf[:maxRW]
}
@ -702,7 +693,7 @@ func (fd *FD) Pread(buf []byte, off int64) (int, error) {
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
err = syscall.ReadFile(fd.Sysfd, buf, &qty, &o.o)
return qty, err
})
}, buf)
if err == syscall.ERROR_HANDLE_EOF {
err = io.EOF
}
@ -725,8 +716,6 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) {
}
defer fd.readUnlock()
fd.pin('r', &buf[0])
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
@ -734,7 +723,7 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) {
var flags uint32
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
return qty, err
})
}, buf)
err = fd.eofError(n, err)
if err != nil {
return n, nil, err
@ -756,8 +745,6 @@ func (fd *FD) ReadFromInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
}
defer fd.readUnlock()
fd.pin('r', &buf[0])
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
@ -765,7 +752,7 @@ func (fd *FD) ReadFromInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
var flags uint32
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
return qty, err
})
}, buf)
err = fd.eofError(n, err)
if err != nil {
return n, err
@ -787,8 +774,6 @@ func (fd *FD) ReadFromInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
}
defer fd.readUnlock()
fd.pin('r', &buf[0])
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
@ -796,7 +781,7 @@ func (fd *FD) ReadFromInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
var flags uint32
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
return qty, err
})
}, buf)
err = fd.eofError(n, err)
if err != nil {
return n, err
@ -819,9 +804,6 @@ func (fd *FD) Write(buf []byte) (int, error) {
defer fd.writeUnlock()
}
if len(buf) > 0 {
fd.pin('w', &buf[0])
}
var ntotal int
for {
max := len(buf)
@ -838,7 +820,7 @@ func (fd *FD) Write(buf []byte) (int, error) {
n, err = fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WriteFile(fd.Sysfd, b, &qty, fd.overlapped(o))
return qty, err
})
}, b)
fd.addOffset(n)
case kindNet:
if race.Enabled {
@ -847,7 +829,7 @@ func (fd *FD) Write(buf []byte) (int, error) {
n, err = fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WSASend(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, &o.o, nil)
return qty, err
})
}, b)
}
ntotal += n
if ntotal == len(buf) || err != nil {
@ -914,10 +896,6 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) {
}
defer fd.readWriteUnlock()
if len(buf) > 0 {
fd.pin('w', &buf[0])
}
if fd.isBlocking {
curoffset, err := syscall.Seek(fd.Sysfd, 0, io.SeekCurrent)
if err != nil {
@ -945,7 +923,7 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WriteFile(fd.Sysfd, buf[ntotal:max], &qty, &o.o)
return qty, err
})
}, buf[ntotal:max])
if n > 0 {
ntotal += n
}
@ -975,7 +953,7 @@ func (fd *FD) Writev(buf *[][]byte) (int64, error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WSASend(fd.Sysfd, &(*bufs)[0], uint32(len(*bufs)), &qty, 0, &o.o, nil)
return qty, err
})
}, nil)
TestHookDidWritev(n)
consume(buf, int64(n))
return int64(n), err
@ -993,12 +971,10 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WSASendto(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa, &o.o, nil)
return qty, err
})
}, nil)
return n, err
}
fd.pin('w', &buf[0])
ntotal := 0
for len(buf) > 0 {
b := buf
@ -1008,7 +984,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WSASendto(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa, &o.o, nil)
return qty, err
})
}, b)
ntotal += int(n)
if err != nil {
return ntotal, err
@ -1030,12 +1006,10 @@ func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet4(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa4, &o.o, nil)
return qty, err
})
}, nil)
return n, err
}
fd.pin('w', &buf[0])
ntotal := 0
for len(buf) > 0 {
b := buf
@ -1045,7 +1019,7 @@ func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet4(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa4, &o.o, nil)
return qty, err
})
}, b)
ntotal += int(n)
if err != nil {
return ntotal, err
@ -1067,12 +1041,10 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet6(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa6, &o.o, nil)
return qty, err
})
}, nil)
return n, err
}
fd.pin('w', &buf[0])
ntotal := 0
for len(buf) > 0 {
b := buf
@ -1082,7 +1054,7 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet6(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa6, &o.o, nil)
return qty, err
})
}, b)
ntotal += int(n)
if err != nil {
return ntotal, err
@ -1098,7 +1070,7 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
func (fd *FD) ConnectEx(ra syscall.Sockaddr) error {
_, err := fd.execIO('w', func(o *operation) (uint32, error) {
return 0, ConnectExFunc(fd.Sysfd, ra, nil, 0, nil, &o.o)
})
}, nil)
return err
}
@ -1109,7 +1081,7 @@ func (fd *FD) acceptOne(s syscall.Handle, rawsa []syscall.RawSockaddrAny) (strin
err = AcceptFunc(fd.Sysfd, s, (*byte)(unsafe.Pointer(&rawsa[0])), 0, rsan, rsan, &qty, &o.o)
return qty, err
})
}, nil)
if err != nil {
CloseFunc(s)
return "acceptex", err
@ -1275,7 +1247,7 @@ func (fd *FD) RawRead(f func(uintptr) bool) error {
}
err = syscall.WSARecv(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, &flags, &o.o, nil)
return qty, err
})
}, nil)
if err == windows.WSAEMSGSIZE {
// expected with a 0-byte peek, ignore.
} else if err != nil {
@ -1366,7 +1338,7 @@ func (fd *FD) ReadMsg(p []byte, oob []byte, flags int) (int, int, int, syscall.S
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil)
return qty, err
})
}, nil)
err = fd.eofError(n, err)
var sa syscall.Sockaddr
if err == nil {
@ -1391,7 +1363,7 @@ func (fd *FD) ReadMsgInet4(p []byte, oob []byte, flags int, sa4 *syscall.Sockadd
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil)
return qty, err
})
}, nil)
err = fd.eofError(n, err)
if err == nil {
rawToSockaddrInet4(msg.Name, sa4)
@ -1415,7 +1387,7 @@ func (fd *FD) ReadMsgInet6(p []byte, oob []byte, flags int, sa6 *syscall.Sockadd
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil)
return qty, err
})
}, nil)
err = fd.eofError(n, err)
if err == nil {
rawToSockaddrInet6(msg.Name, sa6)
@ -1446,7 +1418,7 @@ func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, err
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil)
return qty, err
})
}, nil)
return n, int(msg.Control.Len), err
}
@ -1469,7 +1441,7 @@ func (fd *FD) WriteMsgInet4(p []byte, oob []byte, sa *syscall.SockaddrInet4) (in
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil)
return qty, err
})
}, nil)
return n, int(msg.Control.Len), err
}
@ -1492,7 +1464,7 @@ func (fd *FD) WriteMsgInet6(p []byte, oob []byte, sa *syscall.SockaddrInet6) (in
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil)
return qty, err
})
}, nil)
return n, int(msg.Control.Len), err
}

View file

@ -75,7 +75,7 @@ func SendFile(fd *FD, src uintptr, size int64) (written int64, err error, handle
return 0, err
}
return uint32(chunkSize), nil
})
}, nil)
if err != nil {
return written, err, written > 0
}