diff --git a/exchange/bitswap/README.md b/exchange/bitswap/README.md
index bfa0aaa868b7173680a07d68fc71d0fcc70954ac..cfdbd27e08e599034f333d5094787a6819176bff 100644
--- a/exchange/bitswap/README.md
+++ b/exchange/bitswap/README.md
@@ -1,47 +1,37 @@
-#Welcome to Bitswap
-###(The data trading engine)
+# Bitswap
+
+## Protocol
+Bitswap is the data trading module for ipfs, it manages requesting and sending
+blocks to and from other peers in the network. Bitswap has two main jobs, the
+first is to acquire blocks requested by the client from the network. The second
+is to judiciously send blocks in its posession to other peers who want them.
+
+Bitswap is a message based protocol, as opposed to response-reply. All messages
+contain wantlists, or blocks. Upon receiving a wantlist, a node should consider
+sending out wanted blocks if they have them. Upon receiving blocks, the node
+should send out a notification called a 'Cancel' signifying that they no longer
+want the block. At a protocol level, bitswap is very simple.
+
+## go-ipfs Implementation
+Internally, when a message with a wantlist is received, it is sent to the
+decision engine to be considered, and blocks that we have that are wanted are
+placed into the peer request queue. Any block we possess that is wanted by
+another peer has a task in the peer request queue created for it. The peer
+request queue is a priority queue that sorts available tasks by some metric,
+currently, that metric is very simple and aims to fairly address the tasks
+of each other peer. More advanced decision logic will be implemented in the
+future. Task workers pull tasks to be done off of the queue, retreive the block
+to be sent, and send it off. The number of task workers is limited by a constant
+factor.
+
+Client requests for new blocks are handled by the want manager, for every new
+block (or set of blocks) wanted, the 'WantBlocks' method is invoked. The want
+manager then ensures that connected peers are notified of the new block that we
+want by sending the new entries to a message queue for each peer. The message
+queue will loop while there is work available and do the following: 1) Ensure it
+has a connection to its peer, 2) grab the message to be sent, and 3) send it.
+If new messages are added while the loop is in steps 1 or 3, the messages are
+combined into one to avoid having to keep an actual queue and send multiple
+messages. The same process occurs when the client receives a block and sends a
+cancel message for it.
 
-Bitswap is the module that is responsible for requesting and providing data
-blocks over the network to and from other ipfs peers. The role of bitswap is
-to be a merchant in the large global marketplace of data.
-
-##Main Operations
-Bitswap has three high level operations:
-
-- **GetBlocks**
-  - `GetBlocks` is a bitswap method used to request multiple blocks that are likely
-to all be provided by the same set of peers (part of a single file, for example).
-
-- **GetBlock**
-  - `GetBlock` is a special case of `GetBlocks` that just requests a single block.
-
-- **HasBlock**
-  - `HasBlock` registers a local block with bitswap. Bitswap will then send that
-block to any connected peers who want it (with the strategies approval), record
-that transaction in the ledger and announce to the DHT that the block is being
-provided.
-
-##Internal Details
-All `GetBlock` requests are relayed into a single for-select loop via channels.
-Calls to `GetBlocks` will have `FindProviders` called for only the first key in
-the set initially, This is an optimization attempting to cut down on the number
-of RPCs required. After a timeout (specified by the strategies
-`GetRebroadcastDelay`) Bitswap will iterate through all keys still in the local
-wantlist, perform a find providers call for each, and sent the wantlist out to
-those providers. This is the fallback behaviour for cases where our initial
-assumption about one peer potentially having multiple blocks in a set does not
-hold true.
-
-When receiving messages, Bitswaps `ReceiveMessage` method is called. A bitswap
-message may contain the wantlist of the peer who sent the message, and an array
-of blocks that were on our local wantlist. Any blocks we receive in a bitswap
-message will be passed to `HasBlock`, and the other peers wantlist gets updated
-in the strategy by `bs.strategy.MessageReceived`.
-If another peers wantlist is received, Bitswap will call its strategies
-`ShouldSendBlockToPeer` method to determine whether or not the other peer will
-be sent the block they are requesting (if we even have it).
-
-##Outstanding TODOs:
-- [ ] Ensure only one request active per key
-- [ ] More involved strategies
-- [ ] Ensure only wanted blocks are counted in ledgers
diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go
index 757c9067eb9062bbaa9b19c2e18e14cc8a751a97..db7bc033f5a7f9ae242e5a20c5d5c95218477d23 100644
--- a/exchange/bitswap/bitswap.go
+++ b/exchange/bitswap/bitswap.go
@@ -4,7 +4,6 @@ package bitswap
 
 import (
 	"errors"
-	"fmt"
 	"math"
 	"sync"
 	"time"
@@ -23,7 +22,6 @@ import (
 	"github.com/ipfs/go-ipfs/thirdparty/delay"
 	eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
 	u "github.com/ipfs/go-ipfs/util"
-	pset "github.com/ipfs/go-ipfs/util/peerset" // TODO move this to peerstore
 )
 
 var log = eventlog.Logger("bitswap")
@@ -45,9 +43,7 @@ const (
 	provideWorkers     = 4
 )
 
-var (
-	rebroadcastDelay = delay.Fixed(time.Second * 10)
-)
+var rebroadcastDelay = delay.Fixed(time.Second * 10)
 
 // New initializes a BitSwap instance that communicates over the provided
 // BitSwapNetwork. This function registers the returned instance as the network
@@ -86,12 +82,13 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
 		notifications: notif,
 		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
 		network:       network,
-		wantlist:      wantlist.NewThreadSafe(),
 		batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
 		process:       px,
 		newBlocks:     make(chan *blocks.Block, HasBlockBufferSize),
 		provideKeys:   make(chan u.Key),
+		wm:            NewWantManager(ctx, network),
 	}
+	go bs.wm.Run()
 	network.SetDelegate(bs)
 
 	// Start up bitswaps async worker routines
@@ -108,6 +105,10 @@ type Bitswap struct {
 	// network delivers messages on behalf of the session
 	network bsnet.BitSwapNetwork
 
+	// the peermanager manages sending messages to peers in a way that
+	// wont block bitswap operation
+	wm *WantManager
+
 	// blockstore is the local database
 	// NB: ensure threadsafety
 	blockstore blockstore.Blockstore
@@ -121,14 +122,13 @@ type Bitswap struct {
 
 	engine *decision.Engine
 
-	wantlist *wantlist.ThreadSafe
-
 	process process.Process
 
 	newBlocks chan *blocks.Block
 
 	provideKeys chan u.Key
 
+	counterLk      sync.Mutex
 	blocksRecvd    int
 	dupBlocksRecvd int
 }
@@ -217,7 +217,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
 // HasBlock announces the existance of a block to this bitswap service. The
 // service will potentially notify its peers.
 func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
-	log.Event(ctx, "hasBlock", blk)
 	select {
 	case <-bs.process.Closing():
 		return errors.New("bitswap is closed")
@@ -227,76 +226,22 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
 	if err := bs.blockstore.Put(blk); err != nil {
 		return err
 	}
-	bs.wantlist.Remove(blk.Key())
+
 	bs.notifications.Publish(blk)
 	select {
 	case bs.newBlocks <- blk:
+		// send block off to be reprovided
 	case <-ctx.Done():
 		return ctx.Err()
 	}
 	return nil
 }
 
-func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
-	set := pset.New()
-	wg := sync.WaitGroup{}
-
-loop:
-	for {
-		select {
-		case peerToQuery, ok := <-peers:
-			if !ok {
-				break loop
-			}
-
-			if !set.TryAdd(peerToQuery) { //Do once per peer
-				continue
-			}
-
-			wg.Add(1)
-			go func(p peer.ID) {
-				defer wg.Done()
-				if err := bs.send(ctx, p, m); err != nil {
-					log.Debug(err) // TODO remove if too verbose
-				}
-			}(peerToQuery)
-		case <-ctx.Done():
-			return nil
-		}
-	}
-	done := make(chan struct{})
-	go func() {
-		wg.Wait()
-		close(done)
-	}()
-
-	select {
-	case <-done:
-	case <-ctx.Done():
-		// NB: we may be abandoning goroutines here before they complete
-		// this shouldnt be an issue because they will complete soon anyways
-		// we just don't want their being slow to impact bitswap transfer speeds
-	}
-	return nil
-}
-
-func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
-	message := bsmsg.New()
-	message.SetFull(true)
-	for _, wanted := range bs.wantlist.Entries() {
-		message.AddEntry(wanted.Key, wanted.Priority)
-	}
-	return bs.sendWantlistMsgToPeers(ctx, message, peers)
-}
-
-func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) {
+func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) {
 
 	ctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 
-	// prepare a channel to hand off to sendWantlistToPeers
-	sendToPeers := make(chan peer.ID)
-
 	// Get providers for all entries in wantlist (could take a while)
 	wg := sync.WaitGroup{}
 	for _, e := range entries {
@@ -308,154 +253,76 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
 			defer cancel()
 			providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
 			for prov := range providers {
-				sendToPeers <- prov
+				go func(p peer.ID) {
+					bs.network.ConnectTo(ctx, p)
+				}(prov)
 			}
 		}(e.Key)
 	}
 
-	go func() {
-		wg.Wait() // make sure all our children do finish.
-		close(sendToPeers)
-	}()
-
-	err := bs.sendWantlistToPeers(ctx, sendToPeers)
-	if err != nil {
-		log.Debugf("sendWantlistToPeers error: %s", err)
-	}
+	wg.Wait() // make sure all our children do finish.
 }
 
-// TODO(brian): handle errors
-func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error {
-	defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
-
+func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
 	// This call records changes to wantlists, blocks received,
 	// and number of bytes transfered.
 	bs.engine.MessageReceived(p, incoming)
 	// TODO: this is bad, and could be easily abused.
 	// Should only track *useful* messages in ledger
 
+	if len(incoming.Blocks()) == 0 {
+		return
+	}
+
+	// quickly send out cancels, reduces chances of duplicate block receives
 	var keys []u.Key
 	for _, block := range incoming.Blocks() {
+		keys = append(keys, block.Key())
+	}
+	bs.wm.CancelWants(keys)
+
+	for _, block := range incoming.Blocks() {
+		bs.counterLk.Lock()
 		bs.blocksRecvd++
 		if has, err := bs.blockstore.Has(block.Key()); err == nil && has {
 			bs.dupBlocksRecvd++
 		}
-		log.Debugf("got block %s from %s", block, p)
+		bs.counterLk.Unlock()
+		log.Infof("got block %s from %s (%d,%d)", block, p, bs.blocksRecvd, bs.dupBlocksRecvd)
+
 		hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
 		if err := bs.HasBlock(hasBlockCtx, block); err != nil {
-			return fmt.Errorf("ReceiveMessage HasBlock error: %s", err)
+			log.Warningf("ReceiveMessage HasBlock error: %s", err)
 		}
 		cancel()
-		keys = append(keys, block.Key())
 	}
-
-	bs.cancelBlocks(ctx, keys)
-	return nil
 }
 
 // Connected/Disconnected warns bitswap about peer connections
 func (bs *Bitswap) PeerConnected(p peer.ID) {
 	// TODO: add to clientWorker??
-	peers := make(chan peer.ID, 1)
-	peers <- p
-	close(peers)
-	err := bs.sendWantlistToPeers(context.TODO(), peers)
-	if err != nil {
-		log.Debugf("error sending wantlist: %s", err)
-	}
+	bs.wm.Connected(p)
 }
 
 // Connected/Disconnected warns bitswap about peer connections
 func (bs *Bitswap) PeerDisconnected(p peer.ID) {
+	bs.wm.Disconnected(p)
 	bs.engine.PeerDisconnected(p)
 }
 
-func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
-	if len(bkeys) < 1 {
-		return
-	}
-	message := bsmsg.New()
-	message.SetFull(false)
-	for _, k := range bkeys {
-		log.Debug("cancel block: %s", k)
-		message.Cancel(k)
-	}
-
-	wg := sync.WaitGroup{}
-	for _, p := range bs.engine.Peers() {
-		wg.Add(1)
-		go func(p peer.ID) {
-			defer wg.Done()
-			err := bs.send(ctx, p, message)
-			if err != nil {
-				log.Warningf("Error sending message: %s", err)
-				return
-			}
-		}(p)
-	}
-	wg.Wait()
-	return
-}
-
-func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
-	if len(bkeys) < 1 {
-		return
-	}
-
-	message := bsmsg.New()
-	message.SetFull(false)
-	for i, k := range bkeys {
-		message.AddEntry(k, kMaxPriority-i)
-	}
-
-	wg := sync.WaitGroup{}
-	for _, p := range bs.engine.Peers() {
-		wg.Add(1)
-		go func(p peer.ID) {
-			defer wg.Done()
-			err := bs.send(ctx, p, message)
-			if err != nil {
-				log.Debugf("Error sending message: %s", err)
-			}
-		}(p)
-	}
-	done := make(chan struct{})
-	go func() {
-		wg.Wait()
-		close(done)
-	}()
-	select {
-	case <-done:
-	case <-ctx.Done():
-		// NB: we may be abandoning goroutines here before they complete
-		// this shouldnt be an issue because they will complete soon anyways
-		// we just don't want their being slow to impact bitswap transfer speeds
-	}
-}
-
 func (bs *Bitswap) ReceiveError(err error) {
 	log.Debugf("Bitswap ReceiveError: %s", err)
 	// TODO log the network error
 	// TODO bubble the network error up to the parent context/error logger
 }
 
-// send strives to ensure that accounting is always performed when a message is
-// sent
-func (bs *Bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
-	defer log.EventBegin(ctx, "sendMessage", p, m).Done()
-	if err := bs.network.SendMessage(ctx, p, m); err != nil {
-		return err
-	}
-	return bs.engine.MessageSent(p, m)
-}
-
 func (bs *Bitswap) Close() error {
 	return bs.process.Close()
 }
 
 func (bs *Bitswap) GetWantlist() []u.Key {
 	var out []u.Key
-	for _, e := range bs.wantlist.Entries() {
+	for _, e := range bs.wm.wl.Entries() {
 		out = append(out, e.Key)
 	}
 	return out
diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go
index 354eb73e5afeed20c81450fb73f1416f8a6af36c..803bcd223b80c853aed3b9b9aa52a54129777cb4 100644
--- a/exchange/bitswap/bitswap_test.go
+++ b/exchange/bitswap/bitswap_test.go
@@ -58,8 +58,6 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
 	}
 }
 
-// TestGetBlockAfterRequesting...
-
 func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
 
 	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
@@ -67,14 +65,15 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
 	g := NewTestSessionGenerator(net)
 	defer g.Close()
 
-	hasBlock := g.Next()
+	peers := g.Instances(2)
+	hasBlock := peers[0]
 	defer hasBlock.Exchange.Close()
 
 	if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
 		t.Fatal(err)
 	}
 
-	wantsBlock := g.Next()
+	wantsBlock := peers[1]
 	defer wantsBlock.Exchange.Close()
 
 	ctx, _ := context.WithTimeout(context.Background(), time.Second)
@@ -93,7 +92,7 @@ func TestLargeSwarm(t *testing.T) {
 	if testing.Short() {
 		t.SkipNow()
 	}
-	numInstances := 500
+	numInstances := 100
 	numBlocks := 2
 	if detectrace.WithRace() {
 		// when running with the race detector, 500 instances launches
@@ -121,6 +120,27 @@ func TestLargeFile(t *testing.T) {
 	PerformDistributionTest(t, numInstances, numBlocks)
 }
 
+func TestLargeFileNoRebroadcast(t *testing.T) {
+	rbd := rebroadcastDelay.Get()
+	rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough
+	if testing.Short() {
+		t.SkipNow()
+	}
+	numInstances := 10
+	numBlocks := 100
+	PerformDistributionTest(t, numInstances, numBlocks)
+	rebroadcastDelay.Set(rbd)
+}
+
+func TestLargeFileTwoPeers(t *testing.T) {
+	if testing.Short() {
+		t.SkipNow()
+	}
+	numInstances := 2
+	numBlocks := 100
+	PerformDistributionTest(t, numInstances, numBlocks)
+}
+
 func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
 	if testing.Short() {
 		t.SkipNow()
@@ -130,8 +150,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
 	defer sg.Close()
 	bg := blocksutil.NewBlockGenerator()
 
-	t.Log("Test a few nodes trying to get one file with a lot of blocks")
-
 	instances := sg.Instances(numInstances)
 	blocks := bg.Blocks(numBlocks)
 
@@ -196,8 +214,9 @@ func TestSendToWantingPeer(t *testing.T) {
 	prev := rebroadcastDelay.Set(time.Second / 2)
 	defer func() { rebroadcastDelay.Set(prev) }()
 
-	peerA := sg.Next()
-	peerB := sg.Next()
+	peers := sg.Instances(2)
+	peerA := peers[0]
+	peerB := peers[1]
 
 	t.Logf("Session %v\n", peerA.Peer)
 	t.Logf("Session %v\n", peerB.Peer)
@@ -238,7 +257,7 @@ func TestBasicBitswap(t *testing.T) {
 	defer sg.Close()
 	bg := blocksutil.NewBlockGenerator()
 
-	t.Log("Test a few nodes trying to get one file with a lot of blocks")
+	t.Log("Test a one node trying to get one block from another")
 
 	instances := sg.Instances(2)
 	blocks := bg.Blocks(1)
diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go
index 60b95e469b7ca9664d7b26c24975f3c88695c9fb..d08636d800c0386929032ef6173525195fe2509c 100644
--- a/exchange/bitswap/decision/engine.go
+++ b/exchange/bitswap/decision/engine.go
@@ -5,6 +5,7 @@ import (
 	"sync"
 
 	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
+	blocks "github.com/ipfs/go-ipfs/blocks"
 	bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
 	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
 	wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
@@ -53,8 +54,9 @@ const (
 type Envelope struct {
 	// Peer is the intended recipient
 	Peer peer.ID
-	// Message is the payload
-	Message bsmsg.BitSwapMessage
+
+	// Block is the payload
+	Block *blocks.Block
 
 	// A callback to notify the decision queue that the task is complete
 	Sent func()
@@ -90,7 +92,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
 		bs:               bs,
 		peerRequestQueue: newPRQ(),
 		outbox:           make(chan (<-chan *Envelope), outboxChanBuffer),
-		workSignal:       make(chan struct{}),
+		workSignal:       make(chan struct{}, 1),
 	}
 	go e.taskWorker(ctx)
 	return e
@@ -151,12 +153,18 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
 			continue
 		}
 
-		m := bsmsg.New() // TODO: maybe add keys from our wantlist?
-		m.AddBlock(block)
 		return &Envelope{
-			Peer:    nextTask.Target,
-			Message: m,
-			Sent:    nextTask.Done,
+			Peer:  nextTask.Target,
+			Block: block,
+			Sent: func() {
+				nextTask.Done()
+				select {
+				case e.workSignal <- struct{}{}:
+					// work completing may mean that our queue will provide new
+					// work to be done.
+				default:
+				}
+			},
 		}, nil
 	}
 }
@@ -185,7 +193,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
 	defer e.lock.Unlock()
 
 	if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
-		log.Debug("received empty message from", p)
+		log.Debugf("received empty message from %s", p)
 	}
 
 	newWorkExists := false
@@ -202,11 +210,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
 
 	for _, entry := range m.Wantlist() {
 		if entry.Cancel {
-			log.Debug("cancel", entry.Key)
+			log.Debugf("cancel %s", entry.Key)
 			l.CancelWant(entry.Key)
 			e.peerRequestQueue.Remove(entry.Key, p)
 		} else {
-			log.Debug("wants", entry.Key, entry.Priority)
+			log.Debugf("wants %s - %d", entry.Key, entry.Priority)
 			l.Wants(entry.Key, entry.Priority)
 			if exists, err := e.bs.Has(entry.Key); err == nil && exists {
 				e.peerRequestQueue.Push(entry.Entry, p)
@@ -216,7 +224,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
 	}
 
 	for _, block := range m.Blocks() {
-		log.Debug("got block %s %d bytes", block.Key(), len(block.Data))
+		log.Debugf("got block %s %d bytes", block.Key(), len(block.Data))
 		l.ReceivedBytes(len(block.Data))
 		for _, l := range e.ledgerMap {
 			if entry, ok := l.WantListContains(block.Key()); ok {
diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go
index afe6ba9adddf7e1f720b3dee9e330593720c50dc..8337c480032d1ab2e2b2108aaa1d980abf66b3ab 100644
--- a/exchange/bitswap/decision/engine_test.go
+++ b/exchange/bitswap/decision/engine_test.go
@@ -41,7 +41,7 @@ func TestConsistentAccounting(t *testing.T) {
 	// Send messages from Ernie to Bert
 	for i := 0; i < 1000; i++ {
 
-		m := message.New()
+		m := message.New(false)
 		content := []string{"this", "is", "message", "i"}
 		m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
 
@@ -73,7 +73,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
 	sanfrancisco := newEngine(ctx, "sf")
 	seattle := newEngine(ctx, "sea")
 
-	m := message.New()
+	m := message.New(true)
 
 	sanfrancisco.Engine.MessageSent(seattle.Peer, m)
 	seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
@@ -164,7 +164,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
 }
 
 func partnerWants(e *Engine, keys []string, partner peer.ID) {
-	add := message.New()
+	add := message.New(false)
 	for i, letter := range keys {
 		block := blocks.NewBlock([]byte(letter))
 		add.AddEntry(block.Key(), math.MaxInt32-i)
@@ -173,7 +173,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) {
 }
 
 func partnerCancels(e *Engine, keys []string, partner peer.ID) {
-	cancels := message.New()
+	cancels := message.New(false)
 	for _, k := range keys {
 		block := blocks.NewBlock([]byte(k))
 		cancels.Cancel(block.Key())
@@ -185,7 +185,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
 	for _, k := range keys {
 		next := <-e.Outbox()
 		envelope := <-next
-		received := envelope.Message.Blocks()[0]
+		received := envelope.Block
 		expected := blocks.NewBlock([]byte(k))
 		if received.Key() != expected.Key() {
 			return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data)))
diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go
index 42928487dcc2e52e5f7aab37b40b4cb0a477e04e..397a1622332f17466fd98b7310b187c9ac1db756 100644
--- a/exchange/bitswap/decision/peer_request_queue.go
+++ b/exchange/bitswap/decision/peer_request_queue.go
@@ -51,12 +51,6 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
 		tl.partners[to] = partner
 	}
 
-	if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
-		task.Entry.Priority = entry.Priority
-		partner.taskQueue.Update(task.index)
-		return
-	}
-
 	partner.activelk.Lock()
 	defer partner.activelk.Unlock()
 	_, ok = partner.activeBlocks[entry.Key]
@@ -64,13 +58,19 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
 		return
 	}
 
+	if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
+		task.Entry.Priority = entry.Priority
+		partner.taskQueue.Update(task.index)
+		return
+	}
+
 	task := &peerRequestTask{
 		Entry:   entry,
 		Target:  to,
 		created: time.Now(),
 		Done: func() {
-			partner.TaskDone(entry.Key)
 			tl.lock.Lock()
+			partner.TaskDone(entry.Key)
 			tl.pQueue.Update(partner.Index())
 			tl.lock.Unlock()
 		},
@@ -156,7 +156,7 @@ func (t *peerRequestTask) SetIndex(i int) {
 
 // taskKey returns a key that uniquely identifies a task.
 func taskKey(p peer.ID, k u.Key) string {
-	return string(p.String() + k.String())
+	return string(p) + string(k)
 }
 
 // FIFO is a basic task comparator that returns tasks in the order created.
@@ -220,6 +220,12 @@ func partnerCompare(a, b pq.Elem) bool {
 	if pb.requests == 0 {
 		return true
 	}
+	if pa.active == pb.active {
+		// sorting by taskQueue.Len() aids in cleaning out trash entries faster
+		// if we sorted instead by requests, one peer could potentially build up
+		// a huge number of cancelled entries in the queue resulting in a memory leak
+		return pa.taskQueue.Len() > pb.taskQueue.Len()
+	}
 	return pa.active < pb.active
 }
 
diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go
index 3a7d70aae0eb39ca9b31a82a575408b04e96d6a8..d885bb373a84ce1f4b6590de435d1b5ddcfec093 100644
--- a/exchange/bitswap/message/message.go
+++ b/exchange/bitswap/message/message.go
@@ -29,12 +29,9 @@ type BitSwapMessage interface {
 
 	Cancel(key u.Key)
 
-	// Sets whether or not the contained wantlist represents the entire wantlist
-	// true = full wantlist
-	// false = wantlist 'patch'
-	// default: true
-	SetFull(isFull bool)
+	Empty() bool
 
+	// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
 	Full() bool
 
 	AddBlock(*blocks.Block)
@@ -51,18 +48,18 @@ type Exportable interface {
 type impl struct {
 	full     bool
 	wantlist map[u.Key]Entry
-	blocks   map[u.Key]*blocks.Block // map to detect duplicates
+	blocks   map[u.Key]*blocks.Block
 }
 
-func New() BitSwapMessage {
-	return newMsg()
+func New(full bool) BitSwapMessage {
+	return newMsg(full)
 }
 
-func newMsg() *impl {
+func newMsg(full bool) *impl {
 	return &impl{
 		blocks:   make(map[u.Key]*blocks.Block),
 		wantlist: make(map[u.Key]Entry),
-		full:     true,
+		full:     full,
 	}
 }
 
@@ -72,8 +69,7 @@ type Entry struct {
 }
 
 func newMessageFromProto(pbm pb.Message) BitSwapMessage {
-	m := newMsg()
-	m.SetFull(pbm.GetWantlist().GetFull())
+	m := newMsg(pbm.GetWantlist().GetFull())
 	for _, e := range pbm.GetWantlist().GetEntries() {
 		m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
 	}
@@ -84,14 +80,14 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage {
 	return m
 }
 
-func (m *impl) SetFull(full bool) {
-	m.full = full
-}
-
 func (m *impl) Full() bool {
 	return m.full
 }
 
+func (m *impl) Empty() bool {
+	return len(m.blocks) == 0 && len(m.wantlist) == 0
+}
+
 func (m *impl) Wantlist() []Entry {
 	var out []Entry
 	for _, e := range m.wantlist {
@@ -101,7 +97,7 @@ func (m *impl) Wantlist() []Entry {
 }
 
 func (m *impl) Blocks() []*blocks.Block {
-	bs := make([]*blocks.Block, 0)
+	bs := make([]*blocks.Block, 0, len(m.blocks))
 	for _, block := range m.blocks {
 		bs = append(bs, block)
 	}
@@ -109,6 +105,7 @@ func (m *impl) Blocks() []*blocks.Block {
 }
 
 func (m *impl) Cancel(k u.Key) {
+	delete(m.wantlist, k)
 	m.addEntry(k, 0, true)
 }
 
@@ -155,7 +152,7 @@ func (m *impl) ToProto() *pb.Message {
 		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
 			Block:    proto.String(string(e.Key)),
 			Priority: proto.Int32(int32(e.Priority)),
-			Cancel:   &e.Cancel,
+			Cancel:   proto.Bool(e.Cancel),
 		})
 	}
 	for _, b := range m.Blocks() {
diff --git a/exchange/bitswap/message/message_test.go b/exchange/bitswap/message/message_test.go
index cbeed88927dfc51abca6bc482f6ba9b3d1b80ebc..7a6a28a0430c6d80778cc3b86eee3eec9f1c179e 100644
--- a/exchange/bitswap/message/message_test.go
+++ b/exchange/bitswap/message/message_test.go
@@ -13,7 +13,7 @@ import (
 
 func TestAppendWanted(t *testing.T) {
 	const str = "foo"
-	m := New()
+	m := New(true)
 	m.AddEntry(u.Key(str), 1)
 
 	if !wantlistContains(m.ToProto().GetWantlist(), str) {
@@ -44,7 +44,7 @@ func TestAppendBlock(t *testing.T) {
 	strs = append(strs, "Celeritas")
 	strs = append(strs, "Incendia")
 
-	m := New()
+	m := New(true)
 	for _, str := range strs {
 		block := blocks.NewBlock([]byte(str))
 		m.AddBlock(block)
@@ -61,7 +61,7 @@ func TestAppendBlock(t *testing.T) {
 
 func TestWantlist(t *testing.T) {
 	keystrs := []string{"foo", "bar", "baz", "bat"}
-	m := New()
+	m := New(true)
 	for _, s := range keystrs {
 		m.AddEntry(u.Key(s), 1)
 	}
@@ -84,7 +84,7 @@ func TestWantlist(t *testing.T) {
 
 func TestCopyProtoByValue(t *testing.T) {
 	const str = "foo"
-	m := New()
+	m := New(true)
 	protoBeforeAppend := m.ToProto()
 	m.AddEntry(u.Key(str), 1)
 	if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
@@ -93,7 +93,7 @@ func TestCopyProtoByValue(t *testing.T) {
 }
 
 func TestToNetFromNetPreservesWantList(t *testing.T) {
-	original := New()
+	original := New(true)
 	original.AddEntry(u.Key("M"), 1)
 	original.AddEntry(u.Key("B"), 1)
 	original.AddEntry(u.Key("D"), 1)
@@ -124,7 +124,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
 
 func TestToAndFromNetMessage(t *testing.T) {
 
-	original := New()
+	original := New(true)
 	original.AddBlock(blocks.NewBlock([]byte("W")))
 	original.AddBlock(blocks.NewBlock([]byte("E")))
 	original.AddBlock(blocks.NewBlock([]byte("F")))
@@ -172,7 +172,7 @@ func contains(strs []string, x string) bool {
 
 func TestDuplicates(t *testing.T) {
 	b := blocks.NewBlock([]byte("foo"))
-	msg := New()
+	msg := New(true)
 
 	msg.AddEntry(b.Key(), 1)
 	msg.AddEntry(b.Key(), 1)
diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go
index a6ed070c03c8c9137e733407db44f881b770830a..83fca07937a195326b6fe0a19a890a2d1b458799 100644
--- a/exchange/bitswap/network/interface.go
+++ b/exchange/bitswap/network/interface.go
@@ -23,6 +23,8 @@ type BitSwapNetwork interface {
 	// network.
 	SetDelegate(Receiver)
 
+	ConnectTo(context.Context, peer.ID) error
+
 	Routing
 }
 
@@ -31,7 +33,7 @@ type Receiver interface {
 	ReceiveMessage(
 		ctx context.Context,
 		sender peer.ID,
-		incoming bsmsg.BitSwapMessage) error
+		incoming bsmsg.BitSwapMessage)
 
 	ReceiveError(error)
 
diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go
index 97745e32da2e612efacc4d16f8fcf822dd126b2a..4e5a1317f58bcc5e1f033bdbd8bf928c39673d52 100644
--- a/exchange/bitswap/network/ipfs_impl.go
+++ b/exchange/bitswap/network/ipfs_impl.go
@@ -97,6 +97,10 @@ func (bsnet *impl) SetDelegate(r Receiver) {
 	bsnet.receiver = r
 }
 
+func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
+	return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p})
+}
+
 // FindProvidersAsync returns a channel of providers for the given key
 func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
 
diff --git a/exchange/bitswap/stat.go b/exchange/bitswap/stat.go
index ceab4b2ee2aef994b51bd4c6ccdeca734f8da6c8..a4db4c9c578783ec219647d4b9479715b5d8350f 100644
--- a/exchange/bitswap/stat.go
+++ b/exchange/bitswap/stat.go
@@ -17,8 +17,10 @@ func (bs *Bitswap) Stat() (*Stat, error) {
 	st := new(Stat)
 	st.ProvideBufLen = len(bs.newBlocks)
 	st.Wantlist = bs.GetWantlist()
+	bs.counterLk.Lock()
 	st.BlocksReceived = bs.blocksRecvd
 	st.DupBlksReceived = bs.dupBlocksRecvd
+	bs.counterLk.Unlock()
 
 	for _, p := range bs.engine.Peers() {
 		st.Peers = append(st.Peers, p.Pretty())
diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go
index 9091ff255c65aa77a13a3d7e44c09dfb4ba80f6b..9624df5f8e87125374f5b3637e47feab9d1c6a7c 100644
--- a/exchange/bitswap/testnet/network_test.go
+++ b/exchange/bitswap/testnet/network_test.go
@@ -29,19 +29,17 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
 	responder.SetDelegate(lambda(func(
 		ctx context.Context,
 		fromWaiter peer.ID,
-		msgFromWaiter bsmsg.BitSwapMessage) error {
+		msgFromWaiter bsmsg.BitSwapMessage) {
 
-		msgToWaiter := bsmsg.New()
+		msgToWaiter := bsmsg.New(true)
 		msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
 		waiter.SendMessage(ctx, fromWaiter, msgToWaiter)
-
-		return nil
 	}))
 
 	waiter.SetDelegate(lambda(func(
 		ctx context.Context,
 		fromResponder peer.ID,
-		msgFromResponder bsmsg.BitSwapMessage) error {
+		msgFromResponder bsmsg.BitSwapMessage) {
 
 		// TODO assert that this came from the correct peer and that the message contents are as expected
 		ok := false
@@ -54,12 +52,10 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
 
 		if !ok {
 			t.Fatal("Message not received from the responder")
-
 		}
-		return nil
 	}))
 
-	messageSentAsync := bsmsg.New()
+	messageSentAsync := bsmsg.New(true)
 	messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
 	errSending := waiter.SendMessage(
 		context.Background(), responderPeer.ID(), messageSentAsync)
@@ -71,7 +67,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
 }
 
 type receiverFunc func(ctx context.Context, p peer.ID,
-	incoming bsmsg.BitSwapMessage) error
+	incoming bsmsg.BitSwapMessage)
 
 // lambda returns a Receiver instance given a receiver function
 func lambda(f receiverFunc) bsnet.Receiver {
@@ -81,12 +77,12 @@ func lambda(f receiverFunc) bsnet.Receiver {
 }
 
 type lambdaImpl struct {
-	f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error
+	f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage)
 }
 
 func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
-	p peer.ID, incoming bsmsg.BitSwapMessage) error {
-	return lam.f(ctx, p, incoming)
+	p peer.ID, incoming bsmsg.BitSwapMessage) {
+	lam.f(ctx, p, incoming)
 }
 
 func (lam *lambdaImpl) ReceiveError(err error) {
diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go
index feb5fd722a955df895bde208fe6096a8bafc34e7..f8ca0cd55166ca400b59fec93263788ddb3d599b 100644
--- a/exchange/bitswap/testnet/virtual.go
+++ b/exchange/bitswap/testnet/virtual.go
@@ -72,7 +72,8 @@ func (n *network) deliver(
 
 	n.delay.Wait()
 
-	return r.ReceiveMessage(context.TODO(), from, message)
+	r.ReceiveMessage(context.TODO(), from, message)
+	return nil
 }
 
 type networkClient struct {
@@ -119,3 +120,12 @@ func (nc *networkClient) Provide(ctx context.Context, k util.Key) error {
 func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
 	nc.Receiver = r
 }
+
+func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error {
+	if !nc.network.HasPeer(p) {
+		return errors.New("no such peer in network")
+	}
+	nc.network.clients[p].PeerConnected(nc.local)
+	nc.Receiver.PeerConnected(p)
+	return nil
+}
diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go
index 2ce035c3de8e323bb12be73704993e1be1be9175..47930de694d6a582a08e2da3a4012611a080647c 100644
--- a/exchange/bitswap/testutils.go
+++ b/exchange/bitswap/testutils.go
@@ -7,7 +7,6 @@ import (
 	ds_sync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
 	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
 	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
-	exchange "github.com/ipfs/go-ipfs/exchange"
 	tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
 	peer "github.com/ipfs/go-ipfs/p2p/peer"
 	p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
@@ -56,12 +55,18 @@ func (g *SessionGenerator) Instances(n int) []Instance {
 		inst := g.Next()
 		instances = append(instances, inst)
 	}
+	for i, inst := range instances {
+		for j := i + 1; j < len(instances); j++ {
+			oinst := instances[j]
+			inst.Exchange.PeerConnected(oinst.Peer)
+		}
+	}
 	return instances
 }
 
 type Instance struct {
 	Peer       peer.ID
-	Exchange   exchange.Interface
+	Exchange   *Bitswap
 	blockstore blockstore.Blockstore
 
 	blockstoreDelay delay.D
@@ -94,7 +99,7 @@ func session(ctx context.Context, net tn.Network, p testutil.Identity) Instance
 
 	const alwaysSendToPeer = true
 
-	bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer)
+	bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer).(*Bitswap)
 
 	return Instance{
 		Peer:            p.ID(),
diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go
new file mode 100644
index 0000000000000000000000000000000000000000..29706710f99d62dc0a891612922f1959da24ed96
--- /dev/null
+++ b/exchange/bitswap/wantmanager.go
@@ -0,0 +1,259 @@
+package bitswap
+
+import (
+	"sync"
+	"time"
+
+	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
+	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
+	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
+	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
+	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
+	peer "github.com/ipfs/go-ipfs/p2p/peer"
+	u "github.com/ipfs/go-ipfs/util"
+)
+
+type WantManager struct {
+	// sync channels for Run loop
+	incoming   chan []*bsmsg.Entry
+	connect    chan peer.ID // notification channel for new peers connecting
+	disconnect chan peer.ID // notification channel for peers disconnecting
+
+	// synchronized by Run loop, only touch inside there
+	peers map[peer.ID]*msgQueue
+	wl    *wantlist.Wantlist
+
+	network bsnet.BitSwapNetwork
+	ctx     context.Context
+}
+
+func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
+	return &WantManager{
+		incoming:   make(chan []*bsmsg.Entry, 10),
+		connect:    make(chan peer.ID, 10),
+		disconnect: make(chan peer.ID, 10),
+		peers:      make(map[peer.ID]*msgQueue),
+		wl:         wantlist.New(),
+		network:    network,
+		ctx:        ctx,
+	}
+}
+
+type msgPair struct {
+	to  peer.ID
+	msg bsmsg.BitSwapMessage
+}
+
+type cancellation struct {
+	who peer.ID
+	blk u.Key
+}
+
+type msgQueue struct {
+	p peer.ID
+
+	outlk   sync.Mutex
+	out     bsmsg.BitSwapMessage
+	network bsnet.BitSwapNetwork
+
+	work chan struct{}
+	done chan struct{}
+}
+
+func (pm *WantManager) WantBlocks(ks []u.Key) {
+	log.Infof("want blocks: %s", ks)
+	pm.addEntries(ks, false)
+}
+
+func (pm *WantManager) CancelWants(ks []u.Key) {
+	pm.addEntries(ks, true)
+}
+
+func (pm *WantManager) addEntries(ks []u.Key, cancel bool) {
+	var entries []*bsmsg.Entry
+	for i, k := range ks {
+		entries = append(entries, &bsmsg.Entry{
+			Cancel: cancel,
+			Entry: wantlist.Entry{
+				Key:      k,
+				Priority: kMaxPriority - i,
+			},
+		})
+	}
+	select {
+	case pm.incoming <- entries:
+	case <-pm.ctx.Done():
+	}
+}
+
+func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
+	// Blocks need to be sent synchronously to maintain proper backpressure
+	// throughout the network stack
+	defer env.Sent()
+
+	msg := bsmsg.New(false)
+	msg.AddBlock(env.Block)
+	log.Infof("Sending block %s to %s", env.Peer, env.Block)
+	err := pm.network.SendMessage(ctx, env.Peer, msg)
+	if err != nil {
+		log.Noticef("sendblock error: %s", err)
+	}
+}
+
+func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
+	_, ok := pm.peers[p]
+	if ok {
+		// TODO: log an error?
+		return nil
+	}
+
+	mq := pm.newMsgQueue(p)
+
+	// new peer, we will want to give them our full wantlist
+	fullwantlist := bsmsg.New(true)
+	for _, e := range pm.wl.Entries() {
+		fullwantlist.AddEntry(e.Key, e.Priority)
+	}
+	mq.out = fullwantlist
+	mq.work <- struct{}{}
+
+	pm.peers[p] = mq
+	go mq.runQueue(pm.ctx)
+	return mq
+}
+
+func (pm *WantManager) stopPeerHandler(p peer.ID) {
+	pq, ok := pm.peers[p]
+	if !ok {
+		// TODO: log error?
+		return
+	}
+
+	close(pq.done)
+	delete(pm.peers, p)
+}
+
+func (mq *msgQueue) runQueue(ctx context.Context) {
+	for {
+		select {
+		case <-mq.work: // there is work to be done
+
+			err := mq.network.ConnectTo(ctx, mq.p)
+			if err != nil {
+				log.Errorf("cant connect to peer %s: %s", mq.p, err)
+				// TODO: cant connect, what now?
+				continue
+			}
+
+			// grab outgoing message
+			mq.outlk.Lock()
+			wlm := mq.out
+			if wlm == nil || wlm.Empty() {
+				mq.outlk.Unlock()
+				continue
+			}
+			mq.out = nil
+			mq.outlk.Unlock()
+
+			// send wantlist updates
+			err = mq.network.SendMessage(ctx, mq.p, wlm)
+			if err != nil {
+				log.Noticef("bitswap send error: %s", err)
+				// TODO: what do we do if this fails?
+			}
+		case <-mq.done:
+			return
+		}
+	}
+}
+
+func (pm *WantManager) Connected(p peer.ID) {
+	pm.connect <- p
+}
+
+func (pm *WantManager) Disconnected(p peer.ID) {
+	pm.disconnect <- p
+}
+
+// TODO: use goprocess here once i trust it
+func (pm *WantManager) Run() {
+	tock := time.NewTicker(rebroadcastDelay.Get())
+	defer tock.Stop()
+	for {
+		select {
+		case entries := <-pm.incoming:
+
+			// add changes to our wantlist
+			for _, e := range entries {
+				if e.Cancel {
+					pm.wl.Remove(e.Key)
+				} else {
+					pm.wl.Add(e.Key, e.Priority)
+				}
+			}
+
+			// broadcast those wantlist changes
+			for _, p := range pm.peers {
+				p.addMessage(entries)
+			}
+
+		case <-tock.C:
+			// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
+			var es []*bsmsg.Entry
+			for _, e := range pm.wl.Entries() {
+				es = append(es, &bsmsg.Entry{Entry: e})
+			}
+			for _, p := range pm.peers {
+				p.outlk.Lock()
+				p.out = bsmsg.New(true)
+				p.outlk.Unlock()
+
+				p.addMessage(es)
+			}
+		case p := <-pm.connect:
+			pm.startPeerHandler(p)
+		case p := <-pm.disconnect:
+			pm.stopPeerHandler(p)
+		case <-pm.ctx.Done():
+			return
+		}
+	}
+}
+
+func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
+	mq := new(msgQueue)
+	mq.done = make(chan struct{})
+	mq.work = make(chan struct{}, 1)
+	mq.network = wm.network
+	mq.p = p
+
+	return mq
+}
+
+func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
+	mq.outlk.Lock()
+	defer func() {
+		mq.outlk.Unlock()
+		select {
+		case mq.work <- struct{}{}:
+		default:
+		}
+	}()
+
+	// if we have no message held, or the one we are given is full
+	// overwrite the one we are holding
+	if mq.out == nil {
+		mq.out = bsmsg.New(false)
+	}
+
+	// TODO: add a msg.Combine(...) method
+	// otherwise, combine the one we are holding with the
+	// one passed in
+	for _, e := range entries {
+		if e.Cancel {
+			mq.out.Cancel(e.Key)
+		} else {
+			mq.out.AddEntry(e.Key, e.Priority)
+		}
+	}
+}
diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go
index dff3d911c2088f8b61adcc53a926471d2da8f170..1083566a1fb10b6fb23b4eaca82e6fec3ee2edad 100644
--- a/exchange/bitswap/workers.go
+++ b/exchange/bitswap/workers.go
@@ -46,6 +46,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
 		bs.rebroadcastWorker(ctx)
 	})
 
+	// Start up a worker to manage sending out provides messages
 	px.Go(func(px process.Process) {
 		bs.provideCollector(ctx)
 	})
@@ -70,9 +71,8 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
 				if !ok {
 					continue
 				}
-				log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
-				bs.send(ctx, envelope.Peer, envelope.Message)
-				envelope.Sent()
+
+				bs.wm.SendBlock(ctx, envelope)
 			case <-ctx.Done():
 				return
 			}
@@ -146,30 +146,19 @@ func (bs *Bitswap) clientWorker(parent context.Context) {
 				log.Warning("Received batch request for zero blocks")
 				continue
 			}
-			for i, k := range keys {
-				bs.wantlist.Add(k, kMaxPriority-i)
-			}
 
-			done := make(chan struct{})
-			go func() {
-				bs.wantNewBlocks(req.ctx, keys)
-				close(done)
-			}()
+			bs.wm.WantBlocks(keys)
 
 			// NB: Optimization. Assumes that providers of key[0] are likely to
 			// be able to provide for all keys. This currently holds true in most
 			// every situation. Later, this assumption may not hold as true.
 			child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout)
 			providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
-			err := bs.sendWantlistToPeers(req.ctx, providers)
-			if err != nil {
-				log.Debugf("error sending wantlist: %s", err)
+			for p := range providers {
+				go bs.network.ConnectTo(req.ctx, p)
 			}
 			cancel()
 
-			// Wait for wantNewBlocks to finish
-			<-done
-
 		case <-parent.Done():
 			return
 		}
@@ -180,22 +169,24 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
 	ctx, cancel := context.WithCancel(parent)
 	defer cancel()
 
-	broadcastSignal := time.After(rebroadcastDelay.Get())
-	tick := time.Tick(10 * time.Second)
+	broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
+	defer broadcastSignal.Stop()
+
+	tick := time.NewTicker(10 * time.Second)
+	defer tick.Stop()
 
 	for {
 		select {
-		case <-tick:
-			n := bs.wantlist.Len()
+		case <-tick.C:
+			n := bs.wm.wl.Len()
 			if n > 0 {
 				log.Debug(n, "keys in bitswap wantlist")
 			}
-		case <-broadcastSignal: // resend unfulfilled wantlist keys
-			entries := bs.wantlist.Entries()
+		case <-broadcastSignal.C: // resend unfulfilled wantlist keys
+			entries := bs.wm.wl.Entries()
 			if len(entries) > 0 {
-				bs.sendWantlistToProviders(ctx, entries)
+				bs.connectToProviders(ctx, entries)
 			}
-			broadcastSignal = time.After(rebroadcastDelay.Get())
 		case <-parent.Done():
 			return
 		}