diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 27be53967dc8153be8ee8926ed6d69358f25952f..020c8d16a5f22fcdb507eeaf452fa1efc28dc7ea 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -82,7 +82,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, notifications: notif, engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method network: network, - batchRequests: make(chan *blockRequest, sizeBatchRequestChan), + findKeys: make(chan *blockRequest, sizeBatchRequestChan), process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan u.Key), @@ -115,10 +115,8 @@ type Bitswap struct { notifications notifications.PubSub - // Requests for a set of related blocks - // the assumption is made that the same peer is likely to - // have more than a single block in the set - batchRequests chan *blockRequest + // send keys to a worker to find and connect to providers for them + findKeys chan *blockRequest engine *decision.Engine @@ -202,12 +200,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. } promise := bs.notifications.Subscribe(ctx, keys...) + bs.wm.WantBlocks(keys) + req := &blockRequest{ keys: keys, ctx: ctx, } select { - case bs.batchRequests <- req: + case bs.findKeys <- req: return promise, nil case <-ctx.Done(): return nil, ctx.Err() @@ -270,39 +270,59 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger - if len(incoming.Blocks()) == 0 { + iblocks := incoming.Blocks() + + if len(iblocks) == 0 { return } // quickly send out cancels, reduces chances of duplicate block receives var keys []u.Key - for _, block := range incoming.Blocks() { + for _, block := range iblocks { + if _, found := bs.wm.wl.Contains(block.Key()); !found { + log.Notice("received un-asked-for block: %s", block) + continue + } keys = append(keys, block.Key()) } bs.wm.CancelWants(keys) - for _, block := range incoming.Blocks() { - bs.counterLk.Lock() - bs.blocksRecvd++ - if has, err := bs.blockstore.Has(block.Key()); err == nil && has { - bs.dupBlocksRecvd++ - } - brecvd := bs.blocksRecvd - bdup := bs.dupBlocksRecvd - bs.counterLk.Unlock() - log.Infof("got block %s from %s (%d,%d)", block, p, brecvd, bdup) - - hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) - if err := bs.HasBlock(hasBlockCtx, block); err != nil { - log.Warningf("ReceiveMessage HasBlock error: %s", err) - } - cancel() + wg := sync.WaitGroup{} + for _, block := range iblocks { + wg.Add(1) + go func(b *blocks.Block) { + defer wg.Done() + bs.counterLk.Lock() + bs.blocksRecvd++ + has, err := bs.blockstore.Has(b.Key()) + if err != nil { + bs.counterLk.Unlock() + log.Noticef("blockstore.Has error: %s", err) + return + } + if err == nil && has { + bs.dupBlocksRecvd++ + } + brecvd := bs.blocksRecvd + bdup := bs.dupBlocksRecvd + bs.counterLk.Unlock() + if has { + return + } + + log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup) + hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) + if err := bs.HasBlock(hasBlockCtx, b); err != nil { + log.Warningf("ReceiveMessage HasBlock error: %s", err) + } + cancel() + }(block) } + wg.Wait() } // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerConnected(p peer.ID) { - // TODO: add to clientWorker?? bs.wm.Connected(p) } @@ -313,7 +333,7 @@ func (bs *Bitswap) PeerDisconnected(p peer.ID) { } func (bs *Bitswap) ReceiveError(err error) { - log.Debugf("Bitswap ReceiveError: %s", err) + log.Infof("Bitswap ReceiveError: %s", err) // TODO log the network error // TODO bubble the network error up to the parent context/error logger } diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 5405f5074fedf3b6ad421fecce5dd125b9465933..e8745392055ad0c111cd8984c8291d937ce0aad2 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -21,7 +21,7 @@ type WantManager struct { // synchronized by Run loop, only touch inside there peers map[peer.ID]*msgQueue - wl *wantlist.Wantlist + wl *wantlist.ThreadSafe network bsnet.BitSwapNetwork ctx context.Context @@ -33,7 +33,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana connect: make(chan peer.ID, 10), disconnect: make(chan peer.ID, 10), peers: make(map[peer.ID]*msgQueue), - wl: wantlist.New(), + wl: wantlist.NewThreadSafe(), network: network, ctx: ctx, } diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 1083566a1fb10b6fb23b4eaca82e6fec3ee2edad..7852cf93ec0ec803ff1af50dcc3b8636b4ee7c09 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -31,7 +31,7 @@ func init() { func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // Start up a worker to handle block requests this node is making px.Go(func(px process.Process) { - bs.clientWorker(ctx) + bs.providerConnector(ctx) }) // Start up workers to handle requests from other nodes for the data on this node @@ -134,21 +134,19 @@ func (bs *Bitswap) provideCollector(ctx context.Context) { } } -// TODO ensure only one active request per key -func (bs *Bitswap) clientWorker(parent context.Context) { +// connects to providers for the given keys +func (bs *Bitswap) providerConnector(parent context.Context) { defer log.Info("bitswap client worker shutting down...") for { select { - case req := <-bs.batchRequests: + case req := <-bs.findKeys: keys := req.keys if len(keys) == 0 { log.Warning("Received batch request for zero blocks") continue } - bs.wm.WantBlocks(keys) - // NB: Optimization. Assumes that providers of key[0] are likely to // be able to provide for all keys. This currently holds true in most // every situation. Later, this assumption may not hold as true. diff --git a/test/integration/bitswap_wo_routing_test.go b/test/integration/bitswap_wo_routing_test.go index f0f5d5d31e6c9d7ba9aaffc2b5c24f73451e4016..560e20ec3267bf04af8cbe6bbe1e0f4de69f613c 100644 --- a/test/integration/bitswap_wo_routing_test.go +++ b/test/integration/bitswap_wo_routing_test.go @@ -75,7 +75,7 @@ func TestBitswapWithoutRouting(t *testing.T) { } log.Debugf("%d %s get block.", i, n.Identity) - b, err := n.Exchange.GetBlock(ctx, block0.Key()) + b, err := n.Blocks.GetBlock(ctx, block0.Key()) if err != nil { t.Error(err) } else if !bytes.Equal(b.Data, block0.Data) { @@ -92,7 +92,7 @@ func TestBitswapWithoutRouting(t *testing.T) { // get it out. for _, n := range nodes { - b, err := n.Exchange.GetBlock(ctx, block1.Key()) + b, err := n.Blocks.GetBlock(ctx, block1.Key()) if err != nil { t.Error(err) } else if !bytes.Equal(b.Data, block1.Data) {