mirror of
https://github.com/golang/go.git
synced 2026-02-07 02:09:55 +00:00
internal/poll: move buffer pinning inside execIO
This is a step towards deferring adding the handle to IOCP until the first IO operation. The goal of this CL is to avoid the fd.isBlocking check in fd.pin, which was happening outside execIO, and making buffer pinning less error-prone. This also fixes an issue where buffer used in Pwrite and WriteTo were unpinned too early when the write buffer was larger than the maximum chunk size. For #76391 Cq-Include-Trybots: luci.golang.try:gotip-windows-amd64-longtest,gotip-windows-amd64-race Change-Id: Ia181dcb57a559ae466a4341c36a307ad6678aac0 Reviewed-on: https://go-review.googlesource.com/c/go/+/740561 Reviewed-by: Damien Neil <dneil@google.com> Reviewed-by: Michael Pratt <mpratt@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
This commit is contained in:
parent
3b2a451cef
commit
cce3fea08f
2 changed files with 57 additions and 85 deletions
|
|
@ -232,27 +232,10 @@ func (fd *FD) waitIO(o *operation) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// pin pins ptr for the duration of the IO operation.
|
||||
// If fd is in blocking mode, pin does nothing.
|
||||
func (fd *FD) pin(mode int, ptr any) {
|
||||
if fd.isBlocking {
|
||||
return
|
||||
}
|
||||
if mode == 'r' {
|
||||
fd.readPinner.Pin(ptr)
|
||||
} else {
|
||||
fd.writePinner.Pin(ptr)
|
||||
}
|
||||
}
|
||||
|
||||
// execIO executes a single IO operation o.
|
||||
// It supports both synchronous and asynchronous IO.
|
||||
func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error)) (int, error) {
|
||||
if mode == 'r' {
|
||||
defer fd.readPinner.Unpin()
|
||||
} else {
|
||||
defer fd.writePinner.Unpin()
|
||||
}
|
||||
// buf, if not nil, will be pinned during the lifetime of the operation.
|
||||
func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error), buf []byte) (int, error) {
|
||||
// Notify runtime netpoll about starting IO.
|
||||
err := fd.pd.prepare(mode, fd.isFile)
|
||||
if err != nil {
|
||||
|
|
@ -268,21 +251,37 @@ func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error)) (int,
|
|||
runtimeCtx: fd.pd.runtimeCtx,
|
||||
mode: int32(mode),
|
||||
}
|
||||
// Start IO.
|
||||
if !fd.isBlocking && !fd.pollable() {
|
||||
// If the handle is opened for overlapped IO but we can't
|
||||
// use the runtime poller, then we need to use an
|
||||
// event to wait for the IO to complete.
|
||||
h, err := windows.CreateEvent(nil, 0, 0, nil)
|
||||
if err != nil {
|
||||
// This shouldn't happen when all CreateEvent arguments are zero.
|
||||
panic(err)
|
||||
if !fd.isBlocking {
|
||||
if len(buf) > 0 {
|
||||
ptr := unsafe.SliceData(buf)
|
||||
if mode == 'r' {
|
||||
fd.readPinner.Pin(ptr)
|
||||
} else {
|
||||
fd.writePinner.Pin(ptr)
|
||||
}
|
||||
defer func() {
|
||||
if mode == 'r' {
|
||||
fd.readPinner.Unpin()
|
||||
} else {
|
||||
fd.writePinner.Unpin()
|
||||
}
|
||||
}()
|
||||
}
|
||||
if !fd.pollable() {
|
||||
// If the handle is opened for overlapped IO but we can't
|
||||
// use the runtime poller, then we need to use an
|
||||
// event to wait for the IO to complete.
|
||||
h, err := windows.CreateEvent(nil, 0, 0, nil)
|
||||
if err != nil {
|
||||
// This shouldn't happen when all CreateEvent arguments are zero.
|
||||
panic(err)
|
||||
}
|
||||
// Set the low bit so that the external IOCP doesn't receive the completion packet.
|
||||
o.o.HEvent = h | 1
|
||||
defer syscall.CloseHandle(h)
|
||||
}
|
||||
// Set the low bit so that the external IOCP doesn't receive the completion packet.
|
||||
o.o.HEvent = h | 1
|
||||
defer syscall.CloseHandle(h)
|
||||
}
|
||||
fd.pin(mode, o)
|
||||
// Start IO.
|
||||
qty, err := submit(o)
|
||||
var waitErr error
|
||||
// Blocking operations shouldn't return ERROR_IO_PENDING.
|
||||
|
|
@ -544,10 +543,6 @@ func (fd *FD) Read(buf []byte) (int, error) {
|
|||
defer fd.readUnlock()
|
||||
}
|
||||
|
||||
if len(buf) > 0 {
|
||||
fd.pin('r', &buf[0])
|
||||
}
|
||||
|
||||
if len(buf) > maxRW {
|
||||
buf = buf[:maxRW]
|
||||
}
|
||||
|
|
@ -561,7 +556,7 @@ func (fd *FD) Read(buf []byte) (int, error) {
|
|||
n, err = fd.execIO('r', func(o *operation) (qty uint32, err error) {
|
||||
err = syscall.ReadFile(fd.Sysfd, buf, &qty, fd.overlapped(o))
|
||||
return qty, err
|
||||
})
|
||||
}, buf)
|
||||
fd.addOffset(n)
|
||||
switch err {
|
||||
case syscall.ERROR_HANDLE_EOF:
|
||||
|
|
@ -577,7 +572,7 @@ func (fd *FD) Read(buf []byte) (int, error) {
|
|||
var flags uint32
|
||||
err = syscall.WSARecv(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, buf)
|
||||
if race.Enabled {
|
||||
race.Acquire(unsafe.Pointer(&ioSync))
|
||||
}
|
||||
|
|
@ -674,10 +669,6 @@ func (fd *FD) Pread(buf []byte, off int64) (int, error) {
|
|||
}
|
||||
defer fd.readWriteUnlock()
|
||||
|
||||
if len(buf) > 0 {
|
||||
fd.pin('r', &buf[0])
|
||||
}
|
||||
|
||||
if len(buf) > maxRW {
|
||||
buf = buf[:maxRW]
|
||||
}
|
||||
|
|
@ -702,7 +693,7 @@ func (fd *FD) Pread(buf []byte, off int64) (int, error) {
|
|||
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
|
||||
err = syscall.ReadFile(fd.Sysfd, buf, &qty, &o.o)
|
||||
return qty, err
|
||||
})
|
||||
}, buf)
|
||||
if err == syscall.ERROR_HANDLE_EOF {
|
||||
err = io.EOF
|
||||
}
|
||||
|
|
@ -725,8 +716,6 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) {
|
|||
}
|
||||
defer fd.readUnlock()
|
||||
|
||||
fd.pin('r', &buf[0])
|
||||
|
||||
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
|
||||
defer wsaRsaPool.Put(rsa)
|
||||
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
|
||||
|
|
@ -734,7 +723,7 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) {
|
|||
var flags uint32
|
||||
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, buf)
|
||||
err = fd.eofError(n, err)
|
||||
if err != nil {
|
||||
return n, nil, err
|
||||
|
|
@ -756,8 +745,6 @@ func (fd *FD) ReadFromInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
|
|||
}
|
||||
defer fd.readUnlock()
|
||||
|
||||
fd.pin('r', &buf[0])
|
||||
|
||||
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
|
||||
defer wsaRsaPool.Put(rsa)
|
||||
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
|
||||
|
|
@ -765,7 +752,7 @@ func (fd *FD) ReadFromInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
|
|||
var flags uint32
|
||||
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, buf)
|
||||
err = fd.eofError(n, err)
|
||||
if err != nil {
|
||||
return n, err
|
||||
|
|
@ -787,8 +774,6 @@ func (fd *FD) ReadFromInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
|
|||
}
|
||||
defer fd.readUnlock()
|
||||
|
||||
fd.pin('r', &buf[0])
|
||||
|
||||
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
|
||||
defer wsaRsaPool.Put(rsa)
|
||||
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
|
||||
|
|
@ -796,7 +781,7 @@ func (fd *FD) ReadFromInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
|
|||
var flags uint32
|
||||
err = syscall.WSARecvFrom(fd.Sysfd, newWsaBuf(buf), 1, &qty, &flags, rsa, &rsan, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, buf)
|
||||
err = fd.eofError(n, err)
|
||||
if err != nil {
|
||||
return n, err
|
||||
|
|
@ -819,9 +804,6 @@ func (fd *FD) Write(buf []byte) (int, error) {
|
|||
defer fd.writeUnlock()
|
||||
}
|
||||
|
||||
if len(buf) > 0 {
|
||||
fd.pin('w', &buf[0])
|
||||
}
|
||||
var ntotal int
|
||||
for {
|
||||
max := len(buf)
|
||||
|
|
@ -838,7 +820,7 @@ func (fd *FD) Write(buf []byte) (int, error) {
|
|||
n, err = fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = syscall.WriteFile(fd.Sysfd, b, &qty, fd.overlapped(o))
|
||||
return qty, err
|
||||
})
|
||||
}, b)
|
||||
fd.addOffset(n)
|
||||
case kindNet:
|
||||
if race.Enabled {
|
||||
|
|
@ -847,7 +829,7 @@ func (fd *FD) Write(buf []byte) (int, error) {
|
|||
n, err = fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = syscall.WSASend(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, b)
|
||||
}
|
||||
ntotal += n
|
||||
if ntotal == len(buf) || err != nil {
|
||||
|
|
@ -914,10 +896,6 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) {
|
|||
}
|
||||
defer fd.readWriteUnlock()
|
||||
|
||||
if len(buf) > 0 {
|
||||
fd.pin('w', &buf[0])
|
||||
}
|
||||
|
||||
if fd.isBlocking {
|
||||
curoffset, err := syscall.Seek(fd.Sysfd, 0, io.SeekCurrent)
|
||||
if err != nil {
|
||||
|
|
@ -945,7 +923,7 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) {
|
|||
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = syscall.WriteFile(fd.Sysfd, buf[ntotal:max], &qty, &o.o)
|
||||
return qty, err
|
||||
})
|
||||
}, buf[ntotal:max])
|
||||
if n > 0 {
|
||||
ntotal += n
|
||||
}
|
||||
|
|
@ -975,7 +953,7 @@ func (fd *FD) Writev(buf *[][]byte) (int64, error) {
|
|||
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = syscall.WSASend(fd.Sysfd, &(*bufs)[0], uint32(len(*bufs)), &qty, 0, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, nil)
|
||||
TestHookDidWritev(n)
|
||||
consume(buf, int64(n))
|
||||
return int64(n), err
|
||||
|
|
@ -993,12 +971,10 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
|
|||
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = syscall.WSASendto(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, nil)
|
||||
return n, err
|
||||
}
|
||||
|
||||
fd.pin('w', &buf[0])
|
||||
|
||||
ntotal := 0
|
||||
for len(buf) > 0 {
|
||||
b := buf
|
||||
|
|
@ -1008,7 +984,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
|
|||
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = syscall.WSASendto(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, b)
|
||||
ntotal += int(n)
|
||||
if err != nil {
|
||||
return ntotal, err
|
||||
|
|
@ -1030,12 +1006,10 @@ func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
|
|||
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = windows.WSASendtoInet4(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa4, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, nil)
|
||||
return n, err
|
||||
}
|
||||
|
||||
fd.pin('w', &buf[0])
|
||||
|
||||
ntotal := 0
|
||||
for len(buf) > 0 {
|
||||
b := buf
|
||||
|
|
@ -1045,7 +1019,7 @@ func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
|
|||
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = windows.WSASendtoInet4(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa4, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, b)
|
||||
ntotal += int(n)
|
||||
if err != nil {
|
||||
return ntotal, err
|
||||
|
|
@ -1067,12 +1041,10 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
|
|||
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = windows.WSASendtoInet6(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, 0, sa6, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, nil)
|
||||
return n, err
|
||||
}
|
||||
|
||||
fd.pin('w', &buf[0])
|
||||
|
||||
ntotal := 0
|
||||
for len(buf) > 0 {
|
||||
b := buf
|
||||
|
|
@ -1082,7 +1054,7 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
|
|||
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = windows.WSASendtoInet6(fd.Sysfd, newWsaBuf(b), 1, &qty, 0, sa6, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, b)
|
||||
ntotal += int(n)
|
||||
if err != nil {
|
||||
return ntotal, err
|
||||
|
|
@ -1098,7 +1070,7 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
|
|||
func (fd *FD) ConnectEx(ra syscall.Sockaddr) error {
|
||||
_, err := fd.execIO('w', func(o *operation) (uint32, error) {
|
||||
return 0, ConnectExFunc(fd.Sysfd, ra, nil, 0, nil, &o.o)
|
||||
})
|
||||
}, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -1109,7 +1081,7 @@ func (fd *FD) acceptOne(s syscall.Handle, rawsa []syscall.RawSockaddrAny) (strin
|
|||
err = AcceptFunc(fd.Sysfd, s, (*byte)(unsafe.Pointer(&rawsa[0])), 0, rsan, rsan, &qty, &o.o)
|
||||
return qty, err
|
||||
|
||||
})
|
||||
}, nil)
|
||||
if err != nil {
|
||||
CloseFunc(s)
|
||||
return "acceptex", err
|
||||
|
|
@ -1275,7 +1247,7 @@ func (fd *FD) RawRead(f func(uintptr) bool) error {
|
|||
}
|
||||
err = syscall.WSARecv(fd.Sysfd, &syscall.WSABuf{}, 1, &qty, &flags, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, nil)
|
||||
if err == windows.WSAEMSGSIZE {
|
||||
// expected with a 0-byte peek, ignore.
|
||||
} else if err != nil {
|
||||
|
|
@ -1366,7 +1338,7 @@ func (fd *FD) ReadMsg(p []byte, oob []byte, flags int) (int, int, int, syscall.S
|
|||
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
|
||||
err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, nil)
|
||||
err = fd.eofError(n, err)
|
||||
var sa syscall.Sockaddr
|
||||
if err == nil {
|
||||
|
|
@ -1391,7 +1363,7 @@ func (fd *FD) ReadMsgInet4(p []byte, oob []byte, flags int, sa4 *syscall.Sockadd
|
|||
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
|
||||
err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, nil)
|
||||
err = fd.eofError(n, err)
|
||||
if err == nil {
|
||||
rawToSockaddrInet4(msg.Name, sa4)
|
||||
|
|
@ -1415,7 +1387,7 @@ func (fd *FD) ReadMsgInet6(p []byte, oob []byte, flags int, sa6 *syscall.Sockadd
|
|||
n, err := fd.execIO('r', func(o *operation) (qty uint32, err error) {
|
||||
err = windows.WSARecvMsg(fd.Sysfd, msg, &qty, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, nil)
|
||||
err = fd.eofError(n, err)
|
||||
if err == nil {
|
||||
rawToSockaddrInet6(msg.Name, sa6)
|
||||
|
|
@ -1446,7 +1418,7 @@ func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, err
|
|||
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, nil)
|
||||
return n, int(msg.Control.Len), err
|
||||
}
|
||||
|
||||
|
|
@ -1469,7 +1441,7 @@ func (fd *FD) WriteMsgInet4(p []byte, oob []byte, sa *syscall.SockaddrInet4) (in
|
|||
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, nil)
|
||||
return n, int(msg.Control.Len), err
|
||||
}
|
||||
|
||||
|
|
@ -1492,7 +1464,7 @@ func (fd *FD) WriteMsgInet6(p []byte, oob []byte, sa *syscall.SockaddrInet6) (in
|
|||
n, err := fd.execIO('w', func(o *operation) (qty uint32, err error) {
|
||||
err = windows.WSASendMsg(fd.Sysfd, msg, 0, nil, &o.o, nil)
|
||||
return qty, err
|
||||
})
|
||||
}, nil)
|
||||
return n, int(msg.Control.Len), err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ func SendFile(fd *FD, src uintptr, size int64) (written int64, err error, handle
|
|||
return 0, err
|
||||
}
|
||||
return uint32(chunkSize), nil
|
||||
})
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return written, err, written > 0
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue