Commit fefe7d37 authored by Jeromy's avatar Jeromy

beginnings of a bitswap refactor

parent b0e60a69
......@@ -42,8 +42,10 @@ func New(ctx context.Context, p peer.Peer,
routing: routing,
sender: network,
wantlist: u.NewKeySet(),
blockReq: make(chan u.Key, 32),
}
network.SetDelegate(bs)
go bs.run(ctx)
return bs
}
......@@ -63,6 +65,8 @@ type bitswap struct {
notifications notifications.PubSub
blockReq chan u.Key
// strategy listens to network traffic and makes decisions about how to
// interact with partners.
// TODO(brian): save the strategy's state to the datastore
......@@ -75,7 +79,7 @@ type bitswap struct {
// deadline enforced by the context
//
// TODO ensure only one active request per key
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
log.Debugf("Get Block %v", k)
now := time.Now()
defer func() {
......@@ -88,42 +92,11 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
bs.wantlist.Add(k)
promise := bs.notifications.Subscribe(ctx, k)
const maxProviders = 20
peersToQuery := bs.routing.FindProvidersAsync(ctx, k, maxProviders)
go func() {
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AddWanted(wanted)
}
for peerToQuery := range peersToQuery {
log.Debugf("bitswap got peersToQuery: %s", peerToQuery)
go func(p peer.Peer) {
log.Debugf("bitswap dialing peer: %s", p)
err := bs.sender.DialPeer(ctx, p)
if err != nil {
log.Errorf("Error sender.DialPeer(%s)", p)
return
}
response, err := bs.sender.SendRequest(ctx, p, message)
if err != nil {
log.Errorf("Error sender.SendRequest(%s) = %s", p, err)
return
}
// FIXME ensure accounting is handled correctly when
// communication fails. May require slightly different API to
// get better guarantees. May need shared sequence numbers.
bs.strategy.MessageSent(p, message)
if response == nil {
return
}
bs.ReceiveMessage(ctx, p, response)
}(peerToQuery)
}
}()
select {
case bs.blockReq <- k:
case <-parent.Done():
return nil, parent.Err()
}
select {
case block := <-promise:
......@@ -134,6 +107,96 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
}
}
func (bs *bitswap) GetBlocks(parent context.Context, ks []u.Key) (*blocks.Block, error) {
// TODO: something smart
return nil, nil
}
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AddWanted(wanted)
}
for peerToQuery := range peers {
log.Debugf("bitswap got peersToQuery: %s", peerToQuery)
go func(p peer.Peer) {
log.Debugf("bitswap dialing peer: %s", p)
err := bs.sender.DialPeer(ctx, p)
if err != nil {
log.Errorf("Error sender.DialPeer(%s)", p)
return
}
response, err := bs.sender.SendRequest(ctx, p, message)
if err != nil {
log.Errorf("Error sender.SendRequest(%s) = %s", p, err)
return
}
// FIXME ensure accounting is handled correctly when
// communication fails. May require slightly different API to
// get better guarantees. May need shared sequence numbers.
bs.strategy.MessageSent(p, message)
if response == nil {
return
}
bs.ReceiveMessage(ctx, p, response)
}(peerToQuery)
}
return nil
}
func (bs *bitswap) run(ctx context.Context) {
var sendlist <-chan peer.Peer
// Every so often, we should resend out our current want list
rebroadcastTime := time.Second * 5
// Time to wait before sending out wantlists to better batch up requests
bufferTime := time.Millisecond * 3
peersPerSend := 6
timeout := time.After(rebroadcastTime)
threshold := 10
unsent := 0
for {
select {
case <-timeout:
if sendlist == nil {
// rely on semi randomness of maps
firstKey := bs.wantlist.Keys()[0]
sendlist = bs.routing.FindProvidersAsync(ctx, firstKey, 6)
}
err := bs.sendWantListTo(ctx, sendlist)
if err != nil {
log.Error("error sending wantlist: %s", err)
}
sendlist = nil
timeout = time.After(rebroadcastTime)
case k := <-bs.blockReq:
if unsent == 0 {
sendlist = bs.routing.FindProvidersAsync(ctx, k, peersPerSend)
}
unsent++
if unsent >= threshold {
// send wantlist to sendlist
bs.sendWantListTo(ctx, sendlist)
unsent = 0
timeout = time.After(rebroadcastTime)
sendlist = nil
} else {
// set a timeout to wait for more blocks or send current wantlist
timeout = time.After(bufferTime)
}
case <-ctx.Done():
return
}
}
}
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
......@@ -192,8 +255,8 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
}
}
}
defer bs.strategy.MessageSent(p, message)
bs.strategy.MessageSent(p, message)
log.Debug("Returning message.")
return p, message
}
......
......@@ -31,7 +31,7 @@ func TestGetBlockTimeout(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
block := blocks.NewBlock([]byte("block"))
_, err := self.exchange.Block(ctx, block.Key())
_, err := self.exchange.GetBlock(ctx, block.Key())
if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error")
......@@ -50,7 +50,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
solo := g.Next()
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
_, err := solo.exchange.Block(ctx, block.Key())
_, err := solo.exchange.GetBlock(ctx, block.Key())
if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error")
......@@ -78,7 +78,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
wantsBlock := g.Next()
ctx, _ := context.WithTimeout(context.Background(), time.Second)
received, err := wantsBlock.exchange.Block(ctx, block.Key())
received, err := wantsBlock.exchange.GetBlock(ctx, block.Key())
if err != nil {
t.Log(err)
t.Fatal("Expected to succeed")
......@@ -100,7 +100,7 @@ func TestSwarm(t *testing.T) {
t.Log("Create a ton of instances, and just a few blocks")
numInstances := 500
numInstances := 5
numBlocks := 2
instances := sg.Instances(numInstances)
......@@ -142,7 +142,7 @@ func TestSwarm(t *testing.T) {
func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) {
if _, err := bitswap.blockstore.Get(b.Key()); err != nil {
_, err := bitswap.exchange.Block(context.Background(), b.Key())
_, err := bitswap.exchange.GetBlock(context.Background(), b.Key())
if err != nil {
t.Fatal(err)
}
......@@ -171,7 +171,7 @@ func TestSendToWantingPeer(t *testing.T) {
t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer, alpha.Key())
ctx, _ := context.WithTimeout(context.Background(), timeout)
_, err := w.exchange.Block(ctx, alpha.Key())
_, err := w.exchange.GetBlock(ctx, alpha.Key())
if err == nil {
t.Fatalf("Expected %v to NOT be available", alpha.Key())
}
......@@ -186,7 +186,7 @@ func TestSendToWantingPeer(t *testing.T) {
t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer, beta.Key(), w.peer, alpha.Key())
ctx, _ = context.WithTimeout(context.Background(), timeout)
if _, err := me.exchange.Block(ctx, beta.Key()); err != nil {
if _, err := me.exchange.GetBlock(ctx, beta.Key()); err != nil {
t.Fatal(err)
}
......@@ -199,7 +199,7 @@ func TestSendToWantingPeer(t *testing.T) {
t.Logf("%v requests %v\n", me.peer, alpha.Key())
ctx, _ = context.WithTimeout(context.Background(), timeout)
if _, err := me.exchange.Block(ctx, alpha.Key()); err != nil {
if _, err := me.exchange.GetBlock(ctx, alpha.Key()); err != nil {
t.Fatal(err)
}
......@@ -290,8 +290,10 @@ func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
routing: htc,
sender: adapter,
wantlist: util.NewKeySet(),
blockReq: make(chan util.Key, 32),
}
adapter.SetDelegate(bs)
go bs.run(context.TODO())
return instance{
peer: p,
exchange: bs,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment