Commit 5853eac3 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #1218 from ipfs/refactor/bitswap

large refactor of bitswap, implement wantmanager to manage wantlist
parents f0276dc3 ce0d2f46
#Welcome to Bitswap
###(The data trading engine)
# Bitswap
## Protocol
Bitswap is the data trading module for ipfs, it manages requesting and sending
blocks to and from other peers in the network. Bitswap has two main jobs, the
first is to acquire blocks requested by the client from the network. The second
is to judiciously send blocks in its posession to other peers who want them.
Bitswap is a message based protocol, as opposed to response-reply. All messages
contain wantlists, or blocks. Upon receiving a wantlist, a node should consider
sending out wanted blocks if they have them. Upon receiving blocks, the node
should send out a notification called a 'Cancel' signifying that they no longer
want the block. At a protocol level, bitswap is very simple.
## go-ipfs Implementation
Internally, when a message with a wantlist is received, it is sent to the
decision engine to be considered, and blocks that we have that are wanted are
placed into the peer request queue. Any block we possess that is wanted by
another peer has a task in the peer request queue created for it. The peer
request queue is a priority queue that sorts available tasks by some metric,
currently, that metric is very simple and aims to fairly address the tasks
of each other peer. More advanced decision logic will be implemented in the
future. Task workers pull tasks to be done off of the queue, retreive the block
to be sent, and send it off. The number of task workers is limited by a constant
factor.
Client requests for new blocks are handled by the want manager, for every new
block (or set of blocks) wanted, the 'WantBlocks' method is invoked. The want
manager then ensures that connected peers are notified of the new block that we
want by sending the new entries to a message queue for each peer. The message
queue will loop while there is work available and do the following: 1) Ensure it
has a connection to its peer, 2) grab the message to be sent, and 3) send it.
If new messages are added while the loop is in steps 1 or 3, the messages are
combined into one to avoid having to keep an actual queue and send multiple
messages. The same process occurs when the client receives a block and sends a
cancel message for it.
Bitswap is the module that is responsible for requesting and providing data
blocks over the network to and from other ipfs peers. The role of bitswap is
to be a merchant in the large global marketplace of data.
##Main Operations
Bitswap has three high level operations:
- **GetBlocks**
- `GetBlocks` is a bitswap method used to request multiple blocks that are likely
to all be provided by the same set of peers (part of a single file, for example).
- **GetBlock**
- `GetBlock` is a special case of `GetBlocks` that just requests a single block.
- **HasBlock**
- `HasBlock` registers a local block with bitswap. Bitswap will then send that
block to any connected peers who want it (with the strategies approval), record
that transaction in the ledger and announce to the DHT that the block is being
provided.
##Internal Details
All `GetBlock` requests are relayed into a single for-select loop via channels.
Calls to `GetBlocks` will have `FindProviders` called for only the first key in
the set initially, This is an optimization attempting to cut down on the number
of RPCs required. After a timeout (specified by the strategies
`GetRebroadcastDelay`) Bitswap will iterate through all keys still in the local
wantlist, perform a find providers call for each, and sent the wantlist out to
those providers. This is the fallback behaviour for cases where our initial
assumption about one peer potentially having multiple blocks in a set does not
hold true.
When receiving messages, Bitswaps `ReceiveMessage` method is called. A bitswap
message may contain the wantlist of the peer who sent the message, and an array
of blocks that were on our local wantlist. Any blocks we receive in a bitswap
message will be passed to `HasBlock`, and the other peers wantlist gets updated
in the strategy by `bs.strategy.MessageReceived`.
If another peers wantlist is received, Bitswap will call its strategies
`ShouldSendBlockToPeer` method to determine whether or not the other peer will
be sent the block they are requesting (if we even have it).
##Outstanding TODOs:
- [ ] Ensure only one request active per key
- [ ] More involved strategies
- [ ] Ensure only wanted blocks are counted in ledgers
......@@ -4,7 +4,6 @@ package bitswap
import (
"errors"
"fmt"
"math"
"sync"
"time"
......@@ -23,7 +22,6 @@ import (
"github.com/ipfs/go-ipfs/thirdparty/delay"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
u "github.com/ipfs/go-ipfs/util"
pset "github.com/ipfs/go-ipfs/util/peerset" // TODO move this to peerstore
)
var log = eventlog.Logger("bitswap")
......@@ -45,9 +43,7 @@ const (
provideWorkers = 4
)
var (
rebroadcastDelay = delay.Fixed(time.Second * 10)
)
var rebroadcastDelay = delay.Fixed(time.Second * 10)
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
......@@ -86,12 +82,13 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
wantlist: wantlist.NewThreadSafe(),
batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan u.Key),
wm: NewWantManager(ctx, network),
}
go bs.wm.Run()
network.SetDelegate(bs)
// Start up bitswaps async worker routines
......@@ -108,6 +105,10 @@ type Bitswap struct {
// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
wm *WantManager
// blockstore is the local database
// NB: ensure threadsafety
blockstore blockstore.Blockstore
......@@ -121,14 +122,13 @@ type Bitswap struct {
engine *decision.Engine
wantlist *wantlist.ThreadSafe
process process.Process
newBlocks chan *blocks.Block
provideKeys chan u.Key
counterLk sync.Mutex
blocksRecvd int
dupBlocksRecvd int
}
......@@ -217,7 +217,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
log.Event(ctx, "hasBlock", blk)
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
......@@ -227,76 +226,22 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
if err := bs.blockstore.Put(blk); err != nil {
return err
}
bs.wantlist.Remove(blk.Key())
bs.notifications.Publish(blk)
select {
case bs.newBlocks <- blk:
// send block off to be reprovided
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
set := pset.New()
wg := sync.WaitGroup{}
loop:
for {
select {
case peerToQuery, ok := <-peers:
if !ok {
break loop
}
if !set.TryAdd(peerToQuery) { //Do once per peer
continue
}
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
if err := bs.send(ctx, p, m); err != nil {
log.Debug(err) // TODO remove if too verbose
}
}(peerToQuery)
case <-ctx.Done():
return nil
}
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
// NB: we may be abandoning goroutines here before they complete
// this shouldnt be an issue because they will complete soon anyways
// we just don't want their being slow to impact bitswap transfer speeds
}
return nil
}
func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
message := bsmsg.New()
message.SetFull(true)
for _, wanted := range bs.wantlist.Entries() {
message.AddEntry(wanted.Key, wanted.Priority)
}
return bs.sendWantlistMsgToPeers(ctx, message, peers)
}
func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) {
func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// prepare a channel to hand off to sendWantlistToPeers
sendToPeers := make(chan peer.ID)
// Get providers for all entries in wantlist (could take a while)
wg := sync.WaitGroup{}
for _, e := range entries {
......@@ -308,154 +253,76 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
defer cancel()
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers {
sendToPeers <- prov
go func(p peer.ID) {
bs.network.ConnectTo(ctx, p)
}(prov)
}
}(e.Key)
}
go func() {
wg.Wait() // make sure all our children do finish.
close(sendToPeers)
}()
err := bs.sendWantlistToPeers(ctx, sendToPeers)
if err != nil {
log.Debugf("sendWantlistToPeers error: %s", err)
}
wg.Wait() // make sure all our children do finish.
}
// TODO(brian): handle errors
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error {
defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.engine.MessageReceived(p, incoming)
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger
if len(incoming.Blocks()) == 0 {
return
}
// quickly send out cancels, reduces chances of duplicate block receives
var keys []u.Key
for _, block := range incoming.Blocks() {
keys = append(keys, block.Key())
}
bs.wm.CancelWants(keys)
for _, block := range incoming.Blocks() {
bs.counterLk.Lock()
bs.blocksRecvd++
if has, err := bs.blockstore.Has(block.Key()); err == nil && has {
bs.dupBlocksRecvd++
}
log.Debugf("got block %s from %s", block, p)
bs.counterLk.Unlock()
log.Infof("got block %s from %s (%d,%d)", block, p, bs.blocksRecvd, bs.dupBlocksRecvd)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, block); err != nil {
return fmt.Errorf("ReceiveMessage HasBlock error: %s", err)
log.Warningf("ReceiveMessage HasBlock error: %s", err)
}
cancel()
keys = append(keys, block.Key())
}
bs.cancelBlocks(ctx, keys)
return nil
}
// Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerConnected(p peer.ID) {
// TODO: add to clientWorker??
peers := make(chan peer.ID, 1)
peers <- p
close(peers)
err := bs.sendWantlistToPeers(context.TODO(), peers)
if err != nil {
log.Debugf("error sending wantlist: %s", err)
}
bs.wm.Connected(p)
}
// Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.wm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}
func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
if len(bkeys) < 1 {
return
}
message := bsmsg.New()
message.SetFull(false)
for _, k := range bkeys {
log.Debug("cancel block: %s", k)
message.Cancel(k)
}
wg := sync.WaitGroup{}
for _, p := range bs.engine.Peers() {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
err := bs.send(ctx, p, message)
if err != nil {
log.Warningf("Error sending message: %s", err)
return
}
}(p)
}
wg.Wait()
return
}
func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
if len(bkeys) < 1 {
return
}
message := bsmsg.New()
message.SetFull(false)
for i, k := range bkeys {
message.AddEntry(k, kMaxPriority-i)
}
wg := sync.WaitGroup{}
for _, p := range bs.engine.Peers() {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
err := bs.send(ctx, p, message)
if err != nil {
log.Debugf("Error sending message: %s", err)
}
}(p)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
// NB: we may be abandoning goroutines here before they complete
// this shouldnt be an issue because they will complete soon anyways
// we just don't want their being slow to impact bitswap transfer speeds
}
}
func (bs *Bitswap) ReceiveError(err error) {
log.Debugf("Bitswap ReceiveError: %s", err)
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
}
// send strives to ensure that accounting is always performed when a message is
// sent
func (bs *Bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
defer log.EventBegin(ctx, "sendMessage", p, m).Done()
if err := bs.network.SendMessage(ctx, p, m); err != nil {
return err
}
return bs.engine.MessageSent(p, m)
}
func (bs *Bitswap) Close() error {
return bs.process.Close()
}
func (bs *Bitswap) GetWantlist() []u.Key {
var out []u.Key
for _, e := range bs.wantlist.Entries() {
for _, e := range bs.wm.wl.Entries() {
out = append(out, e.Key)
}
return out
......
......@@ -58,8 +58,6 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
}
}
// TestGetBlockAfterRequesting...
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
......@@ -67,14 +65,15 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
g := NewTestSessionGenerator(net)
defer g.Close()
hasBlock := g.Next()
peers := g.Instances(2)
hasBlock := peers[0]
defer hasBlock.Exchange.Close()
if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
wantsBlock := g.Next()
wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()
ctx, _ := context.WithTimeout(context.Background(), time.Second)
......@@ -93,7 +92,7 @@ func TestLargeSwarm(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
numInstances := 500
numInstances := 100
numBlocks := 2
if detectrace.WithRace() {
// when running with the race detector, 500 instances launches
......@@ -121,6 +120,27 @@ func TestLargeFile(t *testing.T) {
PerformDistributionTest(t, numInstances, numBlocks)
}
func TestLargeFileNoRebroadcast(t *testing.T) {
rbd := rebroadcastDelay.Get()
rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough
if testing.Short() {
t.SkipNow()
}
numInstances := 10
numBlocks := 100
PerformDistributionTest(t, numInstances, numBlocks)
rebroadcastDelay.Set(rbd)
}
func TestLargeFileTwoPeers(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
numInstances := 2
numBlocks := 100
PerformDistributionTest(t, numInstances, numBlocks)
}
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
if testing.Short() {
t.SkipNow()
......@@ -130,8 +150,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
t.Log("Test a few nodes trying to get one file with a lot of blocks")
instances := sg.Instances(numInstances)
blocks := bg.Blocks(numBlocks)
......@@ -196,8 +214,9 @@ func TestSendToWantingPeer(t *testing.T) {
prev := rebroadcastDelay.Set(time.Second / 2)
defer func() { rebroadcastDelay.Set(prev) }()
peerA := sg.Next()
peerB := sg.Next()
peers := sg.Instances(2)
peerA := peers[0]
peerB := peers[1]
t.Logf("Session %v\n", peerA.Peer)
t.Logf("Session %v\n", peerB.Peer)
......@@ -238,7 +257,7 @@ func TestBasicBitswap(t *testing.T) {
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
t.Log("Test a few nodes trying to get one file with a lot of blocks")
t.Log("Test a one node trying to get one block from another")
instances := sg.Instances(2)
blocks := bg.Blocks(1)
......
......@@ -5,6 +5,7 @@ import (
"sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
......@@ -53,8 +54,9 @@ const (
type Envelope struct {
// Peer is the intended recipient
Peer peer.ID
// Message is the payload
Message bsmsg.BitSwapMessage
// Block is the payload
Block *blocks.Block
// A callback to notify the decision queue that the task is complete
Sent func()
......@@ -90,7 +92,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
bs: bs,
peerRequestQueue: newPRQ(),
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}),
workSignal: make(chan struct{}, 1),
}
go e.taskWorker(ctx)
return e
......@@ -151,12 +153,18 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
continue
}
m := bsmsg.New() // TODO: maybe add keys from our wantlist?
m.AddBlock(block)
return &Envelope{
Peer: nextTask.Target,
Message: m,
Sent: nextTask.Done,
Peer: nextTask.Target,
Block: block,
Sent: func() {
nextTask.Done()
select {
case e.workSignal <- struct{}{}:
// work completing may mean that our queue will provide new
// work to be done.
default:
}
},
}, nil
}
}
......@@ -185,7 +193,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
defer e.lock.Unlock()
if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
log.Debug("received empty message from", p)
log.Debugf("received empty message from %s", p)
}
newWorkExists := false
......@@ -202,11 +210,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debug("cancel", entry.Key)
log.Debugf("cancel %s", entry.Key)
l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p)
} else {
log.Debug("wants", entry.Key, entry.Priority)
log.Debugf("wants %s - %d", entry.Key, entry.Priority)
l.Wants(entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p)
......@@ -216,7 +224,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
}
for _, block := range m.Blocks() {
log.Debug("got block %s %d bytes", block.Key(), len(block.Data))
log.Debugf("got block %s %d bytes", block.Key(), len(block.Data))
l.ReceivedBytes(len(block.Data))
for _, l := range e.ledgerMap {
if entry, ok := l.WantListContains(block.Key()); ok {
......
......@@ -41,7 +41,7 @@ func TestConsistentAccounting(t *testing.T) {
// Send messages from Ernie to Bert
for i := 0; i < 1000; i++ {
m := message.New()
m := message.New(false)
content := []string{"this", "is", "message", "i"}
m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
......@@ -73,7 +73,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
sanfrancisco := newEngine(ctx, "sf")
seattle := newEngine(ctx, "sea")
m := message.New()
m := message.New(true)
sanfrancisco.Engine.MessageSent(seattle.Peer, m)
seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
......@@ -164,7 +164,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
}
func partnerWants(e *Engine, keys []string, partner peer.ID) {
add := message.New()
add := message.New(false)
for i, letter := range keys {
block := blocks.NewBlock([]byte(letter))
add.AddEntry(block.Key(), math.MaxInt32-i)
......@@ -173,7 +173,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) {
}
func partnerCancels(e *Engine, keys []string, partner peer.ID) {
cancels := message.New()
cancels := message.New(false)
for _, k := range keys {
block := blocks.NewBlock([]byte(k))
cancels.Cancel(block.Key())
......@@ -185,7 +185,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
for _, k := range keys {
next := <-e.Outbox()
envelope := <-next
received := envelope.Message.Blocks()[0]
received := envelope.Block
expected := blocks.NewBlock([]byte(k))
if received.Key() != expected.Key() {
return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data)))
......
......@@ -51,12 +51,6 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
tl.partners[to] = partner
}
if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
task.Entry.Priority = entry.Priority
partner.taskQueue.Update(task.index)
return
}
partner.activelk.Lock()
defer partner.activelk.Unlock()
_, ok = partner.activeBlocks[entry.Key]
......@@ -64,13 +58,19 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
return
}
if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
task.Entry.Priority = entry.Priority
partner.taskQueue.Update(task.index)
return
}
task := &peerRequestTask{
Entry: entry,
Target: to,
created: time.Now(),
Done: func() {
partner.TaskDone(entry.Key)
tl.lock.Lock()
partner.TaskDone(entry.Key)
tl.pQueue.Update(partner.Index())
tl.lock.Unlock()
},
......@@ -156,7 +156,7 @@ func (t *peerRequestTask) SetIndex(i int) {
// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.ID, k u.Key) string {
return string(p.String() + k.String())
return string(p) + string(k)
}
// FIFO is a basic task comparator that returns tasks in the order created.
......@@ -220,6 +220,12 @@ func partnerCompare(a, b pq.Elem) bool {
if pb.requests == 0 {
return true
}
if pa.active == pb.active {
// sorting by taskQueue.Len() aids in cleaning out trash entries faster
// if we sorted instead by requests, one peer could potentially build up
// a huge number of cancelled entries in the queue resulting in a memory leak
return pa.taskQueue.Len() > pb.taskQueue.Len()
}
return pa.active < pb.active
}
......
......@@ -29,12 +29,9 @@ type BitSwapMessage interface {
Cancel(key u.Key)
// Sets whether or not the contained wantlist represents the entire wantlist
// true = full wantlist
// false = wantlist 'patch'
// default: true
SetFull(isFull bool)
Empty() bool
// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Full() bool
AddBlock(*blocks.Block)
......@@ -51,18 +48,18 @@ type Exportable interface {
type impl struct {
full bool
wantlist map[u.Key]Entry
blocks map[u.Key]*blocks.Block // map to detect duplicates
blocks map[u.Key]*blocks.Block
}
func New() BitSwapMessage {
return newMsg()
func New(full bool) BitSwapMessage {
return newMsg(full)
}
func newMsg() *impl {
func newMsg(full bool) *impl {
return &impl{
blocks: make(map[u.Key]*blocks.Block),
wantlist: make(map[u.Key]Entry),
full: true,
full: full,
}
}
......@@ -72,8 +69,7 @@ type Entry struct {
}
func newMessageFromProto(pbm pb.Message) BitSwapMessage {
m := newMsg()
m.SetFull(pbm.GetWantlist().GetFull())
m := newMsg(pbm.GetWantlist().GetFull())
for _, e := range pbm.GetWantlist().GetEntries() {
m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
}
......@@ -84,14 +80,14 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage {
return m
}
func (m *impl) SetFull(full bool) {
m.full = full
}
func (m *impl) Full() bool {
return m.full
}
func (m *impl) Empty() bool {
return len(m.blocks) == 0 && len(m.wantlist) == 0
}
func (m *impl) Wantlist() []Entry {
var out []Entry
for _, e := range m.wantlist {
......@@ -101,7 +97,7 @@ func (m *impl) Wantlist() []Entry {
}
func (m *impl) Blocks() []*blocks.Block {
bs := make([]*blocks.Block, 0)
bs := make([]*blocks.Block, 0, len(m.blocks))
for _, block := range m.blocks {
bs = append(bs, block)
}
......@@ -109,6 +105,7 @@ func (m *impl) Blocks() []*blocks.Block {
}
func (m *impl) Cancel(k u.Key) {
delete(m.wantlist, k)
m.addEntry(k, 0, true)
}
......@@ -155,7 +152,7 @@ func (m *impl) ToProto() *pb.Message {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
Block: proto.String(string(e.Key)),
Priority: proto.Int32(int32(e.Priority)),
Cancel: &e.Cancel,
Cancel: proto.Bool(e.Cancel),
})
}
for _, b := range m.Blocks() {
......
......@@ -13,7 +13,7 @@ import (
func TestAppendWanted(t *testing.T) {
const str = "foo"
m := New()
m := New(true)
m.AddEntry(u.Key(str), 1)
if !wantlistContains(m.ToProto().GetWantlist(), str) {
......@@ -44,7 +44,7 @@ func TestAppendBlock(t *testing.T) {
strs = append(strs, "Celeritas")
strs = append(strs, "Incendia")
m := New()
m := New(true)
for _, str := range strs {
block := blocks.NewBlock([]byte(str))
m.AddBlock(block)
......@@ -61,7 +61,7 @@ func TestAppendBlock(t *testing.T) {
func TestWantlist(t *testing.T) {
keystrs := []string{"foo", "bar", "baz", "bat"}
m := New()
m := New(true)
for _, s := range keystrs {
m.AddEntry(u.Key(s), 1)
}
......@@ -84,7 +84,7 @@ func TestWantlist(t *testing.T) {
func TestCopyProtoByValue(t *testing.T) {
const str = "foo"
m := New()
m := New(true)
protoBeforeAppend := m.ToProto()
m.AddEntry(u.Key(str), 1)
if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
......@@ -93,7 +93,7 @@ func TestCopyProtoByValue(t *testing.T) {
}
func TestToNetFromNetPreservesWantList(t *testing.T) {
original := New()
original := New(true)
original.AddEntry(u.Key("M"), 1)
original.AddEntry(u.Key("B"), 1)
original.AddEntry(u.Key("D"), 1)
......@@ -124,7 +124,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
func TestToAndFromNetMessage(t *testing.T) {
original := New()
original := New(true)
original.AddBlock(blocks.NewBlock([]byte("W")))
original.AddBlock(blocks.NewBlock([]byte("E")))
original.AddBlock(blocks.NewBlock([]byte("F")))
......@@ -172,7 +172,7 @@ func contains(strs []string, x string) bool {
func TestDuplicates(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
msg := New()
msg := New(true)
msg.AddEntry(b.Key(), 1)
msg.AddEntry(b.Key(), 1)
......
......@@ -23,6 +23,8 @@ type BitSwapNetwork interface {
// network.
SetDelegate(Receiver)
ConnectTo(context.Context, peer.ID) error
Routing
}
......@@ -31,7 +33,7 @@ type Receiver interface {
ReceiveMessage(
ctx context.Context,
sender peer.ID,
incoming bsmsg.BitSwapMessage) error
incoming bsmsg.BitSwapMessage)
ReceiveError(error)
......
......@@ -97,6 +97,10 @@ func (bsnet *impl) SetDelegate(r Receiver) {
bsnet.receiver = r
}
func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p})
}
// FindProvidersAsync returns a channel of providers for the given key
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
......
......@@ -17,8 +17,10 @@ func (bs *Bitswap) Stat() (*Stat, error) {
st := new(Stat)
st.ProvideBufLen = len(bs.newBlocks)
st.Wantlist = bs.GetWantlist()
bs.counterLk.Lock()
st.BlocksReceived = bs.blocksRecvd
st.DupBlksReceived = bs.dupBlocksRecvd
bs.counterLk.Unlock()
for _, p := range bs.engine.Peers() {
st.Peers = append(st.Peers, p.Pretty())
......
......@@ -29,19 +29,17 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
responder.SetDelegate(lambda(func(
ctx context.Context,
fromWaiter peer.ID,
msgFromWaiter bsmsg.BitSwapMessage) error {
msgFromWaiter bsmsg.BitSwapMessage) {
msgToWaiter := bsmsg.New()
msgToWaiter := bsmsg.New(true)
msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
waiter.SendMessage(ctx, fromWaiter, msgToWaiter)
return nil
}))
waiter.SetDelegate(lambda(func(
ctx context.Context,
fromResponder peer.ID,
msgFromResponder bsmsg.BitSwapMessage) error {
msgFromResponder bsmsg.BitSwapMessage) {
// TODO assert that this came from the correct peer and that the message contents are as expected
ok := false
......@@ -54,12 +52,10 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
if !ok {
t.Fatal("Message not received from the responder")
}
return nil
}))
messageSentAsync := bsmsg.New()
messageSentAsync := bsmsg.New(true)
messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
errSending := waiter.SendMessage(
context.Background(), responderPeer.ID(), messageSentAsync)
......@@ -71,7 +67,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
}
type receiverFunc func(ctx context.Context, p peer.ID,
incoming bsmsg.BitSwapMessage) error
incoming bsmsg.BitSwapMessage)
// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
......@@ -81,12 +77,12 @@ func lambda(f receiverFunc) bsnet.Receiver {
}
type lambdaImpl struct {
f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error
f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage)
}
func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
p peer.ID, incoming bsmsg.BitSwapMessage) error {
return lam.f(ctx, p, incoming)
p peer.ID, incoming bsmsg.BitSwapMessage) {
lam.f(ctx, p, incoming)
}
func (lam *lambdaImpl) ReceiveError(err error) {
......
......@@ -72,7 +72,8 @@ func (n *network) deliver(
n.delay.Wait()
return r.ReceiveMessage(context.TODO(), from, message)
r.ReceiveMessage(context.TODO(), from, message)
return nil
}
type networkClient struct {
......@@ -119,3 +120,12 @@ func (nc *networkClient) Provide(ctx context.Context, k util.Key) error {
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
nc.Receiver = r
}
func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error {
if !nc.network.HasPeer(p) {
return errors.New("no such peer in network")
}
nc.network.clients[p].PeerConnected(nc.local)
nc.Receiver.PeerConnected(p)
return nil
}
......@@ -7,7 +7,6 @@ import (
ds_sync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
peer "github.com/ipfs/go-ipfs/p2p/peer"
p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
......@@ -56,12 +55,18 @@ func (g *SessionGenerator) Instances(n int) []Instance {
inst := g.Next()
instances = append(instances, inst)
}
for i, inst := range instances {
for j := i + 1; j < len(instances); j++ {
oinst := instances[j]
inst.Exchange.PeerConnected(oinst.Peer)
}
}
return instances
}
type Instance struct {
Peer peer.ID
Exchange exchange.Interface
Exchange *Bitswap
blockstore blockstore.Blockstore
blockstoreDelay delay.D
......@@ -94,7 +99,7 @@ func session(ctx context.Context, net tn.Network, p testutil.Identity) Instance
const alwaysSendToPeer = true
bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer)
bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer).(*Bitswap)
return Instance{
Peer: p.ID(),
......
package bitswap
import (
"sync"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/ipfs/go-ipfs/p2p/peer"
u "github.com/ipfs/go-ipfs/util"
)
type WantManager struct {
// sync channels for Run loop
incoming chan []*bsmsg.Entry
connect chan peer.ID // notification channel for new peers connecting
disconnect chan peer.ID // notification channel for peers disconnecting
// synchronized by Run loop, only touch inside there
peers map[peer.ID]*msgQueue
wl *wantlist.Wantlist
network bsnet.BitSwapNetwork
ctx context.Context
}
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
return &WantManager{
incoming: make(chan []*bsmsg.Entry, 10),
connect: make(chan peer.ID, 10),
disconnect: make(chan peer.ID, 10),
peers: make(map[peer.ID]*msgQueue),
wl: wantlist.New(),
network: network,
ctx: ctx,
}
}
type msgPair struct {
to peer.ID
msg bsmsg.BitSwapMessage
}
type cancellation struct {
who peer.ID
blk u.Key
}
type msgQueue struct {
p peer.ID
outlk sync.Mutex
out bsmsg.BitSwapMessage
network bsnet.BitSwapNetwork
work chan struct{}
done chan struct{}
}
func (pm *WantManager) WantBlocks(ks []u.Key) {
log.Infof("want blocks: %s", ks)
pm.addEntries(ks, false)
}
func (pm *WantManager) CancelWants(ks []u.Key) {
pm.addEntries(ks, true)
}
func (pm *WantManager) addEntries(ks []u.Key, cancel bool) {
var entries []*bsmsg.Entry
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: wantlist.Entry{
Key: k,
Priority: kMaxPriority - i,
},
})
}
select {
case pm.incoming <- entries:
case <-pm.ctx.Done():
}
}
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer env.Sent()
msg := bsmsg.New(false)
msg.AddBlock(env.Block)
log.Infof("Sending block %s to %s", env.Peer, env.Block)
err := pm.network.SendMessage(ctx, env.Peer, msg)
if err != nil {
log.Noticef("sendblock error: %s", err)
}
}
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
_, ok := pm.peers[p]
if ok {
// TODO: log an error?
return nil
}
mq := pm.newMsgQueue(p)
// new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New(true)
for _, e := range pm.wl.Entries() {
fullwantlist.AddEntry(e.Key, e.Priority)
}
mq.out = fullwantlist
mq.work <- struct{}{}
pm.peers[p] = mq
go mq.runQueue(pm.ctx)
return mq
}
func (pm *WantManager) stopPeerHandler(p peer.ID) {
pq, ok := pm.peers[p]
if !ok {
// TODO: log error?
return
}
close(pq.done)
delete(pm.peers, p)
}
func (mq *msgQueue) runQueue(ctx context.Context) {
for {
select {
case <-mq.work: // there is work to be done
err := mq.network.ConnectTo(ctx, mq.p)
if err != nil {
log.Errorf("cant connect to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
continue
}
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
if wlm == nil || wlm.Empty() {
mq.outlk.Unlock()
continue
}
mq.out = nil
mq.outlk.Unlock()
// send wantlist updates
err = mq.network.SendMessage(ctx, mq.p, wlm)
if err != nil {
log.Noticef("bitswap send error: %s", err)
// TODO: what do we do if this fails?
}
case <-mq.done:
return
}
}
}
func (pm *WantManager) Connected(p peer.ID) {
pm.connect <- p
}
func (pm *WantManager) Disconnected(p peer.ID) {
pm.disconnect <- p
}
// TODO: use goprocess here once i trust it
func (pm *WantManager) Run() {
tock := time.NewTicker(rebroadcastDelay.Get())
defer tock.Stop()
for {
select {
case entries := <-pm.incoming:
// add changes to our wantlist
for _, e := range entries {
if e.Cancel {
pm.wl.Remove(e.Key)
} else {
pm.wl.Add(e.Key, e.Priority)
}
}
// broadcast those wantlist changes
for _, p := range pm.peers {
p.addMessage(entries)
}
case <-tock.C:
// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
var es []*bsmsg.Entry
for _, e := range pm.wl.Entries() {
es = append(es, &bsmsg.Entry{Entry: e})
}
for _, p := range pm.peers {
p.outlk.Lock()
p.out = bsmsg.New(true)
p.outlk.Unlock()
p.addMessage(es)
}
case p := <-pm.connect:
pm.startPeerHandler(p)
case p := <-pm.disconnect:
pm.stopPeerHandler(p)
case <-pm.ctx.Done():
return
}
}
}
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
mq := new(msgQueue)
mq.done = make(chan struct{})
mq.work = make(chan struct{}, 1)
mq.network = wm.network
mq.p = p
return mq
}
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
mq.outlk.Lock()
defer func() {
mq.outlk.Unlock()
select {
case mq.work <- struct{}{}:
default:
}
}()
// if we have no message held, or the one we are given is full
// overwrite the one we are holding
if mq.out == nil {
mq.out = bsmsg.New(false)
}
// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
for _, e := range entries {
if e.Cancel {
mq.out.Cancel(e.Key)
} else {
mq.out.AddEntry(e.Key, e.Priority)
}
}
}
......@@ -46,6 +46,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
bs.rebroadcastWorker(ctx)
})
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})
......@@ -70,9 +71,8 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
if !ok {
continue
}
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
envelope.Sent()
bs.wm.SendBlock(ctx, envelope)
case <-ctx.Done():
return
}
......@@ -146,30 +146,19 @@ func (bs *Bitswap) clientWorker(parent context.Context) {
log.Warning("Received batch request for zero blocks")
continue
}
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}
done := make(chan struct{})
go func() {
bs.wantNewBlocks(req.ctx, keys)
close(done)
}()
bs.wm.WantBlocks(keys)
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(req.ctx, providers)
if err != nil {
log.Debugf("error sending wantlist: %s", err)
for p := range providers {
go bs.network.ConnectTo(req.ctx, p)
}
cancel()
// Wait for wantNewBlocks to finish
<-done
case <-parent.Done():
return
}
......@@ -180,22 +169,24 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
ctx, cancel := context.WithCancel(parent)
defer cancel()
broadcastSignal := time.After(rebroadcastDelay.Get())
tick := time.Tick(10 * time.Second)
broadcastSignal := time.NewTicker(rebroadcastDelay.Get())
defer broadcastSignal.Stop()
tick := time.NewTicker(10 * time.Second)
defer tick.Stop()
for {
select {
case <-tick:
n := bs.wantlist.Len()
case <-tick.C:
n := bs.wm.wl.Len()
if n > 0 {
log.Debug(n, "keys in bitswap wantlist")
}
case <-broadcastSignal: // resend unfulfilled wantlist keys
entries := bs.wantlist.Entries()
case <-broadcastSignal.C: // resend unfulfilled wantlist keys
entries := bs.wm.wl.Entries()
if len(entries) > 0 {
bs.sendWantlistToProviders(ctx, entries)
bs.connectToProviders(ctx, entries)
}
broadcastSignal = time.After(rebroadcastDelay.Get())
case <-parent.Done():
return
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment