Commit b8f1e31c authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #2798 from ipfs/feat/smarter-bitswap

Make bitswap better
parents 53553e64 4ae16a8f
......@@ -3,6 +3,7 @@ package decision
import (
"sync"
"time"
blocks "github.com/ipfs/go-ipfs/blocks"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
......@@ -68,7 +69,7 @@ type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
// Requests are popped from the queue, packaged up, and placed in the
// outbox.
peerRequestQueue peerRequestQueue
peerRequestQueue *prq
// FIXME it's a bit odd for the client and the worker to both share memory
// (both modify the peerRequestQueue) and also to communicate over the
......@@ -86,6 +87,8 @@ type Engine struct {
lock sync.Mutex // protects the fields immediatly below
// ledgerMap lists Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger
ticker *time.Ticker
}
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
......@@ -95,6 +98,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
peerRequestQueue: newPRQ(),
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
}
go e.taskWorker(ctx)
return e
......@@ -142,6 +146,9 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
return nil, ctx.Err()
case <-e.workSignal:
nextTask = e.peerRequestQueue.Pop()
case <-e.ticker.C:
e.peerRequestQueue.thawRound()
nextTask = e.peerRequestQueue.Pop()
}
}
......@@ -191,9 +198,6 @@ func (e *Engine) Peers() []peer.ID {
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
e.lock.Lock()
defer e.lock.Unlock()
if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
log.Debugf("received empty message from %s", p)
}
......@@ -206,6 +210,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
}()
l := e.findOrCreate(p)
l.lk.Lock()
defer l.lk.Unlock()
if m.Full() {
l.wantList = wl.New()
}
......@@ -236,10 +242,12 @@ func (e *Engine) addBlock(block blocks.Block) {
work := false
for _, l := range e.ledgerMap {
l.lk.Lock()
if entry, ok := l.WantListContains(block.Key()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
work = true
}
l.lk.Unlock()
}
if work {
......@@ -261,9 +269,6 @@ func (e *Engine) AddBlock(block blocks.Block) {
// send happen atomically
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
e.lock.Lock()
defer e.lock.Unlock()
l := e.findOrCreate(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data()))
......@@ -290,11 +295,13 @@ func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
// ledger lazily instantiates a ledger
func (e *Engine) findOrCreate(p peer.ID) *ledger {
e.lock.Lock()
l, ok := e.ledgerMap[p]
if !ok {
l = newLedger(p)
e.ledgerMap[p] = l
}
e.lock.Unlock()
return l
}
......
package decision
import (
"sync"
"time"
key "github.com/ipfs/go-ipfs/blocks/key"
......@@ -44,6 +45,8 @@ type ledger struct {
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
// to a given peer
sentToPeer map[key.Key]time.Time
lk sync.Mutex
}
type debtRatio struct {
......
......@@ -15,14 +15,16 @@ type peerRequestQueue interface {
Pop() *peerRequestTask
Push(entry wantlist.Entry, to peer.ID)
Remove(k key.Key, p peer.ID)
// NB: cannot expose simply expose taskQueue.Len because trashed elements
// may exist. These trashed elements should not contribute to the count.
}
func newPRQ() peerRequestQueue {
func newPRQ() *prq {
return &prq{
taskMap: make(map[string]*peerRequestTask),
partners: make(map[peer.ID]*activePartner),
frozen: make(map[peer.ID]*activePartner),
pQueue: pq.New(partnerCompare),
}
}
......@@ -38,6 +40,8 @@ type prq struct {
pQueue pq.PQ
taskMap map[string]*peerRequestTask
partners map[peer.ID]*activePartner
frozen map[peer.ID]*activePartner
}
// Push currently adds a new peerRequestTask to the end of the list
......@@ -92,7 +96,7 @@ func (tl *prq) Pop() *peerRequestTask {
partner := tl.pQueue.Pop().(*activePartner)
var out *peerRequestTask
for partner.taskQueue.Len() > 0 {
for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
out = partner.taskQueue.Pop().(*peerRequestTask)
delete(tl.taskMap, out.Key())
if out.trash {
......@@ -120,11 +124,47 @@ func (tl *prq) Remove(k key.Key, p peer.ID) {
t.trash = true
// having canceled a block, we now account for that in the given partner
tl.partners[p].requests--
partner := tl.partners[p]
partner.requests--
// we now also 'freeze' that partner. If they sent us a cancel for a
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
// them a block they already potentially have
if partner.freezeVal == 0 {
tl.frozen[p] = partner
}
partner.freezeVal++
tl.pQueue.Update(partner.index)
}
tl.lock.Unlock()
}
func (tl *prq) fullThaw() {
tl.lock.Lock()
defer tl.lock.Unlock()
for id, partner := range tl.frozen {
partner.freezeVal = 0
delete(tl.frozen, id)
tl.pQueue.Update(partner.index)
}
}
func (tl *prq) thawRound() {
tl.lock.Lock()
defer tl.lock.Unlock()
for id, partner := range tl.frozen {
partner.freezeVal -= (partner.freezeVal + 1) / 2
if partner.freezeVal <= 0 {
delete(tl.frozen, id)
}
tl.pQueue.Update(partner.index)
}
}
type peerRequestTask struct {
Entry wantlist.Entry
Target peer.ID
......@@ -196,6 +236,8 @@ type activePartner struct {
// for the PQ interface
index int
freezeVal int
// priority queue of tasks belonging to this peer
taskQueue pq.PQ
}
......@@ -208,6 +250,7 @@ func newActivePartner() *activePartner {
}
// partnerCompare implements pq.ElemComparator
// returns true if peer 'a' has higher priority than peer 'b'
func partnerCompare(a, b pq.Elem) bool {
pa := a.(*activePartner)
pb := b.(*activePartner)
......@@ -220,6 +263,14 @@ func partnerCompare(a, b pq.Elem) bool {
if pb.requests == 0 {
return true
}
if pa.freezeVal > pb.freezeVal {
return false
}
if pa.freezeVal < pb.freezeVal {
return true
}
if pa.active == pb.active {
// sorting by taskQueue.Len() aids in cleaning out trash entries faster
// if we sorted instead by requests, one peer could potentially build up
......
......@@ -47,6 +47,8 @@ func TestPushPop(t *testing.T) {
prq.Remove(key.Key(consonant), partner)
}
prq.fullThaw()
var out []string
for {
received := prq.Pop()
......
......@@ -25,9 +25,16 @@ type BitSwapNetwork interface {
ConnectTo(context.Context, peer.ID) error
NewMessageSender(context.Context, peer.ID) (MessageSender, error)
Routing
}
type MessageSender interface {
SendMsg(bsmsg.BitSwapMessage) error
Close() error
}
// Implement Receiver to receive messages from the BitSwapNetwork
type Receiver interface {
ReceiveMessage(
......
......@@ -42,6 +42,27 @@ type impl struct {
receiver Receiver
}
type streamMessageSender struct {
s inet.Stream
}
func (s *streamMessageSender) Close() error {
return s.s.Close()
}
func (s *streamMessageSender) SendMsg(msg bsmsg.BitSwapMessage) error {
return msg.ToNet(s.s)
}
func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) {
s, err := bsnet.newStreamToPeer(ctx, p)
if err != nil {
return nil, err
}
return &streamMessageSender{s: s}, nil
}
func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) {
// first, make sure we're connected.
......
......@@ -112,6 +112,30 @@ func (nc *networkClient) FindProvidersAsync(ctx context.Context, k key.Key, max
return out
}
type messagePasser struct {
net *network
target peer.ID
local peer.ID
ctx context.Context
}
func (mp *messagePasser) SendMsg(m bsmsg.BitSwapMessage) error {
return mp.net.SendMessage(mp.ctx, mp.local, mp.target, m)
}
func (mp *messagePasser) Close() error {
return nil
}
func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
return &messagePasser{
net: n.network,
target: p,
local: n.local,
ctx: ctx,
}, nil
}
// Provide provides the key to the network
func (nc *networkClient) Provide(ctx context.Context, k key.Key) error {
return nc.routing.Provide(ctx, k)
......
......@@ -26,9 +26,11 @@ type WantManager struct {
network bsnet.BitSwapNetwork
ctx context.Context
cancel func()
}
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
ctx, cancel := context.WithCancel(ctx)
return &WantManager{
incoming: make(chan []*bsmsg.Entry, 10),
connect: make(chan peer.ID, 10),
......@@ -38,6 +40,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana
wl: wantlist.NewThreadSafe(),
network: network,
ctx: ctx,
cancel: cancel,
}
}
......@@ -58,6 +61,8 @@ type msgQueue struct {
out bsmsg.BitSwapMessage
network bsnet.BitSwapNetwork
sender bsnet.MessageSender
refcnt int
work chan struct{}
......@@ -150,6 +155,11 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) {
}
func (mq *msgQueue) runQueue(ctx context.Context) {
defer func() {
if mq.sender != nil {
mq.sender.Close()
}
}()
for {
select {
case <-mq.work: // there is work to be done
......@@ -166,6 +176,7 @@ func (mq *msgQueue) doWork(ctx context.Context) {
// allow ten minutes for connections
// this includes looking them up in the dht
// dialing them, and handshaking
if mq.sender == nil {
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()
......@@ -176,6 +187,16 @@ func (mq *msgQueue) doWork(ctx context.Context) {
return
}
nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
log.Infof("cant open new stream to peer %s: %s", mq.p, err)
// TODO: cant open stream, what now?
return
}
mq.sender = nsender
}
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
......@@ -186,13 +207,12 @@ func (mq *msgQueue) doWork(ctx context.Context) {
mq.out = nil
mq.outlk.Unlock()
sendctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()
// send wantlist updates
err = mq.network.SendMessage(sendctx, mq.p, wlm)
err := mq.sender.SendMsg(wlm)
if err != nil {
log.Infof("bitswap send error: %s", err)
mq.sender.Close()
mq.sender = nil
// TODO: what do we do if this fails?
return
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment