Commit b4cd1252 authored by Brian Tiger Chow's avatar Brian Tiger Chow

feat(bitswap): synchronous close

parent 2027fbe0
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect" inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
...@@ -52,28 +53,47 @@ var ( ...@@ -52,28 +53,47 @@ var (
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
bstore blockstore.Blockstore, nice bool) exchange.Interface { bstore blockstore.Blockstore, nice bool) exchange.Interface {
// important to use provided parent context (since it may include important
// loggable data). It's probably not a good idea to allow bitswap to be
// coupled to the concerns of the IPFS daemon in this way.
//
// FIXME(btc) Now that bitswap manages itself using a process, it probably
// shouldn't accept a context anymore. Clients should probably use Close()
// exclusively. We should probably find another way to share logging data
ctx, cancelFunc := context.WithCancel(parent) ctx, cancelFunc := context.WithCancel(parent)
notif := notifications.New() notif := notifications.New()
px := process.WithTeardown(func() error {
notif.Shutdown()
return nil
})
go func() { go func() {
<-ctx.Done() <-px.Closing() // process closes first
cancelFunc() cancelFunc()
notif.Shutdown() }()
go func() {
<-ctx.Done() // parent cancelled first
px.Close()
}() }()
bs := &bitswap{ bs := &bitswap{
self: p, self: p,
blockstore: bstore, blockstore: bstore,
cancelFunc: cancelFunc,
notifications: notif, notifications: notif,
engine: decision.NewEngine(ctx, bstore), engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network, network: network,
wantlist: wantlist.NewThreadSafe(), wantlist: wantlist.NewThreadSafe(),
batchRequests: make(chan []u.Key, sizeBatchRequestChan), batchRequests: make(chan []u.Key, sizeBatchRequestChan),
process: px,
} }
network.SetDelegate(bs) network.SetDelegate(bs)
go bs.clientWorker(ctx) px.Go(func(px process.Process) {
go bs.taskWorker(ctx) bs.clientWorker(ctx)
})
px.Go(func(px process.Process) {
bs.taskWorker(ctx)
})
return bs return bs
} }
...@@ -102,8 +122,7 @@ type bitswap struct { ...@@ -102,8 +122,7 @@ type bitswap struct {
wantlist *wantlist.ThreadSafe wantlist *wantlist.ThreadSafe
// cancelFunc signals cancellation to the bitswap event loop process process.Process
cancelFunc func()
} }
// GetBlock attempts to retrieve a particular block from peers within the // GetBlock attempts to retrieve a particular block from peers within the
...@@ -149,6 +168,11 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ...@@ -149,6 +168,11 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
// that lasts throughout the lifetime of the server) // that lasts throughout the lifetime of the server)
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) { func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
select {
case <-bs.process.Closing():
return nil, errors.New("bitswap is closed")
default:
}
promise := bs.notifications.Subscribe(ctx, keys...) promise := bs.notifications.Subscribe(ctx, keys...)
select { select {
case bs.batchRequests <- keys: case bs.batchRequests <- keys:
...@@ -161,6 +185,11 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. ...@@ -161,6 +185,11 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
// HasBlock announces the existance of a block to this bitswap service. The // HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers. // service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}
if err := bs.blockstore.Put(blk); err != nil { if err := bs.blockstore.Put(blk); err != nil {
return err return err
} }
...@@ -235,6 +264,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli ...@@ -235,6 +264,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
} }
func (bs *bitswap) taskWorker(ctx context.Context) { func (bs *bitswap) taskWorker(ctx context.Context) {
defer log.Info("bitswap task worker shutting down...")
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
...@@ -256,6 +286,8 @@ func (bs *bitswap) taskWorker(ctx context.Context) { ...@@ -256,6 +286,8 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
// TODO ensure only one active request per key // TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) { func (bs *bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")
ctx, cancel := context.WithCancel(parent) ctx, cancel := context.WithCancel(parent)
broadcastSignal := time.After(rebroadcastDelay.Get()) broadcastSignal := time.After(rebroadcastDelay.Get())
...@@ -384,6 +416,5 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) ...@@ -384,6 +416,5 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage)
} }
func (bs *bitswap) Close() error { func (bs *bitswap) Close() error {
bs.cancelFunc() return bs.process.Close()
return nil // to conform to Closer interface
} }
...@@ -22,8 +22,6 @@ import ( ...@@ -22,8 +22,6 @@ import (
const kNetworkDelay = 0 * time.Millisecond const kNetworkDelay = 0 * time.Millisecond
func TestClose(t *testing.T) { func TestClose(t *testing.T) {
// TODO
t.Skip("TODO Bitswap's Close implementation is a WIP")
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sesgen := NewTestSessionGenerator(vnet) sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close() defer sesgen.Close()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment