Commit 77ea854e authored by Steven Allen's avatar Steven Allen

use CIDs directly as map keys

1. Use a `taskEntryKey` *type* instead of a string (now that both peer IDs and
CIDs are hashable).
2. Get rid of all uses of `cid.KeyString` (mostly just for type safety). This
also means we don't need to parse the CID and allocate to convert it *back* from
a string.
parent cbd7eb7c
...@@ -23,7 +23,7 @@ type peerRequestQueue interface { ...@@ -23,7 +23,7 @@ type peerRequestQueue interface {
func newPRQ() *prq { func newPRQ() *prq {
return &prq{ return &prq{
taskMap: make(map[string]*peerRequestTask), taskMap: make(map[taskEntryKey]*peerRequestTask),
partners: make(map[peer.ID]*activePartner), partners: make(map[peer.ID]*activePartner),
frozen: make(map[peer.ID]*activePartner), frozen: make(map[peer.ID]*activePartner),
pQueue: pq.New(partnerCompare), pQueue: pq.New(partnerCompare),
...@@ -39,7 +39,7 @@ var _ peerRequestQueue = &prq{} ...@@ -39,7 +39,7 @@ var _ peerRequestQueue = &prq{}
type prq struct { type prq struct {
lock sync.Mutex lock sync.Mutex
pQueue pq.PQ pQueue pq.PQ
taskMap map[string]*peerRequestTask taskMap map[taskEntryKey]*peerRequestTask
partners map[peer.ID]*activePartner partners map[peer.ID]*activePartner
frozen map[peer.ID]*activePartner frozen map[peer.ID]*activePartner
...@@ -65,7 +65,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) { ...@@ -65,7 +65,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
if partner.activeBlocks.Has(entry.Cid) { if partner.activeBlocks.Has(entry.Cid) {
continue continue
} }
if task, ok := tl.taskMap[taskEntryKey(to, entry.Cid)]; ok { if task, ok := tl.taskMap[taskEntryKey{to, entry.Cid}]; ok {
if entry.Priority > task.Priority { if entry.Priority > task.Priority {
task.Priority = entry.Priority task.Priority = entry.Priority
partner.taskQueue.Update(task.index) partner.taskQueue.Update(task.index)
...@@ -98,7 +98,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) { ...@@ -98,7 +98,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
task.Priority = priority task.Priority = priority
partner.taskQueue.Push(task) partner.taskQueue.Push(task)
for _, entry := range newEntries { for _, entry := range newEntries {
tl.taskMap[taskEntryKey(to, entry.Cid)] = task tl.taskMap[taskEntryKey{to, entry.Cid}] = task
} }
partner.requests += len(newEntries) partner.requests += len(newEntries)
tl.pQueue.Update(partner.Index()) tl.pQueue.Update(partner.Index())
...@@ -119,7 +119,7 @@ func (tl *prq) Pop() *peerRequestTask { ...@@ -119,7 +119,7 @@ func (tl *prq) Pop() *peerRequestTask {
newEntries := make([]*wantlist.Entry, 0, len(out.Entries)) newEntries := make([]*wantlist.Entry, 0, len(out.Entries))
for _, entry := range out.Entries { for _, entry := range out.Entries {
delete(tl.taskMap, taskEntryKey(out.Target, entry.Cid)) delete(tl.taskMap, taskEntryKey{out.Target, entry.Cid})
if entry.Trash { if entry.Trash {
continue continue
} }
...@@ -143,7 +143,7 @@ func (tl *prq) Pop() *peerRequestTask { ...@@ -143,7 +143,7 @@ func (tl *prq) Pop() *peerRequestTask {
// Remove removes a task from the queue // Remove removes a task from the queue
func (tl *prq) Remove(k cid.Cid, p peer.ID) { func (tl *prq) Remove(k cid.Cid, p peer.ID) {
tl.lock.Lock() tl.lock.Lock()
t, ok := tl.taskMap[taskEntryKey(p, k)] t, ok := tl.taskMap[taskEntryKey{p, k}]
if ok { if ok {
for _, entry := range t.Entries { for _, entry := range t.Entries {
if entry.Cid.Equals(k) { if entry.Cid.Equals(k) {
...@@ -220,9 +220,10 @@ func (t *peerRequestTask) SetIndex(i int) { ...@@ -220,9 +220,10 @@ func (t *peerRequestTask) SetIndex(i int) {
t.index = i t.index = i
} }
// taskEntryKey returns a key that uniquely identifies a task. // taskEntryKey is a key identifying a task.
func taskEntryKey(p peer.ID, k cid.Cid) string { type taskEntryKey struct {
return string(p) + k.KeyString() p peer.ID
k cid.Cid
} }
// FIFO is a basic task comparator that returns tasks in the order created. // FIFO is a basic task comparator that returns tasks in the order created.
......
...@@ -49,8 +49,8 @@ type Exportable interface { ...@@ -49,8 +49,8 @@ type Exportable interface {
type impl struct { type impl struct {
full bool full bool
wantlist map[string]*Entry wantlist map[cid.Cid]*Entry
blocks map[string]blocks.Block blocks map[cid.Cid]blocks.Block
} }
func New(full bool) BitSwapMessage { func New(full bool) BitSwapMessage {
...@@ -59,8 +59,8 @@ func New(full bool) BitSwapMessage { ...@@ -59,8 +59,8 @@ func New(full bool) BitSwapMessage {
func newMsg(full bool) *impl { func newMsg(full bool) *impl {
return &impl{ return &impl{
blocks: make(map[string]blocks.Block), blocks: make(map[cid.Cid]blocks.Block),
wantlist: make(map[string]*Entry), wantlist: make(map[cid.Cid]*Entry),
full: full, full: full,
} }
} }
...@@ -135,7 +135,7 @@ func (m *impl) Blocks() []blocks.Block { ...@@ -135,7 +135,7 @@ func (m *impl) Blocks() []blocks.Block {
} }
func (m *impl) Cancel(k cid.Cid) { func (m *impl) Cancel(k cid.Cid) {
delete(m.wantlist, k.KeyString()) delete(m.wantlist, k)
m.addEntry(k, 0, true) m.addEntry(k, 0, true)
} }
...@@ -144,13 +144,12 @@ func (m *impl) AddEntry(k cid.Cid, priority int) { ...@@ -144,13 +144,12 @@ func (m *impl) AddEntry(k cid.Cid, priority int) {
} }
func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) { func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) {
k := c.KeyString() e, exists := m.wantlist[c]
e, exists := m.wantlist[k]
if exists { if exists {
e.Priority = priority e.Priority = priority
e.Cancel = cancel e.Cancel = cancel
} else { } else {
m.wantlist[k] = &Entry{ m.wantlist[c] = &Entry{
Entry: &wantlist.Entry{ Entry: &wantlist.Entry{
Cid: c, Cid: c,
Priority: priority, Priority: priority,
...@@ -161,7 +160,7 @@ func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) { ...@@ -161,7 +160,7 @@ func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) {
} }
func (m *impl) AddBlock(b blocks.Block) { func (m *impl) AddBlock(b blocks.Block) {
m.blocks[b.Cid().KeyString()] = b m.blocks[b.Cid()] = b
} }
func FromNet(r io.Reader) (BitSwapMessage, error) { func FromNet(r io.Reader) (BitSwapMessage, error) {
......
...@@ -121,13 +121,13 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { ...@@ -121,13 +121,13 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
t.Fatal("fullness attribute got dropped on marshal") t.Fatal("fullness attribute got dropped on marshal")
} }
keys := make(map[string]bool) keys := make(map[cid.Cid]bool)
for _, k := range copied.Wantlist() { for _, k := range copied.Wantlist() {
keys[k.Cid.KeyString()] = true keys[k.Cid] = true
} }
for _, k := range original.Wantlist() { for _, k := range original.Wantlist() {
if _, ok := keys[k.Cid.KeyString()]; !ok { if _, ok := keys[k.Cid]; !ok {
t.Fatalf("Key Missing: \"%v\"", k) t.Fatalf("Key Missing: \"%v\"", k)
} }
} }
...@@ -151,13 +151,13 @@ func TestToAndFromNetMessage(t *testing.T) { ...@@ -151,13 +151,13 @@ func TestToAndFromNetMessage(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
keys := make(map[string]bool) keys := make(map[cid.Cid]bool)
for _, b := range m2.Blocks() { for _, b := range m2.Blocks() {
keys[b.Cid().KeyString()] = true keys[b.Cid()] = true
} }
for _, b := range original.Blocks() { for _, b := range original.Blocks() {
if _, ok := keys[b.Cid().KeyString()]; !ok { if _, ok := keys[b.Cid()]; !ok {
t.Fail() t.Fail()
} }
} }
......
...@@ -33,7 +33,7 @@ type Session struct { ...@@ -33,7 +33,7 @@ type Session struct {
interestReqs chan interestReq interestReqs chan interestReq
interest *lru.Cache interest *lru.Cache
liveWants map[string]time.Time liveWants map[cid.Cid]time.Time
tick *time.Timer tick *time.Timer
baseTickDelay time.Duration baseTickDelay time.Duration
...@@ -54,7 +54,7 @@ type Session struct { ...@@ -54,7 +54,7 @@ type Session struct {
func (bs *Bitswap) NewSession(ctx context.Context) *Session { func (bs *Bitswap) NewSession(ctx context.Context) *Session {
s := &Session{ s := &Session{
activePeers: make(map[peer.ID]struct{}), activePeers: make(map[peer.ID]struct{}),
liveWants: make(map[string]time.Time), liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid), newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid), cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(), tofetch: newCidQueue(),
...@@ -87,8 +87,7 @@ func (bs *Bitswap) removeSession(s *Session) { ...@@ -87,8 +87,7 @@ func (bs *Bitswap) removeSession(s *Session) {
live := make([]cid.Cid, 0, len(s.liveWants)) live := make([]cid.Cid, 0, len(s.liveWants))
for c := range s.liveWants { for c := range s.liveWants {
cs, _ := cid.Cast([]byte(c)) live = append(live, c)
live = append(live, cs)
} }
bs.CancelWants(live, s.id) bs.CancelWants(live, s.id)
...@@ -147,7 +146,7 @@ func (s *Session) isLiveWant(c cid.Cid) bool { ...@@ -147,7 +146,7 @@ func (s *Session) isLiveWant(c cid.Cid) bool {
} }
func (s *Session) interestedIn(c cid.Cid) bool { func (s *Session) interestedIn(c cid.Cid) bool {
return s.interest.Contains(c.KeyString()) || s.isLiveWant(c) return s.interest.Contains(c) || s.isLiveWant(c)
} }
const provSearchDelay = time.Second * 10 const provSearchDelay = time.Second * 10
...@@ -188,7 +187,7 @@ func (s *Session) run(ctx context.Context) { ...@@ -188,7 +187,7 @@ func (s *Session) run(ctx context.Context) {
s.resetTick() s.resetTick()
case keys := <-s.newReqs: case keys := <-s.newReqs:
for _, k := range keys { for _, k := range keys {
s.interest.Add(k.KeyString(), nil) s.interest.Add(k, nil)
} }
if len(s.liveWants) < activeWantsLimit { if len(s.liveWants) < activeWantsLimit {
toadd := activeWantsLimit - len(s.liveWants) toadd := activeWantsLimit - len(s.liveWants)
...@@ -211,8 +210,7 @@ func (s *Session) run(ctx context.Context) { ...@@ -211,8 +210,7 @@ func (s *Session) run(ctx context.Context) {
live := make([]cid.Cid, 0, len(s.liveWants)) live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now() now := time.Now()
for c := range s.liveWants { for c := range s.liveWants {
cs, _ := cid.Cast([]byte(c)) live = append(live, c)
live = append(live, cs)
s.liveWants[c] = now s.liveWants[c] = now
} }
...@@ -250,7 +248,7 @@ func (s *Session) run(ctx context.Context) { ...@@ -250,7 +248,7 @@ func (s *Session) run(ctx context.Context) {
} }
func (s *Session) cidIsWanted(c cid.Cid) bool { func (s *Session) cidIsWanted(c cid.Cid) bool {
_, ok := s.liveWants[c.KeyString()] _, ok := s.liveWants[c]
if !ok { if !ok {
ok = s.tofetch.Has(c) ok = s.tofetch.Has(c)
} }
...@@ -261,11 +259,10 @@ func (s *Session) cidIsWanted(c cid.Cid) bool { ...@@ -261,11 +259,10 @@ func (s *Session) cidIsWanted(c cid.Cid) bool {
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
c := blk.Cid() c := blk.Cid()
if s.cidIsWanted(c) { if s.cidIsWanted(c) {
ks := c.KeyString() tval, ok := s.liveWants[c]
tval, ok := s.liveWants[ks]
if ok { if ok {
s.latTotal += time.Since(tval) s.latTotal += time.Since(tval)
delete(s.liveWants, ks) delete(s.liveWants, c)
} else { } else {
s.tofetch.Remove(c) s.tofetch.Remove(c)
} }
...@@ -281,7 +278,7 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { ...@@ -281,7 +278,7 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) { func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
now := time.Now() now := time.Now()
for _, c := range ks { for _, c := range ks {
s.liveWants[c.KeyString()] = now s.liveWants[c] = now
} }
s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id) s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
} }
......
...@@ -11,12 +11,12 @@ import ( ...@@ -11,12 +11,12 @@ import (
type ThreadSafe struct { type ThreadSafe struct {
lk sync.RWMutex lk sync.RWMutex
set map[string]*Entry set map[cid.Cid]*Entry
} }
// not threadsafe // not threadsafe
type Wantlist struct { type Wantlist struct {
set map[string]*Entry set map[cid.Cid]*Entry
} }
type Entry struct { type Entry struct {
...@@ -45,13 +45,13 @@ func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priorit ...@@ -45,13 +45,13 @@ func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priorit
func NewThreadSafe() *ThreadSafe { func NewThreadSafe() *ThreadSafe {
return &ThreadSafe{ return &ThreadSafe{
set: make(map[string]*Entry), set: make(map[cid.Cid]*Entry),
} }
} }
func New() *Wantlist { func New() *Wantlist {
return &Wantlist{ return &Wantlist{
set: make(map[string]*Entry), set: make(map[cid.Cid]*Entry),
} }
} }
...@@ -66,13 +66,12 @@ func New() *Wantlist { ...@@ -66,13 +66,12 @@ func New() *Wantlist {
func (w *ThreadSafe) Add(c cid.Cid, priority int, ses uint64) bool { func (w *ThreadSafe) Add(c cid.Cid, priority int, ses uint64) bool {
w.lk.Lock() w.lk.Lock()
defer w.lk.Unlock() defer w.lk.Unlock()
k := c.KeyString() if e, ok := w.set[c]; ok {
if e, ok := w.set[k]; ok {
e.SesTrk[ses] = struct{}{} e.SesTrk[ses] = struct{}{}
return false return false
} }
w.set[k] = &Entry{ w.set[c] = &Entry{
Cid: c, Cid: c,
Priority: priority, Priority: priority,
SesTrk: map[uint64]struct{}{ses: struct{}{}}, SesTrk: map[uint64]struct{}{ses: struct{}{}},
...@@ -85,12 +84,11 @@ func (w *ThreadSafe) Add(c cid.Cid, priority int, ses uint64) bool { ...@@ -85,12 +84,11 @@ func (w *ThreadSafe) Add(c cid.Cid, priority int, ses uint64) bool {
func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool { func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool {
w.lk.Lock() w.lk.Lock()
defer w.lk.Unlock() defer w.lk.Unlock()
k := e.Cid.KeyString() if ex, ok := w.set[e.Cid]; ok {
if ex, ok := w.set[k]; ok {
ex.SesTrk[ses] = struct{}{} ex.SesTrk[ses] = struct{}{}
return false return false
} }
w.set[k] = e w.set[e.Cid] = e
e.SesTrk[ses] = struct{}{} e.SesTrk[ses] = struct{}{}
return true return true
} }
...@@ -102,15 +100,14 @@ func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool { ...@@ -102,15 +100,14 @@ func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool {
func (w *ThreadSafe) Remove(c cid.Cid, ses uint64) bool { func (w *ThreadSafe) Remove(c cid.Cid, ses uint64) bool {
w.lk.Lock() w.lk.Lock()
defer w.lk.Unlock() defer w.lk.Unlock()
k := c.KeyString() e, ok := w.set[c]
e, ok := w.set[k]
if !ok { if !ok {
return false return false
} }
delete(e.SesTrk, ses) delete(e.SesTrk, ses)
if len(e.SesTrk) == 0 { if len(e.SesTrk) == 0 {
delete(w.set, k) delete(w.set, c)
return true return true
} }
return false return false
...@@ -121,7 +118,7 @@ func (w *ThreadSafe) Remove(c cid.Cid, ses uint64) bool { ...@@ -121,7 +118,7 @@ func (w *ThreadSafe) Remove(c cid.Cid, ses uint64) bool {
func (w *ThreadSafe) Contains(k cid.Cid) (*Entry, bool) { func (w *ThreadSafe) Contains(k cid.Cid) (*Entry, bool) {
w.lk.RLock() w.lk.RLock()
defer w.lk.RUnlock() defer w.lk.RUnlock()
e, ok := w.set[k.KeyString()] e, ok := w.set[k]
return e, ok return e, ok
} }
...@@ -152,12 +149,11 @@ func (w *Wantlist) Len() int { ...@@ -152,12 +149,11 @@ func (w *Wantlist) Len() int {
} }
func (w *Wantlist) Add(c cid.Cid, priority int) bool { func (w *Wantlist) Add(c cid.Cid, priority int) bool {
k := c.KeyString() if _, ok := w.set[c]; ok {
if _, ok := w.set[k]; ok {
return false return false
} }
w.set[k] = &Entry{ w.set[c] = &Entry{
Cid: c, Cid: c,
Priority: priority, Priority: priority,
} }
...@@ -166,27 +162,25 @@ func (w *Wantlist) Add(c cid.Cid, priority int) bool { ...@@ -166,27 +162,25 @@ func (w *Wantlist) Add(c cid.Cid, priority int) bool {
} }
func (w *Wantlist) AddEntry(e *Entry) bool { func (w *Wantlist) AddEntry(e *Entry) bool {
k := e.Cid.KeyString() if _, ok := w.set[e.Cid]; ok {
if _, ok := w.set[k]; ok {
return false return false
} }
w.set[k] = e w.set[e.Cid] = e
return true return true
} }
func (w *Wantlist) Remove(c cid.Cid) bool { func (w *Wantlist) Remove(c cid.Cid) bool {
k := c.KeyString() _, ok := w.set[c]
_, ok := w.set[k]
if !ok { if !ok {
return false return false
} }
delete(w.set, k) delete(w.set, c)
return true return true
} }
func (w *Wantlist) Contains(k cid.Cid) (*Entry, bool) { func (w *Wantlist) Contains(c cid.Cid) (*Entry, bool) {
e, ok := w.set[k.KeyString()] e, ok := w.set[c]
return e, ok return e, ok
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment