sessionrequestsplitter.go 4.01 KB
Newer Older
1 2 3 4 5
package sessionrequestsplitter

import (
	"context"

6 7
	bssd "github.com/ipfs/go-bitswap/sessiondata"

8
	"github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
9
	"github.com/libp2p/go-libp2p-core/peer"
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
)

const (
	minReceivedToAdjustSplit = 2
	maxSplit                 = 16
	maxAcceptableDupes       = 0.4
	minDuplesToTryLessSplits = 0.2
	initialSplit             = 2
)

type srsMessage interface {
	handle(srs *SessionRequestSplitter)
}

// SessionRequestSplitter track how many duplicate and unique blocks come in and
// uses that to determine how much to split up each set of wants among peers.
type SessionRequestSplitter struct {
	ctx      context.Context
	messages chan srsMessage

	// data, do not touch outside run loop
	receivedCount          int
	split                  int
	duplicateReceivedCount int
}

// New returns a new SessionRequestSplitter.
func New(ctx context.Context) *SessionRequestSplitter {
	srs := &SessionRequestSplitter{
		ctx:      ctx,
		messages: make(chan srsMessage, 10),
		split:    initialSplit,
	}
	go srs.run()
	return srs
}

// SplitRequest splits a request for the given cids one or more times among the
// given peers.
49 50
func (srs *SessionRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, ks []cid.Cid) []bssd.PartialRequest {
	resp := make(chan []bssd.PartialRequest, 1)
51 52

	select {
53
	case srs.messages <- &splitRequestMessage{optimizedPeers, ks, resp}:
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
	case <-srs.ctx.Done():
		return nil
	}
	select {
	case splitRequests := <-resp:
		return splitRequests
	case <-srs.ctx.Done():
		return nil
	}

}

// RecordDuplicateBlock records the fact that the session received a duplicate
// block and adjusts split factor as neccesary.
func (srs *SessionRequestSplitter) RecordDuplicateBlock() {
	select {
	case srs.messages <- &recordDuplicateMessage{}:
	case <-srs.ctx.Done():
	}
}

// RecordUniqueBlock records the fact that the session received unique block
// and adjusts the split factor as neccesary.
func (srs *SessionRequestSplitter) RecordUniqueBlock() {
	select {
	case srs.messages <- &recordUniqueMessage{}:
	case <-srs.ctx.Done():
	}
}

func (srs *SessionRequestSplitter) run() {
	for {
		select {
		case message := <-srs.messages:
			message.handle(srs)
		case <-srs.ctx.Done():
			return
		}
	}
}

func (srs *SessionRequestSplitter) duplicateRatio() float64 {
	return float64(srs.duplicateReceivedCount) / float64(srs.receivedCount)
}

type splitRequestMessage struct {
100 101 102
	optimizedPeers []bssd.OptimizedPeer
	ks             []cid.Cid
	resp           chan []bssd.PartialRequest
103 104 105 106
}

func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) {
	split := srs.split
107 108 109 110 111
	// first iteration ignore optimization ratings
	peers := make([]peer.ID, len(s.optimizedPeers))
	for i, optimizedPeer := range s.optimizedPeers {
		peers[i] = optimizedPeer.Peer
	}
112 113 114 115 116 117 118 119 120
	ks := s.ks
	if len(peers) < split {
		split = len(peers)
	}
	peerSplits := splitPeers(peers, split)
	if len(ks) < split {
		split = len(ks)
	}
	keySplits := splitKeys(ks, split)
121 122 123
	splitRequests := make([]bssd.PartialRequest, 0, len(keySplits))
	for i, keySplit := range keySplits {
		splitRequests = append(splitRequests, bssd.PartialRequest{Peers: peerSplits[i], Keys: keySplit})
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
	}
	s.resp <- splitRequests
}

type recordDuplicateMessage struct{}

func (r *recordDuplicateMessage) handle(srs *SessionRequestSplitter) {
	srs.receivedCount++
	srs.duplicateReceivedCount++
	if (srs.receivedCount > minReceivedToAdjustSplit) && (srs.duplicateRatio() > maxAcceptableDupes) && (srs.split < maxSplit) {
		srs.split++
	}
}

type recordUniqueMessage struct{}

func (r *recordUniqueMessage) handle(srs *SessionRequestSplitter) {
	srs.receivedCount++
	if (srs.split > 1) && (srs.duplicateRatio() < minDuplesToTryLessSplits) {
		srs.split--
	}

}
func splitKeys(ks []cid.Cid, split int) [][]cid.Cid {
	splits := make([][]cid.Cid, split)
	for i, c := range ks {
		pos := i % split
		splits[pos] = append(splits[pos], c)
	}
	return splits
}

func splitPeers(peers []peer.ID, split int) [][]peer.ID {
	splits := make([][]peer.ID, split)
	for i, p := range peers {
		pos := i % split
		splits[pos] = append(splits[pos], p)
	}
	return splits
}