virtual.go 8.51 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
5
	"errors"
6
	"sort"
Steven Allen's avatar
Steven Allen committed
7
	"sync"
8
	"sync/atomic"
9
	"time"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
10

Jeromy's avatar
Jeromy committed
11 12
	bsmsg "github.com/ipfs/go-bitswap/message"
	bsnet "github.com/ipfs/go-bitswap/network"
13

Jeromy's avatar
Jeromy committed
14 15 16
	cid "github.com/ipfs/go-cid"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
Raúl Kripalani's avatar
Raúl Kripalani committed
17 18 19 20

	"github.com/libp2p/go-libp2p-core/connmgr"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/routing"
dirkmc's avatar
dirkmc committed
21
	tnet "github.com/libp2p/go-libp2p-testing/net"
22
	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
23 24
)

25 26
// VirtualNetwork generates a new testnet instance - a fake network that
// is used to simulate sending messages.
27
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
28
	return &network{
29 30 31 32 33 34 35 36 37 38
		latencies:          make(map[peer.ID]map[peer.ID]time.Duration),
		clients:            make(map[peer.ID]*receiverQueue),
		delay:              d,
		routingserver:      rs,
		isRateLimited:      false,
		rateLimitGenerator: nil,
		conns:              make(map[string]struct{}),
	}
}

39
// RateLimitGenerator is an interface for generating rate limits across peers
40 41 42 43
type RateLimitGenerator interface {
	NextRateLimit() float64
}

44 45
// RateLimitedVirtualNetwork generates a testnet instance where nodes are rate
// limited in the upload/download speed.
46 47 48
func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network {
	return &network{
		latencies:          make(map[peer.ID]map[peer.ID]time.Duration),
49
		rateLimiters:       make(map[peer.ID]map[peer.ID]*mocknet.RateLimiter),
50 51 52 53 54 55
		clients:            make(map[peer.ID]*receiverQueue),
		delay:              d,
		routingserver:      rs,
		isRateLimited:      true,
		rateLimitGenerator: rateLimitGenerator,
		conns:              make(map[string]struct{}),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
56 57 58 59
	}
}

type network struct {
60 61
	mu                 sync.Mutex
	latencies          map[peer.ID]map[peer.ID]time.Duration
62
	rateLimiters       map[peer.ID]map[peer.ID]*mocknet.RateLimiter
63 64 65 66 67 68
	clients            map[peer.ID]*receiverQueue
	routingserver      mockrouting.Server
	delay              delay.D
	isRateLimited      bool
	rateLimitGenerator RateLimitGenerator
	conns              map[string]struct{}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
69 70
}

71 72 73 74 75 76 77 78 79 80
type message struct {
	from       peer.ID
	msg        bsmsg.BitSwapMessage
	shouldSend time.Time
}

// receiverQueue queues up a set of messages to be sent, and sends them *in
// order* with their delays respected as much as sending them in order allows
// for
type receiverQueue struct {
81
	receiver *networkClient
82 83 84 85 86
	queue    []*message
	active   bool
	lk       sync.Mutex
}

dirkmc's avatar
dirkmc committed
87
func (n *network) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork {
Steven Allen's avatar
Steven Allen committed
88 89 90
	n.mu.Lock()
	defer n.mu.Unlock()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
91
	client := &networkClient{
92
		local:   p.ID(),
93
		network: n,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94
		routing: n.routingserver.Client(p),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
95
	}
96
	n.clients[p.ID()] = &receiverQueue{receiver: client}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
97 98 99
	return client
}

100
func (n *network) HasPeer(p peer.ID) bool {
Steven Allen's avatar
Steven Allen committed
101 102 103
	n.mu.Lock()
	defer n.mu.Unlock()

104
	_, found := n.clients[p]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105 106 107
	return found
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
108 109 110 111
// TODO should this be completely asynchronous?
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
	ctx context.Context,
112 113
	from peer.ID,
	to peer.ID,
114
	mes bsmsg.BitSwapMessage) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
115

Steven Allen's avatar
Steven Allen committed
116 117 118
	n.mu.Lock()
	defer n.mu.Unlock()

119 120 121 122 123 124 125 126 127 128 129 130
	latencies, ok := n.latencies[from]
	if !ok {
		latencies = make(map[peer.ID]time.Duration)
		n.latencies[from] = latencies
	}

	latency, ok := latencies[to]
	if !ok {
		latency = n.delay.NextWaitTime()
		latencies[to] = latency
	}

131 132 133 134
	var bandwidthDelay time.Duration
	if n.isRateLimited {
		rateLimiters, ok := n.rateLimiters[from]
		if !ok {
135
			rateLimiters = make(map[peer.ID]*mocknet.RateLimiter)
136 137 138
			n.rateLimiters[from] = rateLimiters
		}

139
		rateLimiter, ok := rateLimiters[to]
140
		if !ok {
141 142
			rateLimiter = mocknet.NewRateLimiter(n.rateLimitGenerator.NextRateLimit())
			rateLimiters[to] = rateLimiter
143 144 145
		}

		size := mes.ToProtoV1().Size()
146
		bandwidthDelay = rateLimiter.Limit(size)
147 148 149 150
	} else {
		bandwidthDelay = 0
	}

151
	receiver, ok := n.clients[to]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
152
	if !ok {
Łukasz Magiera's avatar
Łukasz Magiera committed
153
		return errors.New("cannot locate peer on network")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
154 155 156 157 158
	}

	// nb: terminate the context since the context wouldn't actually be passed
	// over the network in a real scenario

159 160 161
	msg := &message{
		from:       from,
		msg:        mes,
162
		shouldSend: time.Now().Add(latency).Add(bandwidthDelay),
163 164
	}
	receiver.enqueue(msg)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
165 166 167 168 169

	return nil
}

type networkClient struct {
170
	local peer.ID
Brian Tiger Chow's avatar
Brian Tiger Chow committed
171
	bsnet.Receiver
172
	network *network
Raúl Kripalani's avatar
Raúl Kripalani committed
173
	routing routing.Routing
174
	stats   bsnet.Stats
Brian Tiger Chow's avatar
Brian Tiger Chow committed
175 176
}

dirkmc's avatar
dirkmc committed
177 178 179 180
func (nc *networkClient) Self() peer.ID {
	return nc.local
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
181 182
func (nc *networkClient) SendMessage(
	ctx context.Context,
183
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
184
	message bsmsg.BitSwapMessage) error {
185 186 187 188 189 190 191
	if err := nc.network.SendMessage(ctx, nc.local, to, message); err != nil {
		return err
	}
	atomic.AddUint64(&nc.stats.MessagesSent, 1)
	return nil
}

192 193
func (nc *networkClient) Stats() bsnet.Stats {
	return bsnet.Stats{
194 195 196
		MessagesRecvd: atomic.LoadUint64(&nc.stats.MessagesRecvd),
		MessagesSent:  atomic.LoadUint64(&nc.stats.MessagesSent),
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
197 198
}

199
// FindProvidersAsync returns a channel of providers for the given key.
200
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID {
Raúl Kripalani's avatar
Raúl Kripalani committed
201
	// NB: this function duplicates the AddrInfo -> ID transformation in the
202 203 204 205 206 207 208
	// bitswap network adapter. Not to worry. This network client will be
	// deprecated once the ipfsnet.Mock is added. The code below is only
	// temporary.

	out := make(chan peer.ID)
	go func() {
		defer close(out)
209
		providers := nc.routing.FindProvidersAsync(ctx, k, max)
210 211 212 213 214 215 216 217
		for info := range providers {
			select {
			case <-ctx.Done():
			case out <- info.ID:
			}
		}
	}()
	return out
218 219
}

Raúl Kripalani's avatar
Raúl Kripalani committed
220 221
func (nc *networkClient) ConnectionManager() connmgr.ConnManager {
	return &connmgr.NullConnMgr{}
222 223
}

Jeromy's avatar
Jeromy committed
224
type messagePasser struct {
225
	net    *networkClient
Jeromy's avatar
Jeromy committed
226 227 228 229 230
	target peer.ID
	local  peer.ID
	ctx    context.Context
}

231
func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error {
232
	return mp.net.SendMessage(ctx, mp.target, m)
Jeromy's avatar
Jeromy committed
233 234 235 236 237 238
}

func (mp *messagePasser) Close() error {
	return nil
}

239 240 241 242
func (mp *messagePasser) Reset() error {
	return nil
}

dirkmc's avatar
dirkmc committed
243 244 245 246
func (mp *messagePasser) SupportsHave() bool {
	return true
}

247
func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
Jeromy's avatar
Jeromy committed
248
	return &messagePasser{
249
		net:    nc,
Jeromy's avatar
Jeromy committed
250
		target: p,
251
		local:  nc.local,
Jeromy's avatar
Jeromy committed
252 253 254 255
		ctx:    ctx,
	}, nil
}

256
// Provide provides the key to the network.
257
func (nc *networkClient) Provide(ctx context.Context, k cid.Cid) error {
258
	return nc.routing.Provide(ctx, k, true)
259 260
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
261 262 263
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
	nc.Receiver = r
}
264 265

func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error {
Steven Allen's avatar
Steven Allen committed
266 267 268 269
	nc.network.mu.Lock()
	otherClient, ok := nc.network.clients[p]
	if !ok {
		nc.network.mu.Unlock()
270 271
		return errors.New("no such peer in network")
	}
Steven Allen's avatar
Steven Allen committed
272

273 274
	tag := tagForPeers(nc.local, p)
	if _, ok := nc.network.conns[tag]; ok {
Steven Allen's avatar
Steven Allen committed
275
		nc.network.mu.Unlock()
dirkmc's avatar
dirkmc committed
276
		// log.Warning("ALREADY CONNECTED TO PEER (is this a reconnect? test lib needs fixing)")
277 278 279
		return nil
	}
	nc.network.conns[tag] = struct{}{}
Steven Allen's avatar
Steven Allen committed
280 281
	nc.network.mu.Unlock()

282
	otherClient.receiver.PeerConnected(nc.local)
283 284 285
	nc.Receiver.PeerConnected(p)
	return nil
}
286

dirkmc's avatar
dirkmc committed
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
func (nc *networkClient) DisconnectFrom(_ context.Context, p peer.ID) error {
	nc.network.mu.Lock()
	defer nc.network.mu.Unlock()

	otherClient, ok := nc.network.clients[p]
	if !ok {
		return errors.New("no such peer in network")
	}

	tag := tagForPeers(nc.local, p)
	if _, ok := nc.network.conns[tag]; !ok {
		// Already disconnected
		return nil
	}
	delete(nc.network.conns, tag)

	otherClient.receiver.PeerDisconnected(nc.local)
	nc.Receiver.PeerDisconnected(p)
	return nil
}

308 309 310 311 312 313 314 315 316 317
func (rq *receiverQueue) enqueue(m *message) {
	rq.lk.Lock()
	defer rq.lk.Unlock()
	rq.queue = append(rq.queue, m)
	if !rq.active {
		rq.active = true
		go rq.process()
	}
}

318 319 320 321 322 323 324 325 326 327 328 329
func (rq *receiverQueue) Swap(i, j int) {
	rq.queue[i], rq.queue[j] = rq.queue[j], rq.queue[i]
}

func (rq *receiverQueue) Len() int {
	return len(rq.queue)
}

func (rq *receiverQueue) Less(i, j int) bool {
	return rq.queue[i].shouldSend.UnixNano() < rq.queue[j].shouldSend.UnixNano()
}

330 331 332
func (rq *receiverQueue) process() {
	for {
		rq.lk.Lock()
333
		sort.Sort(rq)
334 335 336 337 338 339
		if len(rq.queue) == 0 {
			rq.active = false
			rq.lk.Unlock()
			return
		}
		m := rq.queue[0]
340 341 342 343 344 345 346 347 348 349
		if time.Until(m.shouldSend).Seconds() < 0.1 {
			rq.queue = rq.queue[1:]
			rq.lk.Unlock()
			time.Sleep(time.Until(m.shouldSend))
			atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
			rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
		} else {
			rq.lk.Unlock()
			time.Sleep(100 * time.Millisecond)
		}
350 351 352
	}
}

353 354 355 356 357 358
func tagForPeers(a, b peer.ID) string {
	if a < b {
		return string(a + b)
	}
	return string(b + a)
}