diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index af84caa057d5f52749c40a60a71c7e1f7d5fc5b1..843bed4a963f0d476f81eeb25d13a7183593e203 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -21,15 +21,28 @@ import ( var log = u.Logger("bitswap") // NetMessageSession initializes a BitSwap session that communicates over the -// provided NetMessage service -func NetMessageSession(parent context.Context, p peer.Peer, +// provided NetMessage service. +// Runs until context is cancelled +func NetMessageSession(ctx context.Context, p peer.Peer, net inet.Network, srv inet.Service, directory bsnet.Routing, d ds.ThreadSafeDatastore, nice bool) exchange.Interface { networkAdapter := bsnet.NetMessageAdapter(srv, net, nil) + + notif := notifications.New() + + go func() { + for { + select { + case <-ctx.Done(): + notif.Shutdown() + } + } + }() + bs := &bitswap{ blockstore: blockstore.NewBlockstore(d), - notifications: notifications.New(), // TODO Shutdown() + notifications: notif, strategy: strategy.New(nice), routing: directory, sender: networkAdapter, @@ -119,15 +132,14 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) case block := <-promise: cancelFunc() bs.wantlist.Remove(k) - // TODO remove from wantlist return &block, nil case <-parent.Done(): return nil, parent.Err() } } -// HasBlock announces the existance of a block to bitswap, potentially sending -// it to peers (Partners) whose WantLists include it. +// HasBlock announces the existance of a block to this bitswap service. The +// service will potentially notify its peers. func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { log.Debugf("Has Block %v", blk.Key()) bs.wantlist.Remove(blk.Key()) @@ -162,13 +174,11 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm if err := bs.blockstore.Put(&block); err != nil { continue // FIXME(brian): err ignored } - go bs.notifications.Publish(block) - go func(block blocks.Block) { - err := bs.HasBlock(ctx, block) // FIXME err ignored - if err != nil { - log.Warningf("HasBlock errored: %s", err) - } - }(block) + bs.notifications.Publish(block) + err := bs.HasBlock(ctx, block) + if err != nil { + log.Warningf("HasBlock errored: %s", err) + } } message := bsmsg.New() @@ -202,11 +212,12 @@ func (bs *bitswap) ReceiveError(err error) { // sent func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) { bs.sender.SendMessage(ctx, p, m) - go bs.strategy.MessageSent(p, m) + bs.strategy.MessageSent(p, m) } func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { log.Debugf("Sending %v to peers that want it", block.Key()) + for _, p := range bs.strategy.Peers() { if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { log.Debugf("%v wants %v", p, block.Key()) @@ -216,7 +227,7 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) for _, wanted := range bs.wantlist.Keys() { message.AddWanted(wanted) } - go bs.send(ctx, p, message) + bs.send(ctx, p, message) } } }