internal/poll: remove buf field from operation

WSASend and WSARecv functions capture the WSABuf structure before
returning, so there is no need to keep a copy of it in the
operation structure.

Write and Read functions don't need it, they can operate directly
on the byte slice.

To be on the safe side, pin the input byte slice so that stack-allocated
slices don't get moved while overlapped I/O is in progress.

Cq-Include-Trybots: luci.golang.try:gotip-windows-amd64-longtest,gotip-windows-amd64-race
Change-Id: I474bed94e11acafa0bdd8392b5dcf8993d8e1ed5
Reviewed-on: https://go-review.googlesource.com/c/go/+/704155
Reviewed-by: Junyang Shao <shaojunyang@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Damien Neil <dneil@google.com>
This commit is contained in:
qmuntal 2025-09-16 10:25:58 +02:00 committed by Quim Muntal
parent eaf2345256
commit 690fc2fb05
2 changed files with 100 additions and 77 deletions

View file

@ -9,6 +9,7 @@ import (
"internal/race"
"internal/syscall/windows"
"io"
"runtime"
"sync"
"sync/atomic"
"syscall"
@ -75,9 +76,6 @@ type operation struct {
// fields used by runtime.netpoll
runtimeCtx uintptr
mode int32
// fields used only by net package
buf syscall.WSABuf
}
func (o *operation) setEvent() {
@ -107,9 +105,8 @@ func (fd *FD) overlapped(o *operation) *syscall.Overlapped {
return &o.o
}
func (o *operation) InitBuf(buf []byte) {
o.buf.Len = uint32(len(buf))
o.buf.Buf = unsafe.SliceData(buf)
func newWsaBuf(b []byte) *syscall.WSABuf {
return &syscall.WSABuf{Buf: unsafe.SliceData(b), Len: uint32(len(b))}
}
var wsaBufsPool = sync.Pool{
@ -362,6 +359,9 @@ type FD struct {
isBlocking bool
disassociated atomic.Bool
readPinner runtime.Pinner
writePinner runtime.Pinner
}
// setOffset sets the offset fields of the overlapped object
@ -537,6 +537,11 @@ func (fd *FD) Read(buf []byte) (int, error) {
defer fd.readUnlock()
}
if len(buf) > 0 && !fd.isBlocking {
fd.readPinner.Pin(&buf[0])
defer fd.readPinner.Unpin()
}
if len(buf) > maxRW {
buf = buf[:maxRW]
}
@ -547,10 +552,8 @@ func (fd *FD) Read(buf []byte) (int, error) {
case kindConsole:
n, err = fd.readConsole(buf)
case kindFile, kindPipe:
o := &fd.rop
o.InitBuf(buf)
n, err = fd.execIO(o, func(o *operation) (qty uint32, err error) {
err = syscall.ReadFile(fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &qty, fd.overlapped(o))
n, err = fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
err = syscall.ReadFile(fd.Sysfd, buf, &qty, fd.overlapped(o))
return qty, err
})
fd.addOffset(n)
@ -564,11 +567,9 @@ func (fd *FD) Read(buf []byte) (int, error) {
}
}
case kindNet:
o := &fd.rop
o.InitBuf(buf)
n, err = fd.execIO(o, func(o *operation) (qty uint32, err error) {
n, err = fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
var flags uint32
err = syscall.WSARecv(fd.Sysfd, &o.buf, 1, &qty, &flags, &o.o, nil)
err = syscall.WSARecv(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, &o.o, nil)
return qty, err
})
if race.Enabled {
@ -656,7 +657,7 @@ func (fd *FD) readConsole(b []byte) (int, error) {
}
// Pread emulates the Unix pread system call.
func (fd *FD) Pread(b []byte, off int64) (int, error) {
func (fd *FD) Pread(buf []byte, off int64) (int, error) {
if fd.kind == kindPipe {
// Pread does not work with pipes
return 0, syscall.ESPIPE
@ -667,8 +668,13 @@ func (fd *FD) Pread(b []byte, off int64) (int, error) {
}
defer fd.readWriteUnlock()
if len(b) > maxRW {
b = b[:maxRW]
if len(buf) > 0 && !fd.isBlocking {
fd.readPinner.Pin(&buf[0])
defer fd.readPinner.Unpin()
}
if len(buf) > maxRW {
buf = buf[:maxRW]
}
if fd.isBlocking {
@ -687,17 +693,15 @@ func (fd *FD) Pread(b []byte, off int64) (int, error) {
curoffset := fd.offset
defer fd.setOffset(curoffset)
}
o := &fd.rop
o.InitBuf(b)
fd.setOffset(off)
n, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
err = syscall.ReadFile(fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &qty, &o.o)
n, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
err = syscall.ReadFile(fd.Sysfd, buf, &qty, &o.o)
return qty, err
})
if err == syscall.ERROR_HANDLE_EOF {
err = io.EOF
}
if len(b) != 0 {
if len(buf) != 0 {
err = fd.eofError(n, err)
}
return n, err
@ -715,14 +719,18 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) {
return 0, nil, err
}
defer fd.readUnlock()
o := &fd.rop
o.InitBuf(buf)
if !fd.isBlocking {
fd.readPinner.Pin(&buf[0])
defer fd.readPinner.Unpin()
}
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
n, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
rsan := int32(unsafe.Sizeof(*rsa))
var flags uint32
err = syscall.WSARecvFrom(fd.Sysfd, &o.buf, 1, &qty, &flags, rsa, &rsan, &o.o, nil)
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
return qty, err
})
err = fd.eofError(n, err)
@ -745,14 +753,18 @@ func (fd *FD) ReadFromInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
return 0, err
}
defer fd.readUnlock()
o := &fd.rop
o.InitBuf(buf)
if !fd.isBlocking {
fd.readPinner.Pin(&buf[0])
defer fd.readPinner.Unpin()
}
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
n, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
rsan := int32(unsafe.Sizeof(*rsa))
var flags uint32
err = syscall.WSARecvFrom(fd.Sysfd, &o.buf, 1, &qty, &flags, rsa, &rsan, &o.o, nil)
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
return qty, err
})
err = fd.eofError(n, err)
@ -775,14 +787,18 @@ func (fd *FD) ReadFromInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
return 0, err
}
defer fd.readUnlock()
o := &fd.rop
o.InitBuf(buf)
if !fd.isBlocking {
fd.readPinner.Pin(&buf[0])
defer fd.readPinner.Unpin()
}
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
n, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
rsan := int32(unsafe.Sizeof(*rsa))
var flags uint32
err = syscall.WSARecvFrom(fd.Sysfd, &o.buf, 1, &qty, &flags, rsa, &rsan, &o.o, nil)
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
return qty, err
})
err = fd.eofError(n, err)
@ -807,6 +823,11 @@ func (fd *FD) Write(buf []byte) (int, error) {
defer fd.writeUnlock()
}
if len(buf) > 0 && !fd.isBlocking {
fd.writePinner.Pin(&buf[0])
defer fd.writePinner.Unpin()
}
var ntotal int
for {
max := len(buf)
@ -820,10 +841,8 @@ func (fd *FD) Write(buf []byte) (int, error) {
case kindConsole:
n, err = fd.writeConsole(b)
case kindPipe, kindFile:
o := &fd.wop
o.InitBuf(b)
n, err = fd.execIO(o, func(o *operation) (qty uint32, err error) {
err = syscall.WriteFile(fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &qty, fd.overlapped(o))
n, err = fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
err = syscall.WriteFile(fd.Sysfd, b, &qty, fd.overlapped(o))
return qty, err
})
fd.addOffset(n)
@ -831,10 +850,8 @@ func (fd *FD) Write(buf []byte) (int, error) {
if race.Enabled {
race.ReleaseMerge(unsafe.Pointer(&ioSync))
}
o := &fd.wop
o.InitBuf(b)
n, err = fd.execIO(o, func(o *operation) (qty uint32, err error) {
err = syscall.WSASend(fd.Sysfd, &o.buf, 1, &qty, 0, &o.o, nil)
n, err = fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
err = syscall.WSASend(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, &o.o, nil)
return qty, err
})
}
@ -903,6 +920,11 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) {
}
defer fd.readWriteUnlock()
if len(buf) > 0 && !fd.isBlocking {
fd.writePinner.Pin(&buf[0])
defer fd.writePinner.Unpin()
}
if fd.isBlocking {
curoffset, err := syscall.Seek(fd.Sysfd, 0, io.SeekCurrent)
if err != nil {
@ -926,12 +948,9 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) {
if max-ntotal > maxRW {
max = ntotal + maxRW
}
b := buf[ntotal:max]
o := &fd.wop
o.InitBuf(b)
fd.setOffset(off + int64(ntotal))
n, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
err = syscall.WriteFile(fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &qty, &o.o)
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
err = syscall.WriteFile(fd.Sysfd, buf[ntotal:max], &qty, &o.o)
return qty, err
})
if n > 0 {
@ -978,25 +997,26 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
if len(buf) == 0 {
// handle zero-byte payload
o := &fd.wop
o.InitBuf(buf)
n, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
err = syscall.WSASendto(fd.Sysfd, &o.buf, 1, &qty, 0, sa, &o.o, nil)
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
err = syscall.WSASendto(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa, &o.o, nil)
return qty, err
})
return n, err
}
if !fd.isBlocking {
fd.writePinner.Pin(&buf[0])
defer fd.writePinner.Unpin()
}
ntotal := 0
for len(buf) > 0 {
b := buf
if len(b) > maxRW {
b = b[:maxRW]
}
o := &fd.wop
o.InitBuf(b)
n, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
err = syscall.WSASendto(fd.Sysfd, &o.buf, 1, &qty, 0, sa, &o.o, nil)
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
err = syscall.WSASendto(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa, &o.o, nil)
return qty, err
})
ntotal += int(n)
@ -1017,25 +1037,26 @@ func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
if len(buf) == 0 {
// handle zero-byte payload
o := &fd.wop
o.InitBuf(buf)
n, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet4(fd.Sysfd, &o.buf, 1, &qty, 0, sa4, &o.o, nil)
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet4(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa4, &o.o, nil)
return qty, err
})
return n, err
}
if !fd.isBlocking {
fd.writePinner.Pin(&buf[0])
defer fd.writePinner.Unpin()
}
ntotal := 0
for len(buf) > 0 {
b := buf
if len(b) > maxRW {
b = b[:maxRW]
}
o := &fd.wop
o.InitBuf(b)
n, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet4(fd.Sysfd, &o.buf, 1, &qty, 0, sa4, &o.o, nil)
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet4(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa4, &o.o, nil)
return qty, err
})
ntotal += int(n)
@ -1056,25 +1077,26 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
if len(buf) == 0 {
// handle zero-byte payload
o := &fd.wop
o.InitBuf(buf)
n, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet6(fd.Sysfd, &o.buf, 1, &qty, 0, sa6, &o.o, nil)
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet6(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa6, &o.o, nil)
return qty, err
})
return n, err
}
if !fd.isBlocking {
fd.writePinner.Pin(&buf[0])
defer fd.writePinner.Unpin()
}
ntotal := 0
for len(buf) > 0 {
b := buf
if len(b) > maxRW {
b = b[:maxRW]
}
o := &fd.wop
o.InitBuf(b)
n, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet6(fd.Sysfd, &o.buf, 1, &qty, 0, sa6, &o.o, nil)
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet6(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa6, &o.o, nil)
return qty, err
})
ntotal += int(n)
@ -1264,14 +1286,12 @@ func (fd *FD) RawRead(f func(uintptr) bool) error {
// Use a zero-byte read as a way to get notified when this
// socket is readable. h/t https://stackoverflow.com/a/42019668/332798
o := &fd.rop
o.InitBuf(nil)
_, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
_, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
var flags uint32
if !fd.IsStream {
flags |= windows.MSG_PEEK
}
err = syscall.WSARecv(fd.Sysfd, &o.buf, 1, &qty, &flags, &o.o, nil)
err = syscall.WSARecv(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, &flags, &o.o, nil)
return qty, err
})
if err == windows.WSAEMSGSIZE {

View file

@ -475,6 +475,9 @@ func TestTCPReadWriteAllocs(t *testing.T) {
t.Skipf("not supported on %s", runtime.GOOS)
}
// Optimizations are required to remove the allocs.
testenv.SkipIfOptimizationOff(t)
ln, err := Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
@ -509,7 +512,7 @@ func TestTCPReadWriteAllocs(t *testing.T) {
}
})
if allocs > 0 {
t.Fatalf("got %v; want 0", allocs)
t.Errorf("got %v; want 0", allocs)
}
var bufwrt [128]byte
@ -531,7 +534,7 @@ func TestTCPReadWriteAllocs(t *testing.T) {
}
})
if allocs > 0 {
t.Fatalf("got %v; want 0", allocs)
t.Errorf("got %v; want 0", allocs)
}
}