Commit d0ca2e1d authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #91 from libp2p/feat/update-stream-muxer

update stream muxer
parents 228ce2ef 5bc13bf6
......@@ -35,6 +35,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
// receive msg
pmes := new(pb.Message)
if err := r.ReadMsg(pmes); err != nil {
s.Reset()
log.Debugf("Error unmarshaling data: %s", err)
return
}
......@@ -45,6 +46,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil {
s.Reset()
log.Debug("got back nil handler from handlerForMsgType")
return
}
......@@ -52,6 +54,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
// dispatch handler.
rpmes, err := handler(ctx, mPeer, pmes)
if err != nil {
s.Reset()
log.Debugf("handle message error: %s", err)
return
}
......@@ -64,6 +67,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
// send out response msg
if err := w.WriteMsg(rpmes); err != nil {
s.Reset()
log.Debugf("send response error: %s", err)
return
}
......@@ -161,73 +165,88 @@ const streamReuseTries = 3
func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
ms.lk.Lock()
defer ms.lk.Unlock()
if err := ms.prep(); err != nil {
return err
}
if err := ms.writeMessage(pmes); err != nil {
return err
}
if ms.singleMes > streamReuseTries {
ms.s.Close()
ms.s = nil
}
return nil
}
func (ms *messageSender) writeMessage(pmes *pb.Message) error {
err := ms.w.WriteMsg(pmes)
if err != nil {
// If the other side isnt expecting us to be reusing streams, we're gonna
// end up erroring here. To make sure things work seamlessly, lets retry once
// before continuing
log.Infof("error writing message: ", err)
ms.s.Close()
ms.s = nil
retry := false
for {
if err := ms.prep(); err != nil {
return err
}
if err := ms.w.WriteMsg(pmes); err != nil {
return err
ms.s.Reset()
ms.s = nil
if retry {
log.Info("error writing message, bailing: ", err)
return err
} else {
log.Info("error writing message, trying again: ", err)
retry = true
continue
}
}
log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)
if ms.singleMes > streamReuseTries {
ms.s.Close()
ms.s = nil
} else if retry {
ms.singleMes++
}
// keep track of this happening. If it happens a few times, its
// likely we can assume the otherside will never support stream reuse
ms.singleMes++
return nil
}
return nil
}
func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
ms.lk.Lock()
defer ms.lk.Unlock()
if err := ms.prep(); err != nil {
return nil, err
}
retry := false
for {
if err := ms.prep(); err != nil {
return nil, err
}
if err := ms.writeMessage(pmes); err != nil {
return nil, err
}
if err := ms.w.WriteMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil
if retry {
log.Info("error writing message, bailing: ", err)
return nil, err
} else {
log.Info("error writing message, trying again: ", err)
retry = true
continue
}
}
log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)
mes := new(pb.Message)
if err := ms.ctxReadMsg(ctx, mes); err != nil {
ms.s.Reset()
ms.s = nil
if retry {
log.Info("error reading message, bailing: ", err)
return nil, err
} else {
log.Info("error reading message, trying again: ", err)
retry = true
continue
}
}
mes := new(pb.Message)
if err := ms.ctxReadMsg(ctx, mes); err != nil {
ms.s.Close()
ms.s = nil
return nil, err
}
log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)
if ms.singleMes > streamReuseTries {
ms.s.Close()
ms.s = nil
}
if ms.singleMes > streamReuseTries {
ms.s.Close()
ms.s = nil
} else if retry {
ms.singleMes++
}
return mes, nil
return mes, nil
}
}
func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {
......
......@@ -192,7 +192,6 @@ func TestNotFound(t *testing.T) {
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
default:
panic("Shouldnt recieve this.")
}
......@@ -288,3 +287,68 @@ func TestLessThanKResponses(t *testing.T) {
}
t.Fatal("Expected to recieve an error.")
}
// Test multiple queries against a node that closes its stream after every query.
func TestMultipleQueries(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
d.Update(ctx, hosts[1].ID())
// It would be nice to be able to just get a value and succeed but then
// we'd need to deal with selectors and validators...
hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
switch pmes.GetType() {
case pb.Message_GET_VALUE:
pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
resp := &pb.Message{
Type: pmes.Type,
CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []pstore.PeerInfo{pi}),
}
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
default:
panic("Shouldnt recieve this.")
}
})
// long timeout to ensure timing is not at play.
ctx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
for i := 0; i < 10; i++ {
if _, err := d.GetValue(ctx, "hello"); err != nil {
switch err {
case routing.ErrNotFound:
//Success!
continue
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}
}
......@@ -120,21 +120,21 @@
},
{
"author": "whyrusleeping",
"hash": "QmUwW8jMQDxXhLD2j4EfWqLEMX3MsvyWcWGvJPVDh1aTmu",
"hash": "QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6",
"name": "go-libp2p-host",
"version": "1.3.19"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmXZyBQMkqSYigxhJResC6fLWDGFhbphK67eZoqMDUvBmK",
"hash": "QmZ3ma9g2NTg7GNF1ntWNRa1GW9jVzGq8UE9cKCwRKv6dS",
"name": "go-libp2p",
"version": "4.5.5"
"version": "5.0.1"
},
{
"author": "whyrusleeping",
"hash": "QmahYsGWry85Y7WUe2SX5G4JkH2zifEQAUtJVLZ24aC9DF",
"hash": "QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1",
"name": "go-libp2p-net",
"version": "1.6.12"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
......@@ -156,9 +156,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmQ1bJEsmdEiGfTQRoj6CsshWmAKduAEDEbwzbvk5QT5Ui",
"hash": "QmP4cEjmvf8tC6ykxKXrvmYLo8vqtGsgduMatjbAKnBzv8",
"name": "go-libp2p-netutil",
"version": "0.2.25"
"version": "0.3.1"
},
{
"author": "multiformats",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment