Commit 63a223c1 authored by Jeromy's avatar Jeromy

remove context from HasBlock, use bitswap process instead

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 418787d0
......@@ -47,7 +47,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) {
if err != nil {
return k, err
}
if err := s.Exchange.HasBlock(context.TODO(), b); err != nil {
if err := s.Exchange.HasBlock(b); err != nil {
return "", errors.New("blockservice is closed")
}
return k, nil
......@@ -61,7 +61,7 @@ func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) {
var ks []key.Key
for _, b := range bs {
if err := s.Exchange.HasBlock(context.TODO(), b); err != nil {
if err := s.Exchange.HasBlock(b); err != nil {
return nil, errors.New("blockservice is closed")
}
ks = append(ks, b.Key())
......
/*
Benchmark github.com/ipfs/go-ipfs/blockservice/worker.
Loop over a range of workers and buffer sizes and measure the time it
per block-transfer operation for each value. Run with:
$ go run "${GOPATH}/src/github.com/ipfs/go-ipfs/blockservice/worker/bench/main.go"
*/
package main
import (
"log"
"math"
"testing"
"time"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds_sync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
worker "github.com/ipfs/go-ipfs/blockservice/worker"
"github.com/ipfs/go-ipfs/exchange/offline"
"github.com/ipfs/go-ipfs/thirdparty/delay"
"github.com/ipfs/go-ipfs/util/datastore2"
)
const kEstRoutingDelay = time.Second
const kBlocksPerOp = 100
func main() {
var bestConfig worker.Config
var quickestNsPerOp int64 = math.MaxInt64
for NumWorkers := 1; NumWorkers < 10; NumWorkers++ {
for ClientBufferSize := 0; ClientBufferSize < 10; ClientBufferSize++ {
for WorkerBufferSize := 0; WorkerBufferSize < 10; WorkerBufferSize++ {
c := worker.Config{
NumWorkers: NumWorkers,
ClientBufferSize: ClientBufferSize,
WorkerBufferSize: WorkerBufferSize,
}
result := testing.Benchmark(BenchmarkWithConfig(c))
if result.NsPerOp() < quickestNsPerOp {
bestConfig = c
quickestNsPerOp = result.NsPerOp()
}
log.Printf("benched %+v \t result: %+v", c, result)
}
}
}
log.Println(bestConfig)
}
func BenchmarkWithConfig(c worker.Config) func(b *testing.B) {
return func(b *testing.B) {
routingDelay := delay.Fixed(0) // during setup
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), routingDelay))
bstore := blockstore.NewBlockstore(dstore)
var testdata []*blocks.Block
var i int64
for i = 0; i < kBlocksPerOp; i++ {
testdata = append(testdata, blocks.NewBlock([]byte(string(i))))
}
b.ResetTimer()
b.SetBytes(kBlocksPerOp)
for i := 0; i < b.N; i++ {
b.StopTimer()
w := worker.NewWorker(offline.Exchange(bstore), c)
b.StartTimer()
prev := routingDelay.Set(kEstRoutingDelay) // during measured section
for _, block := range testdata {
if err := w.HasBlock(block); err != nil {
b.Fatal(err)
}
}
routingDelay.Set(prev) // to hasten the unmeasured close period
b.StopTimer()
w.Close()
b.StartTimer()
}
}
}
package worker
import (
"testing"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/exchange/offline"
)
func BenchmarkHandle10KBlocks(b *testing.B) {
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
var testdata []*blocks.Block
for i := 0; i < 10000; i++ {
testdata = append(testdata, blocks.NewBlock([]byte(string(i))))
}
b.ResetTimer()
b.SetBytes(10000)
for i := 0; i < b.N; i++ {
b.StopTimer()
w := NewWorker(offline.Exchange(bstore), Config{
NumWorkers: 1,
ClientBufferSize: 0,
WorkerBufferSize: 0,
})
b.StartTimer()
for _, block := range testdata {
if err := w.HasBlock(block); err != nil {
b.Fatal(err)
}
}
b.StopTimer()
w.Close()
b.StartTimer()
}
}
// TODO FIXME name me
package worker
import (
"container/list"
"errors"
"time"
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
blocks "github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
exchange "github.com/ipfs/go-ipfs/exchange"
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
)
var log = logging.Logger("blockservice")
var DefaultConfig = Config{
NumWorkers: 1,
ClientBufferSize: 0,
WorkerBufferSize: 0,
}
type Config struct {
// NumWorkers sets the number of background workers that provide blocks to
// the exchange.
NumWorkers int
// ClientBufferSize allows clients of HasBlock to send up to
// |ClientBufferSize| blocks without blocking.
ClientBufferSize int
// WorkerBufferSize can be used in conjunction with NumWorkers to reduce
// communication-coordination within the worker.
WorkerBufferSize int
}
// TODO FIXME name me
type Worker struct {
// added accepts blocks from client
added chan *blocks.Block
exchange exchange.Interface
// workQueue is owned by the client worker
// process manages life-cycle
process process.Process
}
func NewWorker(e exchange.Interface, c Config) *Worker {
if c.NumWorkers < 1 {
c.NumWorkers = 1 // provide a sane default
}
w := &Worker{
exchange: e,
added: make(chan *blocks.Block, c.ClientBufferSize),
process: process.WithParent(process.Background()), // internal management
}
w.start(c)
return w
}
func (w *Worker) HasBlock(b *blocks.Block) error {
select {
case <-w.process.Closed():
return errors.New("blockservice worker is closed")
case w.added <- b:
return nil
}
}
func (w *Worker) Close() error {
log.Debug("blockservice provide worker is shutting down...")
return w.process.Close()
}
func (w *Worker) start(c Config) {
workerChan := make(chan *blocks.Block, c.WorkerBufferSize)
// clientWorker handles incoming blocks from |w.added| and sends to
// |workerChan|. This will never block the client.
w.process.Go(func(proc process.Process) {
defer close(workerChan)
var workQueue BlockList
debugInfo := time.NewTicker(5 * time.Second)
defer debugInfo.Stop()
for {
// take advantage of the fact that sending on nil channel always
// blocks so that a message is only sent if a block exists
sendToWorker := workerChan
nextBlock := workQueue.Pop()
if nextBlock == nil {
sendToWorker = nil
}
select {
// if worker is ready and there's a block to process, send the
// block
case sendToWorker <- nextBlock:
case <-debugInfo.C:
if workQueue.Len() > 0 {
log.Debugf("%d blocks in blockservice provide queue...", workQueue.Len())
}
case block := <-w.added:
if nextBlock != nil {
workQueue.Push(nextBlock) // missed the chance to send it
}
// if the client sends another block, add it to the queue.
workQueue.Push(block)
case <-proc.Closing():
return
}
}
})
// reads from |workerChan| until w.process closes
limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers)
limiter.Go(func(proc process.Process) {
ctx := procctx.OnClosingContext(proc) // shut down in-progress HasBlock when time to die
for {
select {
case <-proc.Closing():
return
case block, ok := <-workerChan:
if !ok {
return
}
limiter.LimitedGo(func(proc process.Process) {
if err := w.exchange.HasBlock(ctx, block); err != nil {
log.Infof("blockservice worker error: %s", err)
}
})
}
}
})
}
type BlockList struct {
list list.List
uniques map[key.Key]*list.Element
}
func (s *BlockList) PushFront(b *blocks.Block) {
if s.uniques == nil {
s.uniques = make(map[key.Key]*list.Element)
}
_, ok := s.uniques[b.Key()]
if !ok {
e := s.list.PushFront(b)
s.uniques[b.Key()] = e
}
}
func (s *BlockList) Push(b *blocks.Block) {
if s.uniques == nil {
s.uniques = make(map[key.Key]*list.Element)
}
_, ok := s.uniques[b.Key()]
if !ok {
e := s.list.PushBack(b)
s.uniques[b.Key()] = e
}
}
func (s *BlockList) Pop() *blocks.Block {
if s.list.Len() == 0 {
return nil
}
e := s.list.Front()
s.list.Remove(e)
b := e.Value.(*blocks.Block)
delete(s.uniques, b.Key())
return b
}
func (s *BlockList) Len() int {
return s.list.Len()
}
package worker
import (
blocks "github.com/ipfs/go-ipfs/blocks"
"testing"
)
func TestStartClose(t *testing.T) {
numRuns := 50
if testing.Short() {
numRuns = 5
}
for i := 0; i < numRuns; i++ {
w := NewWorker(nil, DefaultConfig)
w.Close()
}
}
func TestQueueDeduplication(t *testing.T) {
numUniqBlocks := 5 // arbitrary
var firstBatch []*blocks.Block
for i := 0; i < numUniqBlocks; i++ {
firstBatch = append(firstBatch, blockFromInt(i))
}
// to get different pointer values and prevent the implementation from
// cheating. The impl must check equality using Key.
var secondBatch []*blocks.Block
for i := 0; i < numUniqBlocks; i++ {
secondBatch = append(secondBatch, blockFromInt(i))
}
var workQueue BlockList
for _, b := range append(firstBatch, secondBatch...) {
workQueue.Push(b)
}
for i := 0; i < numUniqBlocks; i++ {
b := workQueue.Pop()
if b.Key() != firstBatch[i].Key() {
t.Fatal("list is not FIFO")
}
}
if b := workQueue.Pop(); b != nil {
t.Fatal("the workQueue did not de-duplicate the blocks")
}
}
func TestPushPopPushPop(t *testing.T) {
var workQueue BlockList
orig := blockFromInt(1)
dup := blockFromInt(1)
workQueue.PushFront(orig)
workQueue.Pop()
workQueue.Push(dup)
if workQueue.Len() != 1 {
t.Fatal("the block list's internal state is corrupt")
}
}
func blockFromInt(i int) *blocks.Block {
return blocks.NewBlock([]byte(string(i)))
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment