Commit c68129e9 authored by Brian Tiger Chow's avatar Brian Tiger Chow

fix(blockservice) fully async exchange.HasBlock

parent 4e4f388e
......@@ -22,6 +22,7 @@ func TestBlocks(t *testing.T) {
t.Error("failed to construct block service", err)
return
}
defer bs.Close()
b := blocks.NewBlock([]byte("beep boop"))
h := u.Hash([]byte("beep boop"))
......@@ -61,6 +62,9 @@ func TestBlocks(t *testing.T) {
func TestGetBlocksSequential(t *testing.T) {
var servs = Mocks(t, 4)
for _, s := range servs {
defer s.Close()
}
bg := blocksutil.NewBlockGenerator()
blks := bg.Blocks(50)
......@@ -73,7 +77,7 @@ func TestGetBlocksSequential(t *testing.T) {
t.Log("one instance at a time, get blocks concurrently")
for i := 1; i < len(servs); i++ {
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
ctx, _ := context.WithTimeout(context.TODO(), time.Second*50)
out := servs[i].GetBlocks(ctx, keys)
gotten := make(map[u.Key]*blocks.Block)
for blk := range out {
......
......@@ -8,20 +8,33 @@ import (
"fmt"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procrl "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
blocks "github.com/jbenet/go-ipfs/blocks"
"github.com/jbenet/go-ipfs/blocks/blockstore"
worker "github.com/jbenet/go-ipfs/blockservice/worker"
exchange "github.com/jbenet/go-ipfs/exchange"
u "github.com/jbenet/go-ipfs/util"
)
var wc = worker.Config{
// When running on a single core, NumWorkers has a harsh negative effect on
// throughput. (-80% when < 25)
// Running a lot more workers appears to have very little effect on both
// single and multicore configurations.
NumWorkers: 25,
// These have no effect on when running on multiple cores, but harsh
// negative effect on throughput when running on a single core
// On multicore configurations these buffers have little effect on
// throughput.
// On single core configurations, larger buffers have severe adverse
// effects on throughput.
ClientBufferSize: 0,
WorkerBufferSize: 0,
}
var log = u.Logger("blockservice")
var ErrNotFound = errors.New("blockservice: key not found")
// MaxExchangeAddWorkers rate limits the number of exchange workers
var MaxExchangeAddWorkers = 100
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values.
......@@ -30,8 +43,7 @@ type BlockService struct {
Blockstore blockstore.Blockstore
Exchange exchange.Interface
rateLimiter *procrl.RateLimiter
exchangeAdd chan blocks.Block
worker *worker.Worker
}
// NewBlockService creates a BlockService with given datastore instance.
......@@ -43,15 +55,10 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error
log.Warning("blockservice running in local (offline) mode.")
}
// exchangeAdd is a channel for async workers to add to the exchange.
// 100 blocks buffer. not clear what this number should be
exchangeAdd := make(chan blocks.Block, 100)
return &BlockService{
Blockstore: bs,
Exchange: rem,
exchangeAdd: exchangeAdd,
rateLimiter: procrl.NewRateLimiter(process.Background(), MaxExchangeAddWorkers),
Blockstore: bs,
Exchange: rem,
worker: worker.NewWorker(rem, wc),
}, nil
}
......@@ -63,22 +70,8 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
if err != nil {
return k, err
}
// this operation rate-limits blockservice operations, so it is
// now an async process.
if s.Exchange != nil {
// LimitedGo will spawn a goroutine but provide proper backpressure.
// it will not spawn the goroutine until the ratelimiter's work load
// is under the threshold.
s.rateLimiter.LimitedGo(func(worker process.Process) {
ctx := context.TODO()
if err := s.Exchange.HasBlock(ctx, b); err != nil {
// suppress error, as the client shouldn't care about bitswap.
// the client only cares about the blockstore.Put.
log.Errorf("Exchange.HasBlock error: %s", err)
}
})
if err := s.worker.HasBlock(b); err != nil {
return "", errors.New("blockservice is closed")
}
return k, nil
}
......@@ -148,3 +141,8 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks
func (s *BlockService) DeleteBlock(k u.Key) error {
return s.Blockstore.DeleteBlock(k)
}
func (s *BlockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.worker.Close()
}
package main
import (
"log"
"math"
"testing"
"time"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
worker "github.com/jbenet/go-ipfs/blockservice/worker"
"github.com/jbenet/go-ipfs/exchange/offline"
"github.com/jbenet/go-ipfs/thirdparty/delay"
"github.com/jbenet/go-ipfs/util/datastore2"
)
const kEstRoutingDelay = time.Second
const kBlocksPerOp = 100
func main() {
var bestConfig worker.Config
var quickestNsPerOp int64 = math.MaxInt64
for NumWorkers := 1; NumWorkers < 10; NumWorkers++ {
for ClientBufferSize := 0; ClientBufferSize < 10; ClientBufferSize++ {
for WorkerBufferSize := 0; WorkerBufferSize < 10; WorkerBufferSize++ {
c := worker.Config{
NumWorkers: NumWorkers,
ClientBufferSize: ClientBufferSize,
WorkerBufferSize: WorkerBufferSize,
}
result := testing.Benchmark(BenchmarkWithConfig(c))
if result.NsPerOp() < quickestNsPerOp {
bestConfig = c
quickestNsPerOp = result.NsPerOp()
}
log.Printf("benched %+v \t result: %+v", c, result)
}
}
}
log.Println(bestConfig)
}
func BenchmarkWithConfig(c worker.Config) func(b *testing.B) {
return func(b *testing.B) {
routingDelay := delay.Fixed(0) // during setup
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), routingDelay))
bstore := blockstore.NewBlockstore(dstore)
var testdata []*blocks.Block
var i int64
for i = 0; i < kBlocksPerOp; i++ {
testdata = append(testdata, blocks.NewBlock([]byte(string(i))))
}
b.ResetTimer()
b.SetBytes(kBlocksPerOp)
for i := 0; i < b.N; i++ {
b.StopTimer()
w := worker.NewWorker(offline.Exchange(bstore), c)
b.StartTimer()
prev := routingDelay.Set(kEstRoutingDelay) // during measured section
for _, block := range testdata {
if err := w.HasBlock(block); err != nil {
b.Fatal(err)
}
}
routingDelay.Set(prev) // to hasten the unmeasured close period
b.StopTimer()
w.Close()
b.StartTimer()
}
}
}
package worker
import (
"testing"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
"github.com/jbenet/go-ipfs/exchange/offline"
)
func BenchmarkHandle10KBlocks(b *testing.B) {
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
var testdata []*blocks.Block
for i := 0; i < 10000; i++ {
testdata = append(testdata, blocks.NewBlock([]byte(string(i))))
}
b.ResetTimer()
b.SetBytes(10000)
for i := 0; i < b.N; i++ {
b.StopTimer()
w := NewWorker(offline.Exchange(bstore), Config{
NumWorkers: 1,
ClientBufferSize: 0,
WorkerBufferSize: 0,
})
b.StartTimer()
for _, block := range testdata {
if err := w.HasBlock(block); err != nil {
b.Fatal(err)
}
}
b.StopTimer()
w.Close()
b.StartTimer()
}
}
// TODO FIXME name me
package worker
import (
"container/list"
"errors"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
blocks "github.com/jbenet/go-ipfs/blocks"
exchange "github.com/jbenet/go-ipfs/exchange"
util "github.com/jbenet/go-ipfs/util"
)
var log = util.Logger("blockservice")
var DefaultConfig = Config{
NumWorkers: 1,
ClientBufferSize: 0,
WorkerBufferSize: 0,
}
type Config struct {
// NumWorkers sets the number of background workers that provide blocks to
// the exchange.
NumWorkers int
// ClientBufferSize allows clients of HasBlock to send up to
// |ClientBufferSize| blocks without blocking.
ClientBufferSize int
// WorkerBufferSize can be used in conjunction with NumWorkers to reduce
// communication-coordination within the worker.
WorkerBufferSize int
}
// TODO FIXME name me
type Worker struct {
// added accepts blocks from client
added chan *blocks.Block
exchange exchange.Interface
// workQueue is owned by the client worker
// process manages life-cycle
process process.Process
}
func NewWorker(e exchange.Interface, c Config) *Worker {
if c.NumWorkers < 1 {
c.NumWorkers = 1 // provide a sane default
}
w := &Worker{
exchange: e,
added: make(chan *blocks.Block, c.ClientBufferSize),
process: process.WithParent(process.Background()), // internal management
}
w.start(c)
return w
}
func (w *Worker) HasBlock(b *blocks.Block) error {
select {
case <-w.process.Closed():
return errors.New("blockservice worker is closed")
case w.added <- b:
return nil
}
}
func (w *Worker) Close() error {
log.Debug("blockservice provide worker is shutting down...")
return w.process.Close()
}
func (w *Worker) start(c Config) {
workerChan := make(chan *blocks.Block, c.WorkerBufferSize)
// clientWorker handles incoming blocks from |w.added| and sends to
// |workerChan|. This will never block the client.
w.process.Go(func(proc process.Process) {
defer close(workerChan)
var workQueue BlockList
for {
// take advantage of the fact that sending on nil channel always
// blocks so that a message is only sent if a block exists
sendToWorker := workerChan
nextBlock := workQueue.Pop()
if nextBlock == nil {
sendToWorker = nil
}
select {
// if worker is ready and there's a block to process, send the
// block
case sendToWorker <- nextBlock:
case <-time.Tick(5 * time.Second):
if workQueue.Len() > 0 {
log.Debugf("%d blocks in blockservice provide queue...", workQueue.Len())
}
case block := <-w.added:
if nextBlock != nil {
workQueue.Push(nextBlock) // missed the chance to send it
}
// if the client sends another block, add it to the queue.
workQueue.Push(block)
case <-proc.Closing():
return
}
}
})
for i := 0; i < c.NumWorkers; i++ {
// reads from |workerChan| until process closes
w.process.Go(func(proc process.Process) {
ctx, cancel := context.WithCancel(context.Background())
// shuts down an in-progress HasBlock operation
proc.Go(func(proc process.Process) {
<-proc.Closing()
cancel()
})
for {
select {
case <-proc.Closing():
return
case block, ok := <-workerChan:
if !ok {
return
}
if err := w.exchange.HasBlock(ctx, block); err != nil {
// TODO log event?
}
}
}
})
}
}
type BlockList struct {
list list.List
}
func (s *BlockList) PushFront(b *blocks.Block) {
// FIXME find figures
s.list.PushFront(b)
}
func (s *BlockList) Push(b *blocks.Block) {
s.list.PushBack(b)
}
func (s *BlockList) Pop() *blocks.Block {
if s.list.Len() == 0 {
return nil
}
e := s.list.Front()
s.list.Remove(e)
return e.Value.(*blocks.Block)
}
func (s *BlockList) Len() int {
return s.list.Len()
}
package worker
import "testing"
func TestStartClose(t *testing.T) {
numRuns := 50
if testing.Short() {
numRuns = 5
}
for i := 0; i < numRuns; i++ {
w := NewWorker(nil, DefaultConfig)
w.Close()
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment