merge branch ping

This commit is contained in:
rui.zheng 2017-01-14 13:51:16 +08:00
commit 14561f9ee2
14 changed files with 996 additions and 281 deletions

View File

@ -1,19 +1,21 @@
package gost package gost
import ( import (
"crypto/rand"
"crypto/tls" "crypto/tls"
"encoding/base64" "encoding/base64"
"errors" "errors"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/http2" "golang.org/x/net/http2"
"io" "io"
//"io/ioutil"
"net" "net"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url" "net/url"
"strconv"
"strings" "strings"
"sync" "sync"
"time"
) )
// Proxy chain holds a list of proxy nodes // Proxy chain holds a list of proxy nodes
@ -134,7 +136,15 @@ func (c *ProxyChain) initHttp2Client(config *tls.Config, nodes ...ProxyNode) {
if err != nil { if err != nil {
return conn, err return conn, err
} }
return tls.Client(conn, cfg), nil conn = tls.Client(conn, cfg)
// enable HTTP2 ping-pong
pingIntvl, _ := strconv.Atoi(http2Node.Get("ping"))
if pingIntvl > 0 {
enablePing(conn, time.Duration(pingIntvl)*time.Second)
}
return conn, nil
}, },
} }
c.http2Client = &http.Client{Transport: &tr} c.http2Client = &http.Client{Transport: &tr}
@ -142,6 +152,36 @@ func (c *ProxyChain) initHttp2Client(config *tls.Config, nodes ...ProxyNode) {
} }
func enablePing(conn net.Conn, interval time.Duration) {
if conn == nil || interval == 0 {
return
}
glog.V(LINFO).Infoln("[http2] ping enabled, interval:", interval)
go func() {
t := time.NewTicker(interval)
var framer *http2.Framer
for {
select {
case <-t.C:
if framer == nil {
framer = http2.NewFramer(conn, conn)
}
var p [8]byte
rand.Read(p[:])
err := framer.WritePing(false, p)
if err != nil {
t.Stop()
framer = nil
glog.V(LWARNING).Infoln("[http2] ping:", err)
return
}
}
}
}()
}
// Connect to addr through proxy chain // Connect to addr through proxy chain
func (c *ProxyChain) Dial(addr string) (net.Conn, error) { func (c *ProxyChain) Dial(addr string) (net.Conn, error) {
if !strings.Contains(addr, ":") { if !strings.Contains(addr, ":") {

View File

@ -1,19 +1,21 @@
package gost package gost
import ( import (
"crypto/rand"
"crypto/tls" "crypto/tls"
"encoding/base64" "encoding/base64"
"errors" "errors"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/http2" "golang.org/x/net/http2"
"io" "io"
//"io/ioutil"
"net" "net"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url" "net/url"
"strconv"
"strings" "strings"
"sync" "sync"
"time"
) )
// Proxy chain holds a list of proxy nodes // Proxy chain holds a list of proxy nodes
@ -134,7 +136,15 @@ func (c *ProxyChain) initHttp2Client(config *tls.Config, nodes ...ProxyNode) {
if err != nil { if err != nil {
return conn, err return conn, err
} }
return tls.Client(conn, cfg), nil conn = tls.Client(conn, cfg)
// enable HTTP2 ping-pong
pingIntvl, _ := strconv.Atoi(http2Node.Get("ping"))
if pingIntvl > 0 {
enablePing(conn, time.Duration(pingIntvl)*time.Second)
}
return conn, nil
}, },
} }
c.http2Client = &http.Client{Transport: &tr} c.http2Client = &http.Client{Transport: &tr}
@ -142,6 +152,37 @@ func (c *ProxyChain) initHttp2Client(config *tls.Config, nodes ...ProxyNode) {
} }
func enablePing(conn net.Conn, interval time.Duration) {
if conn == nil || interval == 0 {
return
}
glog.V(LINFO).Infoln("[http2] ping enabled, interval:", interval)
go func() {
t := time.NewTicker(interval)
var framer *http2.Framer
for {
select {
case <-t.C:
if framer == nil {
framer = http2.NewFramer(conn, conn)
}
var p [8]byte
rand.Read(p[:])
err := framer.WritePing(false, p)
if err != nil {
t.Stop()
framer = nil
glog.V(LWARNING).Infoln("[http2] ping:", err)
return
}
glog.V(LINFO).Infoln("[http2] ping OK")
}
}
}()
}
// Connect to addr through proxy chain // Connect to addr through proxy chain
func (c *ProxyChain) Dial(addr string) (net.Conn, error) { func (c *ProxyChain) Dial(addr string) (net.Conn, error) {
if !strings.Contains(addr, ":") { if !strings.Contains(addr, ":") {

View File

@ -11,7 +11,7 @@ import (
) )
const ( const (
Version = "2.3-rc1" Version = "2.3-rc2"
) )
// Log level for glog // Log level for glog

View File

@ -317,10 +317,12 @@ type Framer struct {
// non-Continuation or Continuation on a different stream is // non-Continuation or Continuation on a different stream is
// attempted to be written. // attempted to be written.
logReads bool logReads, logWrites bool
debugFramer *Framer // only use for logging written writes debugFramer *Framer // only use for logging written writes
debugFramerBuf *bytes.Buffer debugFramerBuf *bytes.Buffer
debugReadLoggerf func(string, ...interface{})
debugWriteLoggerf func(string, ...interface{})
} }
func (fr *Framer) maxHeaderListSize() uint32 { func (fr *Framer) maxHeaderListSize() uint32 {
@ -355,7 +357,7 @@ func (f *Framer) endWrite() error {
byte(length>>16), byte(length>>16),
byte(length>>8), byte(length>>8),
byte(length)) byte(length))
if logFrameWrites { if f.logWrites {
f.logWrite() f.logWrite()
} }
@ -378,10 +380,10 @@ func (f *Framer) logWrite() {
f.debugFramerBuf.Write(f.wbuf) f.debugFramerBuf.Write(f.wbuf)
fr, err := f.debugFramer.ReadFrame() fr, err := f.debugFramer.ReadFrame()
if err != nil { if err != nil {
log.Printf("http2: Framer %p: failed to decode just-written frame", f) f.debugWriteLoggerf("http2: Framer %p: failed to decode just-written frame", f)
return return
} }
log.Printf("http2: Framer %p: wrote %v", f, summarizeFrame(fr)) f.debugWriteLoggerf("http2: Framer %p: wrote %v", f, summarizeFrame(fr))
} }
func (f *Framer) writeByte(v byte) { f.wbuf = append(f.wbuf, v) } func (f *Framer) writeByte(v byte) { f.wbuf = append(f.wbuf, v) }
@ -402,6 +404,9 @@ func NewFramer(w io.Writer, r io.Reader) *Framer {
w: w, w: w,
r: r, r: r,
logReads: logFrameReads, logReads: logFrameReads,
logWrites: logFrameWrites,
debugReadLoggerf: log.Printf,
debugWriteLoggerf: log.Printf,
} }
fr.getReadBuf = func(size uint32) []byte { fr.getReadBuf = func(size uint32) []byte {
if cap(fr.readBuf) >= int(size) { if cap(fr.readBuf) >= int(size) {
@ -483,7 +488,7 @@ func (fr *Framer) ReadFrame() (Frame, error) {
return nil, err return nil, err
} }
if fr.logReads { if fr.logReads {
log.Printf("http2: Framer %p: read %v", fr, summarizeFrame(f)) fr.debugReadLoggerf("http2: Framer %p: read %v", fr, summarizeFrame(f))
} }
if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil { if fh.Type == FrameHeaders && fr.ReadMetaHeaders != nil {
return fr.readMetaFrame(f.(*HeadersFrame)) return fr.readMetaFrame(f.(*HeadersFrame))
@ -1419,8 +1424,8 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) {
hdec.SetEmitEnabled(true) hdec.SetEmitEnabled(true)
hdec.SetMaxStringLength(fr.maxHeaderStringLen()) hdec.SetMaxStringLength(fr.maxHeaderStringLen())
hdec.SetEmitFunc(func(hf hpack.HeaderField) { hdec.SetEmitFunc(func(hf hpack.HeaderField) {
if VerboseLogs && logFrameReads { if VerboseLogs && fr.logReads {
log.Printf("http2: decoded hpack field %+v", hf) fr.debugReadLoggerf("http2: decoded hpack field %+v", hf)
} }
if !httplex.ValidHeaderFieldValue(hf.Value) { if !httplex.ValidHeaderFieldValue(hf.Value) {
invalid = headerFieldValueError(hf.Value) invalid = headerFieldValueError(hf.Value)

View File

@ -6,6 +6,45 @@
package http2 package http2
import "crypto/tls" import (
"crypto/tls"
"io"
"net/http"
)
func cloneTLSConfig(c *tls.Config) *tls.Config { return c.Clone() } func cloneTLSConfig(c *tls.Config) *tls.Config { return c.Clone() }
var _ http.Pusher = (*responseWriter)(nil)
// Push implements http.Pusher.
func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
internalOpts := pushOptions{}
if opts != nil {
internalOpts.Method = opts.Method
internalOpts.Header = opts.Header
}
return w.push(target, internalOpts)
}
func configureServer18(h1 *http.Server, h2 *Server) error {
if h2.IdleTimeout == 0 {
if h1.IdleTimeout != 0 {
h2.IdleTimeout = h1.IdleTimeout
} else {
h2.IdleTimeout = h1.ReadTimeout
}
}
return nil
}
func shouldLogPanic(panicValue interface{}) bool {
return panicValue != nil && panicValue != http.ErrAbortHandler
}
func reqGetBody(req *http.Request) func() (io.ReadCloser, error) {
return req.GetBody
}
func reqBodyIsNoBody(body io.ReadCloser) bool {
return body == http.NoBody
}

View File

@ -78,13 +78,23 @@ var (
type streamState int type streamState int
// HTTP/2 stream states.
//
// See http://tools.ietf.org/html/rfc7540#section-5.1.
//
// For simplicity, the server code merges "reserved (local)" into
// "half-closed (remote)". This is one less state transition to track.
// The only downside is that we send PUSH_PROMISEs slightly less
// liberally than allowable. More discussion here:
// https://lists.w3.org/Archives/Public/ietf-http-wg/2016JulSep/0599.html
//
// "reserved (remote)" is omitted since the client code does not
// support server push.
const ( const (
stateIdle streamState = iota stateIdle streamState = iota
stateOpen stateOpen
stateHalfClosedLocal stateHalfClosedLocal
stateHalfClosedRemote stateHalfClosedRemote
stateResvLocal
stateResvRemote
stateClosed stateClosed
) )
@ -93,8 +103,6 @@ var stateName = [...]string{
stateOpen: "Open", stateOpen: "Open",
stateHalfClosedLocal: "HalfClosedLocal", stateHalfClosedLocal: "HalfClosedLocal",
stateHalfClosedRemote: "HalfClosedRemote", stateHalfClosedRemote: "HalfClosedRemote",
stateResvLocal: "ResvLocal",
stateResvRemote: "ResvRemote",
stateClosed: "Closed", stateClosed: "Closed",
} }

27
cmd/gost/vendor/golang.org/x/net/http2/not_go18.go generated vendored Normal file
View File

@ -0,0 +1,27 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.8
package http2
import (
"io"
"net/http"
)
func configureServer18(h1 *http.Server, h2 *Server) error {
// No IdleTimeout to sync prior to Go 1.8.
return nil
}
func shouldLogPanic(panicValue interface{}) bool {
return panicValue != nil
}
func reqGetBody(req *http.Request) func() (io.ReadCloser, error) {
return nil
}
func reqBodyIsNoBody(io.ReadCloser) bool { return false }

File diff suppressed because it is too large Load Diff

View File

@ -191,6 +191,7 @@ type clientStream struct {
ID uint32 ID uint32
resc chan resAndError resc chan resAndError
bufPipe pipe // buffered pipe with the flow-controlled response payload bufPipe pipe // buffered pipe with the flow-controlled response payload
startedWrite bool // started request body write; guarded by cc.mu
requestedGzip bool requestedGzip bool
on100 func() // optional code to run if get a 100 continue response on100 func() // optional code to run if get a 100 continue response
@ -199,6 +200,7 @@ type clientStream struct {
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read readErr error // sticky read error; owned by transportResponseBody.Read
stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu
peerReset chan struct{} // closed on peer reset peerReset chan struct{} // closed on peer reset
resetErr error // populated before peerReset is closed resetErr error // populated before peerReset is closed
@ -226,15 +228,26 @@ func (cs *clientStream) awaitRequestCancel(req *http.Request) {
} }
select { select {
case <-req.Cancel: case <-req.Cancel:
cs.cancelStream()
cs.bufPipe.CloseWithError(errRequestCanceled) cs.bufPipe.CloseWithError(errRequestCanceled)
cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
case <-ctx.Done(): case <-ctx.Done():
cs.cancelStream()
cs.bufPipe.CloseWithError(ctx.Err()) cs.bufPipe.CloseWithError(ctx.Err())
cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
case <-cs.done: case <-cs.done:
} }
} }
func (cs *clientStream) cancelStream() {
cs.cc.mu.Lock()
didReset := cs.didReset
cs.didReset = true
cs.cc.mu.Unlock()
if !didReset {
cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
}
}
// checkResetOrDone reports any error sent in a RST_STREAM frame by the // checkResetOrDone reports any error sent in a RST_STREAM frame by the
// server, or errStreamClosed if the stream is complete. // server, or errStreamClosed if the stream is complete.
func (cs *clientStream) checkResetOrDone() error { func (cs *clientStream) checkResetOrDone() error {
@ -302,6 +315,10 @@ func authorityAddr(scheme string, authority string) (addr string) {
if a, err := idna.ToASCII(host); err == nil { if a, err := idna.ToASCII(host); err == nil {
host = a host = a
} }
// IPv6 address literal, without a port:
if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
return host + ":" + port
}
return net.JoinHostPort(host, port) return net.JoinHostPort(host, port)
} }
@ -320,9 +337,11 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
} }
traceGotConn(req, cc) traceGotConn(req, cc)
res, err := cc.RoundTrip(req) res, err := cc.RoundTrip(req)
if shouldRetryRequest(req, err) { if err != nil {
if req, err = shouldRetryRequest(req, err); err == nil {
continue continue
} }
}
if err != nil { if err != nil {
t.vlogf("RoundTrip failure: %v", err) t.vlogf("RoundTrip failure: %v", err)
return nil, err return nil, err
@ -343,12 +362,41 @@ func (t *Transport) CloseIdleConnections() {
var ( var (
errClientConnClosed = errors.New("http2: client conn is closed") errClientConnClosed = errors.New("http2: client conn is closed")
errClientConnUnusable = errors.New("http2: client conn not usable") errClientConnUnusable = errors.New("http2: client conn not usable")
errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written")
) )
func shouldRetryRequest(req *http.Request, err error) bool { // shouldRetryRequest is called by RoundTrip when a request fails to get
// TODO: retry GET requests (no bodies) more aggressively, if shutdown // response headers. It is always called with a non-nil error.
// before response. // It returns either a request to retry (either the same request, or a
return err == errClientConnUnusable // modified clone), or an error if the request can't be replayed.
func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) {
switch err {
default:
return nil, err
case errClientConnUnusable, errClientConnGotGoAway:
return req, nil
case errClientConnGotGoAwayAfterSomeReqBody:
// If the Body is nil (or http.NoBody), it's safe to reuse
// this request and its Body.
if req.Body == nil || reqBodyIsNoBody(req.Body) {
return req, nil
}
// Otherwise we depend on the Request having its GetBody
// func defined.
getBody := reqGetBody(req) // Go 1.8: getBody = req.GetBody
if getBody == nil {
return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error")
}
body, err := getBody()
if err != nil {
return nil, err
}
newReq := *req
newReq.Body = body
return &newReq, nil
}
} }
func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) { func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) {
@ -501,6 +549,15 @@ func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
if old != nil && old.ErrCode != ErrCodeNo { if old != nil && old.ErrCode != ErrCodeNo {
cc.goAway.ErrCode = old.ErrCode cc.goAway.ErrCode = old.ErrCode
} }
last := f.LastStreamID
for streamID, cs := range cc.streams {
if streamID > last {
select {
case cs.resc <- resAndError{err: errClientConnGotGoAway}:
default:
}
}
}
} }
func (cc *ClientConn) CanTakeNewRequest() bool { func (cc *ClientConn) CanTakeNewRequest() bool {
@ -601,8 +658,6 @@ func commaSeparatedTrailers(req *http.Request) (string, error) {
} }
if len(keys) > 0 { if len(keys) > 0 {
sort.Strings(keys) sort.Strings(keys)
// TODO: could do better allocation-wise here, but trailers are rare,
// so being lazy for now.
return strings.Join(keys, ","), nil return strings.Join(keys, ","), nil
} }
return "", nil return "", nil
@ -761,6 +816,13 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
cs.abortRequestBodyWrite(errStopReqBodyWrite) cs.abortRequestBodyWrite(errStopReqBodyWrite)
} }
if re.err != nil { if re.err != nil {
if re.err == errClientConnGotGoAway {
cc.mu.Lock()
if cs.startedWrite {
re.err = errClientConnGotGoAwayAfterSomeReqBody
}
cc.mu.Unlock()
}
cc.forgetStreamID(cs.ID) cc.forgetStreamID(cs.ID)
return nil, re.err return nil, re.err
} }
@ -1666,9 +1728,10 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
cc.bw.Flush() cc.bw.Flush()
cc.wmu.Unlock() cc.wmu.Unlock()
} }
didReset := cs.didReset
cc.mu.Unlock() cc.mu.Unlock()
if len(data) > 0 { if len(data) > 0 && !didReset {
if _, err := cs.bufPipe.Write(data); err != nil { if _, err := cs.bufPipe.Write(data); err != nil {
rl.endStreamError(cs, err) rl.endStreamError(cs, err)
return err return err
@ -2000,6 +2063,9 @@ func (t *Transport) getBodyWriterState(cs *clientStream, body io.Reader) (s body
resc := make(chan error, 1) resc := make(chan error, 1)
s.resc = resc s.resc = resc
s.fn = func() { s.fn = func() {
cs.cc.mu.Lock()
cs.startedWrite = true
cs.cc.mu.Unlock()
resc <- cs.writeRequestBody(body, cs.req.Body) resc <- cs.writeRequestBody(body, cs.req.Body)
} }
s.delay = t.expectContinueTimeout() s.delay = t.expectContinueTimeout()

View File

@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"net/url"
"time" "time"
"golang.org/x/net/http2/hpack" "golang.org/x/net/http2/hpack"
@ -44,9 +45,10 @@ type writeContext interface {
HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer)
} }
// endsStream reports whether the given frame writer w will locally // writeEndsStream reports whether w writes a frame that will transition
// close the stream. // the stream to a half-closed local state. This returns false for RST_STREAM,
func endsStream(w writeFramer) bool { // which closes the entire stream (not just the local half).
func writeEndsStream(w writeFramer) bool {
switch v := w.(type) { switch v := w.(type) {
case *writeData: case *writeData:
return v.endStream return v.endStream
@ -56,7 +58,7 @@ func endsStream(w writeFramer) bool {
// This can only happen if the caller reuses w after it's // This can only happen if the caller reuses w after it's
// been intentionally nil'ed out to prevent use. Keep this // been intentionally nil'ed out to prevent use. Keep this
// here to catch future refactoring breaking it. // here to catch future refactoring breaking it.
panic("endsStream called on nil writeFramer") panic("writeEndsStream called on nil writeFramer")
} }
return false return false
} }
@ -150,6 +152,33 @@ func (writeSettingsAck) writeFrame(ctx writeContext) error {
func (writeSettingsAck) staysWithinBuffer(max int) bool { return frameHeaderLen <= max } func (writeSettingsAck) staysWithinBuffer(max int) bool { return frameHeaderLen <= max }
// splitHeaderBlock splits headerBlock into fragments so that each fragment fits
// in a single frame, then calls fn for each fragment. firstFrag/lastFrag are true
// for the first/last fragment, respectively.
func splitHeaderBlock(ctx writeContext, headerBlock []byte, fn func(ctx writeContext, frag []byte, firstFrag, lastFrag bool) error) error {
// For now we're lazy and just pick the minimum MAX_FRAME_SIZE
// that all peers must support (16KB). Later we could care
// more and send larger frames if the peer advertised it, but
// there's little point. Most headers are small anyway (so we
// generally won't have CONTINUATION frames), and extra frames
// only waste 9 bytes anyway.
const maxFrameSize = 16384
first := true
for len(headerBlock) > 0 {
frag := headerBlock
if len(frag) > maxFrameSize {
frag = frag[:maxFrameSize]
}
headerBlock = headerBlock[len(frag):]
if err := fn(ctx, frag, first, len(headerBlock) == 0); err != nil {
return err
}
first = false
}
return nil
}
// writeResHeaders is a request to write a HEADERS and 0+ CONTINUATION frames // writeResHeaders is a request to write a HEADERS and 0+ CONTINUATION frames
// for HTTP response headers or trailers from a server handler. // for HTTP response headers or trailers from a server handler.
type writeResHeaders struct { type writeResHeaders struct {
@ -207,39 +236,69 @@ func (w *writeResHeaders) writeFrame(ctx writeContext) error {
panic("unexpected empty hpack") panic("unexpected empty hpack")
} }
// For now we're lazy and just pick the minimum MAX_FRAME_SIZE return splitHeaderBlock(ctx, headerBlock, w.writeHeaderBlock)
// that all peers must support (16KB). Later we could care }
// more and send larger frames if the peer advertised it, but
// there's little point. Most headers are small anyway (so we
// generally won't have CONTINUATION frames), and extra frames
// only waste 9 bytes anyway.
const maxFrameSize = 16384
first := true func (w *writeResHeaders) writeHeaderBlock(ctx writeContext, frag []byte, firstFrag, lastFrag bool) error {
for len(headerBlock) > 0 { if firstFrag {
frag := headerBlock return ctx.Framer().WriteHeaders(HeadersFrameParam{
if len(frag) > maxFrameSize {
frag = frag[:maxFrameSize]
}
headerBlock = headerBlock[len(frag):]
endHeaders := len(headerBlock) == 0
var err error
if first {
first = false
err = ctx.Framer().WriteHeaders(HeadersFrameParam{
StreamID: w.streamID, StreamID: w.streamID,
BlockFragment: frag, BlockFragment: frag,
EndStream: w.endStream, EndStream: w.endStream,
EndHeaders: endHeaders, EndHeaders: lastFrag,
}) })
} else { } else {
err = ctx.Framer().WriteContinuation(w.streamID, endHeaders, frag) return ctx.Framer().WriteContinuation(w.streamID, lastFrag, frag)
} }
if err != nil { }
return err
// writePushPromise is a request to write a PUSH_PROMISE and 0+ CONTINUATION frames.
type writePushPromise struct {
streamID uint32 // pusher stream
method string // for :method
url *url.URL // for :scheme, :authority, :path
h http.Header
// Creates an ID for a pushed stream. This runs on serveG just before
// the frame is written. The returned ID is copied to promisedID.
allocatePromisedID func() (uint32, error)
promisedID uint32
}
func (w *writePushPromise) staysWithinBuffer(max int) bool {
// TODO: see writeResHeaders.staysWithinBuffer
return false
}
func (w *writePushPromise) writeFrame(ctx writeContext) error {
enc, buf := ctx.HeaderEncoder()
buf.Reset()
encKV(enc, ":method", w.method)
encKV(enc, ":scheme", w.url.Scheme)
encKV(enc, ":authority", w.url.Host)
encKV(enc, ":path", w.url.RequestURI())
encodeHeaders(enc, w.h, nil)
headerBlock := buf.Bytes()
if len(headerBlock) == 0 {
panic("unexpected empty hpack")
} }
return splitHeaderBlock(ctx, headerBlock, w.writeHeaderBlock)
}
func (w *writePushPromise) writeHeaderBlock(ctx writeContext, frag []byte, firstFrag, lastFrag bool) error {
if firstFrag {
return ctx.Framer().WritePushPromise(PushPromiseParam{
StreamID: w.streamID,
PromiseID: w.promisedID,
BlockFragment: frag,
EndHeaders: lastFrag,
})
} else {
return ctx.Framer().WriteContinuation(w.streamID, lastFrag, frag)
} }
return nil
} }
type write100ContinueHeadersFrame struct { type write100ContinueHeadersFrame struct {
@ -274,6 +333,8 @@ func (wu writeWindowUpdate) writeFrame(ctx writeContext) error {
return ctx.Framer().WriteWindowUpdate(wu.streamID, wu.n) return ctx.Framer().WriteWindowUpdate(wu.streamID, wu.n)
} }
// encodeHeaders encodes an http.Header. If keys is not nil, then (k, h[k])
// is encoded only only if k is in keys.
func encodeHeaders(enc *hpack.Encoder, h http.Header, keys []string) { func encodeHeaders(enc *hpack.Encoder, h http.Header, keys []string) {
if keys == nil { if keys == nil {
sorter := sorterPool.Get().(*sorter) sorter := sorterPool.Get().(*sorter)

View File

@ -25,7 +25,9 @@ type WriteScheduler interface {
// https://tools.ietf.org/html/rfc7540#section-5.1 // https://tools.ietf.org/html/rfc7540#section-5.1
AdjustStream(streamID uint32, priority PriorityParam) AdjustStream(streamID uint32, priority PriorityParam)
// Push queues a frame in the scheduler. // Push queues a frame in the scheduler. In most cases, this will not be
// called with wr.StreamID()!=0 unless that stream is currently open. The one
// exception is RST_STREAM frames, which may be sent on idle or closed streams.
Push(wr FrameWriteRequest) Push(wr FrameWriteRequest)
// Pop dequeues the next frame to write. Returns false if no frames can // Pop dequeues the next frame to write. Returns false if no frames can
@ -62,6 +64,13 @@ type FrameWriteRequest struct {
// 0 is used for non-stream frames such as PING and SETTINGS. // 0 is used for non-stream frames such as PING and SETTINGS.
func (wr FrameWriteRequest) StreamID() uint32 { func (wr FrameWriteRequest) StreamID() uint32 {
if wr.stream == nil { if wr.stream == nil {
if se, ok := wr.write.(StreamError); ok {
// (*serverConn).resetStream doesn't set
// stream because it doesn't necessarily have
// one. So special case this type of write
// message.
return se.StreamID
}
return 0 return 0
} }
return wr.stream.id return wr.stream.id
@ -142,17 +151,27 @@ func (wr FrameWriteRequest) Consume(n int32) (FrameWriteRequest, FrameWriteReque
// String is for debugging only. // String is for debugging only.
func (wr FrameWriteRequest) String() string { func (wr FrameWriteRequest) String() string {
var streamID uint32
if wr.stream != nil {
streamID = wr.stream.id
}
var des string var des string
if s, ok := wr.write.(fmt.Stringer); ok { if s, ok := wr.write.(fmt.Stringer); ok {
des = s.String() des = s.String()
} else { } else {
des = fmt.Sprintf("%T", wr.write) des = fmt.Sprintf("%T", wr.write)
} }
return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", streamID, wr.done != nil, des) return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des)
}
// replyToWriter sends err to wr.done and panics if the send must block
// This does nothing if wr.done is nil.
func (wr *FrameWriteRequest) replyToWriter(err error) {
if wr.done == nil {
return
}
select {
case wr.done <- err:
default:
panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
}
wr.write = nil // prevent use (assume it's tainted after wr.done send)
} }
// writeQueue is used by implementations of WriteScheduler. // writeQueue is used by implementations of WriteScheduler.

View File

@ -388,7 +388,15 @@ func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
} else { } else {
n = ws.nodes[id] n = ws.nodes[id]
if n == nil { if n == nil {
panic("add on non-open stream") // id is an idle or closed stream. wr should not be a HEADERS or
// DATA frame. However, wr can be a RST_STREAM. In this case, we
// push wr onto the root, rather than creating a new priorityNode,
// since RST_STREAM is tiny and the stream's priority is unknown
// anyway. See issue #17919.
if wr.DataSize() > 0 {
panic("add DATA on non-open stream")
}
n = &ws.root
} }
} }
n.q.push(wr) n.q.push(wr)

View File

@ -225,10 +225,10 @@
"revisionTime": "2016-10-24T22:38:16Z" "revisionTime": "2016-10-24T22:38:16Z"
}, },
{ {
"checksumSHA1": "UovsbmfW33+DGfUh1geZpGzIoVo=", "checksumSHA1": "N1akwAdrHVfPPrsFOhG2ouP21VA=",
"path": "golang.org/x/net/http2", "path": "golang.org/x/net/http2",
"revision": "65dfc08770ce66f74becfdff5f8ab01caef4e946", "revision": "60c41d1de8da134c05b7b40154a9a82bf5b7edb9",
"revisionTime": "2016-10-24T22:38:16Z" "revisionTime": "2017-01-10T03:16:11Z"
}, },
{ {
"checksumSHA1": "HzuGD7AwgC0p1az1WAQnEFnEk98=", "checksumSHA1": "HzuGD7AwgC0p1az1WAQnEFnEk98=",

View File

@ -11,7 +11,7 @@ import (
) )
const ( const (
Version = "2.3-rc1" Version = "2.3-rc2"
) )
// Log level for glog // Log level for glog