Commit 63f72a51 authored by Jeromy's avatar Jeromy

remove context from HasBlock, use bitswap process instead

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 47b85c7b
...@@ -47,7 +47,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) { ...@@ -47,7 +47,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) {
if err != nil { if err != nil {
return k, err return k, err
} }
if err := s.Exchange.HasBlock(context.TODO(), b); err != nil { if err := s.Exchange.HasBlock(b); err != nil {
return "", errors.New("blockservice is closed") return "", errors.New("blockservice is closed")
} }
return k, nil return k, nil
...@@ -61,7 +61,7 @@ func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) { ...@@ -61,7 +61,7 @@ func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) {
var ks []key.Key var ks []key.Key
for _, b := range bs { for _, b := range bs {
if err := s.Exchange.HasBlock(context.TODO(), b); err != nil { if err := s.Exchange.HasBlock(b); err != nil {
return nil, errors.New("blockservice is closed") return nil, errors.New("blockservice is closed")
} }
ks = append(ks, b.Key()) ks = append(ks, b.Key())
......
/*
Benchmark github.com/ipfs/go-ipfs/blockservice/worker.
Loop over a range of workers and buffer sizes and measure the time it
per block-transfer operation for each value. Run with:
$ go run "${GOPATH}/src/github.com/ipfs/go-ipfs/blockservice/worker/bench/main.go"
*/
package main
import (
"log"
"math"
"testing"
"time"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds_sync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
worker "github.com/ipfs/go-ipfs/blockservice/worker"
"github.com/ipfs/go-ipfs/exchange/offline"
"github.com/ipfs/go-ipfs/thirdparty/delay"
"github.com/ipfs/go-ipfs/util/datastore2"
)
const kEstRoutingDelay = time.Second
const kBlocksPerOp = 100
func main() {
var bestConfig worker.Config
var quickestNsPerOp int64 = math.MaxInt64
for NumWorkers := 1; NumWorkers < 10; NumWorkers++ {
for ClientBufferSize := 0; ClientBufferSize < 10; ClientBufferSize++ {
for WorkerBufferSize := 0; WorkerBufferSize < 10; WorkerBufferSize++ {
c := worker.Config{
NumWorkers: NumWorkers,
ClientBufferSize: ClientBufferSize,
WorkerBufferSize: WorkerBufferSize,
}
result := testing.Benchmark(BenchmarkWithConfig(c))
if result.NsPerOp() < quickestNsPerOp {
bestConfig = c
quickestNsPerOp = result.NsPerOp()
}
log.Printf("benched %+v \t result: %+v", c, result)
}
}
}
log.Println(bestConfig)
}
func BenchmarkWithConfig(c worker.Config) func(b *testing.B) {
return func(b *testing.B) {
routingDelay := delay.Fixed(0) // during setup
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), routingDelay))
bstore := blockstore.NewBlockstore(dstore)
var testdata []*blocks.Block
var i int64
for i = 0; i < kBlocksPerOp; i++ {
testdata = append(testdata, blocks.NewBlock([]byte(string(i))))
}
b.ResetTimer()
b.SetBytes(kBlocksPerOp)
for i := 0; i < b.N; i++ {
b.StopTimer()
w := worker.NewWorker(offline.Exchange(bstore), c)
b.StartTimer()
prev := routingDelay.Set(kEstRoutingDelay) // during measured section
for _, block := range testdata {
if err := w.HasBlock(block); err != nil {
b.Fatal(err)
}
}
routingDelay.Set(prev) // to hasten the unmeasured close period
b.StopTimer()
w.Close()
b.StartTimer()
}
}
}
package worker
import (
"testing"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/exchange/offline"
)
func BenchmarkHandle10KBlocks(b *testing.B) {
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
var testdata []*blocks.Block
for i := 0; i < 10000; i++ {
testdata = append(testdata, blocks.NewBlock([]byte(string(i))))
}
b.ResetTimer()
b.SetBytes(10000)
for i := 0; i < b.N; i++ {
b.StopTimer()
w := NewWorker(offline.Exchange(bstore), Config{
NumWorkers: 1,
ClientBufferSize: 0,
WorkerBufferSize: 0,
})
b.StartTimer()
for _, block := range testdata {
if err := w.HasBlock(block); err != nil {
b.Fatal(err)
}
}
b.StopTimer()
w.Close()
b.StartTimer()
}
}
// TODO FIXME name me
package worker
import (
"container/list"
"errors"
"time"
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
blocks "github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
exchange "github.com/ipfs/go-ipfs/exchange"
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
)
var log = logging.Logger("blockservice")
var DefaultConfig = Config{
NumWorkers: 1,
ClientBufferSize: 0,
WorkerBufferSize: 0,
}
type Config struct {
// NumWorkers sets the number of background workers that provide blocks to
// the exchange.
NumWorkers int
// ClientBufferSize allows clients of HasBlock to send up to
// |ClientBufferSize| blocks without blocking.
ClientBufferSize int
// WorkerBufferSize can be used in conjunction with NumWorkers to reduce
// communication-coordination within the worker.
WorkerBufferSize int
}
// TODO FIXME name me
type Worker struct {
// added accepts blocks from client
added chan *blocks.Block
exchange exchange.Interface
// workQueue is owned by the client worker
// process manages life-cycle
process process.Process
}
func NewWorker(e exchange.Interface, c Config) *Worker {
if c.NumWorkers < 1 {
c.NumWorkers = 1 // provide a sane default
}
w := &Worker{
exchange: e,
added: make(chan *blocks.Block, c.ClientBufferSize),
process: process.WithParent(process.Background()), // internal management
}
w.start(c)
return w
}
func (w *Worker) HasBlock(b *blocks.Block) error {
select {
case <-w.process.Closed():
return errors.New("blockservice worker is closed")
case w.added <- b:
return nil
}
}
func (w *Worker) Close() error {
log.Debug("blockservice provide worker is shutting down...")
return w.process.Close()
}
func (w *Worker) start(c Config) {
workerChan := make(chan *blocks.Block, c.WorkerBufferSize)
// clientWorker handles incoming blocks from |w.added| and sends to
// |workerChan|. This will never block the client.
w.process.Go(func(proc process.Process) {
defer close(workerChan)
var workQueue BlockList
debugInfo := time.NewTicker(5 * time.Second)
defer debugInfo.Stop()
for {
// take advantage of the fact that sending on nil channel always
// blocks so that a message is only sent if a block exists
sendToWorker := workerChan
nextBlock := workQueue.Pop()
if nextBlock == nil {
sendToWorker = nil
}
select {
// if worker is ready and there's a block to process, send the
// block
case sendToWorker <- nextBlock:
case <-debugInfo.C:
if workQueue.Len() > 0 {
log.Debugf("%d blocks in blockservice provide queue...", workQueue.Len())
}
case block := <-w.added:
if nextBlock != nil {
workQueue.Push(nextBlock) // missed the chance to send it
}
// if the client sends another block, add it to the queue.
workQueue.Push(block)
case <-proc.Closing():
return
}
}
})
// reads from |workerChan| until w.process closes
limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers)
limiter.Go(func(proc process.Process) {
ctx := procctx.OnClosingContext(proc) // shut down in-progress HasBlock when time to die
for {
select {
case <-proc.Closing():
return
case block, ok := <-workerChan:
if !ok {
return
}
limiter.LimitedGo(func(proc process.Process) {
if err := w.exchange.HasBlock(ctx, block); err != nil {
log.Infof("blockservice worker error: %s", err)
}
})
}
}
})
}
type BlockList struct {
list list.List
uniques map[key.Key]*list.Element
}
func (s *BlockList) PushFront(b *blocks.Block) {
if s.uniques == nil {
s.uniques = make(map[key.Key]*list.Element)
}
_, ok := s.uniques[b.Key()]
if !ok {
e := s.list.PushFront(b)
s.uniques[b.Key()] = e
}
}
func (s *BlockList) Push(b *blocks.Block) {
if s.uniques == nil {
s.uniques = make(map[key.Key]*list.Element)
}
_, ok := s.uniques[b.Key()]
if !ok {
e := s.list.PushBack(b)
s.uniques[b.Key()] = e
}
}
func (s *BlockList) Pop() *blocks.Block {
if s.list.Len() == 0 {
return nil
}
e := s.list.Front()
s.list.Remove(e)
b := e.Value.(*blocks.Block)
delete(s.uniques, b.Key())
return b
}
func (s *BlockList) Len() int {
return s.list.Len()
}
package worker
import (
blocks "github.com/ipfs/go-ipfs/blocks"
"testing"
)
func TestStartClose(t *testing.T) {
numRuns := 50
if testing.Short() {
numRuns = 5
}
for i := 0; i < numRuns; i++ {
w := NewWorker(nil, DefaultConfig)
w.Close()
}
}
func TestQueueDeduplication(t *testing.T) {
numUniqBlocks := 5 // arbitrary
var firstBatch []*blocks.Block
for i := 0; i < numUniqBlocks; i++ {
firstBatch = append(firstBatch, blockFromInt(i))
}
// to get different pointer values and prevent the implementation from
// cheating. The impl must check equality using Key.
var secondBatch []*blocks.Block
for i := 0; i < numUniqBlocks; i++ {
secondBatch = append(secondBatch, blockFromInt(i))
}
var workQueue BlockList
for _, b := range append(firstBatch, secondBatch...) {
workQueue.Push(b)
}
for i := 0; i < numUniqBlocks; i++ {
b := workQueue.Pop()
if b.Key() != firstBatch[i].Key() {
t.Fatal("list is not FIFO")
}
}
if b := workQueue.Pop(); b != nil {
t.Fatal("the workQueue did not de-duplicate the blocks")
}
}
func TestPushPopPushPop(t *testing.T) {
var workQueue BlockList
orig := blockFromInt(1)
dup := blockFromInt(1)
workQueue.PushFront(orig)
workQueue.Pop()
workQueue.Push(dup)
if workQueue.Len() != 1 {
t.Fatal("the block list's internal state is corrupt")
}
}
func blockFromInt(i int) *blocks.Block {
return blocks.NewBlock([]byte(string(i)))
}
...@@ -228,7 +228,7 @@ func (bs *Bitswap) CancelWants(ks []key.Key) { ...@@ -228,7 +228,7 @@ func (bs *Bitswap) CancelWants(ks []key.Key) {
// HasBlock announces the existance of a block to this bitswap service. The // HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers. // service will potentially notify its peers.
func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { func (bs *Bitswap) HasBlock(blk *blocks.Block) error {
select { select {
case <-bs.process.Closing(): case <-bs.process.Closing():
return errors.New("bitswap is closed") return errors.New("bitswap is closed")
...@@ -246,8 +246,8 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { ...@@ -246,8 +246,8 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
select { select {
case bs.newBlocks <- blk: case bs.newBlocks <- blk:
// send block off to be reprovided // send block off to be reprovided
case <-ctx.Done(): case <-bs.process.Closing():
return ctx.Err() return bs.process.Close()
} }
return nil return nil
} }
...@@ -328,9 +328,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -328,9 +328,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
log.Event(ctx, "Bitswap.GetBlockRequest.End", &k) log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
log.Debugf("got block %s from %s", b, p) log.Debugf("got block %s from %s", b, p)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) if err := bs.HasBlock(b); err != nil {
defer cancel()
if err := bs.HasBlock(hasBlockCtx, b); err != nil {
log.Warningf("ReceiveMessage HasBlock error: %s", err) log.Warningf("ReceiveMessage HasBlock error: %s", err)
} }
}(block) }(block)
......
...@@ -70,7 +70,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { ...@@ -70,7 +70,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
hasBlock := peers[0] hasBlock := peers[0]
defer hasBlock.Exchange.Close() defer hasBlock.Exchange.Close()
if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil { if err := hasBlock.Exchange.HasBlock(block); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -162,7 +162,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -162,7 +162,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
first := instances[0] first := instances[0]
for _, b := range blocks { for _, b := range blocks {
blkeys = append(blkeys, b.Key()) blkeys = append(blkeys, b.Key())
first.Exchange.HasBlock(ctx, b) first.Exchange.HasBlock(b)
} }
t.Log("Distribute!") t.Log("Distribute!")
...@@ -224,7 +224,6 @@ func TestSendToWantingPeer(t *testing.T) { ...@@ -224,7 +224,6 @@ func TestSendToWantingPeer(t *testing.T) {
t.Logf("Session %v\n", peerA.Peer) t.Logf("Session %v\n", peerA.Peer)
t.Logf("Session %v\n", peerB.Peer) t.Logf("Session %v\n", peerB.Peer)
timeout := time.Second
waitTime := time.Second * 5 waitTime := time.Second * 5
alpha := bg.Next() alpha := bg.Next()
...@@ -237,9 +236,7 @@ func TestSendToWantingPeer(t *testing.T) { ...@@ -237,9 +236,7 @@ func TestSendToWantingPeer(t *testing.T) {
} }
// peerB announces to the network that he has block alpha // peerB announces to the network that he has block alpha
ctx, cancel = context.WithTimeout(context.Background(), timeout) err = peerB.Exchange.HasBlock(alpha)
defer cancel()
err = peerB.Exchange.HasBlock(ctx, alpha)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -266,7 +263,7 @@ func TestBasicBitswap(t *testing.T) { ...@@ -266,7 +263,7 @@ func TestBasicBitswap(t *testing.T) {
instances := sg.Instances(2) instances := sg.Instances(2)
blocks := bg.Blocks(1) blocks := bg.Blocks(1)
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0]) err := instances[0].Exchange.HasBlock(blocks[0])
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -19,7 +19,7 @@ type Interface interface { ...@@ -19,7 +19,7 @@ type Interface interface {
// TODO Should callers be concerned with whether the block was made // TODO Should callers be concerned with whether the block was made
// available on the network? // available on the network?
HasBlock(context.Context, *blocks.Block) error HasBlock(*blocks.Block) error
io.Closer io.Closer
} }
...@@ -28,7 +28,7 @@ func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (*blocks.Block, ...@@ -28,7 +28,7 @@ func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (*blocks.Block,
} }
// HasBlock always returns nil. // HasBlock always returns nil.
func (e *offlineExchange) HasBlock(_ context.Context, b *blocks.Block) error { func (e *offlineExchange) HasBlock(b *blocks.Block) error {
return e.bs.Put(b) return e.bs.Put(b)
} }
......
...@@ -26,7 +26,7 @@ func TestHasBlockReturnsNil(t *testing.T) { ...@@ -26,7 +26,7 @@ func TestHasBlockReturnsNil(t *testing.T) {
ex := Exchange(store) ex := Exchange(store)
block := blocks.NewBlock([]byte("data")) block := blocks.NewBlock([]byte("data"))
err := ex.HasBlock(context.Background(), block) err := ex.HasBlock(block)
if err != nil { if err != nil {
t.Fail() t.Fail()
} }
...@@ -44,7 +44,7 @@ func TestGetBlocks(t *testing.T) { ...@@ -44,7 +44,7 @@ func TestGetBlocks(t *testing.T) {
expected := g.Blocks(2) expected := g.Blocks(2)
for _, b := range expected { for _, b := range expected {
if err := ex.HasBlock(context.Background(), b); err != nil { if err := ex.HasBlock(b); err != nil {
t.Fail() t.Fail()
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment