From a7167ac467d406559ce87eb7b1f4dc0044bb5315 Mon Sep 17 00:00:00 2001 From: Zhixing Wang Date: Wed, 23 May 2018 20:46:40 +0100 Subject: [PATCH] add p2p exit on broken irc --- p2p.go | 91 +++++++++++++++++++++++++++++++++------------------------- 1 file changed, 52 insertions(+), 39 deletions(-) diff --git a/p2p.go b/p2p.go index 542c435..7d1735f 100644 --- a/p2p.go +++ b/p2p.go @@ -8,6 +8,7 @@ import ( "github.com/lucas-clemente/quic-go" "gopkg.in/sorcix/irc.v2" "net" + "os" "sync" "time" ) @@ -19,17 +20,44 @@ import ( // 3. P2PListener uses P2PSocket to accept incoming peers and their streams // connections could be interrupted, will drop any broken sessions with peers // Proxy chain is not implemented. See comments below on p2pTransporter for more details. +// (can only be the first node in a proxy chain) + +// all-in-one config covering socket, transporter and listener +// the option 'peer' shouldn't belong here, but I'm too lazy +// to create a seperate config type for transporter XD +type P2PConfig struct { + // irc related + Peer string + User string + Pass string + Addr string + PingIntvl time.Duration + Timeout time.Duration + // others + StunAddr string + // quic will use default certs & timeouts and will always keepalive + // since the authentication is done on irc server + // and p2p connections need keepalive anyway + // (probably should be configurable, leave for now) +} + +// p2p socket for dialing and accepting udp connections +type p2pSocket struct { + i *irc.Conn + c *P2PConfig + lastPong time.Time +} // a helper wrapper for the Encode method -func ircSend(i *irc.Conn, command string, params ...string) error { - return i.Encoder.Encode(&irc.Message{ +func (p *p2pSocket) ircSend(command string, params ...string) error { + return p.i.Encoder.Encode(&irc.Message{ Command: command, Params: params, }) } // wait for the first msg satisfying condition, or timeout -func ircRecv(i *irc.Conn, condition func(*irc.Message) bool, timeout time.Duration) ( +func (p *p2pSocket) ircRecv(condition func(*irc.Message) bool, timeout time.Duration) ( msg *irc.Message, err error) { mChan := make(chan *irc.Message, 1) @@ -37,7 +65,7 @@ func ircRecv(i *irc.Conn, condition func(*irc.Message) bool, timeout time.Durati go func() { for { - msg, err := i.Decoder.Decode() + msg, err := p.i.Decoder.Decode() if err != nil { eChan <- err break @@ -46,6 +74,9 @@ func ircRecv(i *irc.Conn, condition func(*irc.Message) bool, timeout time.Durati eChan <- errors.New("[p2p] ircRecv error code " + msg.Command) break } + if msg.Command == "PONG" { + p.lastPong = time.Now() + } if condition(msg) { mChan <- msg break @@ -71,10 +102,10 @@ func ircRecv(i *irc.Conn, condition func(*irc.Message) bool, timeout time.Durati } // wait for a message from peer_ (leave blank for any) -func ircWaitForPeer(i *irc.Conn, peer_ string, timeout time.Duration) ( +func (p *p2pSocket) ircWaitForPeer(peer_ string, timeout time.Duration) ( peer string, peerAddr net.Addr, err error) { - msg, err := ircRecv(i, func(msg *irc.Message) bool { + msg, err := p.ircRecv(func(msg *irc.Message) bool { return msg.Command == "PRIVMSG" && (peer_ == msg.Name || peer_ == "") }, timeout) @@ -88,30 +119,6 @@ func ircWaitForPeer(i *irc.Conn, peer_ string, timeout time.Duration) ( return } -// all-in-one config covering socket, transporter and listener -// the option 'peer' shouldn't belong here, but I'm too lazy -// to create a seperate config type for transporter XD -type P2PConfig struct { - // irc related - Peer string - User string - Pass string - Addr string - PingIntvl time.Duration - Timeout time.Duration - // others - StunAddr string - // quic will use default certs & timeouts and will always keepalive - // since the authentication is done on irc server - // and p2p connections need keepalive anyway - // (probably should be configurable, leave for now) -} - -// p2p socket for dialing and accepting udp connections -type p2pSocket struct { - i *irc.Conn - c *P2PConfig -} // create a new p2p socket, all configs can be left empty except for user // (and for peer if you are using the socket to dial) @@ -149,19 +156,19 @@ func P2PSocket(c *P2PConfig) (p *p2pSocket, err error) { } // irc login - err = ircSend(p.i, "PASS", p.c.Pass) + err = p.ircSend("PASS", p.c.Pass) if err != nil { return } - err = ircSend(p.i, "NICK", p.c.User) + err = p.ircSend("NICK", p.c.User) if err != nil { return } - err = ircSend(p.i, "USER", p.c.User, "*", "*", "*") + err = p.ircSend("USER", p.c.User, "*", "*", "*") if err != nil { return } - _, err = ircRecv(p.i, func(msg *irc.Message) bool { + _, err = p.ircRecv(func(msg *irc.Message) bool { return msg.Command == "MODE" }, p.c.Timeout) if err != nil { @@ -169,14 +176,20 @@ func P2PSocket(c *P2PConfig) (p *p2pSocket, err error) { } // irc keepalive (ping) + p.lastPong = time.Now() go func() { for { time.Sleep(p.c.PingIntvl) - err := ircSend(p.i, "PING", p.c.Addr) + err := p.ircSend("PING", p.c.Addr) if err != nil { break } + if time.Now().Sub(p.lastPong) > 2 * p.c.PingIntvl { + break + } } + log.Log("[p2p] irc broken") + os.Exit(1) }() log.Log("[p2p] irc ready") @@ -197,12 +210,12 @@ func (p *p2pSocket) Dial() (conn net.PacketConn, peerAddr net.Addr, err error) { return } - err = ircSend(p.i, "PRIVMSG", p.c.Peer, addr.String()) + err = p.ircSend("PRIVMSG", p.c.Peer, addr.String()) if err != nil { return } - _, peerAddr, err = ircWaitForPeer(p.i, p.c.Peer, p.c.Timeout) + _, peerAddr, err = p.ircWaitForPeer(p.c.Peer, p.c.Timeout) if err == nil { log.Log("[p2p] irc connected with", p.c.Peer, "at", peerAddr) } @@ -214,7 +227,7 @@ func (p *p2pSocket) Dial() (conn net.PacketConn, peerAddr net.Addr, err error) { func (p *p2pSocket) Accept() (conn net.PacketConn, err error) { // wait indefintely for an unknown peer :) - peer, peerAddr, err := ircWaitForPeer(p.i, "", 0) + peer, peerAddr, err := p.ircWaitForPeer("", 0) if err != nil { return } @@ -224,7 +237,7 @@ func (p *p2pSocket) Accept() (conn net.PacketConn, err error) { return } - err = ircSend(p.i, "PRIVMSG", peer, addr.String()) + err = p.ircSend("PRIVMSG", peer, addr.String()) if err != nil { return }