From f6433a99bc6e156e4e09bdd50d7a07ac68f99711 Mon Sep 17 00:00:00 2001 From: "rui.zheng" Date: Sun, 18 Mar 2018 19:33:48 +0800 Subject: [PATCH] rtcp over SOCKS5 now support multiplexing --- forward.go | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++--- mux.go | 8 +++++++ socks.go | 24 +++++++++++++++------ 3 files changed, 84 insertions(+), 9 deletions(-) diff --git a/forward.go b/forward.go index f5d3592..718c0a8 100644 --- a/forward.go +++ b/forward.go @@ -10,6 +10,7 @@ import ( "github.com/ginuerzh/gosocks5" "github.com/go-log/log" + smux "gopkg.in/xtaci/smux.v1" ) type forwardConnector struct { @@ -426,7 +427,6 @@ type tcpRemoteForwardListener struct { chain *Chain ln net.Listener session *muxSession - once sync.Once mutex sync.Mutex closed chan struct{} } @@ -502,11 +502,66 @@ func (l *tcpRemoteForwardListener) accept() (conn net.Conn, err error) { } func (l *tcpRemoteForwardListener) muxAccept() (conn net.Conn, err error) { + session, err := l.getSession() + if err != nil { + return nil, err + } + cc, err := session.Accept() + if err != nil { + session.Close() + return nil, err + } + + return cc, nil +} + +func (l *tcpRemoteForwardListener) getSession() (*muxSession, error) { l.mutex.Lock() defer l.mutex.Unlock() - - return nil, nil + if l.session != nil && !l.session.IsClosed() { + return l.session, nil + } + + conn, err := l.chain.Conn() + if err != nil { + return nil, err + } + + conn, err = socks5Handshake(conn, l.chain.LastNode().User) + if err != nil { + return nil, err + } + req := gosocks5.NewRequest(CmdMuxBind, toSocksAddr(l.addr)) + if err := req.Write(conn); err != nil { + log.Log("[rtcp] SOCKS5 BIND request: ", err) + return nil, err + } + + conn.SetReadDeadline(time.Now().Add(ReadTimeout)) + rep, err := gosocks5.ReadReply(conn) + if err != nil { + log.Log("[rtcp] SOCKS5 BIND reply: ", err) + return nil, err + } + conn.SetReadDeadline(time.Time{}) + if rep.Rep != gosocks5.Succeeded { + log.Logf("[rtcp] bind on %s failure", l.addr) + return nil, fmt.Errorf("Bind on %s failure", l.addr.String()) + } + log.Logf("[rtcp] BIND ON %s OK", rep.Addr) + + // Upgrade connection to multiplex stream. + session, err := smux.Server(conn, smux.DefaultConfig()) + if err != nil { + return nil, err + } + l.session = &muxSession{ + conn: conn, + session: session, + } + + return l.session, nil } func (l *tcpRemoteForwardListener) waitConnectSOCKS5(conn net.Conn) (net.Conn, error) { diff --git a/mux.go b/mux.go index c84463d..8bcea11 100644 --- a/mux.go +++ b/mux.go @@ -36,6 +36,14 @@ func (session *muxSession) GetConn() (net.Conn, error) { return &muxStreamConn{Conn: session.conn, stream: stream}, nil } +func (session *muxSession) Accept() (net.Conn, error) { + stream, err := session.session.AcceptStream() + if err != nil { + return nil, err + } + return &muxStreamConn{Conn: session.conn, stream: stream}, nil +} + func (session *muxSession) Close() error { return session.session.Close() } diff --git a/socks.go b/socks.go index 156cf04..65bfc1f 100644 --- a/socks.go +++ b/socks.go @@ -27,9 +27,9 @@ const ( ) const ( - // CMDMuxBind is an extended SOCKS5 request CMD for + // CmdMuxBind is an extended SOCKS5 request CMD for // multiplexing transport with the binding server. - CMDMuxBind uint8 = 0xF2 + CmdMuxBind uint8 = 0xF2 // CmdUDPTun is an extended SOCKS5 request CMD for UDP over TCP. CmdUDPTun uint8 = 0xF3 ) @@ -397,7 +397,7 @@ func (h *socks5Handler) Handle(conn net.Conn) { case gosocks5.CmdUdp: h.handleUDPRelay(conn, req) - case CMDMuxBind: + case CmdMuxBind: h.handleMuxBind(conn, req) case CmdUDPTun: @@ -1019,14 +1019,26 @@ func (h *socks5Handler) muxBindOn(conn net.Conn, addr string) { conn: conn, session: s, } + defer session.Close() + + go func() { + for { + conn, err := session.Accept() + if err != nil { + ln.Close() + return + } + conn.Close() // we do not handle incoming connection. + } + }() for { cc, err := ln.Accept() if err != nil { - log.Logf("[socks5-mbind] %s <- %s : %v", conn.RemoteAddr(), socksAddr, err) + // log.Logf("[socks5-mbind] %s <- %s : %v", conn.RemoteAddr(), socksAddr, err) return } - log.Logf("[socks5-mbind %s <- %s : ACCEPT peer %s", + log.Logf("[socks5-mbind] %s <- %s : ACCEPT peer %s", conn.RemoteAddr(), socksAddr, cc.RemoteAddr()) go func(c net.Conn) { @@ -1034,7 +1046,7 @@ func (h *socks5Handler) muxBindOn(conn net.Conn, addr string) { sc, err := session.GetConn() if err != nil { - log.Logf("[socks5-mbind %s <- %s : %s", conn.RemoteAddr(), socksAddr, err) + log.Logf("[socks5-mbind] %s <- %s : %s", conn.RemoteAddr(), socksAddr, err) return } transport(sc, c)