virtual.go 7.74 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
4
	"context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
5
	"errors"
6
	"sort"
Steven Allen's avatar
Steven Allen committed
7
	"sync"
8
	"sync/atomic"
9
	"time"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
10

Jeromy's avatar
Jeromy committed
11 12
	bsmsg "github.com/ipfs/go-bitswap/message"
	bsnet "github.com/ipfs/go-bitswap/network"
13

Jeromy's avatar
Jeromy committed
14 15 16 17 18 19 20
	cid "github.com/ipfs/go-cid"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
	logging "github.com/ipfs/go-log"
	ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
	peer "github.com/libp2p/go-libp2p-peer"
	routing "github.com/libp2p/go-libp2p-routing"
21
	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
Jeromy's avatar
Jeromy committed
22
	testutil "github.com/libp2p/go-testutil"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
23 24
)

25 26
var log = logging.Logger("bstestnet")

27
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
28
	return &network{
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
		latencies:          make(map[peer.ID]map[peer.ID]time.Duration),
		clients:            make(map[peer.ID]*receiverQueue),
		delay:              d,
		routingserver:      rs,
		isRateLimited:      false,
		rateLimitGenerator: nil,
		conns:              make(map[string]struct{}),
	}
}

type RateLimitGenerator interface {
	NextRateLimit() float64
}

func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network {
	return &network{
		latencies:          make(map[peer.ID]map[peer.ID]time.Duration),
46
		rateLimiters:       make(map[peer.ID]map[peer.ID]*mocknet.RateLimiter),
47 48 49 50 51 52
		clients:            make(map[peer.ID]*receiverQueue),
		delay:              d,
		routingserver:      rs,
		isRateLimited:      true,
		rateLimitGenerator: rateLimitGenerator,
		conns:              make(map[string]struct{}),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
53 54 55 56
	}
}

type network struct {
57 58
	mu                 sync.Mutex
	latencies          map[peer.ID]map[peer.ID]time.Duration
59
	rateLimiters       map[peer.ID]map[peer.ID]*mocknet.RateLimiter
60 61 62 63 64 65
	clients            map[peer.ID]*receiverQueue
	routingserver      mockrouting.Server
	delay              delay.D
	isRateLimited      bool
	rateLimitGenerator RateLimitGenerator
	conns              map[string]struct{}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66 67
}

68 69 70 71 72 73 74 75 76 77
type message struct {
	from       peer.ID
	msg        bsmsg.BitSwapMessage
	shouldSend time.Time
}

// receiverQueue queues up a set of messages to be sent, and sends them *in
// order* with their delays respected as much as sending them in order allows
// for
type receiverQueue struct {
78
	receiver *networkClient
79 80 81 82 83
	queue    []*message
	active   bool
	lk       sync.Mutex
}

84
func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork {
Steven Allen's avatar
Steven Allen committed
85 86 87
	n.mu.Lock()
	defer n.mu.Unlock()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
88
	client := &networkClient{
89
		local:   p.ID(),
90
		network: n,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
91
		routing: n.routingserver.Client(p),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
92
	}
93
	n.clients[p.ID()] = &receiverQueue{receiver: client}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94 95 96
	return client
}

97
func (n *network) HasPeer(p peer.ID) bool {
Steven Allen's avatar
Steven Allen committed
98 99 100
	n.mu.Lock()
	defer n.mu.Unlock()

101
	_, found := n.clients[p]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102 103 104
	return found
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
105 106 107 108
// TODO should this be completely asynchronous?
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
	ctx context.Context,
109 110
	from peer.ID,
	to peer.ID,
111
	mes bsmsg.BitSwapMessage) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
112

Steven Allen's avatar
Steven Allen committed
113 114 115
	n.mu.Lock()
	defer n.mu.Unlock()

116 117 118 119 120 121 122 123 124 125 126 127
	latencies, ok := n.latencies[from]
	if !ok {
		latencies = make(map[peer.ID]time.Duration)
		n.latencies[from] = latencies
	}

	latency, ok := latencies[to]
	if !ok {
		latency = n.delay.NextWaitTime()
		latencies[to] = latency
	}

128 129 130 131
	var bandwidthDelay time.Duration
	if n.isRateLimited {
		rateLimiters, ok := n.rateLimiters[from]
		if !ok {
132
			rateLimiters = make(map[peer.ID]*mocknet.RateLimiter)
133 134 135
			n.rateLimiters[from] = rateLimiters
		}

136
		rateLimiter, ok := rateLimiters[to]
137
		if !ok {
138 139
			rateLimiter = mocknet.NewRateLimiter(n.rateLimitGenerator.NextRateLimit())
			rateLimiters[to] = rateLimiter
140 141 142
		}

		size := mes.ToProtoV1().Size()
143
		bandwidthDelay = rateLimiter.Limit(size)
144 145 146 147
	} else {
		bandwidthDelay = 0
	}

148
	receiver, ok := n.clients[to]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
149
	if !ok {
Łukasz Magiera's avatar
Łukasz Magiera committed
150
		return errors.New("cannot locate peer on network")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
151 152 153 154 155
	}

	// nb: terminate the context since the context wouldn't actually be passed
	// over the network in a real scenario

156 157 158
	msg := &message{
		from:       from,
		msg:        mes,
159
		shouldSend: time.Now().Add(latency).Add(bandwidthDelay),
160 161
	}
	receiver.enqueue(msg)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
162 163 164 165 166

	return nil
}

type networkClient struct {
167
	local peer.ID
Brian Tiger Chow's avatar
Brian Tiger Chow committed
168
	bsnet.Receiver
169
	network *network
170
	routing routing.IpfsRouting
171
	stats   bsnet.NetworkStats
Brian Tiger Chow's avatar
Brian Tiger Chow committed
172 173 174 175
}

func (nc *networkClient) SendMessage(
	ctx context.Context,
176
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
177
	message bsmsg.BitSwapMessage) error {
178 179 180 181 182 183 184 185 186 187 188 189
	if err := nc.network.SendMessage(ctx, nc.local, to, message); err != nil {
		return err
	}
	atomic.AddUint64(&nc.stats.MessagesSent, 1)
	return nil
}

func (nc *networkClient) Stats() bsnet.NetworkStats {
	return bsnet.NetworkStats{
		MessagesRecvd: atomic.LoadUint64(&nc.stats.MessagesRecvd),
		MessagesSent:  atomic.LoadUint64(&nc.stats.MessagesSent),
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
190 191
}

192
// FindProvidersAsync returns a channel of providers for the given key.
193
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID {
194 195 196 197 198 199 200 201 202

	// NB: this function duplicates the PeerInfo -> ID transformation in the
	// bitswap network adapter. Not to worry. This network client will be
	// deprecated once the ipfsnet.Mock is added. The code below is only
	// temporary.

	out := make(chan peer.ID)
	go func() {
		defer close(out)
203
		providers := nc.routing.FindProvidersAsync(ctx, k, max)
204 205 206 207 208 209 210 211
		for info := range providers {
			select {
			case <-ctx.Done():
			case out <- info.ID:
			}
		}
	}()
	return out
212 213
}

214 215 216 217
func (nc *networkClient) ConnectionManager() ifconnmgr.ConnManager {
	return &ifconnmgr.NullConnMgr{}
}

Jeromy's avatar
Jeromy committed
218
type messagePasser struct {
219
	net    *networkClient
Jeromy's avatar
Jeromy committed
220 221 222 223 224
	target peer.ID
	local  peer.ID
	ctx    context.Context
}

225
func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error {
226
	return mp.net.SendMessage(ctx, mp.target, m)
Jeromy's avatar
Jeromy committed
227 228 229 230 231 232
}

func (mp *messagePasser) Close() error {
	return nil
}

233 234 235 236
func (mp *messagePasser) Reset() error {
	return nil
}

Jeromy's avatar
Jeromy committed
237 238
func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
	return &messagePasser{
239
		net:    n,
Jeromy's avatar
Jeromy committed
240 241 242 243 244 245
		target: p,
		local:  n.local,
		ctx:    ctx,
	}, nil
}

246
// Provide provides the key to the network.
247
func (nc *networkClient) Provide(ctx context.Context, k cid.Cid) error {
248
	return nc.routing.Provide(ctx, k, true)
249 250
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
251 252 253
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
	nc.Receiver = r
}
254 255

func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error {
Steven Allen's avatar
Steven Allen committed
256 257 258 259 260
	nc.network.mu.Lock()

	otherClient, ok := nc.network.clients[p]
	if !ok {
		nc.network.mu.Unlock()
261 262
		return errors.New("no such peer in network")
	}
Steven Allen's avatar
Steven Allen committed
263

264 265
	tag := tagForPeers(nc.local, p)
	if _, ok := nc.network.conns[tag]; ok {
Steven Allen's avatar
Steven Allen committed
266
		nc.network.mu.Unlock()
267 268 269 270
		log.Warning("ALREADY CONNECTED TO PEER (is this a reconnect? test lib needs fixing)")
		return nil
	}
	nc.network.conns[tag] = struct{}{}
Steven Allen's avatar
Steven Allen committed
271 272
	nc.network.mu.Unlock()

273 274
	// TODO: add handling for disconnects

275
	otherClient.receiver.PeerConnected(nc.local)
276 277 278
	nc.Receiver.PeerConnected(p)
	return nil
}
279

280 281 282 283 284 285 286 287 288 289
func (rq *receiverQueue) enqueue(m *message) {
	rq.lk.Lock()
	defer rq.lk.Unlock()
	rq.queue = append(rq.queue, m)
	if !rq.active {
		rq.active = true
		go rq.process()
	}
}

290 291 292 293 294 295 296 297 298 299 300 301
func (rq *receiverQueue) Swap(i, j int) {
	rq.queue[i], rq.queue[j] = rq.queue[j], rq.queue[i]
}

func (rq *receiverQueue) Len() int {
	return len(rq.queue)
}

func (rq *receiverQueue) Less(i, j int) bool {
	return rq.queue[i].shouldSend.UnixNano() < rq.queue[j].shouldSend.UnixNano()
}

302 303 304
func (rq *receiverQueue) process() {
	for {
		rq.lk.Lock()
305
		sort.Sort(rq)
306 307 308 309 310 311
		if len(rq.queue) == 0 {
			rq.active = false
			rq.lk.Unlock()
			return
		}
		m := rq.queue[0]
312 313 314 315 316 317 318 319 320 321
		if time.Until(m.shouldSend).Seconds() < 0.1 {
			rq.queue = rq.queue[1:]
			rq.lk.Unlock()
			time.Sleep(time.Until(m.shouldSend))
			atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
			rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
		} else {
			rq.lk.Unlock()
			time.Sleep(100 * time.Millisecond)
		}
322 323 324
	}
}

325 326 327 328 329 330
func tagForPeers(a, b peer.ID) string {
	if a < b {
		return string(a + b)
	}
	return string(b + a)
}