Unverified Commit dcbe1f29 authored by dirkmc's avatar dirkmc Committed by GitHub

Merge PR Parallelize engine reads (#216)

* feat: parallelize reads

* feat: concurent engine task workers and concurrent bstore reads

* fix: lint

* fix: address review comments

* refactor: in BlockstoreManager wait for process.Closing() instead of Context.Done()

* fix: use channel size 0 for BlockstoreManager reads

* fix: change blockstore error logs from warnings to errors

* fix: flaky test

* fix: lint
parent 291b2674
......@@ -130,9 +130,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}
notif := notifications.New()
engine := decision.NewEngine(ctx, bstore, network.ConnectionManager()) // TODO close the engine with Close() method
bs := &Bitswap{
blockstore: bstore,
engine: decision.NewEngine(ctx, bstore, network.ConnectionManager()), // TODO close the engine with Close() method
engine: engine,
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
......@@ -161,6 +162,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
// Start up bitswaps async worker routines
bs.startWorkers(ctx, px)
engine.StartWorkers(ctx, px)
// bind the context and process.
// do it over here to avoid closing before all setup is done.
......@@ -372,7 +374,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.engine.MessageReceived(p, incoming)
bs.engine.MessageReceived(ctx, p, incoming)
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger
......
package decision
import (
"context"
"sync"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
process "github.com/jbenet/goprocess"
)
// blockstoreManager maintains a pool of workers that make requests to the blockstore.
type blockstoreManager struct {
bs bstore.Blockstore
workerCount int
jobs chan func()
px process.Process
}
// newBlockstoreManager creates a new blockstoreManager with the given context
// and number of workers
func newBlockstoreManager(ctx context.Context, bs bstore.Blockstore, workerCount int) *blockstoreManager {
return &blockstoreManager{
bs: bs,
workerCount: workerCount,
jobs: make(chan func()),
}
}
func (bsm *blockstoreManager) start(px process.Process) {
bsm.px = px
// Start up workers
for i := 0; i < bsm.workerCount; i++ {
px.Go(func(px process.Process) {
bsm.worker()
})
}
}
func (bsm *blockstoreManager) worker() {
for {
select {
case <-bsm.px.Closing():
return
case job := <-bsm.jobs:
job()
}
}
}
func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) {
select {
case <-ctx.Done():
case <-bsm.px.Closing():
case bsm.jobs <- job:
}
}
func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) map[cid.Cid]int {
res := make(map[cid.Cid]int)
if len(ks) == 0 {
return res
}
var lk sync.Mutex
bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
size, err := bsm.bs.GetSize(c)
if err != nil {
if err != bstore.ErrNotFound {
log.Errorf("blockstore.GetSize(%s) error: %s", c, err)
}
} else {
lk.Lock()
res[c] = size
lk.Unlock()
}
})
return res
}
func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) map[cid.Cid]blocks.Block {
res := make(map[cid.Cid]blocks.Block)
if len(ks) == 0 {
return res
}
var lk sync.Mutex
bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
blk, err := bsm.bs.Get(c)
if err != nil {
if err != bstore.ErrNotFound {
log.Errorf("blockstore.Get(%s) error: %s", c, err)
}
} else {
lk.Lock()
res[c] = blk
lk.Unlock()
}
})
return res
}
func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) {
wg := sync.WaitGroup{}
for _, k := range ks {
c := k
wg.Add(1)
bsm.addJob(ctx, func() {
jobFn(c)
wg.Done()
})
}
wg.Wait()
}
package decision
import (
"context"
"crypto/rand"
"errors"
"sync"
"testing"
"time"
"github.com/ipfs/go-bitswap/testutil"
cid "github.com/ipfs/go-cid"
blocks "github.com/ipfs/go-block-format"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/delayed"
ds_sync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
delay "github.com/ipfs/go-ipfs-delay"
process "github.com/jbenet/goprocess"
)
func TestBlockstoreManagerNotFoundKey(t *testing.T) {
ctx := context.Background()
bsdelay := delay.Fixed(3 * time.Millisecond)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(ctx, bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))
cids := testutil.GenerateCids(4)
sizes := bsm.getBlockSizes(ctx, cids)
if len(sizes) != 0 {
t.Fatal("Wrong response length")
}
for _, c := range cids {
if _, ok := sizes[c]; ok {
t.Fatal("Non-existent block should have no size")
}
}
blks := bsm.getBlocks(ctx, cids)
if len(blks) != 0 {
t.Fatal("Wrong response length")
}
for _, c := range cids {
if _, ok := blks[c]; ok {
t.Fatal("Non-existent block should have no size")
}
}
}
func TestBlockstoreManager(t *testing.T) {
ctx := context.Background()
bsdelay := delay.Fixed(3 * time.Millisecond)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(ctx, bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))
exp := make(map[cid.Cid]blocks.Block)
var blks []blocks.Block
for i := 0; i < 32; i++ {
buf := make([]byte, 1024*(i+1))
_, _ = rand.Read(buf)
b := blocks.NewBlock(buf)
blks = append(blks, b)
exp[b.Cid()] = b
}
// Put all blocks in the blockstore except the last one
if err := bstore.PutMany(blks[:len(blks)-1]); err != nil {
t.Fatal(err)
}
var cids []cid.Cid
for _, b := range blks {
cids = append(cids, b.Cid())
}
sizes := bsm.getBlockSizes(ctx, cids)
if len(sizes) != len(blks)-1 {
t.Fatal("Wrong response length")
}
for _, c := range cids {
expSize := len(exp[c].RawData())
size, ok := sizes[c]
// Only the last key should be missing
if c.Equals(cids[len(cids)-1]) {
if ok {
t.Fatal("Non-existent block should not be in sizes map")
}
} else {
if !ok {
t.Fatal("Block should be in sizes map")
}
if size != expSize {
t.Fatal("Block has wrong size")
}
}
}
fetched := bsm.getBlocks(ctx, cids)
if len(fetched) != len(blks)-1 {
t.Fatal("Wrong response length")
}
for _, c := range cids {
blk, ok := fetched[c]
// Only the last key should be missing
if c.Equals(cids[len(cids)-1]) {
if ok {
t.Fatal("Non-existent block should not be in blocks map")
}
} else {
if !ok {
t.Fatal("Block should be in blocks map")
}
if !blk.Cid().Equals(c) {
t.Fatal("Block has wrong cid")
}
}
}
}
func TestBlockstoreManagerConcurrency(t *testing.T) {
ctx := context.Background()
bsdelay := delay.Fixed(3 * time.Millisecond)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
workerCount := 5
bsm := newBlockstoreManager(ctx, bstore, workerCount)
bsm.start(process.WithTeardown(func() error { return nil }))
blkSize := int64(8 * 1024)
blks := testutil.GenerateBlocksOfSize(32, blkSize)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
}
err := bstore.PutMany(blks)
if err != nil {
t.Fatal(err)
}
// Create more concurrent requests than the number of workers
wg := sync.WaitGroup{}
for i := 0; i < 16; i++ {
wg.Add(1)
go func(t *testing.T) {
defer wg.Done()
sizes := bsm.getBlockSizes(ctx, ks)
if len(sizes) != len(blks) {
err = errors.New("Wrong response length")
}
}(t)
}
wg.Wait()
if err != nil {
t.Fatal(err)
}
}
func TestBlockstoreManagerClose(t *testing.T) {
ctx := context.Background()
delayTime := 20 * time.Millisecond
bsdelay := delay.Fixed(delayTime)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(ctx, bstore, 3)
px := process.WithTeardown(func() error { return nil })
bsm.start(px)
blks := testutil.GenerateBlocksOfSize(3, 1024)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
}
err := bstore.PutMany(blks)
if err != nil {
t.Fatal(err)
}
go px.Close()
time.Sleep(5 * time.Millisecond)
fnCallDone := make(chan struct{})
go func() {
bsm.getBlockSizes(ctx, ks)
fnCallDone <- struct{}{}
}()
select {
case <-fnCallDone:
t.Fatal("call to BlockstoreManager should be cancelled")
case <-px.Closed():
}
}
func TestBlockstoreManagerCtxDone(t *testing.T) {
delayTime := 20 * time.Millisecond
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), delayTime/2)
defer cancel()
bsdelay := delay.Fixed(delayTime)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(ctx, bstore, 3)
proc := process.WithTeardown(func() error { return nil })
bsm.start(proc)
blks := testutil.GenerateBlocksOfSize(3, 1024)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
}
err := bstore.PutMany(blks)
if err != nil {
t.Fatal(err)
}
fnCallDone := make(chan struct{})
go func() {
bsm.getBlockSizes(ctx, ks)
fnCallDone <- struct{}{}
}()
select {
case <-fnCallDone:
t.Fatal("call to BlockstoreManager should be cancelled")
case <-ctx.Done():
}
}
......@@ -15,6 +15,7 @@ import (
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"
process "github.com/jbenet/goprocess"
peer "github.com/libp2p/go-libp2p-core/peer"
)
......@@ -55,6 +56,8 @@ var log = logging.Logger("engine")
const (
// outboxChanBuffer must be 0 to prevent stale messages from being sent
outboxChanBuffer = 0
// Number of concurrent workers that pull tasks off the request queue
taskWorkerCount = 8
// maxMessageSize is the maximum size of the batched payload
maxMessageSize = 512 * 1024
// tagFormat is the tag given to peers associated an engine
......@@ -78,6 +81,9 @@ const (
// long/short term scores for tagging peers
longTermScore = 10 // this is a high tag but it grows _very_ slowly.
shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
// Number of concurrent workers that process requests to the blockstore
blockstoreWorkerCount = 128
)
var (
......@@ -125,7 +131,7 @@ type Engine struct {
// taskWorker goroutine
outbox chan (<-chan *Envelope)
bs bstore.Blockstore
bsm *blockstoreManager
peerTagger PeerTagger
......@@ -136,26 +142,43 @@ type Engine struct {
ledgerMap map[peer.ID]*ledger
ticker *time.Ticker
taskWorkerLock sync.Mutex
taskWorkerCount int
}
// NewEngine creates a new block sending engine for the given block store
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger) *Engine {
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
bs: bs,
peerTagger: peerTagger,
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
ledgerMap: make(map[peer.ID]*ledger),
bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
peerTagger: peerTagger,
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
taskWorkerCount: taskWorkerCount,
}
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
e.peerRequestQueue = peertaskqueue.New(peertaskqueue.OnPeerAddedHook(e.onPeerAdded), peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved))
go e.taskWorker(ctx)
e.peerRequestQueue = peertaskqueue.New(
peertaskqueue.OnPeerAddedHook(e.onPeerAdded),
peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved))
go e.scoreWorker(ctx)
return e
}
// Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
// Start up blockstore manager
e.bsm.start(px)
for i := 0; i < e.taskWorkerCount; i++ {
px.Go(func(px process.Process) {
e.taskWorker(ctx)
})
}
}
// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
......@@ -287,8 +310,11 @@ func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
}
}
// Each taskWorker pulls items off the request queue up and adds them to an
// envelope. The envelope is passed off to the bitswap workers, which send
// the message to the network.
func (e *Engine) taskWorker(ctx context.Context) {
defer close(e.outbox) // because taskWorker uses the channel exclusively
defer e.taskWorkerExit()
for {
oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
select {
......@@ -308,6 +334,17 @@ func (e *Engine) taskWorker(ctx context.Context) {
}
}
// taskWorkerExit handles cleanup of task workers
func (e *Engine) taskWorkerExit() {
e.taskWorkerLock.Lock()
defer e.taskWorkerLock.Unlock()
e.taskWorkerCount--
if e.taskWorkerCount == 0 {
close(e.outbox)
}
}
// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
......@@ -326,14 +363,15 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
}
// with a task in hand, we're ready to prepare the envelope...
blockCids := cid.NewSet()
for _, t := range nextTask.Tasks {
blockCids.Add(t.Identifier.(cid.Cid))
}
blks := e.bsm.getBlocks(ctx, blockCids.Keys())
msg := bsmsg.New(true)
for _, entry := range nextTask.Tasks {
block, err := e.bs.Get(entry.Identifier.(cid.Cid))
if err != nil {
log.Errorf("tried to execute a task and errored fetching block: %s", err)
continue
}
msg.AddBlock(block)
for _, b := range blks {
msg.AddBlock(b)
}
if msg.Empty() {
......@@ -379,7 +417,7 @@ func (e *Engine) Peers() []peer.ID {
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) {
if m.Empty() {
log.Debugf("received empty message from %s", p)
}
......@@ -391,6 +429,16 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
}
}()
// Get block sizes
entries := m.Wantlist()
wantKs := cid.NewSet()
for _, entry := range entries {
if !entry.Cancel {
wantKs.Add(entry.Cid)
}
}
blockSizes := e.bsm.getBlockSizes(ctx, wantKs.Keys())
l := e.findOrCreate(p)
l.lk.Lock()
defer l.lk.Unlock()
......@@ -408,13 +456,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
} else {
log.Debugf("wants %s - %d", entry.Cid, entry.Priority)
l.Wants(entry.Cid, entry.Priority)
blockSize, err := e.bs.GetSize(entry.Cid)
if err != nil {
if err == bstore.ErrNotFound {
continue
}
log.Error(err)
} else {
blockSize, ok := blockSizes[entry.Cid]
if ok {
// we have the block
newWorkExists = true
if msgSize+blockSize > maxMessageSize {
......@@ -484,9 +527,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
for _, block := range m.Blocks() {
l.SentBytes(len(block.RawData()))
l.wantList.Remove(block.Cid())
e.peerRequestQueue.Remove(block.Cid(), p)
}
}
// PeerConnected is called when a new peer connects, meaning we should start
......
......@@ -15,6 +15,7 @@ import (
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
process "github.com/jbenet/goprocess"
peer "github.com/libp2p/go-libp2p-core/peer"
testutil "github.com/libp2p/go-libp2p-core/test"
)
......@@ -88,13 +89,14 @@ type engineSet struct {
func newEngine(ctx context.Context, idStr string) engineSet {
fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := NewEngine(ctx, bs, fpt)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{
Peer: peer.ID(idStr),
//Strategy: New(true),
PeerTagger: fpt,
Blockstore: bs,
Engine: NewEngine(ctx,
bs, fpt),
Engine: e,
}
}
......@@ -112,7 +114,7 @@ func TestConsistentAccounting(t *testing.T) {
m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
sender.Engine.MessageSent(receiver.Peer, m)
receiver.Engine.MessageReceived(sender.Peer, m)
receiver.Engine.MessageReceived(ctx, sender.Peer, m)
}
// Ensure sender records the change
......@@ -142,7 +144,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
m := message.New(true)
sanfrancisco.Engine.MessageSent(seattle.Peer, m)
seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
seattle.Engine.MessageReceived(ctx, sanfrancisco.Peer, m)
if seattle.Peer == sanfrancisco.Peer {
t.Fatal("Sanity Check: Peers have same Key!")
......@@ -172,8 +174,10 @@ func peerIsPartner(p peer.ID, e *Engine) bool {
}
func TestOutboxClosedWhenEngineClosed(t *testing.T) {
ctx := context.Background()
t.SkipNow() // TODO implement *Engine.Close
e := NewEngine(context.Background(), blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{})
e := NewEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{})
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
var wg sync.WaitGroup
wg.Add(1)
go func() {
......@@ -228,9 +232,11 @@ func TestPartnerWantsThenCancels(t *testing.T) {
}
}
ctx := context.Background()
for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := NewEngine(context.Background(), bs, &fakePeerTagger{})
e := NewEngine(ctx, bs, &fakePeerTagger{})
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
for _, testcase := range testcases {
set := testcase[0]
cancels := testcase[1]
......@@ -310,7 +316,7 @@ func TestTaggingUseful(t *testing.T) {
if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(shortTerm * 10)
time.Sleep(shortTerm * 20)
if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
t.Fatal("peers should finally be untagged")
}
......@@ -322,7 +328,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) {
block := blocks.NewBlock([]byte(letter))
add.AddEntry(block.Cid(), len(keys)-i)
}
e.MessageReceived(partner, add)
e.MessageReceived(context.Background(), partner, add)
}
func partnerCancels(e *Engine, keys []string, partner peer.ID) {
......@@ -331,7 +337,7 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) {
block := blocks.NewBlock([]byte(k))
cancels.Cancel(block.Cid())
}
e.MessageReceived(partner, cancels)
e.MessageReceived(context.Background(), partner, cancels)
}
func checkHandledInOrder(t *testing.T, e *Engine, expected [][]string) error {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment