sessioninterestmanager.go 5.61 KB
Newer Older
dirkmc's avatar
dirkmc committed
1 2 3
package sessioninterestmanager

import (
4 5
	"sync"

6
	blocks "gitlab.dms3.io/dms3/go-block-format"
dirkmc's avatar
dirkmc committed
7

8
	cid "gitlab.dms3.io/dms3/go-cid"
dirkmc's avatar
dirkmc committed
9 10
)

11
// SessionInterestManager records the CIDs that each session is interested in.
dirkmc's avatar
dirkmc committed
12
type SessionInterestManager struct {
13 14
	lk    sync.RWMutex
	wants map[cid.Cid]map[uint64]bool
dirkmc's avatar
dirkmc committed
15 16 17 18 19
}

// New initializes a new SessionInterestManager.
func New() *SessionInterestManager {
	return &SessionInterestManager{
20 21 22 23 24 25 26 27 28
		// Map of cids -> sessions -> bool
		//
		// The boolean indicates whether the session still wants the block
		// or is just interested in receiving messages about it.
		//
		// Note that once the block is received the session no longer wants
		// the block, but still wants to receive messages from peers who have
		// the block as they may have other blocks the session is interested in.
		wants: make(map[cid.Cid]map[uint64]bool),
dirkmc's avatar
dirkmc committed
29 30 31
	}
}

32 33
// When the client asks the session for blocks, the session calls
// RecordSessionInterest() with those cids.
dirkmc's avatar
dirkmc committed
34
func (sim *SessionInterestManager) RecordSessionInterest(ses uint64, ks []cid.Cid) {
35 36 37
	sim.lk.Lock()
	defer sim.lk.Unlock()

38 39 40 41 42 43 44 45 46
	// For each key
	for _, c := range ks {
		// Record that the session wants the blocks
		if want, ok := sim.wants[c]; ok {
			want[ses] = true
		} else {
			sim.wants[c] = map[uint64]bool{ses: true}
		}
	}
dirkmc's avatar
dirkmc committed
47 48
}

49
// When the session shuts down it calls RemoveSessionInterest().
50 51
// Returns the keys that no session is interested in any more.
func (sim *SessionInterestManager) RemoveSession(ses uint64) []cid.Cid {
52 53 54
	sim.lk.Lock()
	defer sim.lk.Unlock()

55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
	// The keys that no session is interested in
	deletedKs := make([]cid.Cid, 0)

	// For each known key
	for c := range sim.wants {
		// Remove the session from the list of sessions that want the key
		delete(sim.wants[c], ses)

		// If there are no more sessions that want the key
		if len(sim.wants[c]) == 0 {
			// Clean up the list memory
			delete(sim.wants, c)
			// Add the key to the list of keys that no session is interested in
			deletedKs = append(deletedKs, c)
		}
	}

	return deletedKs
dirkmc's avatar
dirkmc committed
73 74
}

75
// When the session receives blocks, it calls RemoveSessionWants().
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
func (sim *SessionInterestManager) RemoveSessionWants(ses uint64, ks []cid.Cid) {
	sim.lk.Lock()
	defer sim.lk.Unlock()

	// For each key
	for _, c := range ks {
		// If the session wanted the block
		if wanted, ok := sim.wants[c][ses]; ok && wanted {
			// Mark the block as unwanted
			sim.wants[c][ses] = false
		}
	}
}

// When a request is cancelled, the session calls RemoveSessionInterested().
// Returns the keys that no session is interested in any more.
func (sim *SessionInterestManager) RemoveSessionInterested(ses uint64, ks []cid.Cid) []cid.Cid {
93 94 95
	sim.lk.Lock()
	defer sim.lk.Unlock()

96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
	// The keys that no session is interested in
	deletedKs := make([]cid.Cid, 0, len(ks))

	// For each key
	for _, c := range ks {
		// If there is a list of sessions that want the key
		if _, ok := sim.wants[c]; ok {
			// Remove the session from the list of sessions that want the key
			delete(sim.wants[c], ses)

			// If there are no more sessions that want the key
			if len(sim.wants[c]) == 0 {
				// Clean up the list memory
				delete(sim.wants, c)
				// Add the key to the list of keys that no session is interested in
				deletedKs = append(deletedKs, c)
			}
		}
	}

	return deletedKs
dirkmc's avatar
dirkmc committed
117 118
}

119 120
// The session calls FilterSessionInterested() to filter the sets of keys for
// those that the session is interested in
dirkmc's avatar
dirkmc committed
121
func (sim *SessionInterestManager) FilterSessionInterested(ses uint64, ksets ...[]cid.Cid) [][]cid.Cid {
122 123 124
	sim.lk.RLock()
	defer sim.lk.RUnlock()

125
	// For each set of keys
dirkmc's avatar
dirkmc committed
126 127
	kres := make([][]cid.Cid, len(ksets))
	for i, ks := range ksets {
128 129 130 131 132 133 134 135 136 137 138
		// The set of keys that at least one session is interested in
		has := make([]cid.Cid, 0, len(ks))

		// For each key in the list
		for _, c := range ks {
			// If there is a session that's interested, add the key to the set
			if _, ok := sim.wants[c][ses]; ok {
				has = append(has, c)
			}
		}
		kres[i] = has
dirkmc's avatar
dirkmc committed
139 140 141 142
	}
	return kres
}

143 144
// When bitswap receives blocks it calls SplitWantedUnwanted() to discard
// unwanted blocks
dirkmc's avatar
dirkmc committed
145
func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]blocks.Block, []blocks.Block) {
146 147 148
	sim.lk.RLock()
	defer sim.lk.RUnlock()

149 150
	// Get the wanted block keys as a set
	wantedKs := cid.NewSet()
dirkmc's avatar
dirkmc committed
151
	for _, b := range blks {
152 153 154 155 156 157 158 159 160
		c := b.Cid()
		// For each session that is interested in the key
		for ses := range sim.wants[c] {
			// If the session wants the key (rather than just being interested)
			if wanted, ok := sim.wants[c][ses]; ok && wanted {
				// Add the key to the set
				wantedKs.Add(c)
			}
		}
dirkmc's avatar
dirkmc committed
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
	}

	// Separate the blocks into wanted and unwanted
	wantedBlks := make([]blocks.Block, 0, len(blks))
	notWantedBlks := make([]blocks.Block, 0)
	for _, b := range blks {
		if wantedKs.Has(b.Cid()) {
			wantedBlks = append(wantedBlks, b)
		} else {
			notWantedBlks = append(notWantedBlks, b)
		}
	}
	return wantedBlks, notWantedBlks
}

Dirk McCormick's avatar
Dirk McCormick committed
176
// When the SessionManager receives a message it calls InterestedSessions() to
177
// find out which sessions are interested in the message.
dirkmc's avatar
dirkmc committed
178
func (sim *SessionInterestManager) InterestedSessions(blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []uint64 {
179 180 181
	sim.lk.RLock()
	defer sim.lk.RUnlock()

dirkmc's avatar
dirkmc committed
182 183 184 185 186
	ks := make([]cid.Cid, 0, len(blks)+len(haves)+len(dontHaves))
	ks = append(ks, blks...)
	ks = append(ks, haves...)
	ks = append(ks, dontHaves...)

187 188 189 190 191 192 193 194 195 196 197 198 199 200
	// Create a set of sessions that are interested in the keys
	sesSet := make(map[uint64]struct{})
	for _, c := range ks {
		for s := range sim.wants[c] {
			sesSet[s] = struct{}{}
		}
	}

	// Convert the set into a list
	ses := make([]uint64, 0, len(sesSet))
	for s := range sesSet {
		ses = append(ses, s)
	}
	return ses
dirkmc's avatar
dirkmc committed
201
}