sessionwants.go 5.09 KB
Newer Older
1 2 3
package session

import (
dirkmc's avatar
dirkmc committed
4
	"fmt"
5 6 7
	"math/rand"
	"time"

8
	cid "gitlab.dms3.io/dms3/go-cid"
9 10
)

11 12 13 14 15
// liveWantsOrder and liveWants will get out of sync as blocks are received.
// This constant is the maximum amount to allow them to be out of sync before
// cleaning up the ordering array.
const liveWantsOrderGCLimit = 32

dirkmc's avatar
dirkmc committed
16 17
// sessionWants keeps track of which cids are waiting to be sent out, and which
// peers are "live" - ie, we've sent a request but haven't received a block yet
18
type sessionWants struct {
19 20 21
	// The wants that have not yet been sent out
	toFetch *cidQueue
	// Wants that have been sent but have not received a response
22 23 24
	liveWants map[cid.Cid]time.Time
	// The order in which wants were requested
	liveWantsOrder []cid.Cid
25 26
	// The maximum number of want-haves to send in a broadcast
	broadcastLimit int
27 28
}

29
func newSessionWants(broadcastLimit int) sessionWants {
dirkmc's avatar
dirkmc committed
30
	return sessionWants{
31
		toFetch:        newCidQueue(),
32
		liveWants:      make(map[cid.Cid]time.Time),
33
		broadcastLimit: broadcastLimit,
dirkmc's avatar
dirkmc committed
34 35 36 37
	}
}

func (sw *sessionWants) String() string {
38
	return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), len(sw.liveWants))
dirkmc's avatar
dirkmc committed
39
}
40

dirkmc's avatar
dirkmc committed
41 42 43 44
// BlocksRequested is called when the client makes a request for blocks
func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) {
	for _, k := range newWants {
		sw.toFetch.Push(k)
45 46 47
	}
}

48 49 50 51 52
// GetNextWants is called when the session has not yet discovered peers with
// the blocks that it wants. It moves as many CIDs from the fetch queue to
// the live wants queue as possible (given the broadcast limit).
// Returns the newly live wants.
func (sw *sessionWants) GetNextWants() []cid.Cid {
53 54
	now := time.Now()

55 56 57
	// Move CIDs from fetch queue to the live wants queue (up to the broadcast
	// limit)
	currentLiveCount := len(sw.liveWants)
58
	toAdd := sw.broadcastLimit - currentLiveCount
59 60 61 62 63

	var live []cid.Cid
	for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
		c := sw.toFetch.Pop()
		live = append(live, c)
64 65
		sw.liveWantsOrder = append(sw.liveWantsOrder, c)
		sw.liveWants[c] = now
66 67 68 69 70
	}

	return live
}

dirkmc's avatar
dirkmc committed
71 72 73 74
// WantsSent is called when wants are sent to a peer
func (sw *sessionWants) WantsSent(ks []cid.Cid) {
	now := time.Now()
	for _, c := range ks {
75
		if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) {
dirkmc's avatar
dirkmc committed
76
			sw.toFetch.Remove(c)
77 78
			sw.liveWantsOrder = append(sw.liveWantsOrder, c)
			sw.liveWants[c] = now
dirkmc's avatar
dirkmc committed
79 80 81 82 83 84 85 86 87 88 89 90 91 92
		}
	}
}

// BlocksReceived removes received block CIDs from the live wants list and
// measures latency. It returns the CIDs of blocks that were actually
// wanted (as opposed to duplicates) and the total latency for all incoming blocks.
func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) {
	wanted := make([]cid.Cid, 0, len(ks))
	totalLatency := time.Duration(0)
	if len(ks) == 0 {
		return wanted, totalLatency
	}

93
	// Filter for blocks that were actually wanted (as opposed to duplicates)
dirkmc's avatar
dirkmc committed
94 95
	now := time.Now()
	for _, c := range ks {
96
		if sw.isWanted(c) {
dirkmc's avatar
dirkmc committed
97 98
			wanted = append(wanted, c)

99
			// Measure latency
100
			sentAt, ok := sw.liveWants[c]
dirkmc's avatar
dirkmc committed
101 102 103 104
			if ok && !sentAt.IsZero() {
				totalLatency += now.Sub(sentAt)
			}

105
			// Remove the CID from the live wants / toFetch queue
106
			delete(sw.liveWants, c)
dirkmc's avatar
dirkmc committed
107 108 109 110
			sw.toFetch.Remove(c)
		}
	}

111 112 113 114 115 116 117 118 119 120 121 122
	// If the live wants ordering array is a long way out of sync with the
	// live wants map, clean up the ordering array
	if len(sw.liveWantsOrder)-len(sw.liveWants) > liveWantsOrderGCLimit {
		cleaned := sw.liveWantsOrder[:0]
		for _, c := range sw.liveWantsOrder {
			if _, ok := sw.liveWants[c]; ok {
				cleaned = append(cleaned, c)
			}
		}
		sw.liveWantsOrder = cleaned
	}

dirkmc's avatar
dirkmc committed
123 124 125
	return wanted, totalLatency
}

126
// PrepareBroadcast saves the current time for each live want and returns the
127
// live want CIDs up to the broadcast limit.
128 129
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
	now := time.Now()
130 131 132 133 134 135 136 137 138 139 140 141
	live := make([]cid.Cid, 0, len(sw.liveWants))
	for _, c := range sw.liveWantsOrder {
		if _, ok := sw.liveWants[c]; ok {
			// No response was received for the want, so reset the sent time
			// to now as we're about to broadcast
			sw.liveWants[c] = now

			live = append(live, c)
			if len(live) == sw.broadcastLimit {
				break
			}
		}
142
	}
143

144 145 146 147 148 149 150 151 152 153 154 155
	return live
}

// CancelPending removes the given CIDs from the fetch queue.
func (sw *sessionWants) CancelPending(keys []cid.Cid) {
	for _, k := range keys {
		sw.toFetch.Remove(k)
	}
}

// LiveWants returns a list of live wants
func (sw *sessionWants) LiveWants() []cid.Cid {
156 157 158 159 160 161
	live := make([]cid.Cid, 0, len(sw.liveWants))
	for c := range sw.liveWants {
		live = append(live, c)
	}

	return live
162 163
}

164
// RandomLiveWant returns a randomly selected live want
165
func (sw *sessionWants) RandomLiveWant() cid.Cid {
166
	if len(sw.liveWants) == 0 {
167 168
		return cid.Cid{}
	}
169

170
	// picking a random live want
171 172
	i := rand.Intn(len(sw.liveWants))
	for k := range sw.liveWants {
173 174 175 176 177 178 179 180 181 182
		if i == 0 {
			return k
		}
		i--
	}
	return cid.Cid{}
}

// Has live wants indicates if there are any live wants
func (sw *sessionWants) HasLiveWants() bool {
183
	return len(sw.liveWants) > 0
184 185
}

186 187
// Indicates whether the want is in either of the fetch or live queues
func (sw *sessionWants) isWanted(c cid.Cid) bool {
188
	_, ok := sw.liveWants[c]
189 190 191 192 193
	if !ok {
		ok = sw.toFetch.Has(c)
	}
	return ok
}