internal/poll: pass the I/O mode instead of an overlapped object in execIO

execIO callers should be agnostic to the fact that it uses an overlapped
object. This will unlock future optimizations and simplifications.

Change-Id: I0a58d992101fa74ac75e3538af04cbc44156f0d6
Reviewed-on: https://go-review.googlesource.com/c/go/+/704175
Reviewed-by: Damien Neil <dneil@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Junyang Shao <shaojunyang@google.com>
This commit is contained in:
qmuntal 2025-09-16 10:52:49 +02:00 committed by Quim Muntal
parent fc88e18b4a
commit 75c87df58e
2 changed files with 35 additions and 37 deletions

View file

@ -250,12 +250,16 @@ func (fd *FD) cancelIO(o *operation) {
// It supports both synchronous and asynchronous IO.
// o.qty and o.flags are set to zero before calling submit
// to avoid reusing the values from a previous call.
func (fd *FD) execIO(o *operation, submit func(o *operation) (uint32, error)) (int, error) {
func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error)) (int, error) {
// Notify runtime netpoll about starting IO.
err := fd.pd.prepare(int(o.mode), fd.isFile)
err := fd.pd.prepare(mode, fd.isFile)
if err != nil {
return 0, err
}
o := &fd.rop
if mode == 'w' {
o = &fd.wop
}
// Start IO.
if !fd.isBlocking && o.o.HEvent == 0 && !fd.pollable() {
// If the handle is opened for overlapped IO but we can't
@ -552,7 +556,7 @@ func (fd *FD) Read(buf []byte) (int, error) {
case kindConsole:
n, err = fd.readConsole(buf)
case kindFile, kindPipe:
n, err = fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
n, err = fd.execIO('r', func(o *operation) (qty uint32, err error) {
err = syscall.ReadFile(fd.Sysfd, buf, &qty, fd.overlapped(o))
return qty, err
})
@ -567,7 +571,7 @@ func (fd *FD) Read(buf []byte) (int, error) {
}
}
case kindNet:
n, err = fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
n, err = fd.execIO('r', func(o *operation) (qty uint32, err error) {
var flags uint32
err = syscall.WSARecv(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, &o.o, nil)
return qty, err
@ -694,7 +698,7 @@ func (fd *FD) Pread(buf []byte, off int64) (int, error) {
defer fd.setOffset(curoffset)
}
fd.setOffset(off)
n, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
err = syscall.ReadFile(fd.Sysfd, buf, &qty, &o.o)
return qty, err
})
@ -727,7 +731,7 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) {
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
n, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
rsan := int32(unsafe.Sizeof(*rsa))
var flags uint32
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
@ -761,7 +765,7 @@ func (fd *FD) ReadFromInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
n, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
rsan := int32(unsafe.Sizeof(*rsa))
var flags uint32
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
@ -795,7 +799,7 @@ func (fd *FD) ReadFromInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
n, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
rsan := int32(unsafe.Sizeof(*rsa))
var flags uint32
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
@ -841,7 +845,7 @@ func (fd *FD) Write(buf []byte) (int, error) {
case kindConsole:
n, err = fd.writeConsole(b)
case kindPipe, kindFile:
n, err = fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err = fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WriteFile(fd.Sysfd, b, &qty, fd.overlapped(o))
return qty, err
})
@ -850,7 +854,7 @@ func (fd *FD) Write(buf []byte) (int, error) {
if race.Enabled {
race.ReleaseMerge(unsafe.Pointer(&ioSync))
}
n, err = fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err = fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WSASend(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, &o.o, nil)
return qty, err
})
@ -949,7 +953,7 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) {
max = ntotal + maxRW
}
fd.setOffset(off + int64(ntotal))
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WriteFile(fd.Sysfd, buf[ntotal:max], &qty, &o.o)
return qty, err
})
@ -979,7 +983,7 @@ func (fd *FD) Writev(buf *[][]byte) (int64, error) {
}
bufs := newWSABufs(buf)
defer freeWSABufs(bufs)
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WSASend(fd.Sysfd, &(*bufs)[0], uint32(len(*bufs)), &qty, 0, &o.o, nil)
return qty, err
})
@ -997,7 +1001,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
if len(buf) == 0 {
// handle zero-byte payload
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WSASendto(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa, &o.o, nil)
return qty, err
})
@ -1015,7 +1019,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
if len(b) > maxRW {
b = b[:maxRW]
}
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = syscall.WSASendto(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa, &o.o, nil)
return qty, err
})
@ -1037,7 +1041,7 @@ func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
if len(buf) == 0 {
// handle zero-byte payload
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet4(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa4, &o.o, nil)
return qty, err
})
@ -1055,7 +1059,7 @@ func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
if len(b) > maxRW {
b = b[:maxRW]
}
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet4(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa4, &o.o, nil)
return qty, err
})
@ -1077,7 +1081,7 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
if len(buf) == 0 {
// handle zero-byte payload
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet6(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa6, &o.o, nil)
return qty, err
})
@ -1095,7 +1099,7 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
if len(b) > maxRW {
b = b[:maxRW]
}
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendtoInet6(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa6, &o.o, nil)
return qty, err
})
@ -1112,17 +1116,16 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
// called when the descriptor is first created. This is here rather
// than in the net package so that it can use fd.wop.
func (fd *FD) ConnectEx(ra syscall.Sockaddr) error {
o := &fd.wop
_, err := fd.execIO(o, func(o *operation) (uint32, error) {
_, err := fd.execIO('w', func(o *operation) (uint32, error) {
return 0, ConnectExFunc(fd.Sysfd, ra, nil, 0, nil, &o.o)
})
return err
}
func (fd *FD) acceptOne(s syscall.Handle, rawsa []syscall.RawSockaddrAny, o *operation) (string, error) {
func (fd *FD) acceptOne(s syscall.Handle, rawsa []syscall.RawSockaddrAny) (string, error) {
// Submit accept request.
rsan := uint32(unsafe.Sizeof(rawsa[0]))
_, err := fd.execIO(o, func(o *operation) (qty uint32, err error) {
_, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
err = AcceptFunc(fd.Sysfd, s, (*byte)(unsafe.Pointer(&rawsa[0])), 0, rsan, rsan, &qty, &o.o)
return qty, err
@ -1150,7 +1153,6 @@ func (fd *FD) Accept(sysSocket func() (syscall.Handle, error)) (syscall.Handle,
}
defer fd.readUnlock()
o := &fd.rop
var rawsa [2]syscall.RawSockaddrAny
for {
s, err := sysSocket()
@ -1158,7 +1160,7 @@ func (fd *FD) Accept(sysSocket func() (syscall.Handle, error)) (syscall.Handle,
return syscall.InvalidHandle, nil, 0, "", err
}
errcall, err := fd.acceptOne(s, rawsa[:], o)
errcall, err := fd.acceptOne(s, rawsa[:])
if err == nil {
return s, rawsa[:], uint32(unsafe.Sizeof(rawsa[0])), "", nil
}
@ -1286,7 +1288,7 @@ func (fd *FD) RawRead(f func(uintptr) bool) error {
// Use a zero-byte read as a way to get notified when this
// socket is readable. h/t https://stackoverflow.com/a/42019668/332798
_, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
_, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
var flags uint32
if !fd.IsStream {
flags |= windows.MSG_PEEK
@ -1381,7 +1383,7 @@ func (fd *FD) ReadMsg(p []byte, oob []byte, flags int) (int, int, int, syscall.S
msg := newWSAMsg(p, oob, flags, true)
defer freeWSAMsg(msg)
n, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil)
return qty, err
})
@ -1406,7 +1408,7 @@ func (fd *FD) ReadMsgInet4(p []byte, oob []byte, flags int, sa4 *syscall.Sockadd
msg := newWSAMsg(p, oob, flags, true)
defer freeWSAMsg(msg)
n, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil)
return qty, err
})
@ -1430,7 +1432,7 @@ func (fd *FD) ReadMsgInet6(p []byte, oob []byte, flags int, sa6 *syscall.Sockadd
msg := newWSAMsg(p, oob, flags, true)
defer freeWSAMsg(msg)
n, err := fd.execIO(&fd.rop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil)
return qty, err
})
@ -1461,7 +1463,7 @@ func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, err
return 0, 0, err
}
}
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil)
return qty, err
})
@ -1484,7 +1486,7 @@ func (fd *FD) WriteMsgInet4(p []byte, oob []byte, sa *syscall.SockaddrInet4) (in
if sa != nil {
msg.Namelen = sockaddrInet4ToRaw(msg.Name, sa)
}
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil)
return qty, err
})
@ -1507,7 +1509,7 @@ func (fd *FD) WriteMsgInet6(p []byte, oob []byte, sa *syscall.SockaddrInet6) (in
if sa != nil {
msg.Namelen = sockaddrInet6ToRaw(msg.Name, sa)
}
n, err := fd.execIO(&fd.wop, func(o *operation) (qty uint32, err error) {
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil)
return qty, err
})

View file

@ -62,18 +62,14 @@ func SendFile(fd *FD, src uintptr, size int64) (written int64, err error, handle
// See https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nf-mswsock-transmitfile
const maxChunkSizePerCall = int64(0x7fffffff - 1)
o := &fd.wop
for size > 0 {
chunkSize := maxChunkSizePerCall
if chunkSize > size {
chunkSize = size
}
off := startpos + written
o.o.Offset = uint32(off)
o.o.OffsetHigh = uint32(off >> 32)
n, err := fd.execIO(o, func(o *operation) (uint32, error) {
fd.setOffset(startpos + written)
n, err := fd.execIO('w', func(o *operation) (uint32, error) {
err := syscall.TransmitFile(fd.Sysfd, hsrc, uint32(chunkSize), 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
if err != nil {
return 0, err