From e12336fc3fdb8e96fa1b540a151591cfb68452b9 Mon Sep 17 00:00:00 2001 From: "rui.zheng" Date: Wed, 14 Sep 2016 22:11:29 +0800 Subject: [PATCH] update udp local port forwarding policy --- conn.go | 137 ++++++++++++++++++------------------------ forward.go | 172 +++++++++++++++++++++++++---------------------------- 2 files changed, 139 insertions(+), 170 deletions(-) diff --git a/conn.go b/conn.go index 0bb89b3..9f91681 100644 --- a/conn.go +++ b/conn.go @@ -35,7 +35,7 @@ var ( // udp buffer pool udpPool = sync.Pool{ New: func() interface{} { - return make([]byte, 64*1024+262) + return make([]byte, 32*1024) }, } ) @@ -108,35 +108,6 @@ func listenAndServeTcpForward(arg Args) error { } } -func prepareUdpConnectTunnel(addr net.Addr) (net.Conn, error) { - conn, _, err := forwardChain(forwardArgs...) - if err != nil { - return nil, err - } - - conn.SetWriteDeadline(time.Now().Add(time.Second * 90)) - if err = gosocks5.NewRequest(CmdUdpConnect, ToSocksAddr(addr)).Write(conn); err != nil { - conn.Close() - return nil, err - } - conn.SetWriteDeadline(time.Time{}) - - conn.SetReadDeadline(time.Now().Add(90 * time.Second)) - reply, err := gosocks5.ReadReply(conn) - if err != nil { - conn.Close() - return nil, err - } - conn.SetReadDeadline(time.Time{}) - - if reply.Rep != gosocks5.Succeeded { - conn.Close() - return nil, errors.New("udp connect failure") - } - - return conn, nil -} - func listenAndServeUdpForward(arg Args) error { laddr, err := net.ResolveUDPAddr("udp", arg.Addr) if err != nil { @@ -148,55 +119,65 @@ func listenAndServeUdpForward(arg Args) error { return err } + conn, err := net.ListenUDP("udp", laddr) + if err != nil { + glog.V(LWARNING).Infof("[udp-connect] %s -> %s : %s", laddr, raddr, err) + return err + } + defer conn.Close() + + if len(forwardArgs) == 0 { + for { + b := udpPool.Get().([]byte) + + n, addr, err := conn.ReadFromUDP(b) + if err != nil { + glog.V(LWARNING).Infof("[udp-connect] %s -> %s : %s", laddr, raddr, err) + continue + } + go func() { + handleUdpForwardLocal(conn, addr, raddr, b[:n]) + udpPool.Put(b) + }() + } + } + + rChan, wChan := make(chan *gosocks5.UDPDatagram, 32), make(chan *gosocks5.UDPDatagram, 32) + + go func() { + for { + b := make([]byte, 32*1024) + n, addr, err := conn.ReadFromUDP(b) + if err != nil { + glog.V(LWARNING).Infof("[udp-connect] %s -> %s : %s", laddr, raddr, err) + return + } + + select { + case rChan <- gosocks5.NewUDPDatagram(gosocks5.NewUDPHeader(uint16(n), 0, ToSocksAddr(addr)), b[:n]): + default: + // glog.V(LWARNING).Infof("[udp-connect] %s -> %s : rbuf is full", laddr, raddr) + } + } + }() + + go func() { + for { + dgram := <-wChan + addr, err := net.ResolveUDPAddr("udp", dgram.Header.Addr.String()) + if err != nil { + glog.V(LWARNING).Infof("[udp-connect] %s <- %s : %s", laddr, raddr, err) + continue // drop silently + } + if _, err = conn.WriteToUDP(dgram.Data, addr); err != nil { + glog.V(LWARNING).Infof("[udp-connect] %s <- %s : %s", laddr, raddr, err) + return + } + } + }() + for { - var conn *net.UDPConn - - for { - conn, err = net.ListenUDP("udp", laddr) - if err != nil { - glog.V(LWARNING).Infof("[udp-connect] %s -> %s : %s", laddr, raddr, err) - time.Sleep((1) * time.Second) - continue - } - break - } - - if len(forwardArgs) == 0 { - defer conn.Close() - - for { - b := udpPool.Get().([]byte) - - n, addr, err := conn.ReadFromUDP(b) - if err != nil { - glog.V(LWARNING).Infoln(err) - continue - } - go func() { - handleUdpForwardLocal(conn, addr, raddr, b[:n]) - udpPool.Put(b) - }() - } - } - - var tun net.Conn - retry := 0 - for { - tun, err = prepareUdpConnectTunnel(raddr) - if err != nil { - glog.V(LWARNING).Infof("[udp-connect] %s -> %s : %s", laddr, raddr, err) - time.Sleep((1 << uint(retry)) * time.Second) - if retry < 5 { - retry++ - } - continue - } - break - } - glog.V(LWARNING).Infof("[udp-connect] %s <-> %s", laddr, raddr) - tunnelUDP(conn, tun, false) - glog.V(LWARNING).Infof("[udp-connect] %s >-< %s", laddr, raddr) - conn.Close() + handleUdpForwardTunnel(laddr, raddr, rChan, wChan) } } diff --git a/forward.go b/forward.go index e50738a..d35e6d8 100644 --- a/forward.go +++ b/forward.go @@ -6,7 +6,6 @@ import ( "github.com/ginuerzh/gosocks5" "github.com/golang/glog" "net" - "strings" "time" ) @@ -56,109 +55,98 @@ func handleUdpForwardLocal(conn *net.UDPConn, laddr, raddr *net.UDPAddr, data [] return } -func handleUdpForward(conn *net.UDPConn, raddr *net.UDPAddr, data []byte, arg Args) { - if !strings.Contains(arg.Remote, ":") { - arg.Remote += ":53" // default is dns service - } - - faddr, err := net.ResolveUDPAddr("udp", arg.Remote) +func prepareUdpConnectTunnel(addr net.Addr) (net.Conn, error) { + conn, _, err := forwardChain(forwardArgs...) if err != nil { - glog.V(LWARNING).Infof("[udp-forward] %s -> %s : %s", raddr, arg.Remote, err) - return + return nil, err } - glog.V(LINFO).Infof("[udp-forward] %s - %s", raddr, faddr) + conn.SetWriteDeadline(time.Now().Add(time.Second * 90)) + if err = gosocks5.NewRequest(CmdUdpConnect, ToSocksAddr(addr)).Write(conn); err != nil { + conn.Close() + return nil, err + } + conn.SetWriteDeadline(time.Time{}) - if len(forwardArgs) == 0 { - lconn, err := net.ListenUDP("udp", nil) + conn.SetReadDeadline(time.Now().Add(90 * time.Second)) + reply, err := gosocks5.ReadReply(conn) + if err != nil { + conn.Close() + return nil, err + } + conn.SetReadDeadline(time.Time{}) + + if reply.Rep != gosocks5.Succeeded { + conn.Close() + return nil, errors.New("failure") + } + + return conn, nil +} + +func handleUdpForwardTunnel(laddr, raddr *net.UDPAddr, rChan, wChan chan *gosocks5.UDPDatagram) { + var tun net.Conn + var err error + retry := 0 + for { + tun, err = prepareUdpConnectTunnel(raddr) if err != nil { - glog.V(LWARNING).Infof("[udp-forward] %s -> %s : %s", raddr, arg.Remote, err) - return + glog.V(LWARNING).Infof("[udp-connect] %s -> %s : %s", laddr, raddr, err) + time.Sleep((1 << uint(retry)) * time.Second) + if retry < 5 { + retry++ + } + continue } - defer lconn.Close() + break + } - if _, err := lconn.WriteToUDP(data, faddr); err != nil { - glog.V(LWARNING).Infof("[udp-forward] %s -> %s : %s", raddr, arg.Remote, err) - return + glog.V(LINFO).Infof("[udp-connect] %s <-> %s", laddr, raddr) + + rExit := make(chan interface{}) + rErr, wErr := make(chan error, 1), make(chan error, 1) + + go func() { + for { + select { + case dgram := <-rChan: + if err := dgram.Write(tun); err != nil { + glog.V(LWARNING).Infof("[udp-connect] %s -> %s : %s", laddr, raddr, err) + rErr <- err + return + } + glog.V(LDEBUG).Infof("[udp-tun] %s >>> %s length: %d", laddr, raddr, len(dgram.Data)) + case <-rExit: + // glog.V(LDEBUG).Infof("[udp-connect] %s -> %s : exited", laddr, raddr) + return + } } - glog.V(LDEBUG).Infof("[udp-forward] %s >>> %s length %d", raddr, arg.Remote, len(data)) + }() + go func() { + for { + dgram, err := gosocks5.ReadUDPDatagram(tun) + if err != nil { + glog.V(LWARNING).Infof("[udp-connect] %s <- %s : %s", laddr, raddr, err) + close(rExit) + wErr <- err + return + } - b := udpPool.Get().([]byte) - defer udpPool.Put(b) - lconn.SetReadDeadline(time.Now().Add(time.Second * 60)) - n, addr, err := lconn.ReadFromUDP(b) - if err != nil { - glog.V(LWARNING).Infof("[udp-forward] %s <- %s : %s", raddr, arg.Remote, err) - return + select { + case wChan <- dgram: + glog.V(LDEBUG).Infof("[udp-tun] %s <<< %s length: %d", laddr, raddr, len(dgram.Data)) + default: + } } - glog.V(LDEBUG).Infof("[udp-forward] %s <<< %s length %d", raddr, addr, n) + }() - if _, err := conn.WriteToUDP(b[:n], raddr); err != nil { - glog.V(LWARNING).Infof("[udp-forward] %s <- %s : %s", raddr, arg.Remote, err) - } - glog.V(LINFO).Infof("[udp-forward] %s >-< %s", raddr, arg.Remote) - return + select { + case <-rErr: + //log.Println("w exit", err) + case <-wErr: + //log.Println("r exit", err) } - - tun, _, err := forwardChain(forwardArgs...) - if err != nil { - glog.V(LWARNING).Infof("[udp-forward] %s -> %s : %s", raddr, arg.Remote, err) - return - } - defer tun.Close() - - glog.V(LINFO).Infof("[udp-forward] %s -> %s ASSOCIATE", raddr, arg.Remote) - - req := gosocks5.NewRequest(CmdUdpTun, nil) - tun.SetWriteDeadline(time.Now().Add(time.Second * 90)) - if err = req.Write(tun); err != nil { - glog.V(LWARNING).Infof("[udp-forward] %s -> %s ASSOCIATE : %s", raddr, arg.Remote, err) - return - } - tun.SetWriteDeadline(time.Time{}) - glog.V(LDEBUG).Infof("[udp-forward] %s -> %s\n%s", raddr, arg.Remote, req) - - tun.SetReadDeadline(time.Now().Add(90 * time.Second)) - rep, err := gosocks5.ReadReply(tun) - if err != nil { - glog.V(LWARNING).Infof("[udp-forward] %s <- %s ASSOCIATE : %s", raddr, arg.Remote, err) - return - } - tun.SetReadDeadline(time.Time{}) - - glog.V(LDEBUG).Infof("[udp-forward] %s <- %s\n%s", raddr, arg.Remote, rep) - if rep.Rep != gosocks5.Succeeded { - glog.V(LWARNING).Infof("[udp-forward] %s <- %s ASSOCIATE failured", raddr, arg.Remote) - return - } - glog.V(LINFO).Infof("[udp-forward] %s <-> %s ASSOCIATE ON %s", raddr, arg.Remote, rep.Addr) - - dgram := gosocks5.NewUDPDatagram( - gosocks5.NewUDPHeader(uint16(len(data)), 0, ToSocksAddr(faddr)), data) - - tun.SetWriteDeadline(time.Now().Add(time.Second * 90)) - if err = dgram.Write(tun); err != nil { - glog.V(LWARNING).Infof("[udp-forward] %s -> %s : %s", raddr, arg.Remote, err) - return - } - tun.SetWriteDeadline(time.Time{}) - glog.V(LDEBUG).Infof("[udp-forward] %s >>> %s length %d", raddr, arg.Remote, len(data)) - - tun.SetReadDeadline(time.Now().Add(time.Second * 90)) - dgram, err = gosocks5.ReadUDPDatagram(tun) - if err != nil { - glog.V(LWARNING).Infof("[udp-forward] %s <- %s : %s", raddr, arg.Remote, err) - return - } - tun.SetReadDeadline(time.Time{}) - glog.V(LDEBUG).Infof("[udp-forward] %s <<< %s length %d", raddr, dgram.Header.Addr, len(dgram.Data)) - - if _, err = conn.WriteToUDP(dgram.Data, raddr); err != nil { - glog.V(LWARNING).Infof("[udp-forward] %s <- %s : %s", raddr, arg.Remote, err) - } - - // NOTE: for now we only get one response from peer - glog.V(LINFO).Infof("[udp-forward] %s >-< %s", raddr, arg.Remote) + glog.V(LINFO).Infof("[udp-connect] %s >-< %s", laddr, raddr) } func connectRTcpForward(conn net.Conn, arg Args) error {