Commit 55523af9 authored by Jeromy's avatar Jeromy

move blocking calls out of single threaded loops, cancel contexts ASAP

parent 5ff4f16b
......@@ -84,7 +84,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
wantlist: wantlist.NewThreadSafe(),
batchRequests: make(chan []u.Key, sizeBatchRequestChan),
batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
}
network.SetDelegate(bs)
......@@ -94,6 +94,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
px.Go(func(px process.Process) {
bs.taskWorker(ctx)
})
px.Go(func(px process.Process) {
bs.rebroadcastWorker(ctx)
})
return bs
}
......@@ -116,7 +119,7 @@ type bitswap struct {
// Requests for a set of related blocks
// the assumption is made that the same peer is likely to
// have more than a single block in the set
batchRequests chan []u.Key
batchRequests chan *blockRequest
engine *decision.Engine
......@@ -125,6 +128,11 @@ type bitswap struct {
process process.Process
}
type blockRequest struct {
keys []u.Key
ctx context.Context
}
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
......@@ -175,15 +183,19 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
select {
case <-bs.process.Closing():
return nil, errors.New("bitswap is closed")
default:
}
promise := bs.notifications.Subscribe(ctx, keys...)
req := &blockRequest{
keys: keys,
ctx: ctx,
}
select {
case bs.batchRequests <- keys:
case bs.batchRequests <- req:
return promise, nil
case <-ctx.Done():
return nil, ctx.Err()
......@@ -321,8 +333,8 @@ func (bs *bitswap) PeerConnected(p peer.ID) {
}
// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerDisconnected(peer.ID) {
// TODO: release resources.
func (bs *bitswap) PeerDisconnected(p peer.ID) {
bs.engine.PeerDisconnected(p)
}
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
......@@ -342,6 +354,24 @@ func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
}
}
func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
if len(bkeys) < 1 {
return
}
message := bsmsg.New()
message.SetFull(false)
for i, k := range bkeys {
message.AddEntry(k, kMaxPriority-i)
}
for _, p := range bs.engine.Peers() {
err := bs.send(ctx, p, message)
if err != nil {
log.Debugf("Error sending message: %s", err)
}
}
}
func (bs *bitswap) ReceiveError(err error) {
log.Debugf("Bitswap ReceiveError: %s", err)
// TODO log the network error
......@@ -385,13 +415,42 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")
for {
select {
case req := <-bs.batchRequests:
keys := req.keys
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}
bs.wantNewBlocks(req.ctx, keys)
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, _ := context.WithTimeout(req.ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(req.ctx, providers)
if err != nil {
log.Debugf("error sending wantlist: %s", err)
}
case <-parent.Done():
return
}
}
}
func (bs *bitswap) rebroadcastWorker(parent context.Context) {
ctx, cancel := context.WithCancel(parent)
defer cancel()
broadcastSignal := time.After(rebroadcastDelay.Get())
defer cancel()
for {
select {
......@@ -406,23 +465,6 @@ func (bs *bitswap) clientWorker(parent context.Context) {
bs.sendWantlistToProviders(ctx, entries)
}
broadcastSignal = time.After(rebroadcastDelay.Get())
case keys := <-bs.batchRequests:
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(ctx, providers)
if err != nil {
log.Debugf("error sending wantlist: %s", err)
}
case <-parent.Done():
return
}
......
......@@ -228,6 +228,10 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
return nil
}
func (e *Engine) PeerDisconnected(p peer.ID) {
// TODO: release ledger
}
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
// NB not threadsafe
return e.findOrCreate(p).Accounting.BytesSent
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment