add p2p exit on broken irc

This commit is contained in:
Zhixing Wang 2018-05-23 20:46:40 +01:00
parent 25d223c384
commit a7167ac467

91
p2p.go
View File

@ -8,6 +8,7 @@ import (
"github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go"
"gopkg.in/sorcix/irc.v2" "gopkg.in/sorcix/irc.v2"
"net" "net"
"os"
"sync" "sync"
"time" "time"
) )
@ -19,17 +20,44 @@ import (
// 3. P2PListener uses P2PSocket to accept incoming peers and their streams // 3. P2PListener uses P2PSocket to accept incoming peers and their streams
// connections could be interrupted, will drop any broken sessions with peers // connections could be interrupted, will drop any broken sessions with peers
// Proxy chain is not implemented. See comments below on p2pTransporter for more details. // Proxy chain is not implemented. See comments below on p2pTransporter for more details.
// (can only be the first node in a proxy chain)
// all-in-one config covering socket, transporter and listener
// the option 'peer' shouldn't belong here, but I'm too lazy
// to create a seperate config type for transporter XD
type P2PConfig struct {
// irc related
Peer string
User string
Pass string
Addr string
PingIntvl time.Duration
Timeout time.Duration
// others
StunAddr string
// quic will use default certs & timeouts and will always keepalive
// since the authentication is done on irc server
// and p2p connections need keepalive anyway
// (probably should be configurable, leave for now)
}
// p2p socket for dialing and accepting udp connections
type p2pSocket struct {
i *irc.Conn
c *P2PConfig
lastPong time.Time
}
// a helper wrapper for the Encode method // a helper wrapper for the Encode method
func ircSend(i *irc.Conn, command string, params ...string) error { func (p *p2pSocket) ircSend(command string, params ...string) error {
return i.Encoder.Encode(&irc.Message{ return p.i.Encoder.Encode(&irc.Message{
Command: command, Command: command,
Params: params, Params: params,
}) })
} }
// wait for the first msg satisfying condition, or timeout // wait for the first msg satisfying condition, or timeout
func ircRecv(i *irc.Conn, condition func(*irc.Message) bool, timeout time.Duration) ( func (p *p2pSocket) ircRecv(condition func(*irc.Message) bool, timeout time.Duration) (
msg *irc.Message, err error) { msg *irc.Message, err error) {
mChan := make(chan *irc.Message, 1) mChan := make(chan *irc.Message, 1)
@ -37,7 +65,7 @@ func ircRecv(i *irc.Conn, condition func(*irc.Message) bool, timeout time.Durati
go func() { go func() {
for { for {
msg, err := i.Decoder.Decode() msg, err := p.i.Decoder.Decode()
if err != nil { if err != nil {
eChan <- err eChan <- err
break break
@ -46,6 +74,9 @@ func ircRecv(i *irc.Conn, condition func(*irc.Message) bool, timeout time.Durati
eChan <- errors.New("[p2p] ircRecv error code " + msg.Command) eChan <- errors.New("[p2p] ircRecv error code " + msg.Command)
break break
} }
if msg.Command == "PONG" {
p.lastPong = time.Now()
}
if condition(msg) { if condition(msg) {
mChan <- msg mChan <- msg
break break
@ -71,10 +102,10 @@ func ircRecv(i *irc.Conn, condition func(*irc.Message) bool, timeout time.Durati
} }
// wait for a message from peer_ (leave blank for any) // wait for a message from peer_ (leave blank for any)
func ircWaitForPeer(i *irc.Conn, peer_ string, timeout time.Duration) ( func (p *p2pSocket) ircWaitForPeer(peer_ string, timeout time.Duration) (
peer string, peerAddr net.Addr, err error) { peer string, peerAddr net.Addr, err error) {
msg, err := ircRecv(i, func(msg *irc.Message) bool { msg, err := p.ircRecv(func(msg *irc.Message) bool {
return msg.Command == "PRIVMSG" && return msg.Command == "PRIVMSG" &&
(peer_ == msg.Name || peer_ == "") (peer_ == msg.Name || peer_ == "")
}, timeout) }, timeout)
@ -88,30 +119,6 @@ func ircWaitForPeer(i *irc.Conn, peer_ string, timeout time.Duration) (
return return
} }
// all-in-one config covering socket, transporter and listener
// the option 'peer' shouldn't belong here, but I'm too lazy
// to create a seperate config type for transporter XD
type P2PConfig struct {
// irc related
Peer string
User string
Pass string
Addr string
PingIntvl time.Duration
Timeout time.Duration
// others
StunAddr string
// quic will use default certs & timeouts and will always keepalive
// since the authentication is done on irc server
// and p2p connections need keepalive anyway
// (probably should be configurable, leave for now)
}
// p2p socket for dialing and accepting udp connections
type p2pSocket struct {
i *irc.Conn
c *P2PConfig
}
// create a new p2p socket, all configs can be left empty except for user // create a new p2p socket, all configs can be left empty except for user
// (and for peer if you are using the socket to dial) // (and for peer if you are using the socket to dial)
@ -149,19 +156,19 @@ func P2PSocket(c *P2PConfig) (p *p2pSocket, err error) {
} }
// irc login // irc login
err = ircSend(p.i, "PASS", p.c.Pass) err = p.ircSend("PASS", p.c.Pass)
if err != nil { if err != nil {
return return
} }
err = ircSend(p.i, "NICK", p.c.User) err = p.ircSend("NICK", p.c.User)
if err != nil { if err != nil {
return return
} }
err = ircSend(p.i, "USER", p.c.User, "*", "*", "*") err = p.ircSend("USER", p.c.User, "*", "*", "*")
if err != nil { if err != nil {
return return
} }
_, err = ircRecv(p.i, func(msg *irc.Message) bool { _, err = p.ircRecv(func(msg *irc.Message) bool {
return msg.Command == "MODE" return msg.Command == "MODE"
}, p.c.Timeout) }, p.c.Timeout)
if err != nil { if err != nil {
@ -169,14 +176,20 @@ func P2PSocket(c *P2PConfig) (p *p2pSocket, err error) {
} }
// irc keepalive (ping) // irc keepalive (ping)
p.lastPong = time.Now()
go func() { go func() {
for { for {
time.Sleep(p.c.PingIntvl) time.Sleep(p.c.PingIntvl)
err := ircSend(p.i, "PING", p.c.Addr) err := p.ircSend("PING", p.c.Addr)
if err != nil { if err != nil {
break break
} }
if time.Now().Sub(p.lastPong) > 2 * p.c.PingIntvl {
break
} }
}
log.Log("[p2p] irc broken")
os.Exit(1)
}() }()
log.Log("[p2p] irc ready") log.Log("[p2p] irc ready")
@ -197,12 +210,12 @@ func (p *p2pSocket) Dial() (conn net.PacketConn, peerAddr net.Addr, err error) {
return return
} }
err = ircSend(p.i, "PRIVMSG", p.c.Peer, addr.String()) err = p.ircSend("PRIVMSG", p.c.Peer, addr.String())
if err != nil { if err != nil {
return return
} }
_, peerAddr, err = ircWaitForPeer(p.i, p.c.Peer, p.c.Timeout) _, peerAddr, err = p.ircWaitForPeer(p.c.Peer, p.c.Timeout)
if err == nil { if err == nil {
log.Log("[p2p] irc connected with", p.c.Peer, "at", peerAddr) log.Log("[p2p] irc connected with", p.c.Peer, "at", peerAddr)
} }
@ -214,7 +227,7 @@ func (p *p2pSocket) Dial() (conn net.PacketConn, peerAddr net.Addr, err error) {
func (p *p2pSocket) Accept() (conn net.PacketConn, err error) { func (p *p2pSocket) Accept() (conn net.PacketConn, err error) {
// wait indefintely for an unknown peer :) // wait indefintely for an unknown peer :)
peer, peerAddr, err := ircWaitForPeer(p.i, "", 0) peer, peerAddr, err := p.ircWaitForPeer("", 0)
if err != nil { if err != nil {
return return
} }
@ -224,7 +237,7 @@ func (p *p2pSocket) Accept() (conn net.PacketConn, err error) {
return return
} }
err = ircSend(p.i, "PRIVMSG", peer, addr.String()) err = p.ircSend("PRIVMSG", peer, addr.String())
if err != nil { if err != nil {
return return
} }