Commit 8f653d3c authored by Steven Allen's avatar Steven Allen

chore: explicitly handle errors

parent 26bf7962
...@@ -95,7 +95,7 @@ func BenchmarkDups2Nodes(b *testing.B) { ...@@ -95,7 +95,7 @@ func BenchmarkDups2Nodes(b *testing.B) {
subtestDistributeAndFetch(b, 200, 20, fixedDelay, allToAll, batchFetchAll) subtestDistributeAndFetch(b, 200, 20, fixedDelay, allToAll, batchFetchAll)
}) })
out, _ := json.MarshalIndent(benchmarkLog, "", " ") out, _ := json.MarshalIndent(benchmarkLog, "", " ")
ioutil.WriteFile("tmp/benchmark.json", out, 0666) _ = ioutil.WriteFile("tmp/benchmark.json", out, 0666)
} }
const fastSpeed = 60 * time.Millisecond const fastSpeed = 60 * time.Millisecond
...@@ -145,7 +145,7 @@ func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) { ...@@ -145,7 +145,7 @@ func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll) subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
}) })
out, _ := json.MarshalIndent(benchmarkLog, "", " ") out, _ := json.MarshalIndent(benchmarkLog, "", " ")
ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666) _ = ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666)
} }
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) { func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) {
...@@ -267,7 +267,10 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) ...@@ -267,7 +267,10 @@ func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block)
// but we're mostly just testing performance of the sync algorithm // but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) { func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
for _, blk := range blks { for _, blk := range blks {
provs[rand.Intn(len(provs))].Blockstore().Put(blk) err := provs[rand.Intn(len(provs))].Blockstore().Put(blk)
if err != nil {
b.Fatal(err)
}
} }
} }
......
...@@ -44,7 +44,10 @@ func TestClose(t *testing.T) { ...@@ -44,7 +44,10 @@ func TestClose(t *testing.T) {
bitswap := ig.Next() bitswap := ig.Next()
bitswap.Exchange.Close() bitswap.Exchange.Close()
bitswap.Exchange.GetBlock(context.Background(), block.Cid()) _, err := bitswap.Exchange.GetBlock(context.Background(), block.Cid())
if err == nil {
t.Fatal("expected GetBlock to fail")
}
} }
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
...@@ -56,14 +59,17 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this ...@@ -56,14 +59,17 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
block := blocks.NewBlock([]byte("block")) block := blocks.NewBlock([]byte("block"))
pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t) pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
rs.Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network err := rs.Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network
if err != nil {
t.Fatal(err)
}
solo := ig.Next() solo := ig.Next()
defer solo.Exchange.Close() defer solo.Exchange.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
defer cancel() defer cancel()
_, err := solo.Exchange.GetBlock(ctx, block.Cid()) _, err = solo.Exchange.GetBlock(ctx, block.Cid())
if err != context.DeadlineExceeded { if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error") t.Fatal("Expected DeadlineExceeded error")
...@@ -224,7 +230,10 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -224,7 +230,10 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
first := instances[0] first := instances[0]
for _, b := range blocks { for _, b := range blocks {
blkeys = append(blkeys, b.Cid()) blkeys = append(blkeys, b.Cid())
first.Exchange.HasBlock(b) err := first.Exchange.HasBlock(b)
if err != nil {
t.Fatal(err)
}
} }
t.Log("Distribute!") t.Log("Distribute!")
......
...@@ -113,7 +113,7 @@ func (mq *MessageQueue) runQueue() { ...@@ -113,7 +113,7 @@ func (mq *MessageQueue) runQueue() {
return return
case <-mq.ctx.Done(): case <-mq.ctx.Done():
if mq.sender != nil { if mq.sender != nil {
mq.sender.Reset() _ = mq.sender.Reset()
} }
return return
} }
...@@ -220,7 +220,7 @@ func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) boo ...@@ -220,7 +220,7 @@ func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) boo
} }
log.Infof("bitswap send error: %s", err) log.Infof("bitswap send error: %s", err)
mq.sender.Reset() _ = mq.sender.Reset()
mq.sender = nil mq.sender = nil
select { select {
......
...@@ -133,12 +133,13 @@ func (bsnet *impl) SendMessage( ...@@ -133,12 +133,13 @@ func (bsnet *impl) SendMessage(
} }
if err = bsnet.msgToStream(ctx, s, outgoing); err != nil { if err = bsnet.msgToStream(ctx, s, outgoing); err != nil {
s.Reset() _ = s.Reset()
return err return err
} }
atomic.AddUint64(&bsnet.stats.MessagesSent, 1) atomic.AddUint64(&bsnet.stats.MessagesSent, 1)
// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine. // TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
//nolint
go helpers.AwaitEOF(s) go helpers.AwaitEOF(s)
return s.Close() return s.Close()
...@@ -189,7 +190,7 @@ func (bsnet *impl) handleNewStream(s network.Stream) { ...@@ -189,7 +190,7 @@ func (bsnet *impl) handleNewStream(s network.Stream) {
defer s.Close() defer s.Close()
if bsnet.receiver == nil { if bsnet.receiver == nil {
s.Reset() _ = s.Reset()
return return
} }
...@@ -198,7 +199,7 @@ func (bsnet *impl) handleNewStream(s network.Stream) { ...@@ -198,7 +199,7 @@ func (bsnet *impl) handleNewStream(s network.Stream) {
received, err := bsmsg.FromMsgReader(reader) received, err := bsmsg.FromMsgReader(reader)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
s.Reset() _ = s.Reset()
go bsnet.receiver.ReceiveError(err) go bsnet.receiver.ReceiveError(err)
log.Debugf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err) log.Debugf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
} }
......
...@@ -81,14 +81,23 @@ func TestMessageSendAndReceive(t *testing.T) { ...@@ -81,14 +81,23 @@ func TestMessageSendAndReceive(t *testing.T) {
bsnet1.SetDelegate(r1) bsnet1.SetDelegate(r1)
bsnet2.SetDelegate(r2) bsnet2.SetDelegate(r2)
mn.LinkAll() err = mn.LinkAll()
bsnet1.ConnectTo(ctx, p2.ID()) if err != nil {
t.Fatal(err)
}
err = bsnet1.ConnectTo(ctx, p2.ID())
if err != nil {
t.Fatal(err)
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Fatal("did not connect peer") t.Fatal("did not connect peer")
case <-r1.connectionEvent: case <-r1.connectionEvent:
} }
bsnet2.ConnectTo(ctx, p1.ID()) err = bsnet2.ConnectTo(ctx, p1.ID())
if err != nil {
t.Fatal(err)
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Fatal("did not connect peer") t.Fatal("did not connect peer")
...@@ -107,7 +116,10 @@ func TestMessageSendAndReceive(t *testing.T) { ...@@ -107,7 +116,10 @@ func TestMessageSendAndReceive(t *testing.T) {
sent.AddEntry(block1.Cid(), 1) sent.AddEntry(block1.Cid(), 1)
sent.AddBlock(block2) sent.AddBlock(block2)
bsnet1.SendMessage(ctx, p2.ID(), sent) err = bsnet1.SendMessage(ctx, p2.ID(), sent)
if err != nil {
t.Fatal(err)
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
......
...@@ -65,7 +65,10 @@ func (g *InstanceGenerator) Instances(n int) []Instance { ...@@ -65,7 +65,10 @@ func (g *InstanceGenerator) Instances(n int) []Instance {
for i, inst := range instances { for i, inst := range instances {
for j := i + 1; j < len(instances); j++ { for j := i + 1; j < len(instances); j++ {
oinst := instances[j] oinst := instances[j]
inst.Adapter.ConnectTo(context.Background(), oinst.Peer) err := inst.Adapter.ConnectTo(context.Background(), oinst.Peer)
if err != nil {
panic(err.Error())
}
} }
} }
return instances return instances
......
...@@ -35,7 +35,10 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -35,7 +35,10 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
msgToWaiter := bsmsg.New(true) msgToWaiter := bsmsg.New(true)
msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
waiter.SendMessage(ctx, fromWaiter, msgToWaiter) err := waiter.SendMessage(ctx, fromWaiter, msgToWaiter)
if err != nil {
t.Error(err)
}
})) }))
waiter.SetDelegate(lambda(func( waiter.SetDelegate(lambda(func(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment