Commit eb0d1ffc authored by taylor's avatar taylor Committed by Steven Allen

bitswap: Bitswap now sends multiple blocks per message

Updated PeerRequestTask to hold multiple wantlist.Entry(s). This allows Bitswap to send multiple blocks in bulk per a Peer's request. Also, added a metric for how many blocks to put in a given message. Currently: 512 * 1024 bytes. 

License: MIT
Signed-off-by: default avatarJeromy <why@ipfs.io>
parent e21d842c
......@@ -25,6 +25,6 @@ func BenchmarkTaskQueuePush(b *testing.B) {
for i := 0; i < b.N; i++ {
c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
q.Push(&wantlist.Entry{Cid: c, Priority: math.MaxInt32}, peers[i%len(peers)])
q.Push(peers[i%len(peers)], &wantlist.Entry{Cid: c, Priority: math.MaxInt32})
}
}
......@@ -52,6 +52,8 @@ var log = logging.Logger("engine")
const (
// outboxChanBuffer must be 0 to prevent stale messages from being sent
outboxChanBuffer = 0
// maxMessageSize is the maximum size of the batched payload
maxMessageSize = 512 * 1024
)
// Envelope contains a message for a Peer
......@@ -59,8 +61,8 @@ type Envelope struct {
// Peer is the intended recipient
Peer peer.ID
// Block is the payload
Block blocks.Block
// Message is the payload
Message bsmsg.BitSwapMessage
// A callback to notify the decision queue that the task is complete
Sent func()
......@@ -166,21 +168,28 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
}
// with a task in hand, we're ready to prepare the envelope...
msg := bsmsg.New(true)
for _, entry := range nextTask.Entries {
block, err := e.bs.Get(entry.Cid)
if err != nil {
log.Errorf("tried to execute a task and errored fetching block: %s", err)
continue
}
msg.AddBlock(block)
}
block, err := e.bs.Get(nextTask.Entry.Cid)
if err != nil {
log.Errorf("tried to execute a task and errored fetching block: %s", err)
if msg.Empty() {
// If we don't have the block, don't hold that against the peer
// make sure to update that the task has been 'completed'
nextTask.Done()
nextTask.Done(nextTask.Entries)
continue
}
return &Envelope{
Peer: nextTask.Target,
Block: block,
Peer: nextTask.Target,
Message: msg,
Sent: func() {
nextTask.Done()
nextTask.Done(nextTask.Entries)
select {
case e.workSignal <- struct{}{}:
// work completing may mean that our queue will provide new
......@@ -231,6 +240,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
l.wantList = wl.New()
}
var msgSize int
var activeEntries []*wl.Entry
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debugf("%s cancel %s", p, entry.Cid)
......@@ -239,13 +250,28 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
} else {
log.Debugf("wants %s - %d", entry.Cid, entry.Priority)
l.Wants(entry.Cid, entry.Priority)
if exists, err := e.bs.Has(entry.Cid); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p)
blockSize, err := e.bs.GetSize(entry.Cid)
if err != nil {
if err == bstore.ErrNotFound {
continue
}
log.Error(err)
} else {
// we have the block
newWorkExists = true
if msgSize + blockSize > maxMessageSize {
e.peerRequestQueue.Push(p, activeEntries...)
activeEntries = []*wl.Entry{}
msgSize = 0
}
activeEntries = append(activeEntries, entry.Entry)
msgSize += blockSize
}
}
}
if len(activeEntries) > 0 {
e.peerRequestQueue.Push(p, activeEntries...)
}
for _, block := range m.Blocks() {
log.Debugf("got block %s %d bytes", block, len(block.RawData()))
l.ReceivedBytes(len(block.RawData()))
......@@ -259,7 +285,7 @@ func (e *Engine) addBlock(block blocks.Block) {
for _, l := range e.ledgerMap {
l.lk.Lock()
if entry, ok := l.WantListContains(block.Cid()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
e.peerRequestQueue.Push(l.Partner, entry)
work = true
}
l.lk.Unlock()
......
......@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"sync"
"testing"
......@@ -139,6 +138,19 @@ func TestPartnerWantsThenCancels(t *testing.T) {
},
{
alphabet, stringsComplement(alphabet, vowels),
alphabet[1:25], stringsComplement(alphabet[1:25], vowels), alphabet[2:25], stringsComplement(alphabet[2:25], vowels),
alphabet[3:25], stringsComplement(alphabet[3:25], vowels), alphabet[4:25], stringsComplement(alphabet[4:25], vowels),
alphabet[5:25], stringsComplement(alphabet[5:25], vowels), alphabet[6:25], stringsComplement(alphabet[6:25], vowels),
alphabet[7:25], stringsComplement(alphabet[7:25], vowels), alphabet[8:25], stringsComplement(alphabet[8:25], vowels),
alphabet[9:25], stringsComplement(alphabet[9:25], vowels), alphabet[10:25], stringsComplement(alphabet[10:25], vowels),
alphabet[11:25], stringsComplement(alphabet[11:25], vowels), alphabet[12:25], stringsComplement(alphabet[12:25], vowels),
alphabet[13:25], stringsComplement(alphabet[13:25], vowels), alphabet[14:25], stringsComplement(alphabet[14:25], vowels),
alphabet[15:25], stringsComplement(alphabet[15:25], vowels), alphabet[16:25], stringsComplement(alphabet[16:25], vowels),
alphabet[17:25], stringsComplement(alphabet[17:25], vowels), alphabet[18:25], stringsComplement(alphabet[18:25], vowels),
alphabet[19:25], stringsComplement(alphabet[19:25], vowels), alphabet[20:25], stringsComplement(alphabet[20:25], vowels),
alphabet[21:25], stringsComplement(alphabet[21:25], vowels), alphabet[22:25], stringsComplement(alphabet[22:25], vowels),
alphabet[23:25], stringsComplement(alphabet[23:25], vowels), alphabet[24:25], stringsComplement(alphabet[24:25], vowels),
alphabet[25:25], stringsComplement(alphabet[25:25], vowels),
},
}
......@@ -151,20 +163,22 @@ func TestPartnerWantsThenCancels(t *testing.T) {
}
for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := NewEngine(context.Background(), bs)
for _, testcase := range testcases {
set := testcase[0]
cancels := testcase[1]
keeps := stringsComplement(set, cancels)
expected = append(expected, keeps)
e := NewEngine(context.Background(), bs)
partner := testutil.RandPeerIDFatal(t)
partnerWants(e, set, partner)
partnerCancels(e, cancels, partner)
if err := checkHandledInOrder(t, e, keeps); err != nil {
t.Logf("run #%d of %d", i, numRounds)
t.Fatal(err)
}
}
if err := checkHandledInOrder(t, e, expected); err != nil {
t.Logf("run #%d of %d", i, numRounds)
t.Fatal(err)
}
}
}
......@@ -173,7 +187,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) {
add := message.New(false)
for i, letter := range keys {
block := blocks.NewBlock([]byte(letter))
add.AddEntry(block.Cid(), math.MaxInt32-i)
add.AddEntry(block.Cid(), len(keys)-i)
}
e.MessageReceived(partner, add)
}
......@@ -187,14 +201,28 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) {
e.MessageReceived(partner, cancels)
}
func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
for _, k := range keys {
func checkHandledInOrder(t *testing.T, e *Engine, expected [][]string) error {
for _, keys := range expected {
next := <-e.Outbox()
envelope := <-next
received := envelope.Block
expected := blocks.NewBlock([]byte(k))
if !received.Cid().Equals(expected.Cid()) {
return errors.New(fmt.Sprintln("received", string(received.RawData()), "expected", string(expected.RawData())))
received := envelope.Message.Blocks()
// Verify payload message length
if len(received) != len(keys) {
return errors.New(fmt.Sprintln("# blocks received", len(received), "# blocks expected", len(keys)))
}
// Verify payload message contents
for _, k := range keys {
found := false
expected := blocks.NewBlock([]byte(k))
for _, block := range received {
if block.Cid().Equals(expected.Cid()) {
found = true
break
}
}
if !found {
return errors.New(fmt.Sprintln("received", received, "expected", string(expected.RawData())))
}
}
}
return nil
......
......@@ -14,7 +14,7 @@ import (
type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask
Push(entry *wantlist.Entry, to peer.ID)
Push(to peer.ID, entries ...*wantlist.Entry)
Remove(k cid.Cid, p peer.ID)
// NB: cannot expose simply expose taskQueue.Len because trashed elements
......@@ -46,7 +46,7 @@ type prq struct {
}
// Push currently adds a new peerRequestTask to the end of the list
func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
tl.lock.Lock()
defer tl.lock.Unlock()
partner, ok := tl.partners[to]
......@@ -58,31 +58,49 @@ func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
partner.activelk.Lock()
defer partner.activelk.Unlock()
if partner.activeBlocks.Has(entry.Cid) {
return
var priority int
newEntries := make([]*wantlist.Entry, 0, len(entries))
for _, entry := range entries {
if partner.activeBlocks.Has(entry.Cid) {
continue
}
if task, ok := tl.taskMap[taskEntryKey(to, entry.Cid)]; ok {
if entry.Priority > task.Priority {
task.Priority = entry.Priority
partner.taskQueue.Update(task.index)
}
continue
}
if entry.Priority > priority {
priority = entry.Priority
}
newEntries = append(newEntries, entry)
}
if task, ok := tl.taskMap[taskKey(to, entry.Cid)]; ok {
task.Entry.Priority = entry.Priority
partner.taskQueue.Update(task.index)
if len(newEntries) == 0 {
return
}
task := &peerRequestTask{
Entry: entry,
Entries: newEntries,
Target: to,
created: time.Now(),
Done: func() {
Done: func(e []*wantlist.Entry) {
tl.lock.Lock()
partner.TaskDone(entry.Cid)
for _, entry := range e {
partner.TaskDone(entry.Cid)
}
tl.pQueue.Update(partner.Index())
tl.lock.Unlock()
},
}
task.Priority = priority
partner.taskQueue.Push(task)
tl.taskMap[task.Key()] = task
partner.requests++
for _, entry := range newEntries {
tl.taskMap[taskEntryKey(to, entry.Cid)] = task
}
partner.requests += len(newEntries)
tl.pQueue.Update(partner.Index())
}
......@@ -98,14 +116,23 @@ func (tl *prq) Pop() *peerRequestTask {
var out *peerRequestTask
for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
out = partner.taskQueue.Pop().(*peerRequestTask)
delete(tl.taskMap, out.Key())
if out.trash {
out = nil
continue // discarding tasks that have been removed
}
partner.StartTask(out.Entry.Cid)
partner.requests--
newEntries := make([]*wantlist.Entry, 0, len(out.Entries))
for _, entry := range out.Entries {
delete(tl.taskMap, taskEntryKey(out.Target, entry.Cid))
if entry.Trash {
continue
}
partner.requests--
partner.StartTask(entry.Cid)
newEntries = append(newEntries, entry)
}
if len(newEntries) > 0 {
out.Entries = newEntries
} else {
out = nil // discarding tasks that have been removed
continue
}
break // and return |out|
}
......@@ -116,12 +143,17 @@ func (tl *prq) Pop() *peerRequestTask {
// Remove removes a task from the queue
func (tl *prq) Remove(k cid.Cid, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskMap[taskKey(p, k)]
t, ok := tl.taskMap[taskEntryKey(p, k)]
if ok {
// remove the task "lazily"
// simply mark it as trash, so it'll be dropped when popped off the
// queue.
t.trash = true
for _, entry := range t.Entries {
if entry.Cid.Equals(k) {
// remove the task "lazily"
// simply mark it as trash, so it'll be dropped when popped off the
// queue.
entry.Trash = true
break
}
}
// having canceled a block, we now account for that in the given partner
partner := tl.partners[p]
......@@ -166,24 +198,18 @@ func (tl *prq) thawRound() {
}
type peerRequestTask struct {
Entry *wantlist.Entry
Target peer.ID
Entries []*wantlist.Entry
Priority int
Target peer.ID
// A callback to signal that this task has been completed
Done func()
Done func([]*wantlist.Entry)
// trash in a book-keeping field
trash bool
// created marks the time that the task was added to the queue
created time.Time
index int // book-keeping field used by the pq container
}
// Key uniquely identifies a task.
func (t *peerRequestTask) Key() string {
return taskKey(t.Target, t.Entry.Cid)
}
// Index implements pq.Elem
func (t *peerRequestTask) Index() int {
return t.index
......@@ -194,8 +220,8 @@ func (t *peerRequestTask) SetIndex(i int) {
t.index = i
}
// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.ID, k cid.Cid) string {
// taskEntryKey returns a key that uniquely identifies a task.
func taskEntryKey(p peer.ID, k cid.Cid) string {
return string(p) + k.KeyString()
}
......@@ -208,7 +234,7 @@ var FIFO = func(a, b *peerRequestTask) bool {
// different peers, the oldest task is prioritized.
var V1 = func(a, b *peerRequestTask) bool {
if a.Target == b.Target {
return a.Entry.Priority > b.Entry.Priority
return a.Priority > b.Priority
}
return FIFO(a, b)
}
......
......@@ -45,7 +45,7 @@ func TestPushPop(t *testing.T) {
t.Log(partner.String())
c := cid.NewCidV0(u.Hash([]byte(letter)))
prq.Push(&wantlist.Entry{Cid: c, Priority: math.MaxInt32 - index}, partner)
prq.Push(partner, &wantlist.Entry{Cid: c, Priority: math.MaxInt32 - index})
}
for _, consonant := range consonants {
c := cid.NewCidV0(u.Hash([]byte(consonant)))
......@@ -61,7 +61,9 @@ func TestPushPop(t *testing.T) {
break
}
out = append(out, received.Entry.Cid.String())
for _, entry := range received.Entries {
out = append(out, entry.Cid.String())
}
}
// Entries popped should already be in correct order
......@@ -85,10 +87,10 @@ func TestPeerRepeats(t *testing.T) {
for i := 0; i < 5; i++ {
elcid := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
prq.Push(&wantlist.Entry{Cid: elcid}, a)
prq.Push(&wantlist.Entry{Cid: elcid}, b)
prq.Push(&wantlist.Entry{Cid: elcid}, c)
prq.Push(&wantlist.Entry{Cid: elcid}, d)
prq.Push(a, &wantlist.Entry{Cid: elcid})
prq.Push(b, &wantlist.Entry{Cid: elcid})
prq.Push(c, &wantlist.Entry{Cid: elcid})
prq.Push(d, &wantlist.Entry{Cid: elcid})
}
// now, pop off four entries, there should be one from each
......@@ -117,7 +119,7 @@ func TestPeerRepeats(t *testing.T) {
for blockI := 0; blockI < 4; blockI++ {
for i := 0; i < 4; i++ {
// its okay to mark the same task done multiple times here (JUST FOR TESTING)
tasks[i].Done()
tasks[i].Done(tasks[i].Entries)
ntask := prq.Pop()
if ntask.Target != tasks[i].Target {
......
......@@ -24,6 +24,8 @@ type Entry struct {
Priority int
SesTrk map[uint64]struct{}
// Trash in a book-keeping field
Trash bool
}
// NewRefEntry creates a new reference tracked wantlist entry
......
......@@ -114,16 +114,20 @@ func (pm *WantManager) ConnectedPeers() []peer.ID {
return <-resp
}
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
func (pm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer env.Sent()
pm.sentHistogram.Observe(float64(len(env.Block.RawData())))
msgSize := 0
msg := bsmsg.New(false)
msg.AddBlock(env.Block)
log.Infof("Sending block %s to %s", env.Block, env.Peer)
for _, block := range env.Message.Blocks() {
msgSize += len(block.RawData())
msg.AddBlock(block)
log.Infof("Sending block %s to %s", block, env.Peer)
}
pm.sentHistogram.Observe(float64(msgSize))
err := pm.network.SendMessage(ctx, env.Peer, msg)
if err != nil {
log.Infof("sendblock error: %s", err)
......
......@@ -59,24 +59,27 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
if !ok {
continue
}
log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableF(func() map[string]interface{} {
return logging.LoggableMap{
"ID": id,
"Target": envelope.Peer.Pretty(),
"Block": envelope.Block.Cid().String(),
}
}))
// update the BS ledger to reflect sent message
// TODO: Should only track *useful* messages in ledger
outgoing := bsmsg.New(false)
outgoing.AddBlock(envelope.Block)
for _, block := range envelope.Message.Blocks() {
log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableF(func() map[string]interface{} {
return logging.LoggableMap{
"ID": id,
"Target": envelope.Peer.Pretty(),
"Block": block.Cid().String(),
}
}))
outgoing.AddBlock(block)
}
bs.engine.MessageSent(envelope.Peer, outgoing)
bs.wm.SendBlock(ctx, envelope)
bs.wm.SendBlocks(ctx, envelope)
bs.counterLk.Lock()
bs.counters.blocksSent++
bs.counters.dataSent += uint64(len(envelope.Block.RawData()))
for _, block := range envelope.Message.Blocks() {
bs.counters.blocksSent++
bs.counters.dataSent += uint64(len(block.RawData()))
}
bs.counterLk.Unlock()
case <-ctx.Done():
return
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment