strategy.go 3.7 KB
Newer Older
1 2 3 4
package strategy

import (
	"errors"
5
	"sync"
Jeromy's avatar
Jeromy committed
6
	"time"
7 8

	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
9
	peer "github.com/jbenet/go-ipfs/peer"
10 11 12
	u "github.com/jbenet/go-ipfs/util"
)

13 14
const resendTimeoutPeriod = time.Minute

15 16
var log = u.Logger("strategy")

17 18 19 20 21 22 23 24 25 26
// TODO niceness should be on a per-peer basis. Use-case: Certain peers are
// "trusted" and/or controlled by a single human user. The user may want for
// these peers to exchange data freely
func New(nice bool) Strategy {
	var stratFunc strategyFunc
	if nice {
		stratFunc = yesManStrategy
	} else {
		stratFunc = standardStrategy
	}
27 28
	return &strategist{
		ledgerMap:    ledgerMap{},
29
		strategyFunc: stratFunc,
30 31 32 33
	}
}

type strategist struct {
34
	lock sync.RWMutex
35 36 37 38 39 40 41 42 43 44 45
	ledgerMap
	strategyFunc
}

// LedgerMap lists Ledgers by their Partner key.
type ledgerMap map[peerKey]*ledger

// FIXME share this externally
type peerKey u.Key

// Peers returns a list of peers
46
func (s *strategist) Peers() []peer.Peer {
47 48 49
	s.lock.RLock()
	defer s.lock.RUnlock()

50
	response := make([]peer.Peer, 0)
51 52 53 54 55 56
	for _, ledger := range s.ledgerMap {
		response = append(response, ledger.Partner)
	}
	return response
}

57
func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
58 59 60
	s.lock.RLock()
	defer s.lock.RUnlock()

61 62 63 64
	ledger := s.ledger(p)
	return ledger.WantListContains(k)
}

65
func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool {
66 67 68
	s.lock.RLock()
	defer s.lock.RUnlock()

69
	ledger := s.ledger(p)
70

71 72 73
	// Dont resend blocks within a certain time period
	t, ok := ledger.sentToPeer[k]
	if ok && t.Add(resendTimeoutPeriod).After(time.Now()) {
74 75 76
		return false
	}

77 78 79
	return ledger.ShouldSend()
}

80 81 82 83 84
func (s *strategist) BlockSentToPeer(k u.Key, p peer.Peer) {
	s.lock.Lock()
	defer s.lock.Unlock()

	ledger := s.ledger(p)
85
	ledger.sentToPeer[k] = time.Now()
86 87
}

88
func (s *strategist) Seed(int64) {
89 90 91
	s.lock.Lock()
	defer s.lock.Unlock()

92 93 94
	// TODO
}

95 96
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
97
func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
98 99 100
	s.lock.Lock()
	defer s.lock.Unlock()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
101 102 103 104 105 106 107
	// TODO find a more elegant way to handle this check
	if p == nil {
		return errors.New("Strategy received nil peer")
	}
	if m == nil {
		return errors.New("Strategy received nil message")
	}
108 109 110 111 112 113 114 115
	l := s.ledger(p)
	for _, key := range m.Wantlist() {
		l.Wants(key)
	}
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
		l.ReceivedBytes(len(block.Data))
	}
116
	return nil
117 118 119 120 121 122 123 124
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

125
func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
126 127 128
	s.lock.Lock()
	defer s.lock.Unlock()

129 130 131 132
	l := s.ledger(p)
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
	}
133 134 135

	// TODO remove these blocks from peer's want list

136 137 138
	return nil
}

139
func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 {
140 141 142
	s.lock.RLock()
	defer s.lock.RUnlock()

143 144 145
	return s.ledger(p).Accounting.BytesSent
}

146
func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 {
147 148 149
	s.lock.RLock()
	defer s.lock.RUnlock()

150 151 152
	return s.ledger(p).Accounting.BytesRecv
}

153
// ledger lazily instantiates a ledger
154
func (s *strategist) ledger(p peer.Peer) *ledger {
155 156 157 158 159 160 161
	l, ok := s.ledgerMap[peerKey(p.Key())]
	if !ok {
		l = newLedger(p, s.strategyFunc)
		s.ledgerMap[peerKey(p.Key())] = l
	}
	return l
}
Jeromy's avatar
Jeromy committed
162 163 164 165 166 167

func (s *strategist) GetBatchSize() int {
	return 10
}

func (s *strategist) GetRebroadcastDelay() time.Duration {
Jeromy's avatar
Jeromy committed
168
	return time.Second * 5
Jeromy's avatar
Jeromy committed
169
}