mirror of
https://github.com/golang/go.git
synced 2025-12-08 06:10:04 +00:00
internal/poll: implement a pipe pool for splice() call
In scenarios where splice() is called, splice() is usually called not just once, but many times, which means that a lot of pipes will be created and destroyed frequently, costing an amount of system resources and slowing down performance, thus I suggest that we add a pipe pool for reusing pipes. Benchmark tests: goos: linux goarch: amd64 pkg: internal/poll cpu: AMD EPYC 7K62 48-Core Processor name old time/op new time/op delta SplicePipe-8 1.36µs ± 1% 0.02µs ± 0% -98.57% (p=0.001 n=7+7) SplicePipeParallel-8 747ns ± 4% 4ns ± 0% -99.41% (p=0.001 n=7+7) name old alloc/op new alloc/op delta SplicePipe-8 24.0B ± 0% 0.0B -100.00% (p=0.001 n=7+7) SplicePipeParallel-8 24.0B ± 0% 0.0B -100.00% (p=0.001 n=7+7) name old allocs/op new allocs/op delta SplicePipe-8 1.00 ± 0% 0.00 -100.00% (p=0.001 n=7+7) SplicePipeParallel-8 1.00 ± 0% 0.00 -100.00% (p=0.001 n=7+7) Fixes #42740 Change-Id: Idff654b7264342084e089b5ba796c87c380c471b Reviewed-on: https://go-review.googlesource.com/c/go/+/271537 Reviewed-by: Ian Lance Taylor <iant@golang.org> Run-TryBot: Ian Lance Taylor <iant@golang.org> TryBot-Result: Go Bot <gobot@golang.org> Trust: Brad Fitzpatrick <bradfitz@golang.org>
This commit is contained in:
parent
d33e2192a7
commit
643d240a11
3 changed files with 185 additions and 20 deletions
22
src/internal/poll/export_linux_test.go
Normal file
22
src/internal/poll/export_linux_test.go
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
// Copyright 2021 The Go Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// Export guts for testing on linux.
|
||||||
|
// Since testing imports os and os imports internal/poll,
|
||||||
|
// the internal/poll tests can not be in package poll.
|
||||||
|
|
||||||
|
package poll
|
||||||
|
|
||||||
|
var (
|
||||||
|
GetPipe = getPipe
|
||||||
|
PutPipe = putPipe
|
||||||
|
NewPipe = newPipe
|
||||||
|
DestroyPipe = destroyPipe
|
||||||
|
)
|
||||||
|
|
||||||
|
func GetPipeFds(p *SplicePipe) (int, int) {
|
||||||
|
return p.rfd, p.wfd
|
||||||
|
}
|
||||||
|
|
||||||
|
type SplicePipe = splicePipe
|
||||||
|
|
@ -6,6 +6,8 @@ package poll
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"internal/syscall/unix"
|
"internal/syscall/unix"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
@ -23,23 +25,23 @@ const (
|
||||||
// Splice transfers at most remain bytes of data from src to dst, using the
|
// Splice transfers at most remain bytes of data from src to dst, using the
|
||||||
// splice system call to minimize copies of data from and to userspace.
|
// splice system call to minimize copies of data from and to userspace.
|
||||||
//
|
//
|
||||||
// Splice creates a temporary pipe, to serve as a buffer for the data transfer.
|
// Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
|
||||||
// src and dst must both be stream-oriented sockets.
|
// src and dst must both be stream-oriented sockets.
|
||||||
//
|
//
|
||||||
// If err != nil, sc is the system call which caused the error.
|
// If err != nil, sc is the system call which caused the error.
|
||||||
func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) {
|
func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) {
|
||||||
prfd, pwfd, sc, err := newTempPipe()
|
p, sc, err := getPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, false, sc, err
|
return 0, false, sc, err
|
||||||
}
|
}
|
||||||
defer destroyTempPipe(prfd, pwfd)
|
defer putPipe(p)
|
||||||
var inPipe, n int
|
var inPipe, n int
|
||||||
for err == nil && remain > 0 {
|
for err == nil && remain > 0 {
|
||||||
max := maxSpliceSize
|
max := maxSpliceSize
|
||||||
if int64(max) > remain {
|
if int64(max) > remain {
|
||||||
max = int(remain)
|
max = int(remain)
|
||||||
}
|
}
|
||||||
inPipe, err = spliceDrain(pwfd, src, max)
|
inPipe, err = spliceDrain(p.wfd, src, max)
|
||||||
// The operation is considered handled if splice returns no
|
// The operation is considered handled if splice returns no
|
||||||
// error, or an error other than EINVAL. An EINVAL means the
|
// error, or an error other than EINVAL. An EINVAL means the
|
||||||
// kernel does not support splice for the socket type of src.
|
// kernel does not support splice for the socket type of src.
|
||||||
|
|
@ -55,10 +57,13 @@ func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string,
|
||||||
if err != nil || inPipe == 0 {
|
if err != nil || inPipe == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
n, err = splicePump(dst, prfd, inPipe)
|
p.data += inPipe
|
||||||
|
|
||||||
|
n, err = splicePump(dst, p.rfd, inPipe)
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
written += int64(n)
|
written += int64(n)
|
||||||
remain -= int64(n)
|
remain -= int64(n)
|
||||||
|
p.data -= n
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -149,13 +154,57 @@ func splice(out int, in int, max int, flags int) (int, error) {
|
||||||
return int(n), err
|
return int(n), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type splicePipe struct {
|
||||||
|
rfd int
|
||||||
|
wfd int
|
||||||
|
data int
|
||||||
|
}
|
||||||
|
|
||||||
|
// splicePipePool caches pipes to avoid high frequency construction and destruction of pipe buffers.
|
||||||
|
// The garbage collector will free all pipes in the sync.Pool in periodically, thus we need to set up
|
||||||
|
// a finalizer for each pipe to close the its file descriptors before the actual GC.
|
||||||
|
var splicePipePool = sync.Pool{New: newPoolPipe}
|
||||||
|
|
||||||
|
func newPoolPipe() interface{} {
|
||||||
|
// Discard the error which occurred during the creation of pipe buffer,
|
||||||
|
// redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
|
||||||
|
p := newPipe()
|
||||||
|
if p != nil {
|
||||||
|
runtime.SetFinalizer(p, destroyPipe)
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from cache.
|
||||||
|
//
|
||||||
|
// Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error
|
||||||
|
// and system call name splice in string as the indication.
|
||||||
|
func getPipe() (*splicePipe, string, error) {
|
||||||
|
v := splicePipePool.Get()
|
||||||
|
if v == nil {
|
||||||
|
return nil, "splice", syscall.EINVAL
|
||||||
|
}
|
||||||
|
return v.(*splicePipe), "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func putPipe(p *splicePipe) {
|
||||||
|
// If there is still data left in the pipe,
|
||||||
|
// then close and discard it instead of putting it back into the pool.
|
||||||
|
if p.data != 0 {
|
||||||
|
runtime.SetFinalizer(p, nil)
|
||||||
|
destroyPipe(p)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
splicePipePool.Put(p)
|
||||||
|
}
|
||||||
|
|
||||||
var disableSplice unsafe.Pointer
|
var disableSplice unsafe.Pointer
|
||||||
|
|
||||||
// newTempPipe sets up a temporary pipe for a splice operation.
|
// newPipe sets up a pipe for a splice operation.
|
||||||
func newTempPipe() (prfd, pwfd int, sc string, err error) {
|
func newPipe() (sp *splicePipe) {
|
||||||
p := (*bool)(atomic.LoadPointer(&disableSplice))
|
p := (*bool)(atomic.LoadPointer(&disableSplice))
|
||||||
if p != nil && *p {
|
if p != nil && *p {
|
||||||
return -1, -1, "splice", syscall.EINVAL
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var fds [2]int
|
var fds [2]int
|
||||||
|
|
@ -165,9 +214,11 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) {
|
||||||
// closed.
|
// closed.
|
||||||
const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK
|
const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK
|
||||||
if err := syscall.Pipe2(fds[:], flags); err != nil {
|
if err := syscall.Pipe2(fds[:], flags); err != nil {
|
||||||
return -1, -1, "pipe2", err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sp = &splicePipe{rfd: fds[0], wfd: fds[1]}
|
||||||
|
|
||||||
if p == nil {
|
if p == nil {
|
||||||
p = new(bool)
|
p = new(bool)
|
||||||
defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p))
|
defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p))
|
||||||
|
|
@ -175,20 +226,16 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) {
|
||||||
// F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug.
|
// F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug.
|
||||||
if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 {
|
if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 {
|
||||||
*p = true
|
*p = true
|
||||||
destroyTempPipe(fds[0], fds[1])
|
destroyPipe(sp)
|
||||||
return -1, -1, "fcntl", errno
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return fds[0], fds[1], "", nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// destroyTempPipe destroys a temporary pipe.
|
// destroyPipe destroys a pipe.
|
||||||
func destroyTempPipe(prfd, pwfd int) error {
|
func destroyPipe(p *splicePipe) {
|
||||||
err := CloseFunc(prfd)
|
CloseFunc(p.rfd)
|
||||||
err1 := CloseFunc(pwfd)
|
CloseFunc(p.wfd)
|
||||||
if err == nil {
|
|
||||||
return err1
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
96
src/internal/poll/splice_linux_test.go
Normal file
96
src/internal/poll/splice_linux_test.go
Normal file
|
|
@ -0,0 +1,96 @@
|
||||||
|
// Copyright 2021 The Go Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package poll_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"internal/poll"
|
||||||
|
"runtime"
|
||||||
|
"syscall"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// checkPipes returns true if all pipes are closed properly, false otherwise.
|
||||||
|
func checkPipes(fds []int) bool {
|
||||||
|
for _, fd := range fds {
|
||||||
|
// Check if each pipe fd has been closed.
|
||||||
|
err := syscall.FcntlFlock(uintptr(fd), syscall.F_GETFD, nil)
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSplicePipePool(t *testing.T) {
|
||||||
|
const N = 64
|
||||||
|
var (
|
||||||
|
p *poll.SplicePipe
|
||||||
|
ps []*poll.SplicePipe
|
||||||
|
fds []int
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
p, _, err = poll.GetPipe()
|
||||||
|
if err != nil {
|
||||||
|
t.Skip("failed to create pipe, skip this test")
|
||||||
|
}
|
||||||
|
prfd, pwfd := poll.GetPipeFds(p)
|
||||||
|
fds = append(fds, prfd, pwfd)
|
||||||
|
ps = append(ps, p)
|
||||||
|
}
|
||||||
|
for _, p = range ps {
|
||||||
|
poll.PutPipe(p)
|
||||||
|
}
|
||||||
|
ps = nil
|
||||||
|
|
||||||
|
var ok bool
|
||||||
|
// Trigger garbage collection to free the pipes in sync.Pool and check whether or not
|
||||||
|
// those pipe buffers have been closed as we expected.
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
runtime.GC()
|
||||||
|
time.Sleep(time.Duration(i*100+10) * time.Millisecond)
|
||||||
|
if ok = checkPipes(fds); ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("at least one pipe is still open")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkSplicePipe(b *testing.B) {
|
||||||
|
b.Run("SplicePipeWithPool", func(b *testing.B) {
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
p, _, _ := poll.GetPipe()
|
||||||
|
poll.PutPipe(p)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
b.Run("SplicePipeWithoutPool", func(b *testing.B) {
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
p := poll.NewPipe()
|
||||||
|
poll.DestroyPipe(p)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkSplicePipePoolParallel(b *testing.B) {
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
p, _, _ := poll.GetPipe()
|
||||||
|
poll.PutPipe(p)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkSplicePipeNativeParallel(b *testing.B) {
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
p := poll.NewPipe()
|
||||||
|
poll.DestroyPipe(p)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue