From 0df75e410b8387b27a723081418ad622bab83fd9 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 12 Feb 2018 23:40:15 -0800 Subject: [PATCH] bitswap virtual test net code should send messages in order License: MIT Signed-off-by: Jeromy --- testnet/virtual.go | 61 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/testnet/virtual.go b/testnet/virtual.go index c5ba6e0..0524d17 100644 --- a/testnet/virtual.go +++ b/testnet/virtual.go @@ -4,6 +4,7 @@ import ( "context" "errors" "sync" + "time" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" @@ -22,7 +23,7 @@ var log = logging.Logger("bstestnet") func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { return &network{ - clients: make(map[peer.ID]bsnet.Receiver), + clients: make(map[peer.ID]*receiverQueue), delay: d, routingserver: rs, conns: make(map[string]struct{}), @@ -31,12 +32,28 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { type network struct { mu sync.Mutex - clients map[peer.ID]bsnet.Receiver + clients map[peer.ID]*receiverQueue routingserver mockrouting.Server delay delay.D conns map[string]struct{} } +type message struct { + from peer.ID + msg bsmsg.BitSwapMessage + shouldSend time.Time +} + +// receiverQueue queues up a set of messages to be sent, and sends them *in +// order* with their delays respected as much as sending them in order allows +// for +type receiverQueue struct { + receiver bsnet.Receiver + queue []*message + active bool + lk sync.Mutex +} + func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork { n.mu.Lock() defer n.mu.Unlock() @@ -46,7 +63,7 @@ func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork { network: n, routing: n.routingserver.Client(p), } - n.clients[p.ID()] = client + n.clients[p.ID()] = &receiverQueue{receiver: client} return client } @@ -64,7 +81,7 @@ func (n *network) SendMessage( ctx context.Context, from peer.ID, to peer.ID, - message bsmsg.BitSwapMessage) error { + mes bsmsg.BitSwapMessage) error { n.mu.Lock() defer n.mu.Unlock() @@ -77,7 +94,12 @@ func (n *network) SendMessage( // nb: terminate the context since the context wouldn't actually be passed // over the network in a real scenario - go n.deliver(receiver, from, message) + msg := &message{ + from: from, + msg: mes, + shouldSend: time.Now().Add(n.delay.Get()), + } + receiver.enqueue(msg) return nil } @@ -191,11 +213,38 @@ func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { // TODO: add handling for disconnects - otherClient.PeerConnected(nc.local) + otherClient.receiver.PeerConnected(nc.local) nc.Receiver.PeerConnected(p) return nil } +func (rq *receiverQueue) enqueue(m *message) { + rq.lk.Lock() + defer rq.lk.Unlock() + rq.queue = append(rq.queue, m) + if !rq.active { + rq.active = true + go rq.process() + } +} + +func (rq *receiverQueue) process() { + for { + rq.lk.Lock() + if len(rq.queue) == 0 { + rq.active = false + rq.lk.Unlock() + return + } + m := rq.queue[0] + rq.queue = rq.queue[1:] + rq.lk.Unlock() + + time.Sleep(time.Until(m.shouldSend)) + rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg) + } +} + func tagForPeers(a, b peer.ID) string { if a < b { return string(a + b) -- GitLab