peerlinktracker.go 2.52 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
package responseassembler

import (
	"sync"

	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/linktracker"
	"github.com/ipld/go-ipld-prime"
)

type peerLinkTracker struct {
	linkTrackerLk sync.RWMutex
	linkTracker   *linktracker.LinkTracker
	altTrackers   map[string]*linktracker.LinkTracker
	dedupKeys     map[graphsync.RequestID]string
}

func newTracker() *peerLinkTracker {
	return &peerLinkTracker{
		linkTracker: linktracker.New(),
		dedupKeys:   make(map[graphsync.RequestID]string),
		altTrackers: make(map[string]*linktracker.LinkTracker),
	}
}

func (prs *peerLinkTracker) getLinkTracker(requestID graphsync.RequestID) *linktracker.LinkTracker {
	key, ok := prs.dedupKeys[requestID]
	if ok {
		return prs.altTrackers[key]
	}
	return prs.linkTracker
}

// DedupKey indicates that outgoing blocks should be deduplicated in a seperate bucket (only with requests that share
// supplied key string)
func (prs *peerLinkTracker) DedupKey(requestID graphsync.RequestID, key string) {
	prs.linkTrackerLk.Lock()
	defer prs.linkTrackerLk.Unlock()
	prs.dedupKeys[requestID] = key
	_, ok := prs.altTrackers[key]
	if !ok {
		prs.altTrackers[key] = linktracker.New()
	}
}

// IgnoreBlocks indicates that a list of keys should be ignored when sending blocks
func (prs *peerLinkTracker) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) {
	prs.linkTrackerLk.Lock()
	linkTracker := prs.getLinkTracker(requestID)
	for _, link := range links {
		linkTracker.RecordLinkTraversal(requestID, link, true)
	}
	prs.linkTrackerLk.Unlock()
}

// FinishTracking clears link tracking data for the request.
func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool {
	prs.linkTrackerLk.Lock()
	defer prs.linkTrackerLk.Unlock()
	linkTracker := prs.getLinkTracker(requestID)
	allBlocks := linkTracker.FinishRequest(requestID)
	key, ok := prs.dedupKeys[requestID]
	if ok {
		delete(prs.dedupKeys, requestID)
		var otherRequestsFound bool
		for _, otherKey := range prs.dedupKeys {
			if otherKey == key {
				otherRequestsFound = true
				break
			}
		}
		if !otherRequestsFound {
			delete(prs.altTrackers, key)
		}
	}
	return allBlocks
}

// RecordLinkTraversal records whether a link is found for a request.
func (prs *peerLinkTracker) RecordLinkTraversal(requestID graphsync.RequestID,
	link ipld.Link, hasBlock bool) (isUnique bool) {
	prs.linkTrackerLk.Lock()
	linkTracker := prs.getLinkTracker(requestID)
	isUnique = linkTracker.BlockRefCount(link) == 0
	linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
	prs.linkTrackerLk.Unlock()
	return
}