Unverified Commit cd14e70c authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #291 from ipfs/fix/session-broadcast-wants

Fix order of session broadcast wants
parents 5c18cf5d 73261ec7
......@@ -141,7 +141,7 @@ func New(ctx context.Context,
periodicSearchDelay delay.D,
self peer.ID) *Session {
s := &Session{
sw: newSessionWants(),
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
......@@ -433,7 +433,7 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
}
// No peers discovered yet, broadcast some want-haves
ks := s.sw.GetNextWants(broadcastLiveWantsLimit)
ks := s.sw.GetNextWants()
if len(ks) > 0 {
log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks))
s.wm.BroadcastWantHaves(ctx, s.id, ks)
......
......@@ -222,12 +222,19 @@ func TestSessionFindMorePeers(t *testing.T) {
t.Fatal("Did not make second want request ")
}
// Verify a broadcast was made
// The session should keep broadcasting periodically until it receives a response
select {
case receivedWantReq := <-fwm.wantReqs:
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list")
}
// Make sure the first block is not included because it has already
// been received
for _, c := range receivedWantReq.cids {
if c.Equals(cids[0]) {
t.Fatal("should not braodcast block that was already received")
}
}
case <-ctx.Done():
t.Fatal("Never rebroadcast want list")
}
......
......@@ -8,17 +8,29 @@ import (
cid "github.com/ipfs/go-cid"
)
// liveWantsOrder and liveWants will get out of sync as blocks are received.
// This constant is the maximum amount to allow them to be out of sync before
// cleaning up the ordering array.
const liveWantsOrderGCLimit = 32
// sessionWants keeps track of which cids are waiting to be sent out, and which
// peers are "live" - ie, we've sent a request but haven't received a block yet
type sessionWants struct {
toFetch *cidQueue
// The wants that have not yet been sent out
toFetch *cidQueue
// Wants that have been sent but have not received a response
liveWants map[cid.Cid]time.Time
// The order in which wants were requested
liveWantsOrder []cid.Cid
// The maximum number of want-haves to send in a broadcast
broadcastLimit int
}
func newSessionWants() sessionWants {
func newSessionWants(broadcastLimit int) sessionWants {
return sessionWants{
toFetch: newCidQueue(),
liveWants: make(map[cid.Cid]time.Time),
toFetch: newCidQueue(),
liveWants: make(map[cid.Cid]time.Time),
broadcastLimit: broadcastLimit,
}
}
......@@ -33,19 +45,23 @@ func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) {
}
}
// GetNextWants moves as many CIDs from the fetch queue to the live wants
// list as possible (given the limit). Returns the newly live wants.
func (sw *sessionWants) GetNextWants(limit int) []cid.Cid {
// GetNextWants is called when the session has not yet discovered peers with
// the blocks that it wants. It moves as many CIDs from the fetch queue to
// the live wants queue as possible (given the broadcast limit).
// Returns the newly live wants.
func (sw *sessionWants) GetNextWants() []cid.Cid {
now := time.Now()
// Move CIDs from fetch queue to the live wants queue (up to the limit)
// Move CIDs from fetch queue to the live wants queue (up to the broadcast
// limit)
currentLiveCount := len(sw.liveWants)
toAdd := limit - currentLiveCount
toAdd := sw.broadcastLimit - currentLiveCount
var live []cid.Cid
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
c := sw.toFetch.Pop()
live = append(live, c)
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
}
......@@ -58,6 +74,7 @@ func (sw *sessionWants) WantsSent(ks []cid.Cid) {
for _, c := range ks {
if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) {
sw.toFetch.Remove(c)
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
}
}
......@@ -73,11 +90,13 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
return wanted, totalLatency
}
// Filter for blocks that were actually wanted (as opposed to duplicates)
now := time.Now()
for _, c := range ks {
if sw.isWanted(c) {
wanted = append(wanted, c)
// Measure latency
sentAt, ok := sw.liveWants[c]
if ok && !sentAt.IsZero() {
totalLatency += now.Sub(sentAt)
......@@ -89,21 +108,39 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
}
}
// If the live wants ordering array is a long way out of sync with the
// live wants map, clean up the ordering array
if len(sw.liveWantsOrder)-len(sw.liveWants) > liveWantsOrderGCLimit {
cleaned := sw.liveWantsOrder[:0]
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
cleaned = append(cleaned, c)
}
}
sw.liveWantsOrder = cleaned
}
return wanted, totalLatency
}
// PrepareBroadcast saves the current time for each live want and returns the
// live want CIDs.
// live want CIDs up to the broadcast limit.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
// TODO: Change this to return wants in order so that the session will
// send out Find Providers request for the first want
// (Note that maps return keys in random order)
now := time.Now()
live := make([]cid.Cid, 0, len(sw.liveWants))
for c := range sw.liveWants {
live = append(live, c)
sw.liveWants[c] = now
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
// No response was received for the want, so reset the sent time
// to now as we're about to broadcast
sw.liveWants[c] = now
live = append(live, c)
if len(live) == sw.broadcastLimit {
break
}
}
}
return live
}
......@@ -120,9 +157,11 @@ func (sw *sessionWants) LiveWants() []cid.Cid {
for c := range sw.liveWants {
live = append(live, c)
}
return live
}
// RandomLiveWant returns a randomly selected live want
func (sw *sessionWants) RandomLiveWant() cid.Cid {
if len(sw.liveWants) == 0 {
return cid.Cid{}
......
......@@ -8,7 +8,7 @@ import (
)
func TestEmptySessionWants(t *testing.T) {
sw := newSessionWants()
sw := newSessionWants(broadcastLiveWantsLimit)
// Expect these functions to return nothing on a new sessionWants
lws := sw.PrepareBroadcast()
......@@ -29,7 +29,7 @@ func TestEmptySessionWants(t *testing.T) {
}
func TestSessionWants(t *testing.T) {
sw := newSessionWants()
sw := newSessionWants(5)
cids := testutil.GenerateCids(10)
others := testutil.GenerateCids(1)
......@@ -42,7 +42,7 @@ func TestSessionWants(t *testing.T) {
// The first 5 cids should go move into the live queue
// toFetch Live
// 98765 43210
nextw := sw.GetNextWants(5)
nextw := sw.GetNextWants()
if len(nextw) != 5 {
t.Fatal("expected 5 next wants")
}
......@@ -78,7 +78,7 @@ func TestSessionWants(t *testing.T) {
// Should move 2 wants from toFetch queue to live wants
// toFetch Live
// 987__ 65432
nextw = sw.GetNextWants(5)
nextw = sw.GetNextWants()
if len(nextw) != 2 {
t.Fatal("expected 2 next wants")
}
......@@ -108,3 +108,82 @@ func TestSessionWants(t *testing.T) {
t.Fatal("expected 4 live wants")
}
}
func TestPrepareBroadcast(t *testing.T) {
sw := newSessionWants(3)
cids := testutil.GenerateCids(10)
// Add 6 new wants
// toFetch Live
// 543210
sw.BlocksRequested(cids[:6])
// Get next wants with a limit of 3
// The first 3 cids should go move into the live queue
// toFetch Live
// 543 210
sw.GetNextWants()
// Broadcast should contain wants in order
for i := 0; i < 10; i++ {
ws := sw.PrepareBroadcast()
if len(ws) != 3 {
t.Fatal("should broadcast all live wants")
}
for idx, c := range ws {
if !c.Equals(cids[idx]) {
t.Fatal("broadcast should always return wants in order")
}
}
}
// One block received
// Remove a cid from the live queue
sw.BlocksReceived(cids[:1])
// toFetch Live
// 543 21_
// Add 4 new wants
// toFetch Live
// 9876543 21
sw.BlocksRequested(cids[6:])
// 2 Wants sent
// toFetch Live
// 98765 4321
sw.WantsSent(cids[3:5])
// Broadcast should contain wants in order
cids = cids[1:]
for i := 0; i < 10; i++ {
ws := sw.PrepareBroadcast()
if len(ws) != 3 {
t.Fatal("should broadcast live wants up to limit", len(ws), len(cids))
}
for idx, c := range ws {
if !c.Equals(cids[idx]) {
t.Fatal("broadcast should always return wants in order")
}
}
}
}
// Test that even after GC broadcast returns correct wants
func TestPrepareBroadcastAfterGC(t *testing.T) {
sw := newSessionWants(5)
cids := testutil.GenerateCids(liveWantsOrderGCLimit * 2)
sw.BlocksRequested(cids)
// Trigger a sessionWants internal GC of the live wants
sw.BlocksReceived(cids[:liveWantsOrderGCLimit+1])
cids = cids[:liveWantsOrderGCLimit+1]
// Broadcast should contain wants in order
ws := sw.PrepareBroadcast()
for i, c := range ws {
if !c.Equals(cids[i]) {
t.Fatal("broadcast should always return wants in order")
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment