diff --git a/cmd/gost/route.go b/cmd/gost/route.go index 359e81b..836c010 100644 --- a/cmd/gost/route.go +++ b/cmd/gost/route.go @@ -297,6 +297,11 @@ func (r *route) GenRouters() ([]router, error) { wsOpts.WriteBufferSize = node.GetInt("wbuf") wsOpts.Path = node.Get("path") + ttl, err := time.ParseDuration(node.Get("ttl")) + if err != nil { + ttl = time.Duration(node.GetInt("ttl")) * time.Second + } + var ln gost.Listener switch node.Transport { case "tls": @@ -361,11 +366,27 @@ func (r *route) GenRouters() ([]router, error) { } ln, err = gost.TCPRemoteForwardListener(node.Addr, chain) case "udp": - ln, err = gost.UDPDirectForwardListener(node.Addr, time.Duration(node.GetInt("ttl"))*time.Second) + ln, err = gost.UDPDirectForwardListener(node.Addr, &gost.UDPForwardListenConfig{ + TTL: ttl, + Backlog: node.GetInt("backlog"), + QueueSize: node.GetInt("queue"), + }) case "rudp": - ln, err = gost.UDPRemoteForwardListener(node.Addr, chain, time.Duration(node.GetInt("ttl"))*time.Second) + ln, err = gost.UDPRemoteForwardListener(node.Addr, + chain, + &gost.UDPForwardListenConfig{ + TTL: ttl, + Backlog: node.GetInt("backlog"), + QueueSize: node.GetInt("queue"), + }) case "ssu": - ln, err = gost.ShadowUDPListener(node.Addr, node.User, time.Duration(node.GetInt("ttl"))*time.Second) + ln, err = gost.ShadowUDPListener(node.Addr, + node.User, + &gost.UDPForwardListenConfig{ + TTL: ttl, + Backlog: node.GetInt("backlog"), + QueueSize: node.GetInt("queue"), + }) case "obfs4": if err = gost.Obfs4Init(node, true); err != nil { return nil, err diff --git a/forward.go b/forward.go index 1dab036..40c5154 100644 --- a/forward.go +++ b/forward.go @@ -372,16 +372,22 @@ func (m *udpConnMap) Size() int64 { return atomic.LoadInt64(&m.size) } +type UDPForwardListenConfig struct { + TTL time.Duration + Backlog int + QueueSize int +} + type udpDirectForwardListener struct { ln net.PacketConn connChan chan net.Conn errChan chan error - ttl time.Duration connMap udpConnMap + config *UDPForwardListenConfig } // UDPDirectForwardListener creates a Listener for UDP port forwarding server. -func UDPDirectForwardListener(addr string, ttl time.Duration) (Listener, error) { +func UDPDirectForwardListener(addr string, cfg *UDPForwardListenConfig) (Listener, error) { laddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, err @@ -390,11 +396,21 @@ func UDPDirectForwardListener(addr string, ttl time.Duration) (Listener, error) if err != nil { return nil, err } + + if cfg == nil { + cfg = &UDPForwardListenConfig{} + } + + backlog := cfg.Backlog + if backlog <= 0 { + backlog = defaultBacklog + } + l := &udpDirectForwardListener{ ln: ln, - connChan: make(chan net.Conn, 128), + connChan: make(chan net.Conn, backlog), errChan: make(chan error, 1), - ttl: ttl, + config: cfg, } go l.listenLoop() return l, nil @@ -414,7 +430,7 @@ func (l *udpDirectForwardListener) listenLoop() { conn, ok := l.connMap.Get(raddr.String()) if !ok { - conn = newUDPServerConn(l.ln, raddr, l.ttl) + conn = newUDPServerConn(l.ln, raddr, l.config.TTL, l.config.QueueSize) conn.onClose = func() { l.connMap.Delete(raddr.String()) log.Logf("[udp] %s closed (%d)", raddr, l.connMap.Size()) @@ -426,7 +442,7 @@ func (l *udpDirectForwardListener) listenLoop() { log.Logf("[udp] %s -> %s (%d)", raddr, l.Addr(), l.connMap.Size()) default: conn.Close() - log.Logf("[udp] %s - %s: connection queue is full", raddr, l.Addr()) + log.Logf("[udp] %s - %s: connection queue is full (%d)", raddr, l.Addr(), cap(l.connChan)) } } @@ -436,7 +452,7 @@ func (l *udpDirectForwardListener) listenLoop() { log.Logf("[udp] %s >>> %s : length %d", raddr, l.Addr(), n) } default: - log.Logf("[udp] %s -> %s : read queue is full", raddr, l.Addr()) + log.Logf("[udp] %s -> %s : recv queue is full (%d)", raddr, l.Addr(), cap(conn.rChan)) } } } @@ -478,11 +494,14 @@ type udpServerConn struct { onClose func() } -func newUDPServerConn(conn net.PacketConn, raddr net.Addr, ttl time.Duration) *udpServerConn { +func newUDPServerConn(conn net.PacketConn, raddr net.Addr, ttl time.Duration, qsize int) *udpServerConn { + if qsize <= 0 { + qsize = defaultQueueSize + } c := &udpServerConn{ conn: conn, raddr: raddr, - rChan: make(chan []byte, 128), + rChan: make(chan []byte, qsize), closed: make(chan struct{}), nopChan: make(chan int), ttl: ttl, @@ -853,22 +872,32 @@ type udpRemoteForwardListener struct { closed chan struct{} closeMux sync.Mutex once sync.Once + config *UDPForwardListenConfig } // UDPRemoteForwardListener creates a Listener for UDP remote port forwarding server. -func UDPRemoteForwardListener(addr string, chain *Chain, ttl time.Duration) (Listener, error) { +func UDPRemoteForwardListener(addr string, chain *Chain, cfg *UDPForwardListenConfig) (Listener, error) { laddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, err } + if cfg == nil { + cfg = &UDPForwardListenConfig{} + } + + backlog := cfg.Backlog + if backlog <= 0 { + backlog = defaultBacklog + } + ln := &udpRemoteForwardListener{ addr: laddr, chain: chain, - connChan: make(chan net.Conn, 128), + connChan: make(chan net.Conn, backlog), errChan: make(chan error, 1), - ttl: ttl, closed: make(chan struct{}), + config: cfg, } go ln.listenLoop() @@ -904,7 +933,7 @@ func (l *udpRemoteForwardListener) listenLoop() { uc, ok := l.connMap.Get(raddr.String()) if !ok { - uc = newUDPServerConn(conn, raddr, l.ttl) + uc = newUDPServerConn(conn, raddr, l.config.TTL, l.config.QueueSize) uc.onClose = func() { l.connMap.Delete(raddr.String()) log.Logf("[rudp] %s closed (%d)", raddr, l.connMap.Size()) @@ -916,7 +945,8 @@ func (l *udpRemoteForwardListener) listenLoop() { log.Logf("[rudp] %s -> %s (%d)", raddr, l.Addr(), l.connMap.Size()) default: uc.Close() - log.Logf("[rudp] %s - %s: connection queue is full", raddr, l.Addr()) + log.Logf("[rudp] %s - %s: connection queue is full (%d)", + raddr, l.Addr(), cap(l.connChan)) } } @@ -926,7 +956,7 @@ func (l *udpRemoteForwardListener) listenLoop() { log.Logf("[rudp] %s >>> %s : length %d", raddr, l.Addr(), n) } default: - log.Logf("[rudp] %s -> %s : write queue is full", raddr, l.Addr()) + log.Logf("[rudp] %s -> %s : recv queue is full", raddr, l.Addr(), cap(uc.rChan)) } } }() diff --git a/gost.go b/gost.go index cc07518..93bf8d8 100644 --- a/gost.go +++ b/gost.go @@ -68,7 +68,9 @@ var ( // PingRetries is the reties of ping. PingRetries = 1 // default udp node TTL in second for udp port forwarding. - defaultTTL = 60 * time.Second + defaultTTL = 60 * time.Second + defaultBacklog = 128 + defaultQueueSize = 128 ) var ( diff --git a/ss.go b/ss.go index f779d3d..9645e9e 100644 --- a/ss.go +++ b/ss.go @@ -302,10 +302,11 @@ type shadowUDPListener struct { errChan chan error ttl time.Duration connMap udpConnMap + config *UDPForwardListenConfig } // ShadowUDPListener creates a Listener for shadowsocks UDP relay server. -func ShadowUDPListener(addr string, cipher *url.Userinfo, ttl time.Duration) (Listener, error) { +func ShadowUDPListener(addr string, cipher *url.Userinfo, cfg *UDPForwardListenConfig) (Listener, error) { laddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, err @@ -326,11 +327,20 @@ func ShadowUDPListener(addr string, cipher *url.Userinfo, ttl time.Duration) (Li return nil, err } + if cfg == nil { + cfg = &UDPForwardListenConfig{} + } + + backlog := cfg.Backlog + if backlog <= 0 { + backlog = defaultBacklog + } + l := &shadowUDPListener{ ln: ss.NewSecurePacketConn(ln, cp, false), - connChan: make(chan net.Conn, 128), + connChan: make(chan net.Conn, backlog), errChan: make(chan error, 1), - ttl: ttl, + config: cfg, } go l.listenLoop() return l, nil @@ -350,7 +360,7 @@ func (l *shadowUDPListener) listenLoop() { conn, ok := l.connMap.Get(raddr.String()) if !ok { - conn = newUDPServerConn(l.ln, raddr, l.ttl) + conn = newUDPServerConn(l.ln, raddr, l.config.TTL, l.config.QueueSize) conn.onClose = func() { l.connMap.Delete(raddr.String()) log.Logf("[ssu] %s closed (%d)", raddr, l.connMap.Size()) @@ -362,7 +372,7 @@ func (l *shadowUDPListener) listenLoop() { log.Logf("[ssu] %s -> %s (%d)", raddr, l.Addr(), l.connMap.Size()) default: conn.Close() - log.Logf("[ssu] %s - %s: connection queue is full", raddr, l.Addr()) + log.Logf("[ssu] %s - %s: connection queue is full (%d)", raddr, l.Addr(), cap(l.connChan)) } } @@ -372,7 +382,7 @@ func (l *shadowUDPListener) listenLoop() { log.Logf("[ssu] %s >>> %s : length %d", raddr, l.Addr(), n) } default: - log.Logf("[ssu] %s -> %s : read queue is full", raddr, l.Addr()) + log.Logf("[ssu] %s -> %s : recv queue is full (%d)", raddr, l.Addr(), cap(conn.rChan)) } } }