sessionrequestsplitter.go 3.82 KB
Newer Older
1 2 3 4 5 6
package sessionrequestsplitter

import (
	"context"

	"github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
7
	"github.com/libp2p/go-libp2p-core/peer"
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
)

const (
	minReceivedToAdjustSplit = 2
	maxSplit                 = 16
	maxAcceptableDupes       = 0.4
	minDuplesToTryLessSplits = 0.2
	initialSplit             = 2
)

// PartialRequest is represents one slice of an over request split among peers
type PartialRequest struct {
	Peers []peer.ID
	Keys  []cid.Cid
}

type srsMessage interface {
	handle(srs *SessionRequestSplitter)
}

// SessionRequestSplitter track how many duplicate and unique blocks come in and
// uses that to determine how much to split up each set of wants among peers.
type SessionRequestSplitter struct {
	ctx      context.Context
	messages chan srsMessage

	// data, do not touch outside run loop
	receivedCount          int
	split                  int
	duplicateReceivedCount int
}

// New returns a new SessionRequestSplitter.
func New(ctx context.Context) *SessionRequestSplitter {
	srs := &SessionRequestSplitter{
		ctx:      ctx,
		messages: make(chan srsMessage, 10),
		split:    initialSplit,
	}
	go srs.run()
	return srs
}

// SplitRequest splits a request for the given cids one or more times among the
// given peers.
func (srs *SessionRequestSplitter) SplitRequest(peers []peer.ID, ks []cid.Cid) []*PartialRequest {
54
	resp := make(chan []*PartialRequest, 1)
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163

	select {
	case srs.messages <- &splitRequestMessage{peers, ks, resp}:
	case <-srs.ctx.Done():
		return nil
	}
	select {
	case splitRequests := <-resp:
		return splitRequests
	case <-srs.ctx.Done():
		return nil
	}

}

// RecordDuplicateBlock records the fact that the session received a duplicate
// block and adjusts split factor as neccesary.
func (srs *SessionRequestSplitter) RecordDuplicateBlock() {
	select {
	case srs.messages <- &recordDuplicateMessage{}:
	case <-srs.ctx.Done():
	}
}

// RecordUniqueBlock records the fact that the session received unique block
// and adjusts the split factor as neccesary.
func (srs *SessionRequestSplitter) RecordUniqueBlock() {
	select {
	case srs.messages <- &recordUniqueMessage{}:
	case <-srs.ctx.Done():
	}
}

func (srs *SessionRequestSplitter) run() {
	for {
		select {
		case message := <-srs.messages:
			message.handle(srs)
		case <-srs.ctx.Done():
			return
		}
	}
}

func (srs *SessionRequestSplitter) duplicateRatio() float64 {
	return float64(srs.duplicateReceivedCount) / float64(srs.receivedCount)
}

type splitRequestMessage struct {
	peers []peer.ID
	ks    []cid.Cid
	resp  chan []*PartialRequest
}

func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) {
	split := srs.split
	peers := s.peers
	ks := s.ks
	if len(peers) < split {
		split = len(peers)
	}
	peerSplits := splitPeers(peers, split)
	if len(ks) < split {
		split = len(ks)
	}
	keySplits := splitKeys(ks, split)
	splitRequests := make([]*PartialRequest, len(keySplits))
	for i := range splitRequests {
		splitRequests[i] = &PartialRequest{peerSplits[i], keySplits[i]}
	}
	s.resp <- splitRequests
}

type recordDuplicateMessage struct{}

func (r *recordDuplicateMessage) handle(srs *SessionRequestSplitter) {
	srs.receivedCount++
	srs.duplicateReceivedCount++
	if (srs.receivedCount > minReceivedToAdjustSplit) && (srs.duplicateRatio() > maxAcceptableDupes) && (srs.split < maxSplit) {
		srs.split++
	}
}

type recordUniqueMessage struct{}

func (r *recordUniqueMessage) handle(srs *SessionRequestSplitter) {
	srs.receivedCount++
	if (srs.split > 1) && (srs.duplicateRatio() < minDuplesToTryLessSplits) {
		srs.split--
	}

}
func splitKeys(ks []cid.Cid, split int) [][]cid.Cid {
	splits := make([][]cid.Cid, split)
	for i, c := range ks {
		pos := i % split
		splits[pos] = append(splits[pos], c)
	}
	return splits
}

func splitPeers(peers []peer.ID, split int) [][]peer.ID {
	splits := make([][]peer.ID, split)
	for i, p := range peers {
		pos := i % split
		splits[pos] = append(splits[pos], p)
	}
	return splits
}