From a159e6825c2c8c82b734e290e8fc2116d3afd429 Mon Sep 17 00:00:00 2001
From: Jeromy <jeromyj@gmail.com>
Date: Fri, 8 May 2015 23:55:35 -0700
Subject: [PATCH] implement peermanager to control outgoing messages

Also more refactoring of bitswap in general, including some perf
improvements and eventlog removal.

clean up, and buffer channels

move some things around

correctly buffer work messages

more cleanup, and improve test perf

remove unneccessary test

revert changes to bitswap message, they werent necessary
---
 exchange/bitswap/bitswap.go                   |  88 ++------
 exchange/bitswap/bitswap_test.go              |  35 +--
 exchange/bitswap/decision/engine.go           |  22 +-
 exchange/bitswap/decision/engine_test.go      |   2 +-
 .../bitswap/decision/peer_request_queue.go    |   2 +-
 exchange/bitswap/message/message.go           |  11 +-
 exchange/bitswap/network/interface.go         |   2 +
 exchange/bitswap/network/ipfs_impl.go         |   4 +
 exchange/bitswap/peermanager.go               | 203 ++++++++++++++++++
 exchange/bitswap/testnet/virtual.go           |   9 +
 exchange/bitswap/testutils.go                 |  11 +-
 exchange/bitswap/workers.go                   |   6 +-
 12 files changed, 275 insertions(+), 120 deletions(-)
 create mode 100644 exchange/bitswap/peermanager.go

diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go
index 757c9067e..b8dcdab1e 100644
--- a/exchange/bitswap/bitswap.go
+++ b/exchange/bitswap/bitswap.go
@@ -91,7 +91,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
 		process:       px,
 		newBlocks:     make(chan *blocks.Block, HasBlockBufferSize),
 		provideKeys:   make(chan u.Key),
+		pm:            NewPeerManager(network),
 	}
+	go bs.pm.Run(ctx)
 	network.SetDelegate(bs)
 
 	// Start up bitswaps async worker routines
@@ -108,6 +110,10 @@ type Bitswap struct {
 	// network delivers messages on behalf of the session
 	network bsnet.BitSwapNetwork
 
+	// the peermanager manages sending messages to peers in a way that
+	// wont block bitswap operation
+	pm *PeerManager
+
 	// blockstore is the local database
 	// NB: ensure threadsafety
 	blockstore blockstore.Blockstore
@@ -217,7 +223,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
 // HasBlock announces the existance of a block to this bitswap service. The
 // service will potentially notify its peers.
 func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
-	log.Event(ctx, "hasBlock", blk)
 	select {
 	case <-bs.process.Closing():
 		return errors.New("bitswap is closed")
@@ -227,6 +232,7 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
 	if err := bs.blockstore.Put(blk); err != nil {
 		return err
 	}
+
 	bs.wantlist.Remove(blk.Key())
 	bs.notifications.Publish(blk)
 	select {
@@ -239,7 +245,6 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
 
 func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
 	set := pset.New()
-	wg := sync.WaitGroup{}
 
 loop:
 	for {
@@ -253,37 +258,22 @@ loop:
 				continue
 			}
 
-			wg.Add(1)
-			go func(p peer.ID) {
-				defer wg.Done()
-				if err := bs.send(ctx, p, m); err != nil {
-					log.Debug(err) // TODO remove if too verbose
-				}
-			}(peerToQuery)
+			bs.pm.Send(peerToQuery, m)
 		case <-ctx.Done():
 			return nil
 		}
 	}
-	done := make(chan struct{})
-	go func() {
-		wg.Wait()
-		close(done)
-	}()
-
-	select {
-	case <-done:
-	case <-ctx.Done():
-		// NB: we may be abandoning goroutines here before they complete
-		// this shouldnt be an issue because they will complete soon anyways
-		// we just don't want their being slow to impact bitswap transfer speeds
-	}
 	return nil
 }
 
 func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
+	entries := bs.wantlist.Entries()
+	if len(entries) == 0 {
+		return nil
+	}
 	message := bsmsg.New()
 	message.SetFull(true)
-	for _, wanted := range bs.wantlist.Entries() {
+	for _, wanted := range entries {
 		message.AddEntry(wanted.Key, wanted.Priority)
 	}
 	return bs.sendWantlistMsgToPeers(ctx, message, peers)
@@ -326,7 +316,7 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
 
 // TODO(brian): handle errors
 func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error {
-	defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
+	//defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
 
 	// This call records changes to wantlists, blocks received,
 	// and number of bytes transfered.
@@ -356,6 +346,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
 // Connected/Disconnected warns bitswap about peer connections
 func (bs *Bitswap) PeerConnected(p peer.ID) {
 	// TODO: add to clientWorker??
+	bs.pm.Connected(p)
 	peers := make(chan peer.ID, 1)
 	peers <- p
 	close(peers)
@@ -367,6 +358,7 @@ func (bs *Bitswap) PeerConnected(p peer.ID) {
 
 // Connected/Disconnected warns bitswap about peer connections
 func (bs *Bitswap) PeerDisconnected(p peer.ID) {
+	bs.pm.Disconnected(p)
 	bs.engine.PeerDisconnected(p)
 }
 
@@ -381,19 +373,7 @@ func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
 		message.Cancel(k)
 	}
 
-	wg := sync.WaitGroup{}
-	for _, p := range bs.engine.Peers() {
-		wg.Add(1)
-		go func(p peer.ID) {
-			defer wg.Done()
-			err := bs.send(ctx, p, message)
-			if err != nil {
-				log.Warningf("Error sending message: %s", err)
-				return
-			}
-		}(p)
-	}
-	wg.Wait()
+	bs.pm.Broadcast(message)
 	return
 }
 
@@ -408,29 +388,7 @@ func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
 		message.AddEntry(k, kMaxPriority-i)
 	}
 
-	wg := sync.WaitGroup{}
-	for _, p := range bs.engine.Peers() {
-		wg.Add(1)
-		go func(p peer.ID) {
-			defer wg.Done()
-			err := bs.send(ctx, p, message)
-			if err != nil {
-				log.Debugf("Error sending message: %s", err)
-			}
-		}(p)
-	}
-	done := make(chan struct{})
-	go func() {
-		wg.Wait()
-		close(done)
-	}()
-	select {
-	case <-done:
-	case <-ctx.Done():
-		// NB: we may be abandoning goroutines here before they complete
-		// this shouldnt be an issue because they will complete soon anyways
-		// we just don't want their being slow to impact bitswap transfer speeds
-	}
+	bs.pm.Broadcast(message)
 }
 
 func (bs *Bitswap) ReceiveError(err error) {
@@ -439,16 +397,6 @@ func (bs *Bitswap) ReceiveError(err error) {
 	// TODO bubble the network error up to the parent context/error logger
 }
 
-// send strives to ensure that accounting is always performed when a message is
-// sent
-func (bs *Bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
-	defer log.EventBegin(ctx, "sendMessage", p, m).Done()
-	if err := bs.network.SendMessage(ctx, p, m); err != nil {
-		return err
-	}
-	return bs.engine.MessageSent(p, m)
-}
-
 func (bs *Bitswap) Close() error {
 	return bs.process.Close()
 }
diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go
index 354eb73e5..c04946692 100644
--- a/exchange/bitswap/bitswap_test.go
+++ b/exchange/bitswap/bitswap_test.go
@@ -13,7 +13,6 @@ import (
 	blocks "github.com/ipfs/go-ipfs/blocks"
 	blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
 	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
-	p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
 	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
 	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
 	u "github.com/ipfs/go-ipfs/util"
@@ -36,30 +35,6 @@ func TestClose(t *testing.T) {
 	bitswap.Exchange.GetBlock(context.Background(), block.Key())
 }
 
-func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
-
-	rs := mockrouting.NewServer()
-	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
-	g := NewTestSessionGenerator(net)
-	defer g.Close()
-
-	block := blocks.NewBlock([]byte("block"))
-	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
-	rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
-
-	solo := g.Next()
-	defer solo.Exchange.Close()
-
-	ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
-	_, err := solo.Exchange.GetBlock(ctx, block.Key())
-
-	if err != context.DeadlineExceeded {
-		t.Fatal("Expected DeadlineExceeded error")
-	}
-}
-
-// TestGetBlockAfterRequesting...
-
 func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
 
 	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
@@ -67,14 +42,15 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
 	g := NewTestSessionGenerator(net)
 	defer g.Close()
 
-	hasBlock := g.Next()
+	peers := g.Instances(2)
+	hasBlock := peers[0]
 	defer hasBlock.Exchange.Close()
 
 	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
 		t.Fatal(err)
 	}
 
-	wantsBlock := g.Next()
+	wantsBlock := peers[1]
 	defer wantsBlock.Exchange.Close()
 
 	ctx, _ := context.WithTimeout(context.Background(), time.Second)
@@ -196,8 +172,9 @@ func TestSendToWantingPeer(t *testing.T) {
 	prev := rebroadcastDelay.Set(time.Second / 2)
 	defer func() { rebroadcastDelay.Set(prev) }()
 
-	peerA := sg.Next()
-	peerB := sg.Next()
+	peers := sg.Instances(2)
+	peerA := peers[0]
+	peerB := peers[1]
 
 	t.Logf("Session %v\n", peerA.Peer)
 	t.Logf("Session %v\n", peerB.Peer)
diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go
index 60b95e469..0b08a55fb 100644
--- a/exchange/bitswap/decision/engine.go
+++ b/exchange/bitswap/decision/engine.go
@@ -5,6 +5,7 @@ import (
 	"sync"
 
 	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
+	blocks "github.com/ipfs/go-ipfs/blocks"
 	bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
 	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
 	wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
@@ -53,8 +54,9 @@ const (
 type Envelope struct {
 	// Peer is the intended recipient
 	Peer peer.ID
-	// Message is the payload
-	Message bsmsg.BitSwapMessage
+
+	// Block is the payload
+	Block *blocks.Block
 
 	// A callback to notify the decision queue that the task is complete
 	Sent func()
@@ -151,12 +153,10 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
 			continue
 		}
 
-		m := bsmsg.New() // TODO: maybe add keys from our wantlist?
-		m.AddBlock(block)
 		return &Envelope{
-			Peer:    nextTask.Target,
-			Message: m,
-			Sent:    nextTask.Done,
+			Peer:  nextTask.Target,
+			Block: block,
+			Sent:  nextTask.Done,
 		}, nil
 	}
 }
@@ -185,7 +185,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
 	defer e.lock.Unlock()
 
 	if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
-		log.Debug("received empty message from", p)
+		log.Debugf("received empty message from %s", p)
 	}
 
 	newWorkExists := false
@@ -202,11 +202,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
 
 	for _, entry := range m.Wantlist() {
 		if entry.Cancel {
-			log.Debug("cancel", entry.Key)
+			log.Debugf("cancel %s", entry.Key)
 			l.CancelWant(entry.Key)
 			e.peerRequestQueue.Remove(entry.Key, p)
 		} else {
-			log.Debug("wants", entry.Key, entry.Priority)
+			log.Debugf("wants %s", entry.Key, entry.Priority)
 			l.Wants(entry.Key, entry.Priority)
 			if exists, err := e.bs.Has(entry.Key); err == nil && exists {
 				e.peerRequestQueue.Push(entry.Entry, p)
@@ -216,7 +216,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
 	}
 
 	for _, block := range m.Blocks() {
-		log.Debug("got block %s %d bytes", block.Key(), len(block.Data))
+		log.Debugf("got block %s %d bytes", block.Key(), len(block.Data))
 		l.ReceivedBytes(len(block.Data))
 		for _, l := range e.ledgerMap {
 			if entry, ok := l.WantListContains(block.Key()); ok {
diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go
index afe6ba9ad..31e46c776 100644
--- a/exchange/bitswap/decision/engine_test.go
+++ b/exchange/bitswap/decision/engine_test.go
@@ -185,7 +185,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
 	for _, k := range keys {
 		next := <-e.Outbox()
 		envelope := <-next
-		received := envelope.Message.Blocks()[0]
+		received := envelope.Block
 		expected := blocks.NewBlock([]byte(k))
 		if received.Key() != expected.Key() {
 			return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data)))
diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go
index 42928487d..15f52da74 100644
--- a/exchange/bitswap/decision/peer_request_queue.go
+++ b/exchange/bitswap/decision/peer_request_queue.go
@@ -156,7 +156,7 @@ func (t *peerRequestTask) SetIndex(i int) {
 
 // taskKey returns a key that uniquely identifies a task.
 func taskKey(p peer.ID, k u.Key) string {
-	return string(p.String() + k.String())
+	return string(p) + string(k)
 }
 
 // FIFO is a basic task comparator that returns tasks in the order created.
diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go
index 3a7d70aae..4e88e738c 100644
--- a/exchange/bitswap/message/message.go
+++ b/exchange/bitswap/message/message.go
@@ -29,6 +29,8 @@ type BitSwapMessage interface {
 
 	Cancel(key u.Key)
 
+	Empty() bool
+
 	// Sets whether or not the contained wantlist represents the entire wantlist
 	// true = full wantlist
 	// false = wantlist 'patch'
@@ -51,7 +53,7 @@ type Exportable interface {
 type impl struct {
 	full     bool
 	wantlist map[u.Key]Entry
-	blocks   map[u.Key]*blocks.Block // map to detect duplicates
+	blocks   map[u.Key]*blocks.Block
 }
 
 func New() BitSwapMessage {
@@ -92,6 +94,10 @@ func (m *impl) Full() bool {
 	return m.full
 }
 
+func (m *impl) Empty() bool {
+	return len(m.blocks) == 0 && len(m.wantlist) == 0
+}
+
 func (m *impl) Wantlist() []Entry {
 	var out []Entry
 	for _, e := range m.wantlist {
@@ -101,7 +107,7 @@ func (m *impl) Wantlist() []Entry {
 }
 
 func (m *impl) Blocks() []*blocks.Block {
-	bs := make([]*blocks.Block, 0)
+	bs := make([]*blocks.Block, 0, len(m.blocks))
 	for _, block := range m.blocks {
 		bs = append(bs, block)
 	}
@@ -109,6 +115,7 @@ func (m *impl) Blocks() []*blocks.Block {
 }
 
 func (m *impl) Cancel(k u.Key) {
+	delete(m.wantlist, k)
 	m.addEntry(k, 0, true)
 }
 
diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go
index a6ed070c0..849a1c28e 100644
--- a/exchange/bitswap/network/interface.go
+++ b/exchange/bitswap/network/interface.go
@@ -23,6 +23,8 @@ type BitSwapNetwork interface {
 	// network.
 	SetDelegate(Receiver)
 
+	ConnectTo(context.Context, peer.ID) error
+
 	Routing
 }
 
diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go
index 97745e32d..4e5a1317f 100644
--- a/exchange/bitswap/network/ipfs_impl.go
+++ b/exchange/bitswap/network/ipfs_impl.go
@@ -97,6 +97,10 @@ func (bsnet *impl) SetDelegate(r Receiver) {
 	bsnet.receiver = r
 }
 
+func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
+	return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p})
+}
+
 // FindProvidersAsync returns a channel of providers for the given key
 func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
 
diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go
new file mode 100644
index 000000000..ff3d9ab31
--- /dev/null
+++ b/exchange/bitswap/peermanager.go
@@ -0,0 +1,203 @@
+package bitswap
+
+import (
+	"sync"
+
+	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
+	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
+	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
+	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
+	peer "github.com/ipfs/go-ipfs/p2p/peer"
+	u "github.com/ipfs/go-ipfs/util"
+)
+
+type PeerManager struct {
+	receiver bsnet.Receiver
+
+	incoming   chan *msgPair
+	connect    chan peer.ID
+	disconnect chan peer.ID
+
+	peers map[peer.ID]*msgQueue
+
+	network bsnet.BitSwapNetwork
+}
+
+func NewPeerManager(network bsnet.BitSwapNetwork) *PeerManager {
+	return &PeerManager{
+		incoming:   make(chan *msgPair, 10),
+		connect:    make(chan peer.ID, 10),
+		disconnect: make(chan peer.ID, 10),
+		peers:      make(map[peer.ID]*msgQueue),
+		network:    network,
+	}
+}
+
+type msgPair struct {
+	to  peer.ID
+	msg bsmsg.BitSwapMessage
+}
+
+type cancellation struct {
+	who peer.ID
+	blk u.Key
+}
+
+type msgQueue struct {
+	p peer.ID
+
+	lk    sync.Mutex
+	wlmsg bsmsg.BitSwapMessage
+
+	work chan struct{}
+	done chan struct{}
+}
+
+func (pm *PeerManager) SendBlock(env *engine.Envelope) {
+	// Blocks need to be sent synchronously to maintain proper backpressure
+	// throughout the network stack
+	defer env.Sent()
+
+	msg := bsmsg.New()
+	msg.AddBlock(env.Block)
+	err := pm.network.SendMessage(context.TODO(), env.Peer, msg)
+	if err != nil {
+		log.Error(err)
+	}
+}
+
+func (pm *PeerManager) startPeerHandler(p peer.ID) {
+	_, ok := pm.peers[p]
+	if ok {
+		// TODO: log an error?
+		return
+	}
+
+	mq := new(msgQueue)
+	mq.done = make(chan struct{})
+	mq.work = make(chan struct{}, 1)
+	mq.p = p
+
+	pm.peers[p] = mq
+	go pm.runQueue(mq)
+}
+
+func (pm *PeerManager) stopPeerHandler(p peer.ID) {
+	pq, ok := pm.peers[p]
+	if !ok {
+		// TODO: log error?
+		return
+	}
+
+	close(pq.done)
+	delete(pm.peers, p)
+}
+
+func (pm *PeerManager) runQueue(mq *msgQueue) {
+	for {
+		select {
+		case <-mq.work: // there is work to be done
+
+			// TODO: this might not need to be done every time, figure out
+			// a good heuristic
+			err := pm.network.ConnectTo(context.TODO(), mq.p)
+			if err != nil {
+				log.Error(err)
+				// TODO: cant connect, what now?
+			}
+
+			// grab messages from queue
+			mq.lk.Lock()
+			wlm := mq.wlmsg
+			mq.wlmsg = nil
+			mq.lk.Unlock()
+
+			if wlm != nil && !wlm.Empty() {
+				// send wantlist updates
+				err = pm.network.SendMessage(context.TODO(), mq.p, wlm)
+				if err != nil {
+					log.Error("bitswap send error: ", err)
+					// TODO: what do we do if this fails?
+				}
+			}
+		case <-mq.done:
+			return
+		}
+	}
+}
+
+func (pm *PeerManager) Send(to peer.ID, msg bsmsg.BitSwapMessage) {
+	if len(msg.Blocks()) > 0 {
+		panic("no blocks here!")
+	}
+	pm.incoming <- &msgPair{to: to, msg: msg}
+}
+
+func (pm *PeerManager) Broadcast(msg bsmsg.BitSwapMessage) {
+	pm.incoming <- &msgPair{msg: msg}
+}
+
+func (pm *PeerManager) Connected(p peer.ID) {
+	pm.connect <- p
+}
+
+func (pm *PeerManager) Disconnected(p peer.ID) {
+	pm.disconnect <- p
+}
+
+// TODO: use goprocess here once i trust it
+func (pm *PeerManager) Run(ctx context.Context) {
+	for {
+		select {
+		case msgp := <-pm.incoming:
+
+			// Broadcast message to all if recipient not set
+			if msgp.to == "" {
+				for _, p := range pm.peers {
+					p.addMessage(msgp.msg)
+				}
+				continue
+			}
+
+			p, ok := pm.peers[msgp.to]
+			if !ok {
+				//TODO: decide, drop message? or dial?
+				pm.startPeerHandler(msgp.to)
+				p = pm.peers[msgp.to]
+			}
+
+			p.addMessage(msgp.msg)
+		case p := <-pm.connect:
+			pm.startPeerHandler(p)
+		case p := <-pm.disconnect:
+			pm.stopPeerHandler(p)
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
+func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) {
+	mq.lk.Lock()
+	defer func() {
+		mq.lk.Unlock()
+		select {
+		case mq.work <- struct{}{}:
+		default:
+		}
+	}()
+
+	if mq.wlmsg == nil || msg.Full() {
+		mq.wlmsg = msg
+		return
+	}
+
+	// TODO: add a msg.Combine(...) method
+	for _, e := range msg.Wantlist() {
+		if e.Cancel {
+			mq.wlmsg.Cancel(e.Key)
+		} else {
+			mq.wlmsg.AddEntry(e.Key, e.Priority)
+		}
+	}
+}
diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go
index feb5fd722..f2c814f81 100644
--- a/exchange/bitswap/testnet/virtual.go
+++ b/exchange/bitswap/testnet/virtual.go
@@ -119,3 +119,12 @@ func (nc *networkClient) Provide(ctx context.Context, k util.Key) error {
 func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
 	nc.Receiver = r
 }
+
+func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error {
+	if !nc.network.HasPeer(p) {
+		return errors.New("no such peer in network")
+	}
+	nc.network.clients[p].PeerConnected(nc.local)
+	nc.Receiver.PeerConnected(p)
+	return nil
+}
diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go
index 2ce035c3d..47930de69 100644
--- a/exchange/bitswap/testutils.go
+++ b/exchange/bitswap/testutils.go
@@ -7,7 +7,6 @@ import (
 	ds_sync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
 	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
 	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
-	exchange "github.com/ipfs/go-ipfs/exchange"
 	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
 	peer "github.com/ipfs/go-ipfs/p2p/peer"
 	p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
@@ -56,12 +55,18 @@ func (g *SessionGenerator) Instances(n int) []Instance {
 		inst := g.Next()
 		instances = append(instances, inst)
 	}
+	for i, inst := range instances {
+		for j := i + 1; j < len(instances); j++ {
+			oinst := instances[j]
+			inst.Exchange.PeerConnected(oinst.Peer)
+		}
+	}
 	return instances
 }
 
 type Instance struct {
 	Peer       peer.ID
-	Exchange   exchange.Interface
+	Exchange   *Bitswap
 	blockstore blockstore.Blockstore
 
 	blockstoreDelay delay.D
@@ -94,7 +99,7 @@ func session(ctx context.Context, net tn.Network, p testutil.Identity) Instance
 
 	const alwaysSendToPeer = true
 
-	bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer)
+	bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer).(*Bitswap)
 
 	return Instance{
 		Peer:            p.ID(),
diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go
index dff3d911c..c6c2bbb25 100644
--- a/exchange/bitswap/workers.go
+++ b/exchange/bitswap/workers.go
@@ -70,9 +70,9 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
 				if !ok {
 					continue
 				}
-				log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
-				bs.send(ctx, envelope.Peer, envelope.Message)
-				envelope.Sent()
+
+				//log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
+				bs.pm.SendBlock(envelope)
 			case <-ctx.Done():
 				return
 			}
-- 
GitLab