Commit 0df75e41 authored by Jeromy's avatar Jeromy

bitswap virtual test net code should send messages in order

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 30cc892d
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"sync"
"time"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
......@@ -22,7 +23,7 @@ var log = logging.Logger("bstestnet")
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
return &network{
clients: make(map[peer.ID]bsnet.Receiver),
clients: make(map[peer.ID]*receiverQueue),
delay: d,
routingserver: rs,
conns: make(map[string]struct{}),
......@@ -31,12 +32,28 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
type network struct {
mu sync.Mutex
clients map[peer.ID]bsnet.Receiver
clients map[peer.ID]*receiverQueue
routingserver mockrouting.Server
delay delay.D
conns map[string]struct{}
}
type message struct {
from peer.ID
msg bsmsg.BitSwapMessage
shouldSend time.Time
}
// receiverQueue queues up a set of messages to be sent, and sends them *in
// order* with their delays respected as much as sending them in order allows
// for
type receiverQueue struct {
receiver bsnet.Receiver
queue []*message
active bool
lk sync.Mutex
}
func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork {
n.mu.Lock()
defer n.mu.Unlock()
......@@ -46,7 +63,7 @@ func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork {
network: n,
routing: n.routingserver.Client(p),
}
n.clients[p.ID()] = client
n.clients[p.ID()] = &receiverQueue{receiver: client}
return client
}
......@@ -64,7 +81,7 @@ func (n *network) SendMessage(
ctx context.Context,
from peer.ID,
to peer.ID,
message bsmsg.BitSwapMessage) error {
mes bsmsg.BitSwapMessage) error {
n.mu.Lock()
defer n.mu.Unlock()
......@@ -77,7 +94,12 @@ func (n *network) SendMessage(
// nb: terminate the context since the context wouldn't actually be passed
// over the network in a real scenario
go n.deliver(receiver, from, message)
msg := &message{
from: from,
msg: mes,
shouldSend: time.Now().Add(n.delay.Get()),
}
receiver.enqueue(msg)
return nil
}
......@@ -191,11 +213,38 @@ func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error {
// TODO: add handling for disconnects
otherClient.PeerConnected(nc.local)
otherClient.receiver.PeerConnected(nc.local)
nc.Receiver.PeerConnected(p)
return nil
}
func (rq *receiverQueue) enqueue(m *message) {
rq.lk.Lock()
defer rq.lk.Unlock()
rq.queue = append(rq.queue, m)
if !rq.active {
rq.active = true
go rq.process()
}
}
func (rq *receiverQueue) process() {
for {
rq.lk.Lock()
if len(rq.queue) == 0 {
rq.active = false
rq.lk.Unlock()
return
}
m := rq.queue[0]
rq.queue = rq.queue[1:]
rq.lk.Unlock()
time.Sleep(time.Until(m.shouldSend))
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
}
}
func tagForPeers(a, b peer.ID) string {
if a < b {
return string(a + b)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment