virtual.go 9.72 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
5
	"errors"
6
	"sort"
Steven Allen's avatar
Steven Allen committed
7
	"sync"
8
	"sync/atomic"
9
	"time"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
10

11 12 13 14 15 16 17 18 19 20 21 22 23 24
	bsmsg "gitlab.dms3.io/dms3/go-bitswap/message"
	bsnet "gitlab.dms3.io/dms3/go-bitswap/network"

	cid "gitlab.dms3.io/dms3/go-cid"
	delay "gitlab.dms3.io/dms3/go-dms3-delay"
	mockrouting "gitlab.dms3.io/dms3/go-dms3-routing/mock"

	"gitlab.dms3.io/p2p/go-p2p-core/connmgr"
	"gitlab.dms3.io/p2p/go-p2p-core/peer"
	protocol "gitlab.dms3.io/p2p/go-p2p-core/protocol"
	"gitlab.dms3.io/p2p/go-p2p-core/routing"
	tnet "gitlab.dms3.io/p2p/go-p2p-testing/net"
	mocknet "gitlab.dms3.io/p2p/go-p2p/p2p/net/mock"
	"gitlab.dms3.io/p2p/go-p2p/p2p/protocol/ping"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
25 26
)

27 28
// VirtualNetwork generates a new testnet instance - a fake network that
// is used to simulate sending messages.
29
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30
	return &network{
31 32 33 34 35 36 37 38 39 40
		latencies:          make(map[peer.ID]map[peer.ID]time.Duration),
		clients:            make(map[peer.ID]*receiverQueue),
		delay:              d,
		routingserver:      rs,
		isRateLimited:      false,
		rateLimitGenerator: nil,
		conns:              make(map[string]struct{}),
	}
}

41
// RateLimitGenerator is an interface for generating rate limits across peers
42 43 44 45
type RateLimitGenerator interface {
	NextRateLimit() float64
}

46 47
// RateLimitedVirtualNetwork generates a testnet instance where nodes are rate
// limited in the upload/download speed.
48 49 50
func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network {
	return &network{
		latencies:          make(map[peer.ID]map[peer.ID]time.Duration),
51
		rateLimiters:       make(map[peer.ID]map[peer.ID]*mocknet.RateLimiter),
52 53 54 55 56 57
		clients:            make(map[peer.ID]*receiverQueue),
		delay:              d,
		routingserver:      rs,
		isRateLimited:      true,
		rateLimitGenerator: rateLimitGenerator,
		conns:              make(map[string]struct{}),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58 59 60 61
	}
}

type network struct {
62 63
	mu                 sync.Mutex
	latencies          map[peer.ID]map[peer.ID]time.Duration
64
	rateLimiters       map[peer.ID]map[peer.ID]*mocknet.RateLimiter
65 66 67 68 69 70
	clients            map[peer.ID]*receiverQueue
	routingserver      mockrouting.Server
	delay              delay.D
	isRateLimited      bool
	rateLimitGenerator RateLimitGenerator
	conns              map[string]struct{}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
71 72
}

73 74 75 76 77 78 79 80 81 82
type message struct {
	from       peer.ID
	msg        bsmsg.BitSwapMessage
	shouldSend time.Time
}

// receiverQueue queues up a set of messages to be sent, and sends them *in
// order* with their delays respected as much as sending them in order allows
// for
type receiverQueue struct {
83
	receiver *networkClient
84 85 86 87 88
	queue    []*message
	active   bool
	lk       sync.Mutex
}

dirkmc's avatar
dirkmc committed
89
func (n *network) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork {
Steven Allen's avatar
Steven Allen committed
90 91 92
	n.mu.Lock()
	defer n.mu.Unlock()

93 94 95 96 97 98 99 100 101 102 103 104
	s := bsnet.Settings{
		SupportedProtocols: []protocol.ID{
			bsnet.ProtocolBitswap,
			bsnet.ProtocolBitswapOneOne,
			bsnet.ProtocolBitswapOneZero,
			bsnet.ProtocolBitswapNoVers,
		},
	}
	for _, opt := range opts {
		opt(&s)
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
105
	client := &networkClient{
106 107 108 109
		local:              p.ID(),
		network:            n,
		routing:            n.routingserver.Client(p),
		supportedProtocols: s.SupportedProtocols,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
110
	}
111
	n.clients[p.ID()] = &receiverQueue{receiver: client}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
112 113 114
	return client
}

115
func (n *network) HasPeer(p peer.ID) bool {
Steven Allen's avatar
Steven Allen committed
116 117 118
	n.mu.Lock()
	defer n.mu.Unlock()

119
	_, found := n.clients[p]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120 121 122
	return found
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
123 124 125 126
// TODO should this be completely asynchronous?
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
	ctx context.Context,
127 128
	from peer.ID,
	to peer.ID,
129
	mes bsmsg.BitSwapMessage) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
130

Dirk McCormick's avatar
Dirk McCormick committed
131 132
	mes = mes.Clone()

Steven Allen's avatar
Steven Allen committed
133 134 135
	n.mu.Lock()
	defer n.mu.Unlock()

136 137 138 139 140 141 142 143 144 145 146 147
	latencies, ok := n.latencies[from]
	if !ok {
		latencies = make(map[peer.ID]time.Duration)
		n.latencies[from] = latencies
	}

	latency, ok := latencies[to]
	if !ok {
		latency = n.delay.NextWaitTime()
		latencies[to] = latency
	}

148 149 150 151
	var bandwidthDelay time.Duration
	if n.isRateLimited {
		rateLimiters, ok := n.rateLimiters[from]
		if !ok {
152
			rateLimiters = make(map[peer.ID]*mocknet.RateLimiter)
153 154 155
			n.rateLimiters[from] = rateLimiters
		}

156
		rateLimiter, ok := rateLimiters[to]
157
		if !ok {
158 159
			rateLimiter = mocknet.NewRateLimiter(n.rateLimitGenerator.NextRateLimit())
			rateLimiters[to] = rateLimiter
160 161 162
		}

		size := mes.ToProtoV1().Size()
163
		bandwidthDelay = rateLimiter.Limit(size)
164 165 166 167
	} else {
		bandwidthDelay = 0
	}

168
	receiver, ok := n.clients[to]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
169
	if !ok {
Łukasz Magiera's avatar
Łukasz Magiera committed
170
		return errors.New("cannot locate peer on network")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
171 172 173 174 175
	}

	// nb: terminate the context since the context wouldn't actually be passed
	// over the network in a real scenario

176 177 178
	msg := &message{
		from:       from,
		msg:        mes,
179
		shouldSend: time.Now().Add(latency).Add(bandwidthDelay),
180 181
	}
	receiver.enqueue(msg)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
182 183 184 185 186

	return nil
}

type networkClient struct {
187 188 189
	// These need to be at the top of the struct (allocated on the heap) for alignment on 32bit platforms.
	stats bsnet.Stats

190
	local peer.ID
Brian Tiger Chow's avatar
Brian Tiger Chow committed
191
	bsnet.Receiver
192 193 194
	network            *network
	routing            routing.Routing
	supportedProtocols []protocol.ID
Brian Tiger Chow's avatar
Brian Tiger Chow committed
195 196
}

dirkmc's avatar
dirkmc committed
197 198 199 200
func (nc *networkClient) Self() peer.ID {
	return nc.local
}

201 202 203 204 205 206 207 208 209 210
func (nc *networkClient) Ping(ctx context.Context, p peer.ID) ping.Result {
	return ping.Result{RTT: nc.Latency(p)}
}

func (nc *networkClient) Latency(p peer.ID) time.Duration {
	nc.network.mu.Lock()
	defer nc.network.mu.Unlock()
	return nc.network.latencies[nc.local][p]
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
211 212
func (nc *networkClient) SendMessage(
	ctx context.Context,
213
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
214
	message bsmsg.BitSwapMessage) error {
215 216 217 218 219 220 221
	if err := nc.network.SendMessage(ctx, nc.local, to, message); err != nil {
		return err
	}
	atomic.AddUint64(&nc.stats.MessagesSent, 1)
	return nil
}

222 223
func (nc *networkClient) Stats() bsnet.Stats {
	return bsnet.Stats{
224 225 226
		MessagesRecvd: atomic.LoadUint64(&nc.stats.MessagesRecvd),
		MessagesSent:  atomic.LoadUint64(&nc.stats.MessagesSent),
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
227 228
}

229
// FindProvidersAsync returns a channel of providers for the given key.
230
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID {
Raúl Kripalani's avatar
Raúl Kripalani committed
231
	// NB: this function duplicates the AddrInfo -> ID transformation in the
232
	// bitswap network adapter. Not to worry. This network client will be
233
	// deprecated once the dms3net.Mock is added. The code below is only
234 235 236 237 238
	// temporary.

	out := make(chan peer.ID)
	go func() {
		defer close(out)
239
		providers := nc.routing.FindProvidersAsync(ctx, k, max)
240 241 242 243 244 245 246 247
		for info := range providers {
			select {
			case <-ctx.Done():
			case out <- info.ID:
			}
		}
	}()
	return out
248 249
}

Raúl Kripalani's avatar
Raúl Kripalani committed
250 251
func (nc *networkClient) ConnectionManager() connmgr.ConnManager {
	return &connmgr.NullConnMgr{}
252 253
}

Jeromy's avatar
Jeromy committed
254
type messagePasser struct {
255
	net    *networkClient
Jeromy's avatar
Jeromy committed
256 257 258 259 260
	target peer.ID
	local  peer.ID
	ctx    context.Context
}

261
func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error {
262
	return mp.net.SendMessage(ctx, mp.target, m)
Jeromy's avatar
Jeromy committed
263 264 265 266 267 268
}

func (mp *messagePasser) Close() error {
	return nil
}

269 270 271 272
func (mp *messagePasser) Reset() error {
	return nil
}

273 274 275 276 277 278
var oldProtos = map[protocol.ID]struct{}{
	bsnet.ProtocolBitswapNoVers:  struct{}{},
	bsnet.ProtocolBitswapOneZero: struct{}{},
	bsnet.ProtocolBitswapOneOne:  struct{}{},
}

dirkmc's avatar
dirkmc committed
279
func (mp *messagePasser) SupportsHave() bool {
280 281 282 283 284 285 286
	protos := mp.net.network.clients[mp.target].receiver.supportedProtocols
	for _, proto := range protos {
		if _, ok := oldProtos[proto]; !ok {
			return true
		}
	}
	return false
dirkmc's avatar
dirkmc committed
287 288
}

289
func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, opts *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) {
Jeromy's avatar
Jeromy committed
290
	return &messagePasser{
291
		net:    nc,
Jeromy's avatar
Jeromy committed
292
		target: p,
293
		local:  nc.local,
Jeromy's avatar
Jeromy committed
294 295 296 297
		ctx:    ctx,
	}, nil
}

298
// Provide provides the key to the network.
299
func (nc *networkClient) Provide(ctx context.Context, k cid.Cid) error {
300
	return nc.routing.Provide(ctx, k, true)
301 302
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
303 304 305
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
	nc.Receiver = r
}
306 307

func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error {
Steven Allen's avatar
Steven Allen committed
308 309 310 311
	nc.network.mu.Lock()
	otherClient, ok := nc.network.clients[p]
	if !ok {
		nc.network.mu.Unlock()
312 313
		return errors.New("no such peer in network")
	}
Steven Allen's avatar
Steven Allen committed
314

315 316
	tag := tagForPeers(nc.local, p)
	if _, ok := nc.network.conns[tag]; ok {
Steven Allen's avatar
Steven Allen committed
317
		nc.network.mu.Unlock()
dirkmc's avatar
dirkmc committed
318
		// log.Warning("ALREADY CONNECTED TO PEER (is this a reconnect? test lib needs fixing)")
319 320 321
		return nil
	}
	nc.network.conns[tag] = struct{}{}
Steven Allen's avatar
Steven Allen committed
322 323
	nc.network.mu.Unlock()

324
	otherClient.receiver.PeerConnected(nc.local)
325 326 327
	nc.Receiver.PeerConnected(p)
	return nil
}
328

dirkmc's avatar
dirkmc committed
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
func (nc *networkClient) DisconnectFrom(_ context.Context, p peer.ID) error {
	nc.network.mu.Lock()
	defer nc.network.mu.Unlock()

	otherClient, ok := nc.network.clients[p]
	if !ok {
		return errors.New("no such peer in network")
	}

	tag := tagForPeers(nc.local, p)
	if _, ok := nc.network.conns[tag]; !ok {
		// Already disconnected
		return nil
	}
	delete(nc.network.conns, tag)

	otherClient.receiver.PeerDisconnected(nc.local)
	nc.Receiver.PeerDisconnected(p)
	return nil
}

350 351 352 353 354 355 356 357 358 359
func (rq *receiverQueue) enqueue(m *message) {
	rq.lk.Lock()
	defer rq.lk.Unlock()
	rq.queue = append(rq.queue, m)
	if !rq.active {
		rq.active = true
		go rq.process()
	}
}

360 361 362 363 364 365 366 367 368 369 370 371
func (rq *receiverQueue) Swap(i, j int) {
	rq.queue[i], rq.queue[j] = rq.queue[j], rq.queue[i]
}

func (rq *receiverQueue) Len() int {
	return len(rq.queue)
}

func (rq *receiverQueue) Less(i, j int) bool {
	return rq.queue[i].shouldSend.UnixNano() < rq.queue[j].shouldSend.UnixNano()
}

372 373 374
func (rq *receiverQueue) process() {
	for {
		rq.lk.Lock()
375
		sort.Sort(rq)
376 377 378 379 380 381
		if len(rq.queue) == 0 {
			rq.active = false
			rq.lk.Unlock()
			return
		}
		m := rq.queue[0]
382 383 384 385 386 387 388 389 390 391
		if time.Until(m.shouldSend).Seconds() < 0.1 {
			rq.queue = rq.queue[1:]
			rq.lk.Unlock()
			time.Sleep(time.Until(m.shouldSend))
			atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
			rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
		} else {
			rq.lk.Unlock()
			time.Sleep(100 * time.Millisecond)
		}
392 393 394
	}
}

395 396 397 398 399 400
func tagForPeers(a, b peer.ID) string {
	if a < b {
		return string(a + b)
	}
	return string(b + a)
}