add config for udp forward listener

This commit is contained in:
ginuerzh 2020-01-01 15:09:27 +08:00
parent 2e428aaba9
commit 56d90dfa70
4 changed files with 88 additions and 25 deletions

View File

@ -297,6 +297,11 @@ func (r *route) GenRouters() ([]router, error) {
wsOpts.WriteBufferSize = node.GetInt("wbuf")
wsOpts.Path = node.Get("path")
ttl, err := time.ParseDuration(node.Get("ttl"))
if err != nil {
ttl = time.Duration(node.GetInt("ttl")) * time.Second
}
var ln gost.Listener
switch node.Transport {
case "tls":
@ -361,11 +366,27 @@ func (r *route) GenRouters() ([]router, error) {
}
ln, err = gost.TCPRemoteForwardListener(node.Addr, chain)
case "udp":
ln, err = gost.UDPDirectForwardListener(node.Addr, time.Duration(node.GetInt("ttl"))*time.Second)
ln, err = gost.UDPDirectForwardListener(node.Addr, &gost.UDPForwardListenConfig{
TTL: ttl,
Backlog: node.GetInt("backlog"),
QueueSize: node.GetInt("queue"),
})
case "rudp":
ln, err = gost.UDPRemoteForwardListener(node.Addr, chain, time.Duration(node.GetInt("ttl"))*time.Second)
ln, err = gost.UDPRemoteForwardListener(node.Addr,
chain,
&gost.UDPForwardListenConfig{
TTL: ttl,
Backlog: node.GetInt("backlog"),
QueueSize: node.GetInt("queue"),
})
case "ssu":
ln, err = gost.ShadowUDPListener(node.Addr, node.User, time.Duration(node.GetInt("ttl"))*time.Second)
ln, err = gost.ShadowUDPListener(node.Addr,
node.User,
&gost.UDPForwardListenConfig{
TTL: ttl,
Backlog: node.GetInt("backlog"),
QueueSize: node.GetInt("queue"),
})
case "obfs4":
if err = gost.Obfs4Init(node, true); err != nil {
return nil, err

View File

@ -372,16 +372,22 @@ func (m *udpConnMap) Size() int64 {
return atomic.LoadInt64(&m.size)
}
type UDPForwardListenConfig struct {
TTL time.Duration
Backlog int
QueueSize int
}
type udpDirectForwardListener struct {
ln net.PacketConn
connChan chan net.Conn
errChan chan error
ttl time.Duration
connMap udpConnMap
config *UDPForwardListenConfig
}
// UDPDirectForwardListener creates a Listener for UDP port forwarding server.
func UDPDirectForwardListener(addr string, ttl time.Duration) (Listener, error) {
func UDPDirectForwardListener(addr string, cfg *UDPForwardListenConfig) (Listener, error) {
laddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
@ -390,11 +396,21 @@ func UDPDirectForwardListener(addr string, ttl time.Duration) (Listener, error)
if err != nil {
return nil, err
}
if cfg == nil {
cfg = &UDPForwardListenConfig{}
}
backlog := cfg.Backlog
if backlog <= 0 {
backlog = defaultBacklog
}
l := &udpDirectForwardListener{
ln: ln,
connChan: make(chan net.Conn, 128),
connChan: make(chan net.Conn, backlog),
errChan: make(chan error, 1),
ttl: ttl,
config: cfg,
}
go l.listenLoop()
return l, nil
@ -414,7 +430,7 @@ func (l *udpDirectForwardListener) listenLoop() {
conn, ok := l.connMap.Get(raddr.String())
if !ok {
conn = newUDPServerConn(l.ln, raddr, l.ttl)
conn = newUDPServerConn(l.ln, raddr, l.config.TTL, l.config.QueueSize)
conn.onClose = func() {
l.connMap.Delete(raddr.String())
log.Logf("[udp] %s closed (%d)", raddr, l.connMap.Size())
@ -426,7 +442,7 @@ func (l *udpDirectForwardListener) listenLoop() {
log.Logf("[udp] %s -> %s (%d)", raddr, l.Addr(), l.connMap.Size())
default:
conn.Close()
log.Logf("[udp] %s - %s: connection queue is full", raddr, l.Addr())
log.Logf("[udp] %s - %s: connection queue is full (%d)", raddr, l.Addr(), cap(l.connChan))
}
}
@ -436,7 +452,7 @@ func (l *udpDirectForwardListener) listenLoop() {
log.Logf("[udp] %s >>> %s : length %d", raddr, l.Addr(), n)
}
default:
log.Logf("[udp] %s -> %s : read queue is full", raddr, l.Addr())
log.Logf("[udp] %s -> %s : recv queue is full (%d)", raddr, l.Addr(), cap(conn.rChan))
}
}
}
@ -478,11 +494,14 @@ type udpServerConn struct {
onClose func()
}
func newUDPServerConn(conn net.PacketConn, raddr net.Addr, ttl time.Duration) *udpServerConn {
func newUDPServerConn(conn net.PacketConn, raddr net.Addr, ttl time.Duration, qsize int) *udpServerConn {
if qsize <= 0 {
qsize = defaultQueueSize
}
c := &udpServerConn{
conn: conn,
raddr: raddr,
rChan: make(chan []byte, 128),
rChan: make(chan []byte, qsize),
closed: make(chan struct{}),
nopChan: make(chan int),
ttl: ttl,
@ -853,22 +872,32 @@ type udpRemoteForwardListener struct {
closed chan struct{}
closeMux sync.Mutex
once sync.Once
config *UDPForwardListenConfig
}
// UDPRemoteForwardListener creates a Listener for UDP remote port forwarding server.
func UDPRemoteForwardListener(addr string, chain *Chain, ttl time.Duration) (Listener, error) {
func UDPRemoteForwardListener(addr string, chain *Chain, cfg *UDPForwardListenConfig) (Listener, error) {
laddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
if cfg == nil {
cfg = &UDPForwardListenConfig{}
}
backlog := cfg.Backlog
if backlog <= 0 {
backlog = defaultBacklog
}
ln := &udpRemoteForwardListener{
addr: laddr,
chain: chain,
connChan: make(chan net.Conn, 128),
connChan: make(chan net.Conn, backlog),
errChan: make(chan error, 1),
ttl: ttl,
closed: make(chan struct{}),
config: cfg,
}
go ln.listenLoop()
@ -904,7 +933,7 @@ func (l *udpRemoteForwardListener) listenLoop() {
uc, ok := l.connMap.Get(raddr.String())
if !ok {
uc = newUDPServerConn(conn, raddr, l.ttl)
uc = newUDPServerConn(conn, raddr, l.config.TTL, l.config.QueueSize)
uc.onClose = func() {
l.connMap.Delete(raddr.String())
log.Logf("[rudp] %s closed (%d)", raddr, l.connMap.Size())
@ -916,7 +945,8 @@ func (l *udpRemoteForwardListener) listenLoop() {
log.Logf("[rudp] %s -> %s (%d)", raddr, l.Addr(), l.connMap.Size())
default:
uc.Close()
log.Logf("[rudp] %s - %s: connection queue is full", raddr, l.Addr())
log.Logf("[rudp] %s - %s: connection queue is full (%d)",
raddr, l.Addr(), cap(l.connChan))
}
}
@ -926,7 +956,7 @@ func (l *udpRemoteForwardListener) listenLoop() {
log.Logf("[rudp] %s >>> %s : length %d", raddr, l.Addr(), n)
}
default:
log.Logf("[rudp] %s -> %s : write queue is full", raddr, l.Addr())
log.Logf("[rudp] %s -> %s : recv queue is full", raddr, l.Addr(), cap(uc.rChan))
}
}
}()

View File

@ -68,7 +68,9 @@ var (
// PingRetries is the reties of ping.
PingRetries = 1
// default udp node TTL in second for udp port forwarding.
defaultTTL = 60 * time.Second
defaultTTL = 60 * time.Second
defaultBacklog = 128
defaultQueueSize = 128
)
var (

22
ss.go
View File

@ -302,10 +302,11 @@ type shadowUDPListener struct {
errChan chan error
ttl time.Duration
connMap udpConnMap
config *UDPForwardListenConfig
}
// ShadowUDPListener creates a Listener for shadowsocks UDP relay server.
func ShadowUDPListener(addr string, cipher *url.Userinfo, ttl time.Duration) (Listener, error) {
func ShadowUDPListener(addr string, cipher *url.Userinfo, cfg *UDPForwardListenConfig) (Listener, error) {
laddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
@ -326,11 +327,20 @@ func ShadowUDPListener(addr string, cipher *url.Userinfo, ttl time.Duration) (Li
return nil, err
}
if cfg == nil {
cfg = &UDPForwardListenConfig{}
}
backlog := cfg.Backlog
if backlog <= 0 {
backlog = defaultBacklog
}
l := &shadowUDPListener{
ln: ss.NewSecurePacketConn(ln, cp, false),
connChan: make(chan net.Conn, 128),
connChan: make(chan net.Conn, backlog),
errChan: make(chan error, 1),
ttl: ttl,
config: cfg,
}
go l.listenLoop()
return l, nil
@ -350,7 +360,7 @@ func (l *shadowUDPListener) listenLoop() {
conn, ok := l.connMap.Get(raddr.String())
if !ok {
conn = newUDPServerConn(l.ln, raddr, l.ttl)
conn = newUDPServerConn(l.ln, raddr, l.config.TTL, l.config.QueueSize)
conn.onClose = func() {
l.connMap.Delete(raddr.String())
log.Logf("[ssu] %s closed (%d)", raddr, l.connMap.Size())
@ -362,7 +372,7 @@ func (l *shadowUDPListener) listenLoop() {
log.Logf("[ssu] %s -> %s (%d)", raddr, l.Addr(), l.connMap.Size())
default:
conn.Close()
log.Logf("[ssu] %s - %s: connection queue is full", raddr, l.Addr())
log.Logf("[ssu] %s - %s: connection queue is full (%d)", raddr, l.Addr(), cap(l.connChan))
}
}
@ -372,7 +382,7 @@ func (l *shadowUDPListener) listenLoop() {
log.Logf("[ssu] %s >>> %s : length %d", raddr, l.Addr(), n)
}
default:
log.Logf("[ssu] %s -> %s : read queue is full", raddr, l.Addr())
log.Logf("[ssu] %s -> %s : recv queue is full (%d)", raddr, l.Addr(), cap(conn.rChan))
}
}
}