virtual.go 8.03 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
5
	"errors"
6
	"sort"
Steven Allen's avatar
Steven Allen committed
7
	"sync"
8
	"sync/atomic"
9
	"time"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
10

Jeromy's avatar
Jeromy committed
11 12
	bsmsg "github.com/ipfs/go-bitswap/message"
	bsnet "github.com/ipfs/go-bitswap/network"
13

Jeromy's avatar
Jeromy committed
14 15 16 17 18 19 20
	cid "github.com/ipfs/go-cid"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
	logging "github.com/ipfs/go-log"
	ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
	peer "github.com/libp2p/go-libp2p-peer"
	routing "github.com/libp2p/go-libp2p-routing"
21
	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
Jeromy's avatar
Jeromy committed
22
	testutil "github.com/libp2p/go-testutil"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
23 24
)

25 26
var log = logging.Logger("bstestnet")

27 28
// VirtualNetwork generates a new testnet instance - a fake network that
// is used to simulate sending messages.
29
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30
	return &network{
31 32 33 34 35 36 37 38 39 40
		latencies:          make(map[peer.ID]map[peer.ID]time.Duration),
		clients:            make(map[peer.ID]*receiverQueue),
		delay:              d,
		routingserver:      rs,
		isRateLimited:      false,
		rateLimitGenerator: nil,
		conns:              make(map[string]struct{}),
	}
}

41
// RateLimitGenerator is an interface for generating rate limits across peers
42 43 44 45
type RateLimitGenerator interface {
	NextRateLimit() float64
}

46 47
// RateLimitedVirtualNetwork generates a testnet instance where nodes are rate
// limited in the upload/download speed.
48 49 50
func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network {
	return &network{
		latencies:          make(map[peer.ID]map[peer.ID]time.Duration),
51
		rateLimiters:       make(map[peer.ID]map[peer.ID]*mocknet.RateLimiter),
52 53 54 55 56 57
		clients:            make(map[peer.ID]*receiverQueue),
		delay:              d,
		routingserver:      rs,
		isRateLimited:      true,
		rateLimitGenerator: rateLimitGenerator,
		conns:              make(map[string]struct{}),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58 59 60 61
	}
}

type network struct {
62 63
	mu                 sync.Mutex
	latencies          map[peer.ID]map[peer.ID]time.Duration
64
	rateLimiters       map[peer.ID]map[peer.ID]*mocknet.RateLimiter
65 66 67 68 69 70
	clients            map[peer.ID]*receiverQueue
	routingserver      mockrouting.Server
	delay              delay.D
	isRateLimited      bool
	rateLimitGenerator RateLimitGenerator
	conns              map[string]struct{}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
71 72
}

73 74 75 76 77 78 79 80 81 82
type message struct {
	from       peer.ID
	msg        bsmsg.BitSwapMessage
	shouldSend time.Time
}

// receiverQueue queues up a set of messages to be sent, and sends them *in
// order* with their delays respected as much as sending them in order allows
// for
type receiverQueue struct {
83
	receiver *networkClient
84 85 86 87 88
	queue    []*message
	active   bool
	lk       sync.Mutex
}

89
func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork {
Steven Allen's avatar
Steven Allen committed
90 91 92
	n.mu.Lock()
	defer n.mu.Unlock()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
93
	client := &networkClient{
94
		local:   p.ID(),
95
		network: n,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
96
		routing: n.routingserver.Client(p),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
97
	}
98
	n.clients[p.ID()] = &receiverQueue{receiver: client}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
99 100 101
	return client
}

102
func (n *network) HasPeer(p peer.ID) bool {
Steven Allen's avatar
Steven Allen committed
103 104 105
	n.mu.Lock()
	defer n.mu.Unlock()

106
	_, found := n.clients[p]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107 108 109
	return found
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
110 111 112 113
// TODO should this be completely asynchronous?
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
	ctx context.Context,
114 115
	from peer.ID,
	to peer.ID,
116
	mes bsmsg.BitSwapMessage) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
117

Steven Allen's avatar
Steven Allen committed
118 119 120
	n.mu.Lock()
	defer n.mu.Unlock()

121 122 123 124 125 126 127 128 129 130 131 132
	latencies, ok := n.latencies[from]
	if !ok {
		latencies = make(map[peer.ID]time.Duration)
		n.latencies[from] = latencies
	}

	latency, ok := latencies[to]
	if !ok {
		latency = n.delay.NextWaitTime()
		latencies[to] = latency
	}

133 134 135 136
	var bandwidthDelay time.Duration
	if n.isRateLimited {
		rateLimiters, ok := n.rateLimiters[from]
		if !ok {
137
			rateLimiters = make(map[peer.ID]*mocknet.RateLimiter)
138 139 140
			n.rateLimiters[from] = rateLimiters
		}

141
		rateLimiter, ok := rateLimiters[to]
142
		if !ok {
143 144
			rateLimiter = mocknet.NewRateLimiter(n.rateLimitGenerator.NextRateLimit())
			rateLimiters[to] = rateLimiter
145 146 147
		}

		size := mes.ToProtoV1().Size()
148
		bandwidthDelay = rateLimiter.Limit(size)
149 150 151 152
	} else {
		bandwidthDelay = 0
	}

153
	receiver, ok := n.clients[to]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
154
	if !ok {
Łukasz Magiera's avatar
Łukasz Magiera committed
155
		return errors.New("cannot locate peer on network")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
156 157 158 159 160
	}

	// nb: terminate the context since the context wouldn't actually be passed
	// over the network in a real scenario

161 162 163
	msg := &message{
		from:       from,
		msg:        mes,
164
		shouldSend: time.Now().Add(latency).Add(bandwidthDelay),
165 166
	}
	receiver.enqueue(msg)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
167 168 169 170 171

	return nil
}

type networkClient struct {
172
	local peer.ID
Brian Tiger Chow's avatar
Brian Tiger Chow committed
173
	bsnet.Receiver
174
	network *network
175
	routing routing.IpfsRouting
176
	stats   bsnet.Stats
Brian Tiger Chow's avatar
Brian Tiger Chow committed
177 178 179 180
}

func (nc *networkClient) SendMessage(
	ctx context.Context,
181
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
182
	message bsmsg.BitSwapMessage) error {
183 184 185 186 187 188 189
	if err := nc.network.SendMessage(ctx, nc.local, to, message); err != nil {
		return err
	}
	atomic.AddUint64(&nc.stats.MessagesSent, 1)
	return nil
}

190 191
func (nc *networkClient) Stats() bsnet.Stats {
	return bsnet.Stats{
192 193 194
		MessagesRecvd: atomic.LoadUint64(&nc.stats.MessagesRecvd),
		MessagesSent:  atomic.LoadUint64(&nc.stats.MessagesSent),
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
195 196
}

197
// FindProvidersAsync returns a channel of providers for the given key.
198
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID {
199 200 201 202 203 204 205 206 207

	// NB: this function duplicates the PeerInfo -> ID transformation in the
	// bitswap network adapter. Not to worry. This network client will be
	// deprecated once the ipfsnet.Mock is added. The code below is only
	// temporary.

	out := make(chan peer.ID)
	go func() {
		defer close(out)
208
		providers := nc.routing.FindProvidersAsync(ctx, k, max)
209 210 211 212 213 214 215 216
		for info := range providers {
			select {
			case <-ctx.Done():
			case out <- info.ID:
			}
		}
	}()
	return out
217 218
}

219 220 221 222
func (nc *networkClient) ConnectionManager() ifconnmgr.ConnManager {
	return &ifconnmgr.NullConnMgr{}
}

Jeromy's avatar
Jeromy committed
223
type messagePasser struct {
224
	net    *networkClient
Jeromy's avatar
Jeromy committed
225 226 227 228 229
	target peer.ID
	local  peer.ID
	ctx    context.Context
}

230
func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error {
231
	return mp.net.SendMessage(ctx, mp.target, m)
Jeromy's avatar
Jeromy committed
232 233 234 235 236 237
}

func (mp *messagePasser) Close() error {
	return nil
}

238 239 240 241
func (mp *messagePasser) Reset() error {
	return nil
}

242
func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
Jeromy's avatar
Jeromy committed
243
	return &messagePasser{
244
		net:    nc,
Jeromy's avatar
Jeromy committed
245
		target: p,
246
		local:  nc.local,
Jeromy's avatar
Jeromy committed
247 248 249 250
		ctx:    ctx,
	}, nil
}

251
// Provide provides the key to the network.
252
func (nc *networkClient) Provide(ctx context.Context, k cid.Cid) error {
253
	return nc.routing.Provide(ctx, k, true)
254 255
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
256 257 258
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
	nc.Receiver = r
}
259 260

func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error {
Steven Allen's avatar
Steven Allen committed
261 262 263 264 265
	nc.network.mu.Lock()

	otherClient, ok := nc.network.clients[p]
	if !ok {
		nc.network.mu.Unlock()
266 267
		return errors.New("no such peer in network")
	}
Steven Allen's avatar
Steven Allen committed
268

269 270
	tag := tagForPeers(nc.local, p)
	if _, ok := nc.network.conns[tag]; ok {
Steven Allen's avatar
Steven Allen committed
271
		nc.network.mu.Unlock()
272 273 274 275
		log.Warning("ALREADY CONNECTED TO PEER (is this a reconnect? test lib needs fixing)")
		return nil
	}
	nc.network.conns[tag] = struct{}{}
Steven Allen's avatar
Steven Allen committed
276 277
	nc.network.mu.Unlock()

278 279
	// TODO: add handling for disconnects

280
	otherClient.receiver.PeerConnected(nc.local)
281 282 283
	nc.Receiver.PeerConnected(p)
	return nil
}
284

285 286 287 288 289 290 291 292 293 294
func (rq *receiverQueue) enqueue(m *message) {
	rq.lk.Lock()
	defer rq.lk.Unlock()
	rq.queue = append(rq.queue, m)
	if !rq.active {
		rq.active = true
		go rq.process()
	}
}

295 296 297 298 299 300 301 302 303 304 305 306
func (rq *receiverQueue) Swap(i, j int) {
	rq.queue[i], rq.queue[j] = rq.queue[j], rq.queue[i]
}

func (rq *receiverQueue) Len() int {
	return len(rq.queue)
}

func (rq *receiverQueue) Less(i, j int) bool {
	return rq.queue[i].shouldSend.UnixNano() < rq.queue[j].shouldSend.UnixNano()
}

307 308 309
func (rq *receiverQueue) process() {
	for {
		rq.lk.Lock()
310
		sort.Sort(rq)
311 312 313 314 315 316
		if len(rq.queue) == 0 {
			rq.active = false
			rq.lk.Unlock()
			return
		}
		m := rq.queue[0]
317 318 319 320 321 322 323 324 325 326
		if time.Until(m.shouldSend).Seconds() < 0.1 {
			rq.queue = rq.queue[1:]
			rq.lk.Unlock()
			time.Sleep(time.Until(m.shouldSend))
			atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
			rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
		} else {
			rq.lk.Unlock()
			time.Sleep(100 * time.Millisecond)
		}
327 328 329
	}
}

330 331 332 333 334 335
func tagForPeers(a, b peer.ID) string {
	if a < b {
		return string(a + b)
	}
	return string(b + a)
}