Unverified Commit 4d5f6019 authored by Matt Joiner's avatar Matt Joiner Committed by GitHub

Rework handleNewStream (#267)

parent 0c13c2cc
......@@ -45,10 +45,15 @@ func (w *bufferedDelimitedWriter) Flush() error {
// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
go dht.handleNewMessage(s)
defer s.Reset()
if dht.handleNewMessage(s) {
// Gracefully close the stream for writes.
s.Close()
}
}
func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
// Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
ctx := dht.Context()
cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
......@@ -57,54 +62,48 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
mPeer := s.Conn().RemotePeer()
for {
// receive msg
pmes := new(pb.Message)
switch err := r.ReadMsg(pmes); err {
var req pb.Message
switch err := r.ReadMsg(&req); err {
case io.EOF:
s.Close()
return
case nil:
return true
default:
s.Reset()
logger.Debugf("Error unmarshaling data: %s", err)
return
// This string test is necessary because there isn't a single stream reset error
// instance in use.
if err.Error() != "stream reset" {
logger.Debugf("error reading message: %#v", err)
}
return false
case nil:
}
// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, mPeer, pmes)
// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
handler := dht.handlerForMsgType(req.GetType())
if handler == nil {
s.Reset()
logger.Debug("got back nil handler from handlerForMsgType")
return
logger.Warningf("can't handle received message of type %v", req.GetType())
return false
}
// dispatch handler.
rpmes, err := handler(ctx, mPeer, pmes)
resp, err := handler(ctx, mPeer, &req)
if err != nil {
s.Reset()
logger.Debugf("handle message error: %s", err)
return
logger.Debugf("error handling message: %v", err)
return false
}
// if nil response, return it before serializing
if rpmes == nil {
logger.Debug("got back nil response from request")
dht.updateFromMessage(ctx, mPeer, &req)
if resp == nil {
continue
}
// send out response msg
err = w.WriteMsg(rpmes)
err = w.WriteMsg(resp)
if err == nil {
err = w.Flush()
}
if err != nil {
s.Reset()
logger.Debugf("send response error: %s", err)
return
logger.Debugf("error writing response: %v", err)
return false
}
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment