sessionwants.go 3.75 KB
Newer Older
1 2 3
package session

import (
dirkmc's avatar
dirkmc committed
4
	"fmt"
5 6 7 8 9 10
	"math/rand"
	"time"

	cid "github.com/ipfs/go-cid"
)

dirkmc's avatar
dirkmc committed
11 12
// sessionWants keeps track of which cids are waiting to be sent out, and which
// peers are "live" - ie, we've sent a request but haven't received a block yet
13 14 15 16 17
type sessionWants struct {
	toFetch   *cidQueue
	liveWants map[cid.Cid]time.Time
}

dirkmc's avatar
dirkmc committed
18 19 20 21 22 23 24 25 26 27
func newSessionWants() sessionWants {
	return sessionWants{
		toFetch:   newCidQueue(),
		liveWants: make(map[cid.Cid]time.Time),
	}
}

func (sw *sessionWants) String() string {
	return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), len(sw.liveWants))
}
28

dirkmc's avatar
dirkmc committed
29 30 31 32
// BlocksRequested is called when the client makes a request for blocks
func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) {
	for _, k := range newWants {
		sw.toFetch.Push(k)
33 34 35
	}
}

dirkmc's avatar
dirkmc committed
36 37 38
// GetNextWants moves as many CIDs from the fetch queue to the live wants
// list as possible (given the limit). Returns the newly live wants.
func (sw *sessionWants) GetNextWants(limit int) []cid.Cid {
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
	now := time.Now()

	// Move CIDs from fetch queue to the live wants queue (up to the limit)
	currentLiveCount := len(sw.liveWants)
	toAdd := limit - currentLiveCount

	var live []cid.Cid
	for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
		c := sw.toFetch.Pop()
		live = append(live, c)
		sw.liveWants[c] = now
	}

	return live
}

dirkmc's avatar
dirkmc committed
55 56 57 58
// WantsSent is called when wants are sent to a peer
func (sw *sessionWants) WantsSent(ks []cid.Cid) {
	now := time.Now()
	for _, c := range ks {
59
		if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) {
dirkmc's avatar
dirkmc committed
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
			sw.toFetch.Remove(c)
			sw.liveWants[c] = now
		}
	}
}

// BlocksReceived removes received block CIDs from the live wants list and
// measures latency. It returns the CIDs of blocks that were actually
// wanted (as opposed to duplicates) and the total latency for all incoming blocks.
func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) {
	wanted := make([]cid.Cid, 0, len(ks))
	totalLatency := time.Duration(0)
	if len(ks) == 0 {
		return wanted, totalLatency
	}

	now := time.Now()
	for _, c := range ks {
78
		if sw.isWanted(c) {
dirkmc's avatar
dirkmc committed
79 80 81 82 83 84 85
			wanted = append(wanted, c)

			sentAt, ok := sw.liveWants[c]
			if ok && !sentAt.IsZero() {
				totalLatency += now.Sub(sentAt)
			}

86
			// Remove the CID from the live wants / toFetch queue
dirkmc's avatar
dirkmc committed
87 88 89 90 91 92 93 94
			delete(sw.liveWants, c)
			sw.toFetch.Remove(c)
		}
	}

	return wanted, totalLatency
}

95 96 97
// PrepareBroadcast saves the current time for each live want and returns the
// live want CIDs.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
98 99 100
	// TODO: Change this to return wants in order so that the session will
	// send out Find Providers request for the first want
	// (Note that maps return keys in random order)
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
	now := time.Now()
	live := make([]cid.Cid, 0, len(sw.liveWants))
	for c := range sw.liveWants {
		live = append(live, c)
		sw.liveWants[c] = now
	}
	return live
}

// CancelPending removes the given CIDs from the fetch queue.
func (sw *sessionWants) CancelPending(keys []cid.Cid) {
	for _, k := range keys {
		sw.toFetch.Remove(k)
	}
}

// LiveWants returns a list of live wants
func (sw *sessionWants) LiveWants() []cid.Cid {
	live := make([]cid.Cid, 0, len(sw.liveWants))
	for c := range sw.liveWants {
		live = append(live, c)
	}
	return live
}

func (sw *sessionWants) RandomLiveWant() cid.Cid {
	if len(sw.liveWants) == 0 {
		return cid.Cid{}
	}
130

131
	// picking a random live want
132
	i := rand.Intn(len(sw.liveWants))
133 134 135 136 137 138 139 140 141 142 143 144 145 146
	for k := range sw.liveWants {
		if i == 0 {
			return k
		}
		i--
	}
	return cid.Cid{}
}

// Has live wants indicates if there are any live wants
func (sw *sessionWants) HasLiveWants() bool {
	return len(sw.liveWants) > 0
}

147 148
// Indicates whether the want is in either of the fetch or live queues
func (sw *sessionWants) isWanted(c cid.Cid) bool {
149 150 151 152 153 154
	_, ok := sw.liveWants[c]
	if !ok {
		ok = sw.toFetch.Has(c)
	}
	return ok
}