Commit face34bd authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

blockservice: async HasBlock with ratelimit

parent c356c710
......@@ -8,6 +8,8 @@ import (
"fmt"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procrl "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
blocks "github.com/jbenet/go-ipfs/blocks"
"github.com/jbenet/go-ipfs/blocks/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange"
......@@ -17,6 +19,9 @@ import (
var log = u.Logger("blockservice")
var ErrNotFound = errors.New("blockservice: key not found")
// MaxExchangeAddWorkers rate limits the number of exchange workers
var MaxExchangeAddWorkers = 100
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values.
......@@ -24,6 +29,9 @@ type BlockService struct {
// TODO don't expose underlying impl details
Blockstore blockstore.Blockstore
Exchange exchange.Interface
rateLimiter *procrl.RateLimiter
exchangeAdd chan blocks.Block
}
// NewBlockService creates a BlockService with given datastore instance.
......@@ -34,7 +42,17 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error
if rem == nil {
log.Warning("blockservice running in local (offline) mode.")
}
return &BlockService{Blockstore: bs, Exchange: rem}, nil
// exchangeAdd is a channel for async workers to add to the exchange.
// 100 blocks buffer. not clear what this number should be
exchangeAdd := make(chan blocks.Block, 100)
return &BlockService{
Blockstore: bs,
Exchange: rem,
exchangeAdd: exchangeAdd,
rateLimiter: procrl.NewRateLimiter(process.Background(), MaxExchangeAddWorkers),
}, nil
}
// AddBlock adds a particular block to the service, Putting it into the datastore.
......@@ -46,15 +64,21 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
return k, err
}
// TODO this operation rate-limits blockservice operations, we should
// consider moving this to an sync process.
// this operation rate-limits blockservice operations, so it is
// now an async process.
if s.Exchange != nil {
ctx := context.TODO()
if err := s.Exchange.HasBlock(ctx, b); err != nil {
// suppress error, as the client shouldn't care about bitswap.
// the client only cares about the blockstore.Put.
log.Errorf("Exchange.HasBlock error: %s", err)
}
// LimitedGo will spawn a goroutine but provide proper backpressure.
// it will not spawn the goroutine until the ratelimiter's work load
// is under the threshold.
s.rateLimiter.LimitedGo(func(worker process.Process) {
ctx := context.TODO()
if err := s.Exchange.HasBlock(ctx, b); err != nil {
// suppress error, as the client shouldn't care about bitswap.
// the client only cares about the blockstore.Put.
log.Errorf("Exchange.HasBlock error: %s", err)
}
})
}
return k, nil
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment