Commit 73261ec7 authored by Dirk McCormick's avatar Dirk McCormick

refactor: improve sessionWants perf

parent b83a609c
......@@ -8,15 +8,20 @@ import (
cid "github.com/ipfs/go-cid"
)
// liveWantsOrder and liveWants will get out of sync as blocks are received.
// This constant is the maximum amount to allow them to be out of sync before
// cleaning up the ordering array.
const liveWantsOrderGCLimit = 32
// sessionWants keeps track of which cids are waiting to be sent out, and which
// peers are "live" - ie, we've sent a request but haven't received a block yet
type sessionWants struct {
// The wants that have not yet been sent out
toFetch *cidQueue
// Wants that have been sent but have not received a response
liveWants *cidQueue
// The time at which live wants were sent
sentAt map[cid.Cid]time.Time
liveWants map[cid.Cid]time.Time
// The order in which wants were requested
liveWantsOrder []cid.Cid
// The maximum number of want-haves to send in a broadcast
broadcastLimit int
}
......@@ -24,14 +29,13 @@ type sessionWants struct {
func newSessionWants(broadcastLimit int) sessionWants {
return sessionWants{
toFetch: newCidQueue(),
liveWants: newCidQueue(),
sentAt: make(map[cid.Cid]time.Time),
liveWants: make(map[cid.Cid]time.Time),
broadcastLimit: broadcastLimit,
}
}
func (sw *sessionWants) String() string {
return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), sw.liveWants.Len())
return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), len(sw.liveWants))
}
// BlocksRequested is called when the client makes a request for blocks
......@@ -48,16 +52,17 @@ func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) {
func (sw *sessionWants) GetNextWants() []cid.Cid {
now := time.Now()
// Move CIDs from fetch queue to the live wants queue (up to the limit)
currentLiveCount := sw.liveWants.Len()
// Move CIDs from fetch queue to the live wants queue (up to the broadcast
// limit)
currentLiveCount := len(sw.liveWants)
toAdd := sw.broadcastLimit - currentLiveCount
var live []cid.Cid
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
c := sw.toFetch.Pop()
live = append(live, c)
sw.liveWants.Push(c)
sw.sentAt[c] = now
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
}
return live
......@@ -67,10 +72,10 @@ func (sw *sessionWants) GetNextWants() []cid.Cid {
func (sw *sessionWants) WantsSent(ks []cid.Cid) {
now := time.Now()
for _, c := range ks {
if _, ok := sw.sentAt[c]; !ok && sw.toFetch.Has(c) {
if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) {
sw.toFetch.Remove(c)
sw.liveWants.Push(c)
sw.sentAt[c] = now
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
}
}
}
......@@ -85,24 +90,36 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
return wanted, totalLatency
}
// Filter for blocks that were actually wanted (as opposed to duplicates)
now := time.Now()
for _, c := range ks {
if sw.isWanted(c) {
wanted = append(wanted, c)
// Measure latency
sentAt, ok := sw.sentAt[c]
sentAt, ok := sw.liveWants[c]
if ok && !sentAt.IsZero() {
totalLatency += now.Sub(sentAt)
}
// Remove the CID from the live wants / toFetch queue
sw.liveWants.Remove(c)
delete(sw.sentAt, c)
delete(sw.liveWants, c)
sw.toFetch.Remove(c)
}
}
// If the live wants ordering array is a long way out of sync with the
// live wants map, clean up the ordering array
if len(sw.liveWantsOrder)-len(sw.liveWants) > liveWantsOrderGCLimit {
cleaned := sw.liveWantsOrder[:0]
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
cleaned = append(cleaned, c)
}
}
sw.liveWantsOrder = cleaned
}
return wanted, totalLatency
}
......@@ -110,13 +127,20 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
// live want CIDs up to the broadcast limit.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
now := time.Now()
live := sw.liveWants.Cids()
if len(live) > sw.broadcastLimit {
live = live[:sw.broadcastLimit]
}
for _, c := range live {
sw.sentAt[c] = now
live := make([]cid.Cid, 0, len(sw.liveWants))
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
// No response was received for the want, so reset the sent time
// to now as we're about to broadcast
sw.liveWants[c] = now
live = append(live, c)
if len(live) == sw.broadcastLimit {
break
}
}
}
return live
}
......@@ -129,18 +153,23 @@ func (sw *sessionWants) CancelPending(keys []cid.Cid) {
// LiveWants returns a list of live wants
func (sw *sessionWants) LiveWants() []cid.Cid {
return sw.liveWants.Cids()
live := make([]cid.Cid, 0, len(sw.liveWants))
for c := range sw.liveWants {
live = append(live, c)
}
return live
}
// RandomLiveWant returns a randomly selected live want
func (sw *sessionWants) RandomLiveWant() cid.Cid {
if len(sw.sentAt) == 0 {
if len(sw.liveWants) == 0 {
return cid.Cid{}
}
// picking a random live want
i := rand.Intn(len(sw.sentAt))
for k := range sw.sentAt {
i := rand.Intn(len(sw.liveWants))
for k := range sw.liveWants {
if i == 0 {
return k
}
......@@ -151,12 +180,12 @@ func (sw *sessionWants) RandomLiveWant() cid.Cid {
// Has live wants indicates if there are any live wants
func (sw *sessionWants) HasLiveWants() bool {
return sw.liveWants.Len() > 0
return len(sw.liveWants) > 0
}
// Indicates whether the want is in either of the fetch or live queues
func (sw *sessionWants) isWanted(c cid.Cid) bool {
ok := sw.liveWants.Has(c)
_, ok := sw.liveWants[c]
if !ok {
ok = sw.toFetch.Has(c)
}
......
......@@ -116,7 +116,7 @@ func TestPrepareBroadcast(t *testing.T) {
// Add 6 new wants
// toFetch Live
// 543210
sw.BlocksRequested(cids[0:6])
sw.BlocksRequested(cids[:6])
// Get next wants with a limit of 3
// The first 3 cids should go move into the live queue
......@@ -139,7 +139,7 @@ func TestPrepareBroadcast(t *testing.T) {
// One block received
// Remove a cid from the live queue
sw.BlocksReceived(cids[0:1])
sw.BlocksReceived(cids[:1])
// toFetch Live
// 543 21_
......@@ -167,3 +167,23 @@ func TestPrepareBroadcast(t *testing.T) {
}
}
}
// Test that even after GC broadcast returns correct wants
func TestPrepareBroadcastAfterGC(t *testing.T) {
sw := newSessionWants(5)
cids := testutil.GenerateCids(liveWantsOrderGCLimit * 2)
sw.BlocksRequested(cids)
// Trigger a sessionWants internal GC of the live wants
sw.BlocksReceived(cids[:liveWantsOrderGCLimit+1])
cids = cids[:liveWantsOrderGCLimit+1]
// Broadcast should contain wants in order
ws := sw.PrepareBroadcast()
for i, c := range ws {
if !c.Equals(cids[i]) {
t.Fatal("broadcast should always return wants in order")
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment