strategy.go 5.08 KB
Newer Older
1 2 3 4
package strategy

import (
	"errors"
5
	"sync"
Jeromy's avatar
Jeromy committed
6
	"time"
7

Jeromy's avatar
Jeromy committed
8 9
	blocks "github.com/jbenet/go-ipfs/blocks"
	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
10
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
Jeromy's avatar
Jeromy committed
11
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
12
	peer "github.com/jbenet/go-ipfs/peer"
13 14 15
	u "github.com/jbenet/go-ipfs/util"
)

16 17
const resendTimeoutPeriod = time.Minute

18 19
var log = u.Logger("strategy")

20 21 22 23 24 25 26 27 28 29
// TODO niceness should be on a per-peer basis. Use-case: Certain peers are
// "trusted" and/or controlled by a single human user. The user may want for
// these peers to exchange data freely
func New(nice bool) Strategy {
	var stratFunc strategyFunc
	if nice {
		stratFunc = yesManStrategy
	} else {
		stratFunc = standardStrategy
	}
30 31
	return &strategist{
		ledgerMap:    ledgerMap{},
32
		strategyFunc: stratFunc,
33 34 35 36
	}
}

type strategist struct {
37
	lock sync.RWMutex
38 39 40 41 42 43 44 45 46 47 48
	ledgerMap
	strategyFunc
}

// LedgerMap lists Ledgers by their Partner key.
type ledgerMap map[peerKey]*ledger

// FIXME share this externally
type peerKey u.Key

// Peers returns a list of peers
49
func (s *strategist) Peers() []peer.Peer {
50 51 52
	s.lock.RLock()
	defer s.lock.RUnlock()

53
	response := make([]peer.Peer, 0)
54 55 56 57 58 59
	for _, ledger := range s.ledgerMap {
		response = append(response, ledger.Partner)
	}
	return response
}

60
func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
61 62 63
	s.lock.RLock()
	defer s.lock.RUnlock()

64 65 66 67
	ledger := s.ledger(p)
	return ledger.WantListContains(k)
}

68
func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool {
69 70 71
	s.lock.RLock()
	defer s.lock.RUnlock()

72
	ledger := s.ledger(p)
73

74 75 76
	// Dont resend blocks within a certain time period
	t, ok := ledger.sentToPeer[k]
	if ok && t.Add(resendTimeoutPeriod).After(time.Now()) {
77 78 79
		return false
	}

80 81 82
	return ledger.ShouldSend()
}

Jeromy's avatar
Jeromy committed
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
type Task struct {
	Peer   peer.Peer
	Blocks []*blocks.Block
}

func (s *strategist) GetAllocation(bandwidth int, bs bstore.Blockstore) ([]*Task, error) {
	var tasks []*Task

	s.lock.RLock()
	defer s.lock.RUnlock()
	var partners []peer.Peer
	for _, ledger := range s.ledgerMap {
		if ledger.ShouldSend() {
			partners = append(partners, ledger.Partner)
		}
	}
	if len(partners) == 0 {
		return nil, nil
	}

	bandwidthPerPeer := bandwidth / len(partners)
	for _, p := range partners {
		blksForPeer, err := s.getSendableBlocks(s.ledger(p).wantList, bs, bandwidthPerPeer)
		if err != nil {
			return nil, err
		}
		tasks = append(tasks, &Task{
			Peer:   p,
			Blocks: blksForPeer,
		})
	}

	return tasks, nil
}

func (s *strategist) getSendableBlocks(wantlist *wl.Wantlist, bs bstore.Blockstore, bw int) ([]*blocks.Block, error) {
	var outblocks []*blocks.Block
	for _, e := range wantlist.Entries() {
		block, err := bs.Get(e.Value)
		if err == u.ErrNotFound {
			continue
		}
		if err != nil {
			return nil, err
		}
		outblocks = append(outblocks, block)
		bw -= len(block.Data)
		if bw <= 0 {
			break
		}
	}
	return outblocks, nil
}

137 138 139 140 141
func (s *strategist) BlockSentToPeer(k u.Key, p peer.Peer) {
	s.lock.Lock()
	defer s.lock.Unlock()

	ledger := s.ledger(p)
142
	ledger.sentToPeer[k] = time.Now()
143 144
}

145
func (s *strategist) Seed(int64) {
146 147 148
	s.lock.Lock()
	defer s.lock.Unlock()

149 150 151
	// TODO
}

152 153
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
154
func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
155 156 157
	s.lock.Lock()
	defer s.lock.Unlock()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
158 159 160 161 162 163 164
	// TODO find a more elegant way to handle this check
	if p == nil {
		return errors.New("Strategy received nil peer")
	}
	if m == nil {
		return errors.New("Strategy received nil message")
	}
165
	l := s.ledger(p)
Jeromy's avatar
Jeromy committed
166 167 168 169 170 171 172 173 174
	if m.Full() {
		l.wantList = wl.NewWantlist()
	}
	for _, e := range m.Wantlist() {
		if e.Cancel {
			l.CancelWant(e.Key)
		} else {
			l.Wants(e.Key, e.Priority)
		}
175 176 177 178 179
	}
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
		l.ReceivedBytes(len(block.Data))
	}
180
	return nil
181 182 183 184 185 186 187 188
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

189
func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
190 191 192
	s.lock.Lock()
	defer s.lock.Unlock()

193 194 195 196
	l := s.ledger(p)
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
	}
197 198 199

	// TODO remove these blocks from peer's want list

200 201 202
	return nil
}

203
func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 {
204 205 206
	s.lock.RLock()
	defer s.lock.RUnlock()

207 208 209
	return s.ledger(p).Accounting.BytesSent
}

210
func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 {
211 212 213
	s.lock.RLock()
	defer s.lock.RUnlock()

214 215 216
	return s.ledger(p).Accounting.BytesRecv
}

217
// ledger lazily instantiates a ledger
218
func (s *strategist) ledger(p peer.Peer) *ledger {
219 220 221 222 223 224 225
	l, ok := s.ledgerMap[peerKey(p.Key())]
	if !ok {
		l = newLedger(p, s.strategyFunc)
		s.ledgerMap[peerKey(p.Key())] = l
	}
	return l
}
Jeromy's avatar
Jeromy committed
226 227 228 229 230 231

func (s *strategist) GetBatchSize() int {
	return 10
}

func (s *strategist) GetRebroadcastDelay() time.Duration {
Jeromy's avatar
Jeromy committed
232
	return time.Second * 10
Jeromy's avatar
Jeromy committed
233
}