sessionwants.go 3.59 KB
Newer Older
1 2 3
package session

import (
dirkmc's avatar
dirkmc committed
4
	"fmt"
5 6 7 8 9 10
	"math/rand"
	"time"

	cid "github.com/ipfs/go-cid"
)

dirkmc's avatar
dirkmc committed
11 12
// sessionWants keeps track of which cids are waiting to be sent out, and which
// peers are "live" - ie, we've sent a request but haven't received a block yet
13 14 15 16 17
type sessionWants struct {
	toFetch   *cidQueue
	liveWants map[cid.Cid]time.Time
}

dirkmc's avatar
dirkmc committed
18 19 20 21 22 23 24 25 26 27
func newSessionWants() sessionWants {
	return sessionWants{
		toFetch:   newCidQueue(),
		liveWants: make(map[cid.Cid]time.Time),
	}
}

func (sw *sessionWants) String() string {
	return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), len(sw.liveWants))
}
28

dirkmc's avatar
dirkmc committed
29 30 31 32
// BlocksRequested is called when the client makes a request for blocks
func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) {
	for _, k := range newWants {
		sw.toFetch.Push(k)
33 34 35
	}
}

dirkmc's avatar
dirkmc committed
36 37 38
// GetNextWants moves as many CIDs from the fetch queue to the live wants
// list as possible (given the limit). Returns the newly live wants.
func (sw *sessionWants) GetNextWants(limit int) []cid.Cid {
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
	now := time.Now()

	// Move CIDs from fetch queue to the live wants queue (up to the limit)
	currentLiveCount := len(sw.liveWants)
	toAdd := limit - currentLiveCount

	var live []cid.Cid
	for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
		c := sw.toFetch.Pop()
		live = append(live, c)
		sw.liveWants[c] = now
	}

	return live
}

dirkmc's avatar
dirkmc committed
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
// WantsSent is called when wants are sent to a peer
func (sw *sessionWants) WantsSent(ks []cid.Cid) {
	now := time.Now()
	for _, c := range ks {
		if _, ok := sw.liveWants[c]; !ok {
			sw.toFetch.Remove(c)
			sw.liveWants[c] = now
		}
	}
}

// BlocksReceived removes received block CIDs from the live wants list and
// measures latency. It returns the CIDs of blocks that were actually
// wanted (as opposed to duplicates) and the total latency for all incoming blocks.
func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) {
	wanted := make([]cid.Cid, 0, len(ks))
	totalLatency := time.Duration(0)
	if len(ks) == 0 {
		return wanted, totalLatency
	}

	now := time.Now()
	for _, c := range ks {
78
		if sw.isWanted(c) {
dirkmc's avatar
dirkmc committed
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
			wanted = append(wanted, c)

			sentAt, ok := sw.liveWants[c]
			if ok && !sentAt.IsZero() {
				totalLatency += now.Sub(sentAt)
			}

			// Remove the CID from the live wants / toFetch queue and add it
			// to the past wants
			delete(sw.liveWants, c)
			sw.toFetch.Remove(c)
		}
	}

	return wanted, totalLatency
}

96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
// PrepareBroadcast saves the current time for each live want and returns the
// live want CIDs.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
	now := time.Now()
	live := make([]cid.Cid, 0, len(sw.liveWants))
	for c := range sw.liveWants {
		live = append(live, c)
		sw.liveWants[c] = now
	}
	return live
}

// CancelPending removes the given CIDs from the fetch queue.
func (sw *sessionWants) CancelPending(keys []cid.Cid) {
	for _, k := range keys {
		sw.toFetch.Remove(k)
	}
}

// LiveWants returns a list of live wants
func (sw *sessionWants) LiveWants() []cid.Cid {
	live := make([]cid.Cid, 0, len(sw.liveWants))
	for c := range sw.liveWants {
		live = append(live, c)
	}
	return live
}

func (sw *sessionWants) RandomLiveWant() cid.Cid {
	if len(sw.liveWants) == 0 {
		return cid.Cid{}
	}
128

129
	// picking a random live want
130
	i := rand.Intn(len(sw.liveWants))
131 132 133 134 135 136 137 138 139 140 141 142 143 144
	for k := range sw.liveWants {
		if i == 0 {
			return k
		}
		i--
	}
	return cid.Cid{}
}

// Has live wants indicates if there are any live wants
func (sw *sessionWants) HasLiveWants() bool {
	return len(sw.liveWants) > 0
}

145 146
// Indicates whether the want is in either of the fetch or live queues
func (sw *sessionWants) isWanted(c cid.Cid) bool {
147 148 149 150 151 152
	_, ok := sw.liveWants[c]
	if !ok {
		ok = sw.toFetch.Has(c)
	}
	return ok
}