Commit d18a24cf authored by Brian Tiger Chow's avatar Brian Tiger Chow

fix(bitswap) don't 'go' local function calls

parent fc8168f6
......@@ -21,15 +21,28 @@ import (
var log = u.Logger("bitswap")
// NetMessageSession initializes a BitSwap session that communicates over the
// provided NetMessage service
func NetMessageSession(parent context.Context, p peer.Peer,
// provided NetMessage service.
// Runs until context is cancelled
func NetMessageSession(ctx context.Context, p peer.Peer,
net inet.Network, srv inet.Service, directory bsnet.Routing,
d ds.ThreadSafeDatastore, nice bool) exchange.Interface {
networkAdapter := bsnet.NetMessageAdapter(srv, net, nil)
notif := notifications.New()
go func() {
for {
select {
case <-ctx.Done():
notif.Shutdown()
}
}
}()
bs := &bitswap{
blockstore: blockstore.NewBlockstore(d),
notifications: notifications.New(), // TODO Shutdown()
notifications: notif,
strategy: strategy.New(nice),
routing: directory,
sender: networkAdapter,
......@@ -119,15 +132,14 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
case block := <-promise:
cancelFunc()
bs.wantlist.Remove(k)
// TODO remove from wantlist
return &block, nil
case <-parent.Done():
return nil, parent.Err()
}
}
// HasBlock announces the existance of a block to bitswap, potentially sending
// it to peers (Partners) whose WantLists include it.
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
log.Debugf("Has Block %v", blk.Key())
bs.wantlist.Remove(blk.Key())
......@@ -162,13 +174,11 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
if err := bs.blockstore.Put(&block); err != nil {
continue // FIXME(brian): err ignored
}
go bs.notifications.Publish(block)
go func(block blocks.Block) {
err := bs.HasBlock(ctx, block) // FIXME err ignored
if err != nil {
log.Warningf("HasBlock errored: %s", err)
}
}(block)
bs.notifications.Publish(block)
err := bs.HasBlock(ctx, block)
if err != nil {
log.Warningf("HasBlock errored: %s", err)
}
}
message := bsmsg.New()
......@@ -202,11 +212,12 @@ func (bs *bitswap) ReceiveError(err error) {
// sent
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) {
bs.sender.SendMessage(ctx, p, m)
go bs.strategy.MessageSent(p, m)
bs.strategy.MessageSent(p, m)
}
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
log.Debugf("Sending %v to peers that want it", block.Key())
for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
log.Debugf("%v wants %v", p, block.Key())
......@@ -216,7 +227,7 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block)
for _, wanted := range bs.wantlist.Keys() {
message.AddWanted(wanted)
}
go bs.send(ctx, p, message)
bs.send(ctx, p, message)
}
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment