Commit ade18f0c authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

implement bitswap roundWorker

make vendor
parent e92ee20e
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications" notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog" eventlog "github.com/jbenet/go-ipfs/util/eventlog"
...@@ -29,6 +30,8 @@ const maxProvidersPerRequest = 3 ...@@ -29,6 +30,8 @@ const maxProvidersPerRequest = 3
const providerRequestTimeout = time.Second * 10 const providerRequestTimeout = time.Second * 10
const hasBlockTimeout = time.Second * 15 const hasBlockTimeout = time.Second * 15
const roundTime = time.Second / 2
// New initializes a BitSwap instance that communicates over the // New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as // provided BitSwapNetwork. This function registers the returned instance as
// the network delegate. // the network delegate.
...@@ -41,6 +44,7 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout ...@@ -41,6 +44,7 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout
notif := notifications.New() notif := notifications.New()
go func() { go func() {
<-ctx.Done() <-ctx.Done()
cancelFunc()
notif.Shutdown() notif.Shutdown()
}() }()
...@@ -51,11 +55,12 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout ...@@ -51,11 +55,12 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout
strategy: strategy.New(nice), strategy: strategy.New(nice),
routing: routing, routing: routing,
sender: network, sender: network,
wantlist: u.NewKeySet(), wantlist: wl.NewWantlist(),
batchRequests: make(chan []u.Key, 32), batchRequests: make(chan []u.Key, 32),
} }
network.SetDelegate(bs) network.SetDelegate(bs)
go bs.loop(ctx) go bs.loop(ctx)
go bs.roundWorker(ctx)
return bs return bs
} }
...@@ -85,7 +90,7 @@ type bitswap struct { ...@@ -85,7 +90,7 @@ type bitswap struct {
// TODO(brian): save the strategy's state to the datastore // TODO(brian): save the strategy's state to the datastore
strategy strategy.Strategy strategy strategy.Strategy
wantlist u.KeySet wantlist *wl.Wantlist
// cancelFunc signals cancellation to the bitswap event loop // cancelFunc signals cancellation to the bitswap event loop
cancelFunc func() cancelFunc func()
...@@ -166,8 +171,8 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e ...@@ -166,8 +171,8 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
panic("Cant send wantlist to nil peerchan") panic("Cant send wantlist to nil peerchan")
} }
message := bsmsg.New() message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() { for _, wanted := range bs.wantlist.Entries() {
message.AddWanted(wanted) message.AddEntry(wanted.Value, wanted.Priority, false)
} }
for peerToQuery := range peers { for peerToQuery := range peers {
log.Debug("sending query to: %s", peerToQuery) log.Debug("sending query to: %s", peerToQuery)
...@@ -195,9 +200,9 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e ...@@ -195,9 +200,9 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
return nil return nil
} }
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) { func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, k := range ks { for _, e := range wantlist.Entries() {
wg.Add(1) wg.Add(1)
go func(k u.Key) { go func(k u.Key) {
child, _ := context.WithTimeout(ctx, providerRequestTimeout) child, _ := context.WithTimeout(ctx, providerRequestTimeout)
...@@ -208,11 +213,44 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) { ...@@ -208,11 +213,44 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) {
log.Errorf("error sending wantlist: %s", err) log.Errorf("error sending wantlist: %s", err)
} }
wg.Done() wg.Done()
}(k) }(e.Value)
} }
wg.Wait() wg.Wait()
} }
func (bs *bitswap) roundWorker(ctx context.Context) {
roundTicker := time.NewTicker(roundTime)
bandwidthPerRound := 500000
for {
select {
case <-ctx.Done():
return
case <-roundTicker.C:
alloc, err := bs.strategy.GetAllocation(bandwidthPerRound, bs.blockstore)
if err != nil {
log.Critical("%s", err)
}
//log.Errorf("Allocation: %v", alloc)
bs.processStrategyAllocation(ctx, alloc)
}
}
}
func (bs *bitswap) processStrategyAllocation(ctx context.Context, alloc []*strategy.Task) {
for _, t := range alloc {
for _, block := range t.Blocks {
message := bsmsg.New()
message.AddBlock(block)
for _, wanted := range bs.wantlist.Entries() {
message.AddEntry(wanted.Value, wanted.Priority, false)
}
if err := bs.send(ctx, t.Peer, message); err != nil {
log.Errorf("Message Send Failed: %s", err)
}
}
}
}
// TODO ensure only one active request per key // TODO ensure only one active request per key
func (bs *bitswap) loop(parent context.Context) { func (bs *bitswap) loop(parent context.Context) {
...@@ -228,7 +266,7 @@ func (bs *bitswap) loop(parent context.Context) { ...@@ -228,7 +266,7 @@ func (bs *bitswap) loop(parent context.Context) {
select { select {
case <-broadcastSignal.C: case <-broadcastSignal.C:
// Resend unfulfilled wantlist keys // Resend unfulfilled wantlist keys
bs.sendWantlistToProviders(ctx, bs.wantlist.Keys()) bs.sendWantlistToProviders(ctx, bs.wantlist)
case ks := <-bs.batchRequests: case ks := <-bs.batchRequests:
// TODO: implement batching on len(ks) > X for some X // TODO: implement batching on len(ks) > X for some X
// i.e. if given 20 keys, fetch first five, then next // i.e. if given 20 keys, fetch first five, then next
...@@ -239,7 +277,7 @@ func (bs *bitswap) loop(parent context.Context) { ...@@ -239,7 +277,7 @@ func (bs *bitswap) loop(parent context.Context) {
continue continue
} }
for _, k := range ks { for _, k := range ks {
bs.wantlist.Add(k) bs.wantlist.Add(k, 1)
} }
// NB: send want list to providers for the first peer in this list. // NB: send want list to providers for the first peer in this list.
// the assumption is made that the providers of the first key in // the assumption is made that the providers of the first key in
...@@ -277,45 +315,41 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm ...@@ -277,45 +315,41 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
return nil, nil return nil, nil
} }
// Record message bytes in ledger
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger
// This call records changes to wantlists, blocks received, // This call records changes to wantlists, blocks received,
// and number of bytes transfered. // and number of bytes transfered.
bs.strategy.MessageReceived(p, incoming) bs.strategy.MessageReceived(p, incoming)
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger
var blkeys []u.Key
for _, block := range incoming.Blocks() { for _, block := range incoming.Blocks() {
blkeys = append(blkeys, block.Key())
if err := bs.HasBlock(ctx, block); err != nil { if err := bs.HasBlock(ctx, block); err != nil {
log.Error(err) log.Error(err)
} }
} }
if len(blkeys) > 0 {
for _, key := range incoming.Wantlist() { bs.cancelBlocks(ctx, blkeys)
if bs.strategy.ShouldSendBlockToPeer(key, p) {
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue
} else {
// Create a separate message to send this block in
blkmsg := bsmsg.New()
// TODO: only send this the first time
// no sense in sending our wantlist to the
// same peer multiple times
for _, k := range bs.wantlist.Keys() {
blkmsg.AddWanted(k)
}
blkmsg.AddBlock(block)
bs.send(ctx, p, blkmsg)
bs.strategy.BlockSentToPeer(block.Key(), p)
}
}
} }
// TODO: consider changing this function to not return anything // TODO: consider changing this function to not return anything
return nil, nil return nil, nil
} }
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
message := bsmsg.New()
message.SetFull(false)
for _, k := range bkeys {
message.AddEntry(k, 0, true)
}
for _, p := range bs.strategy.Peers() {
err := bs.send(ctx, p, message)
if err != nil {
log.Errorf("Error sending message: %s", err)
}
}
}
func (bs *bitswap) ReceiveError(err error) { func (bs *bitswap) ReceiveError(err error) {
log.Errorf("Bitswap ReceiveError: %s", err) log.Errorf("Bitswap ReceiveError: %s", err)
// TODO log the network error // TODO log the network error
...@@ -337,8 +371,8 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) ...@@ -337,8 +371,8 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block)
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New() message := bsmsg.New()
message.AddBlock(block) message.AddBlock(block)
for _, wanted := range bs.wantlist.Keys() { for _, wanted := range bs.wantlist.Entries() {
message.AddWanted(wanted) message.AddEntry(wanted.Value, wanted.Priority, false)
} }
if err := bs.send(ctx, p, message); err != nil { if err := bs.send(ctx, p, message); err != nil {
return err return err
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
mockrouting "github.com/jbenet/go-ipfs/routing/mock" mockrouting "github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay" delay "github.com/jbenet/go-ipfs/util/delay"
testutil "github.com/jbenet/go-ipfs/util/testutil" testutil "github.com/jbenet/go-ipfs/util/testutil"
) )
...@@ -25,6 +26,7 @@ func TestClose(t *testing.T) { ...@@ -25,6 +26,7 @@ func TestClose(t *testing.T) {
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rout := mockrouting.NewServer() rout := mockrouting.NewServer()
sesgen := NewSessionGenerator(vnet, rout) sesgen := NewSessionGenerator(vnet, rout)
defer sesgen.Stop()
bgen := blocksutil.NewBlockGenerator() bgen := blocksutil.NewBlockGenerator()
block := bgen.Next() block := bgen.Next()
...@@ -39,6 +41,7 @@ func TestGetBlockTimeout(t *testing.T) { ...@@ -39,6 +41,7 @@ func TestGetBlockTimeout(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer() rs := mockrouting.NewServer()
g := NewSessionGenerator(net, rs) g := NewSessionGenerator(net, rs)
defer g.Stop()
self := g.Next() self := g.Next()
...@@ -56,11 +59,13 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { ...@@ -56,11 +59,13 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer() rs := mockrouting.NewServer()
g := NewSessionGenerator(net, rs) g := NewSessionGenerator(net, rs)
defer g.Stop()
block := blocks.NewBlock([]byte("block")) block := blocks.NewBlock([]byte("block"))
rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network
solo := g.Next() solo := g.Next()
defer solo.Exchange.Close()
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
_, err := solo.Exchange.GetBlock(ctx, block.Key()) _, err := solo.Exchange.GetBlock(ctx, block.Key())
...@@ -78,8 +83,10 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { ...@@ -78,8 +83,10 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
rs := mockrouting.NewServer() rs := mockrouting.NewServer()
block := blocks.NewBlock([]byte("block")) block := blocks.NewBlock([]byte("block"))
g := NewSessionGenerator(net, rs) g := NewSessionGenerator(net, rs)
defer g.Stop()
hasBlock := g.Next() hasBlock := g.Next()
defer hasBlock.Exchange.Close()
if err := hasBlock.Blockstore().Put(block); err != nil { if err := hasBlock.Blockstore().Put(block); err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -89,6 +96,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { ...@@ -89,6 +96,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
} }
wantsBlock := g.Next() wantsBlock := g.Next()
defer wantsBlock.Exchange.Close()
ctx, _ := context.WithTimeout(context.Background(), time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Second)
received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key()) received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
...@@ -107,7 +115,7 @@ func TestLargeSwarm(t *testing.T) { ...@@ -107,7 +115,7 @@ func TestLargeSwarm(t *testing.T) {
t.SkipNow() t.SkipNow()
} }
t.Parallel() t.Parallel()
numInstances := 5 numInstances := 500
numBlocks := 2 numBlocks := 2
PerformDistributionTest(t, numInstances, numBlocks) PerformDistributionTest(t, numInstances, numBlocks)
} }
...@@ -129,6 +137,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -129,6 +137,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer() rs := mockrouting.NewServer()
sg := NewSessionGenerator(net, rs) sg := NewSessionGenerator(net, rs)
defer sg.Stop()
bg := blocksutil.NewBlockGenerator() bg := blocksutil.NewBlockGenerator()
t.Log("Test a few nodes trying to get one file with a lot of blocks") t.Log("Test a few nodes trying to get one file with a lot of blocks")
...@@ -138,24 +147,29 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -138,24 +147,29 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.Log("Give the blocks to the first instance") t.Log("Give the blocks to the first instance")
var blkeys []u.Key
first := instances[0] first := instances[0]
for _, b := range blocks { for _, b := range blocks {
first.Blockstore().Put(b) first.Blockstore().Put(b)
blkeys = append(blkeys, b.Key())
first.Exchange.HasBlock(context.Background(), b) first.Exchange.HasBlock(context.Background(), b)
rs.Client(first.Peer).Provide(context.Background(), b.Key()) rs.Client(first.Peer).Provide(context.Background(), b.Key())
} }
t.Log("Distribute!") t.Log("Distribute!")
var wg sync.WaitGroup wg := sync.WaitGroup{}
for _, inst := range instances { for _, inst := range instances {
for _, b := range blocks { wg.Add(1)
wg.Add(1) go func(inst Instance) {
// NB: executing getOrFail concurrently puts tremendous pressure on defer wg.Done()
// the goroutine scheduler outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
getOrFail(inst, b, t, &wg) if err != nil {
} t.Fatal(err)
}
for _ = range outch {
}
}(inst)
} }
wg.Wait() wg.Wait()
...@@ -189,6 +203,7 @@ func TestSendToWantingPeer(t *testing.T) { ...@@ -189,6 +203,7 @@ func TestSendToWantingPeer(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer() rs := mockrouting.NewServer()
sg := NewSessionGenerator(net, rs) sg := NewSessionGenerator(net, rs)
defer sg.Stop()
bg := blocksutil.NewBlockGenerator() bg := blocksutil.NewBlockGenerator()
me := sg.Next() me := sg.Next()
...@@ -201,7 +216,7 @@ func TestSendToWantingPeer(t *testing.T) { ...@@ -201,7 +216,7 @@ func TestSendToWantingPeer(t *testing.T) {
alpha := bg.Next() alpha := bg.Next()
const timeout = 100 * time.Millisecond // FIXME don't depend on time const timeout = 1000 * time.Millisecond // FIXME don't depend on time
t.Logf("Peer %v attempts to get %v. NB: not available\n", w.Peer, alpha.Key()) t.Logf("Peer %v attempts to get %v. NB: not available\n", w.Peer, alpha.Key())
ctx, _ := context.WithTimeout(context.Background(), timeout) ctx, _ := context.WithTimeout(context.Background(), timeout)
...@@ -246,3 +261,33 @@ func TestSendToWantingPeer(t *testing.T) { ...@@ -246,3 +261,33 @@ func TestSendToWantingPeer(t *testing.T) {
t.Fatal("Expected to receive alpha from me") t.Fatal("Expected to receive alpha from me")
} }
} }
func TestBasicBitswap(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer()
sg := NewSessionGenerator(net, rs)
bg := blocksutil.NewBlockGenerator()
t.Log("Test a few nodes trying to get one file with a lot of blocks")
instances := sg.Instances(2)
blocks := bg.Blocks(1)
err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
if err != nil {
t.Fatal(err)
}
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
if err != nil {
t.Fatal(err)
}
t.Log(blk)
for _, inst := range instances {
err := inst.Exchange.Close()
if err != nil {
t.Fatal(err)
}
}
}
...@@ -21,16 +21,16 @@ var _ = proto.Marshal ...@@ -21,16 +21,16 @@ var _ = proto.Marshal
var _ = math.Inf var _ = math.Inf
type Message struct { type Message struct {
Wantlist []string `protobuf:"bytes,1,rep,name=wantlist" json:"wantlist,omitempty"` Wantlist *Message_Wantlist `protobuf:"bytes,1,opt,name=wantlist" json:"wantlist,omitempty"`
Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"` Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
} }
func (m *Message) Reset() { *m = Message{} } func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) } func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {} func (*Message) ProtoMessage() {}
func (m *Message) GetWantlist() []string { func (m *Message) GetWantlist() *Message_Wantlist {
if m != nil { if m != nil {
return m.Wantlist return m.Wantlist
} }
...@@ -44,5 +44,61 @@ func (m *Message) GetBlocks() [][]byte { ...@@ -44,5 +44,61 @@ func (m *Message) GetBlocks() [][]byte {
return nil return nil
} }
type Message_Wantlist struct {
Entries []*Message_Wantlist_Entry `protobuf:"bytes,1,rep,name=entries" json:"entries,omitempty"`
Full *bool `protobuf:"varint,2,opt,name=full" json:"full,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Message_Wantlist) Reset() { *m = Message_Wantlist{} }
func (m *Message_Wantlist) String() string { return proto.CompactTextString(m) }
func (*Message_Wantlist) ProtoMessage() {}
func (m *Message_Wantlist) GetEntries() []*Message_Wantlist_Entry {
if m != nil {
return m.Entries
}
return nil
}
func (m *Message_Wantlist) GetFull() bool {
if m != nil && m.Full != nil {
return *m.Full
}
return false
}
type Message_Wantlist_Entry struct {
Block *string `protobuf:"bytes,1,opt,name=block" json:"block,omitempty"`
Priority *int32 `protobuf:"varint,2,opt,name=priority" json:"priority,omitempty"`
Cancel *bool `protobuf:"varint,3,opt,name=cancel" json:"cancel,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Message_Wantlist_Entry) Reset() { *m = Message_Wantlist_Entry{} }
func (m *Message_Wantlist_Entry) String() string { return proto.CompactTextString(m) }
func (*Message_Wantlist_Entry) ProtoMessage() {}
func (m *Message_Wantlist_Entry) GetBlock() string {
if m != nil && m.Block != nil {
return *m.Block
}
return ""
}
func (m *Message_Wantlist_Entry) GetPriority() int32 {
if m != nil && m.Priority != nil {
return *m.Priority
}
return 0
}
func (m *Message_Wantlist_Entry) GetCancel() bool {
if m != nil && m.Cancel != nil {
return *m.Cancel
}
return false
}
func init() { func init() {
} }
package bitswap.message.pb; package bitswap.message.pb;
message Message { message Message {
repeated string wantlist = 1;
repeated bytes blocks = 2; message Wantlist {
message Entry {
optional string block = 1; // the block key
optional int32 priority = 2; // the priority (normalized). default to 1
optional bool cancel = 3; // whether this revokes an entry
}
repeated Entry entries = 1; // a list of wantlist entries
optional bool full = 2; // whether this is the full wantlist. default to false
}
optional Wantlist wantlist = 1;
repeated bytes blocks = 2;
} }
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
) )
// TODO move message.go into the bitswap package // TODO move message.go into the bitswap package
...@@ -17,21 +18,21 @@ import ( ...@@ -17,21 +18,21 @@ import (
type BitSwapMessage interface { type BitSwapMessage interface {
// Wantlist returns a slice of unique keys that represent data wanted by // Wantlist returns a slice of unique keys that represent data wanted by
// the sender. // the sender.
Wantlist() []u.Key Wantlist() []*Entry
// Blocks returns a slice of unique blocks // Blocks returns a slice of unique blocks
Blocks() []*blocks.Block Blocks() []*blocks.Block
// AddWanted adds the key to the Wantlist. // AddEntry adds an entry to the Wantlist.
// AddEntry(u.Key, int, bool)
// Insertion order determines priority. That is, earlier insertions are
// deemed higher priority than keys inserted later. // Sets whether or not the contained wantlist represents the entire wantlist
// // true = full wantlist
// t = 0, msg.AddWanted(A) // false = wantlist 'patch'
// t = 1, msg.AddWanted(B) // default: true
// SetFull(bool)
// implies Priority(A) > Priority(B)
AddWanted(u.Key) Full() bool
AddBlock(*blocks.Block) AddBlock(*blocks.Block)
Exportable Exportable
...@@ -43,23 +44,30 @@ type Exportable interface { ...@@ -43,23 +44,30 @@ type Exportable interface {
} }
type impl struct { type impl struct {
existsInWantlist map[u.Key]struct{} // map to detect duplicates full bool
wantlist []u.Key // slice to preserve ordering wantlist map[u.Key]*Entry
blocks map[u.Key]*blocks.Block // map to detect duplicates blocks map[u.Key]*blocks.Block // map to detect duplicates
} }
func New() BitSwapMessage { func New() BitSwapMessage {
return &impl{ return &impl{
blocks: make(map[u.Key]*blocks.Block), blocks: make(map[u.Key]*blocks.Block),
existsInWantlist: make(map[u.Key]struct{}), wantlist: make(map[u.Key]*Entry),
wantlist: make([]u.Key, 0), full: true,
} }
} }
type Entry struct {
Key u.Key
Priority int
Cancel bool
}
func newMessageFromProto(pbm pb.Message) BitSwapMessage { func newMessageFromProto(pbm pb.Message) BitSwapMessage {
m := New() m := New()
for _, s := range pbm.GetWantlist() { m.SetFull(pbm.GetWantlist().GetFull())
m.AddWanted(u.Key(s)) for _, e := range pbm.GetWantlist().GetEntries() {
m.AddEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
} }
for _, d := range pbm.GetBlocks() { for _, d := range pbm.GetBlocks() {
b := blocks.NewBlock(d) b := blocks.NewBlock(d)
...@@ -68,8 +76,20 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage { ...@@ -68,8 +76,20 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage {
return m return m
} }
func (m *impl) Wantlist() []u.Key { func (m *impl) SetFull(full bool) {
return m.wantlist m.full = full
}
func (m *impl) Full() bool {
return m.full
}
func (m *impl) Wantlist() []*Entry {
var out []*Entry
for _, e := range m.wantlist {
out = append(out, e)
}
return out
} }
func (m *impl) Blocks() []*blocks.Block { func (m *impl) Blocks() []*blocks.Block {
...@@ -80,13 +100,18 @@ func (m *impl) Blocks() []*blocks.Block { ...@@ -80,13 +100,18 @@ func (m *impl) Blocks() []*blocks.Block {
return bs return bs
} }
func (m *impl) AddWanted(k u.Key) { func (m *impl) AddEntry(k u.Key, priority int, cancel bool) {
_, exists := m.existsInWantlist[k] e, exists := m.wantlist[k]
if exists { if exists {
return e.Priority = priority
e.Cancel = cancel
} else {
m.wantlist[k] = &Entry{
Key: k,
Priority: priority,
Cancel: cancel,
}
} }
m.existsInWantlist[k] = struct{}{}
m.wantlist = append(m.wantlist, k)
} }
func (m *impl) AddBlock(b *blocks.Block) { func (m *impl) AddBlock(b *blocks.Block) {
...@@ -106,14 +131,19 @@ func FromNet(r io.Reader) (BitSwapMessage, error) { ...@@ -106,14 +131,19 @@ func FromNet(r io.Reader) (BitSwapMessage, error) {
} }
func (m *impl) ToProto() *pb.Message { func (m *impl) ToProto() *pb.Message {
pb := new(pb.Message) pbm := new(pb.Message)
for _, k := range m.Wantlist() { pbm.Wantlist = new(pb.Message_Wantlist)
pb.Wantlist = append(pb.Wantlist, string(k)) for _, e := range m.wantlist {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
Block: proto.String(string(e.Key)),
Priority: proto.Int32(int32(e.Priority)),
Cancel: &e.Cancel,
})
} }
for _, b := range m.Blocks() { for _, b := range m.Blocks() {
pb.Blocks = append(pb.Blocks, b.Data) pbm.Blocks = append(pbm.Blocks, b.Data)
} }
return pb return pbm
} }
func (m *impl) ToNet(w io.Writer) error { func (m *impl) ToNet(w io.Writer) error {
......
...@@ -4,6 +4,8 @@ import ( ...@@ -4,6 +4,8 @@ import (
"bytes" "bytes"
"testing" "testing"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb" pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
...@@ -12,22 +14,26 @@ import ( ...@@ -12,22 +14,26 @@ import (
func TestAppendWanted(t *testing.T) { func TestAppendWanted(t *testing.T) {
const str = "foo" const str = "foo"
m := New() m := New()
m.AddWanted(u.Key(str)) m.AddEntry(u.Key(str), 1, false)
if !contains(m.ToProto().GetWantlist(), str) { if !wantlistContains(m.ToProto().GetWantlist(), str) {
t.Fail() t.Fail()
} }
m.ToProto().GetWantlist().GetEntries()
} }
func TestNewMessageFromProto(t *testing.T) { func TestNewMessageFromProto(t *testing.T) {
const str = "a_key" const str = "a_key"
protoMessage := new(pb.Message) protoMessage := new(pb.Message)
protoMessage.Wantlist = []string{string(str)} protoMessage.Wantlist = new(pb.Message_Wantlist)
if !contains(protoMessage.Wantlist, str) { protoMessage.Wantlist.Entries = []*pb.Message_Wantlist_Entry{
&pb.Message_Wantlist_Entry{Block: proto.String(str)},
}
if !wantlistContains(protoMessage.Wantlist, str) {
t.Fail() t.Fail()
} }
m := newMessageFromProto(*protoMessage) m := newMessageFromProto(*protoMessage)
if !contains(m.ToProto().GetWantlist(), str) { if !wantlistContains(m.ToProto().GetWantlist(), str) {
t.Fail() t.Fail()
} }
} }
...@@ -57,7 +63,7 @@ func TestWantlist(t *testing.T) { ...@@ -57,7 +63,7 @@ func TestWantlist(t *testing.T) {
keystrs := []string{"foo", "bar", "baz", "bat"} keystrs := []string{"foo", "bar", "baz", "bat"}
m := New() m := New()
for _, s := range keystrs { for _, s := range keystrs {
m.AddWanted(u.Key(s)) m.AddEntry(u.Key(s), 1, false)
} }
exported := m.Wantlist() exported := m.Wantlist()
...@@ -65,12 +71,12 @@ func TestWantlist(t *testing.T) { ...@@ -65,12 +71,12 @@ func TestWantlist(t *testing.T) {
present := false present := false
for _, s := range keystrs { for _, s := range keystrs {
if s == string(k) { if s == string(k.Key) {
present = true present = true
} }
} }
if !present { if !present {
t.Logf("%v isn't in original list", string(k)) t.Logf("%v isn't in original list", k.Key)
t.Fail() t.Fail()
} }
} }
...@@ -80,19 +86,19 @@ func TestCopyProtoByValue(t *testing.T) { ...@@ -80,19 +86,19 @@ func TestCopyProtoByValue(t *testing.T) {
const str = "foo" const str = "foo"
m := New() m := New()
protoBeforeAppend := m.ToProto() protoBeforeAppend := m.ToProto()
m.AddWanted(u.Key(str)) m.AddEntry(u.Key(str), 1, false)
if contains(protoBeforeAppend.GetWantlist(), str) { if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
t.Fail() t.Fail()
} }
} }
func TestToNetFromNetPreservesWantList(t *testing.T) { func TestToNetFromNetPreservesWantList(t *testing.T) {
original := New() original := New()
original.AddWanted(u.Key("M")) original.AddEntry(u.Key("M"), 1, false)
original.AddWanted(u.Key("B")) original.AddEntry(u.Key("B"), 1, false)
original.AddWanted(u.Key("D")) original.AddEntry(u.Key("D"), 1, false)
original.AddWanted(u.Key("T")) original.AddEntry(u.Key("T"), 1, false)
original.AddWanted(u.Key("F")) original.AddEntry(u.Key("F"), 1, false)
var buf bytes.Buffer var buf bytes.Buffer
if err := original.ToNet(&buf); err != nil { if err := original.ToNet(&buf); err != nil {
...@@ -106,11 +112,11 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { ...@@ -106,11 +112,11 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
keys := make(map[u.Key]bool) keys := make(map[u.Key]bool)
for _, k := range copied.Wantlist() { for _, k := range copied.Wantlist() {
keys[k] = true keys[k.Key] = true
} }
for _, k := range original.Wantlist() { for _, k := range original.Wantlist() {
if _, ok := keys[k]; !ok { if _, ok := keys[k.Key]; !ok {
t.Fatalf("Key Missing: \"%v\"", k) t.Fatalf("Key Missing: \"%v\"", k)
} }
} }
...@@ -146,9 +152,18 @@ func TestToAndFromNetMessage(t *testing.T) { ...@@ -146,9 +152,18 @@ func TestToAndFromNetMessage(t *testing.T) {
} }
} }
func contains(s []string, x string) bool { func wantlistContains(wantlist *pb.Message_Wantlist, x string) bool {
for _, a := range s { for _, e := range wantlist.GetEntries() {
if a == x { if e.GetBlock() == x {
return true
}
}
return false
}
func contains(strs []string, x string) bool {
for _, s := range strs {
if s == x {
return true return true
} }
} }
...@@ -159,8 +174,8 @@ func TestDuplicates(t *testing.T) { ...@@ -159,8 +174,8 @@ func TestDuplicates(t *testing.T) {
b := blocks.NewBlock([]byte("foo")) b := blocks.NewBlock([]byte("foo"))
msg := New() msg := New()
msg.AddWanted(b.Key()) msg.AddEntry(b.Key(), 1, false)
msg.AddWanted(b.Key()) msg.AddEntry(b.Key(), 1, false)
if len(msg.Wantlist()) != 1 { if len(msg.Wantlist()) != 1 {
t.Fatal("Duplicate in BitSwapMessage") t.Fatal("Duplicate in BitSwapMessage")
} }
......
...@@ -3,6 +3,7 @@ package strategy ...@@ -3,6 +3,7 @@ package strategy
import ( import (
"time" "time"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
...@@ -34,6 +35,8 @@ type Strategy interface { ...@@ -34,6 +35,8 @@ type Strategy interface {
BlockSentToPeer(u.Key, peer.Peer) BlockSentToPeer(u.Key, peer.Peer)
GetAllocation(int, bstore.Blockstore) ([]*Task, error)
// Values determining bitswap behavioural patterns // Values determining bitswap behavioural patterns
GetBatchSize() int GetBatchSize() int
GetRebroadcastDelay() time.Duration GetRebroadcastDelay() time.Duration
......
...@@ -3,6 +3,7 @@ package strategy ...@@ -3,6 +3,7 @@ package strategy
import ( import (
"time" "time"
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
...@@ -13,7 +14,7 @@ type keySet map[u.Key]struct{} ...@@ -13,7 +14,7 @@ type keySet map[u.Key]struct{}
func newLedger(p peer.Peer, strategy strategyFunc) *ledger { func newLedger(p peer.Peer, strategy strategyFunc) *ledger {
return &ledger{ return &ledger{
wantList: keySet{}, wantList: wl.NewWantlist(),
Strategy: strategy, Strategy: strategy,
Partner: p, Partner: p,
sentToPeer: make(map[u.Key]time.Time), sentToPeer: make(map[u.Key]time.Time),
...@@ -39,7 +40,7 @@ type ledger struct { ...@@ -39,7 +40,7 @@ type ledger struct {
exchangeCount uint64 exchangeCount uint64
// wantList is a (bounded, small) set of keys that Partner desires. // wantList is a (bounded, small) set of keys that Partner desires.
wantList keySet wantList *wl.Wantlist
// sentToPeer is a set of keys to ensure we dont send duplicate blocks // sentToPeer is a set of keys to ensure we dont send duplicate blocks
// to a given peer // to a given peer
...@@ -65,14 +66,17 @@ func (l *ledger) ReceivedBytes(n int) { ...@@ -65,14 +66,17 @@ func (l *ledger) ReceivedBytes(n int) {
} }
// TODO: this needs to be different. We need timeouts. // TODO: this needs to be different. We need timeouts.
func (l *ledger) Wants(k u.Key) { func (l *ledger) Wants(k u.Key, priority int) {
log.Debugf("peer %s wants %s", l.Partner, k) log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList[k] = struct{}{} l.wantList.Add(k, priority)
}
func (l *ledger) CancelWant(k u.Key) {
l.wantList.Remove(k)
} }
func (l *ledger) WantListContains(k u.Key) bool { func (l *ledger) WantListContains(k u.Key) bool {
_, ok := l.wantList[k] return l.wantList.Contains(k)
return ok
} }
func (l *ledger) ExchangeCount() uint64 { func (l *ledger) ExchangeCount() uint64 {
......
...@@ -5,7 +5,10 @@ import ( ...@@ -5,7 +5,10 @@ import (
"sync" "sync"
"time" "time"
blocks "github.com/jbenet/go-ipfs/blocks"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
...@@ -77,6 +80,60 @@ func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool { ...@@ -77,6 +80,60 @@ func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool {
return ledger.ShouldSend() return ledger.ShouldSend()
} }
type Task struct {
Peer peer.Peer
Blocks []*blocks.Block
}
func (s *strategist) GetAllocation(bandwidth int, bs bstore.Blockstore) ([]*Task, error) {
var tasks []*Task
s.lock.RLock()
defer s.lock.RUnlock()
var partners []peer.Peer
for _, ledger := range s.ledgerMap {
if ledger.ShouldSend() {
partners = append(partners, ledger.Partner)
}
}
if len(partners) == 0 {
return nil, nil
}
bandwidthPerPeer := bandwidth / len(partners)
for _, p := range partners {
blksForPeer, err := s.getSendableBlocks(s.ledger(p).wantList, bs, bandwidthPerPeer)
if err != nil {
return nil, err
}
tasks = append(tasks, &Task{
Peer: p,
Blocks: blksForPeer,
})
}
return tasks, nil
}
func (s *strategist) getSendableBlocks(wantlist *wl.Wantlist, bs bstore.Blockstore, bw int) ([]*blocks.Block, error) {
var outblocks []*blocks.Block
for _, e := range wantlist.Entries() {
block, err := bs.Get(e.Value)
if err == u.ErrNotFound {
continue
}
if err != nil {
return nil, err
}
outblocks = append(outblocks, block)
bw -= len(block.Data)
if bw <= 0 {
break
}
}
return outblocks, nil
}
func (s *strategist) BlockSentToPeer(k u.Key, p peer.Peer) { func (s *strategist) BlockSentToPeer(k u.Key, p peer.Peer) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
...@@ -106,8 +163,15 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error ...@@ -106,8 +163,15 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error
return errors.New("Strategy received nil message") return errors.New("Strategy received nil message")
} }
l := s.ledger(p) l := s.ledger(p)
for _, key := range m.Wantlist() { if m.Full() {
l.Wants(key) l.wantList = wl.NewWantlist()
}
for _, e := range m.Wantlist() {
if e.Cancel {
l.CancelWant(e.Key)
} else {
l.Wants(e.Key, e.Priority)
}
} }
for _, block := range m.Blocks() { for _, block := range m.Blocks() {
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method // FIXME extract blocks.NumBytes(block) or block.NumBytes() method
...@@ -165,5 +229,5 @@ func (s *strategist) GetBatchSize() int { ...@@ -165,5 +229,5 @@ func (s *strategist) GetBatchSize() int {
} }
func (s *strategist) GetRebroadcastDelay() time.Duration { func (s *strategist) GetRebroadcastDelay() time.Duration {
return time.Second * 5 return time.Second * 10
} }
...@@ -61,7 +61,7 @@ func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) { ...@@ -61,7 +61,7 @@ func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) {
block := blocks.NewBlock([]byte("data wanted by beggar")) block := blocks.NewBlock([]byte("data wanted by beggar"))
messageFromBeggarToChooser := message.New() messageFromBeggarToChooser := message.New()
messageFromBeggarToChooser.AddWanted(block.Key()) messageFromBeggarToChooser.AddEntry(block.Key(), 1, false)
chooser.MessageReceived(beggar.Peer, messageFromBeggarToChooser) chooser.MessageReceived(beggar.Peer, messageFromBeggarToChooser)
// for this test, doesn't matter if you record that beggar sent // for this test, doesn't matter if you record that beggar sent
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment