internal/poll: implement a pipe pool for splice() call

In scenarios where splice() is called, splice() is usually called not just once, but many times,
which means that a lot of pipes will be created and destroyed frequently, costing an amount of system resources
and slowing down performance, thus I suggest that we add a pipe pool for reusing pipes.

Benchmark tests:

goos: linux
goarch: amd64
pkg: internal/poll
cpu: AMD EPYC 7K62 48-Core Processor

name                  old time/op    new time/op    delta
SplicePipe-8            1.36µs ± 1%    0.02µs ± 0%   -98.57%  (p=0.001 n=7+7)
SplicePipeParallel-8     747ns ± 4%       4ns ± 0%   -99.41%  (p=0.001 n=7+7)

name                  old alloc/op   new alloc/op   delta
SplicePipe-8             24.0B ± 0%      0.0B       -100.00%  (p=0.001 n=7+7)
SplicePipeParallel-8     24.0B ± 0%      0.0B       -100.00%  (p=0.001 n=7+7)

name                  old allocs/op  new allocs/op  delta
SplicePipe-8              1.00 ± 0%      0.00       -100.00%  (p=0.001 n=7+7)
SplicePipeParallel-8      1.00 ± 0%      0.00       -100.00%  (p=0.001 n=7+7)

Fixes #42740

Change-Id: Idff654b7264342084e089b5ba796c87c380c471b
Reviewed-on: https://go-review.googlesource.com/c/go/+/271537
Reviewed-by: Ian Lance Taylor <iant@golang.org>
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Trust: Brad Fitzpatrick <bradfitz@golang.org>
This commit is contained in:
Andy Pan 2020-11-19 19:09:14 +08:00 committed by Ian Lance Taylor
parent d33e2192a7
commit 643d240a11
3 changed files with 185 additions and 20 deletions

View file

@ -0,0 +1,22 @@
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Export guts for testing on linux.
// Since testing imports os and os imports internal/poll,
// the internal/poll tests can not be in package poll.
package poll
var (
GetPipe = getPipe
PutPipe = putPipe
NewPipe = newPipe
DestroyPipe = destroyPipe
)
func GetPipeFds(p *SplicePipe) (int, int) {
return p.rfd, p.wfd
}
type SplicePipe = splicePipe

View file

@ -6,6 +6,8 @@ package poll
import ( import (
"internal/syscall/unix" "internal/syscall/unix"
"runtime"
"sync"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"unsafe" "unsafe"
@ -23,23 +25,23 @@ const (
// Splice transfers at most remain bytes of data from src to dst, using the // Splice transfers at most remain bytes of data from src to dst, using the
// splice system call to minimize copies of data from and to userspace. // splice system call to minimize copies of data from and to userspace.
// //
// Splice creates a temporary pipe, to serve as a buffer for the data transfer. // Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
// src and dst must both be stream-oriented sockets. // src and dst must both be stream-oriented sockets.
// //
// If err != nil, sc is the system call which caused the error. // If err != nil, sc is the system call which caused the error.
func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) { func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) {
prfd, pwfd, sc, err := newTempPipe() p, sc, err := getPipe()
if err != nil { if err != nil {
return 0, false, sc, err return 0, false, sc, err
} }
defer destroyTempPipe(prfd, pwfd) defer putPipe(p)
var inPipe, n int var inPipe, n int
for err == nil && remain > 0 { for err == nil && remain > 0 {
max := maxSpliceSize max := maxSpliceSize
if int64(max) > remain { if int64(max) > remain {
max = int(remain) max = int(remain)
} }
inPipe, err = spliceDrain(pwfd, src, max) inPipe, err = spliceDrain(p.wfd, src, max)
// The operation is considered handled if splice returns no // The operation is considered handled if splice returns no
// error, or an error other than EINVAL. An EINVAL means the // error, or an error other than EINVAL. An EINVAL means the
// kernel does not support splice for the socket type of src. // kernel does not support splice for the socket type of src.
@ -55,10 +57,13 @@ func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string,
if err != nil || inPipe == 0 { if err != nil || inPipe == 0 {
break break
} }
n, err = splicePump(dst, prfd, inPipe) p.data += inPipe
n, err = splicePump(dst, p.rfd, inPipe)
if n > 0 { if n > 0 {
written += int64(n) written += int64(n)
remain -= int64(n) remain -= int64(n)
p.data -= n
} }
} }
if err != nil { if err != nil {
@ -149,13 +154,57 @@ func splice(out int, in int, max int, flags int) (int, error) {
return int(n), err return int(n), err
} }
type splicePipe struct {
rfd int
wfd int
data int
}
// splicePipePool caches pipes to avoid high frequency construction and destruction of pipe buffers.
// The garbage collector will free all pipes in the sync.Pool in periodically, thus we need to set up
// a finalizer for each pipe to close the its file descriptors before the actual GC.
var splicePipePool = sync.Pool{New: newPoolPipe}
func newPoolPipe() interface{} {
// Discard the error which occurred during the creation of pipe buffer,
// redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
p := newPipe()
if p != nil {
runtime.SetFinalizer(p, destroyPipe)
}
return p
}
// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from cache.
//
// Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error
// and system call name splice in string as the indication.
func getPipe() (*splicePipe, string, error) {
v := splicePipePool.Get()
if v == nil {
return nil, "splice", syscall.EINVAL
}
return v.(*splicePipe), "", nil
}
func putPipe(p *splicePipe) {
// If there is still data left in the pipe,
// then close and discard it instead of putting it back into the pool.
if p.data != 0 {
runtime.SetFinalizer(p, nil)
destroyPipe(p)
return
}
splicePipePool.Put(p)
}
var disableSplice unsafe.Pointer var disableSplice unsafe.Pointer
// newTempPipe sets up a temporary pipe for a splice operation. // newPipe sets up a pipe for a splice operation.
func newTempPipe() (prfd, pwfd int, sc string, err error) { func newPipe() (sp *splicePipe) {
p := (*bool)(atomic.LoadPointer(&disableSplice)) p := (*bool)(atomic.LoadPointer(&disableSplice))
if p != nil && *p { if p != nil && *p {
return -1, -1, "splice", syscall.EINVAL return nil
} }
var fds [2]int var fds [2]int
@ -165,9 +214,11 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) {
// closed. // closed.
const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK
if err := syscall.Pipe2(fds[:], flags); err != nil { if err := syscall.Pipe2(fds[:], flags); err != nil {
return -1, -1, "pipe2", err return nil
} }
sp = &splicePipe{rfd: fds[0], wfd: fds[1]}
if p == nil { if p == nil {
p = new(bool) p = new(bool)
defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p)) defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p))
@ -175,20 +226,16 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) {
// F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug. // F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug.
if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 { if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 {
*p = true *p = true
destroyTempPipe(fds[0], fds[1]) destroyPipe(sp)
return -1, -1, "fcntl", errno return nil
} }
} }
return fds[0], fds[1], "", nil return
} }
// destroyTempPipe destroys a temporary pipe. // destroyPipe destroys a pipe.
func destroyTempPipe(prfd, pwfd int) error { func destroyPipe(p *splicePipe) {
err := CloseFunc(prfd) CloseFunc(p.rfd)
err1 := CloseFunc(pwfd) CloseFunc(p.wfd)
if err == nil {
return err1
}
return err
} }

View file

@ -0,0 +1,96 @@
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package poll_test
import (
"internal/poll"
"runtime"
"syscall"
"testing"
"time"
)
// checkPipes returns true if all pipes are closed properly, false otherwise.
func checkPipes(fds []int) bool {
for _, fd := range fds {
// Check if each pipe fd has been closed.
err := syscall.FcntlFlock(uintptr(fd), syscall.F_GETFD, nil)
if err == nil {
return false
}
}
return true
}
func TestSplicePipePool(t *testing.T) {
const N = 64
var (
p *poll.SplicePipe
ps []*poll.SplicePipe
fds []int
err error
)
for i := 0; i < N; i++ {
p, _, err = poll.GetPipe()
if err != nil {
t.Skip("failed to create pipe, skip this test")
}
prfd, pwfd := poll.GetPipeFds(p)
fds = append(fds, prfd, pwfd)
ps = append(ps, p)
}
for _, p = range ps {
poll.PutPipe(p)
}
ps = nil
var ok bool
// Trigger garbage collection to free the pipes in sync.Pool and check whether or not
// those pipe buffers have been closed as we expected.
for i := 0; i < 5; i++ {
runtime.GC()
time.Sleep(time.Duration(i*100+10) * time.Millisecond)
if ok = checkPipes(fds); ok {
break
}
}
if !ok {
t.Fatal("at least one pipe is still open")
}
}
func BenchmarkSplicePipe(b *testing.B) {
b.Run("SplicePipeWithPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
p, _, _ := poll.GetPipe()
poll.PutPipe(p)
}
})
b.Run("SplicePipeWithoutPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
p := poll.NewPipe()
poll.DestroyPipe(p)
}
})
}
func BenchmarkSplicePipePoolParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
p, _, _ := poll.GetPipe()
poll.PutPipe(p)
}
})
}
func BenchmarkSplicePipeNativeParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
p := poll.NewPipe()
poll.DestroyPipe(p)
}
})
}