From faee10effee1ad1ab79004532978e0b58426d359 Mon Sep 17 00:00:00 2001
From: Brian Tiger Chow <brian.holderchow@gmail.com>
Date: Sun, 21 Sep 2014 21:39:45 -0700
Subject: [PATCH] test(bitswap) send entire wantlist to peers

fix(bitswap) pass go vet

fixes #97

https://github.com/jbenet/go-ipfs/issues/97
---
 exchange/bitswap/bitswap.go      | 70 ++++++++++++++++++++++++++++----
 exchange/bitswap/bitswap_test.go | 50 +++++++++++++++--------
 2 files changed, 93 insertions(+), 27 deletions(-)

diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go
index 2dc73ca8..cf530329 100644
--- a/exchange/bitswap/bitswap.go
+++ b/exchange/bitswap/bitswap.go
@@ -2,6 +2,7 @@ package bitswap
 
 import (
 	"errors"
+	"sync"
 
 	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
 	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
@@ -28,6 +29,9 @@ func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageS
 		strategy:      strategy.New(),
 		routing:       directory,
 		sender:        networkAdapter,
+		wantlist: WantList{
+			data: make(map[u.Key]struct{}),
+		},
 	}
 	networkAdapter.SetDelegate(bs)
 
@@ -53,6 +57,39 @@ type bitswap struct {
 	// interact with partners.
 	// TODO(brian): save the strategy's state to the datastore
 	strategy strategy.Strategy
+
+	wantlist WantList
+}
+
+type WantList struct {
+	lock sync.RWMutex
+	data map[u.Key]struct{}
+}
+
+func (wl *WantList) Add(k u.Key) {
+	u.DOut("Adding %v to Wantlist\n", k.Pretty())
+	wl.lock.Lock()
+	defer wl.lock.Unlock()
+
+	wl.data[k] = struct{}{}
+}
+
+func (wl *WantList) Remove(k u.Key) {
+	u.DOut("Removing %v from Wantlist\n", k.Pretty())
+	wl.lock.Lock()
+	defer wl.lock.Unlock()
+
+	delete(wl.data, k)
+}
+
+func (wl *WantList) Keys() []u.Key {
+	wl.lock.RLock()
+	defer wl.lock.RUnlock()
+	keys := make([]u.Key, 0)
+	for k, _ := range wl.data {
+		keys = append(keys, k)
+	}
+	return keys
 }
 
 // GetBlock attempts to retrieve a particular block from peers within the
@@ -60,9 +97,10 @@ type bitswap struct {
 //
 // TODO ensure only one active request per key
 func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
+	u.DOut("Get Block %v\n", k.Pretty())
 
 	ctx, cancelFunc := context.WithCancel(parent)
-	// TODO add to wantlist
+	bs.wantlist.Add(k)
 	promise := bs.notifications.Subscribe(ctx, k)
 
 	const maxProviders = 20
@@ -70,6 +108,9 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
 
 	go func() {
 		message := bsmsg.New()
+		for _, wanted := range bs.wantlist.Keys() {
+			message.AppendWanted(wanted)
+		}
 		message.AppendWanted(k)
 		for iiiii := range peersToQuery {
 			// u.DOut("bitswap got peersToQuery: %s\n", iiiii)
@@ -94,6 +135,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
 	select {
 	case block := <-promise:
 		cancelFunc()
+		bs.wantlist.Remove(k)
 		// TODO remove from wantlist
 		return &block, nil
 	case <-parent.Done():
@@ -104,6 +146,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
 // HasBlock announces the existance of a block to bitswap, potentially sending
 // it to peers (Partners) whose WantLists include it.
 func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
+	u.DOut("Has Block %v\n", blk.Key().Pretty())
+	bs.wantlist.Remove(blk.Key())
 	bs.sendToPeersThatWant(ctx, blk)
 	return bs.routing.Provide(ctx, blk.Key())
 }
@@ -111,6 +155,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
 // TODO(brian): handle errors
 func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
 	*peer.Peer, bsmsg.BitSwapMessage, error) {
+	u.DOut("ReceiveMessage from %v\n", p.Key().Pretty())
 
 	if p == nil {
 		return nil, nil, errors.New("Received nil Peer")
@@ -132,19 +177,21 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs
 		}(block)
 	}
 
+	message := bsmsg.New()
+	for _, wanted := range bs.wantlist.Keys() {
+		message.AppendWanted(wanted)
+	}
 	for _, key := range incoming.Wantlist() {
 		if bs.strategy.ShouldSendBlockToPeer(key, p) {
-			block, errBlockNotFound := bs.blockstore.Get(key)
-			if errBlockNotFound != nil {
-				return nil, nil, errBlockNotFound
+			if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
+				continue
+			} else {
+				message.AppendBlock(*block)
 			}
-			message := bsmsg.New()
-			message.AppendBlock(*block)
-			defer bs.strategy.MessageSent(p, message)
-			return p, message, nil
 		}
 	}
-	return nil, nil, nil
+	defer bs.strategy.MessageSent(p, message)
+	return p, message, nil
 }
 
 // send strives to ensure that accounting is always performed when a message is
@@ -155,11 +202,16 @@ func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessag
 }
 
 func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
+	u.DOut("Sending %v to peers that want it\n", block.Key().Pretty())
 	for _, p := range bs.strategy.Peers() {
 		if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
+			u.DOut("%v wants %v\n", p.Key().Pretty(), block.Key().Pretty())
 			if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
 				message := bsmsg.New()
 				message.AppendBlock(block)
+				for _, wanted := range bs.wantlist.Keys() {
+					message.AppendWanted(wanted)
+				}
 				go bs.send(ctx, p, message)
 			}
 		}
diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go
index 60ba7bf0..6ec45f21 100644
--- a/exchange/bitswap/bitswap_test.go
+++ b/exchange/bitswap/bitswap_test.go
@@ -16,6 +16,7 @@ import (
 	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
 	tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
 	peer "github.com/jbenet/go-ipfs/peer"
+	util "github.com/jbenet/go-ipfs/util"
 	testutil "github.com/jbenet/go-ipfs/util/testutil"
 )
 
@@ -145,7 +146,10 @@ func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGro
 	wg.Done()
 }
 
+// TODO simplify this test. get to the _essence_!
 func TestSendToWantingPeer(t *testing.T) {
+	util.Debug = true
+
 	net := tn.VirtualNetwork()
 	rs := tn.VirtualRoutingServer()
 	sg := NewSessionGenerator(net, rs)
@@ -155,48 +159,55 @@ func TestSendToWantingPeer(t *testing.T) {
 	w := sg.Next()
 	o := sg.Next()
 
+	t.Logf("Session %v\n", me.peer.Key().Pretty())
+	t.Logf("Session %v\n", w.peer.Key().Pretty())
+	t.Logf("Session %v\n", o.peer.Key().Pretty())
+
 	alpha := bg.Next()
 
-	const timeout = 100 * time.Millisecond
-	const wait = 100 * time.Millisecond
+	const timeout = 1 * time.Millisecond // FIXME don't depend on time
 
-	t.Log("Peer |w| attempts to get a file |alpha|. NB: alpha not available")
+	t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer.Key().Pretty(), alpha.Key().Pretty())
 	ctx, _ := context.WithTimeout(context.Background(), timeout)
 	_, err := w.exchange.Block(ctx, alpha.Key())
 	if err == nil {
-		t.Error("Expected alpha to NOT be available")
+		t.Fatalf("Expected %v to NOT be available", alpha.Key().Pretty())
 	}
-	time.Sleep(wait)
 
-	t.Log("Peer |w| announces availability of a file |beta|")
 	beta := bg.Next()
+	t.Logf("Peer %v announes availability  of %v\n", w.peer.Key().Pretty(), beta.Key().Pretty())
 	ctx, _ = context.WithTimeout(context.Background(), timeout)
+	if err := w.blockstore.Put(beta); err != nil {
+		t.Fatal(err)
+	}
 	w.exchange.HasBlock(ctx, beta)
-	time.Sleep(wait)
 
-	t.Log("I request and get |beta| from |w|. In the message, I receive |w|'s wants [alpha]")
-	t.Log("I don't have alpha, but I keep it on my wantlist.")
+	t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer.Key().Pretty(), beta.Key().Pretty(), w.peer.Key().Pretty(), alpha.Key().Pretty())
 	ctx, _ = context.WithTimeout(context.Background(), timeout)
-	me.exchange.Block(ctx, beta.Key())
-	time.Sleep(wait)
+	if _, err := me.exchange.Block(ctx, beta.Key()); err != nil {
+		t.Fatal(err)
+	}
 
-	t.Log("Peer |o| announces the availability of |alpha|")
+	t.Logf("%v announces availability of %v\n", o.peer.Key().Pretty(), alpha.Key().Pretty())
 	ctx, _ = context.WithTimeout(context.Background(), timeout)
+	if err := o.blockstore.Put(alpha); err != nil {
+		t.Fatal(err)
+	}
 	o.exchange.HasBlock(ctx, alpha)
-	time.Sleep(wait)
 
-	t.Log("I request |alpha| for myself.")
+	t.Logf("%v requests %v\n", me.peer.Key().Pretty(), alpha.Key().Pretty())
 	ctx, _ = context.WithTimeout(context.Background(), timeout)
-	me.exchange.Block(ctx, alpha.Key())
-	time.Sleep(wait)
+	if _, err := me.exchange.Block(ctx, alpha.Key()); err != nil {
+		t.Fatal(err)
+	}
 
-	t.Log("After receiving |f| from |o|, I send it to the wanting peer |w|")
+	t.Logf("%v should now have %v\n", w.peer.Key().Pretty(), alpha.Key().Pretty())
 	block, err := w.blockstore.Get(alpha.Key())
 	if err != nil {
 		t.Fatal("Should not have received an error")
 	}
 	if block.Key() != alpha.Key() {
-		t.Error("Expected to receive alpha from me")
+		t.Fatal("Expected to receive alpha from me")
 	}
 }
 
@@ -278,6 +289,9 @@ func session(net tn.Network, rs tn.RoutingServer, id peer.ID) instance {
 		strategy:      strategy.New(),
 		routing:       htc,
 		sender:        adapter,
+		wantlist: WantList{
+			data: make(map[util.Key]struct{}),
+		},
 	}
 	adapter.SetDelegate(bs)
 	return instance{
-- 
GitLab