Commit 26b8a09f authored by hannahhoward's avatar hannahhoward

feat(messagequeue): Send changes on startup

If wantlist changes are present, send them immediately on startup, rather than as a seperate message
parent fd3edeac
...@@ -77,7 +77,7 @@ func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) { ...@@ -77,7 +77,7 @@ func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
// Startup starts the processing of messages, and creates an initial message // Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist. // based on the given initial wantlist.
func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) { func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, ses uint64) {
// new peer, we will want to give them our full wantlist // new peer, we will want to give them our full wantlist
if len(initialEntries) > 0 { if len(initialEntries) > 0 {
...@@ -89,8 +89,16 @@ func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist. ...@@ -89,8 +89,16 @@ func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.
fullwantlist.AddEntry(e.Cid, e.Priority) fullwantlist.AddEntry(e.Cid, e.Priority)
} }
mq.out = fullwantlist mq.out = fullwantlist
mq.work <- struct{}{}
} }
if len(initialEntries) > 0 || mq.addEntries(entries, ses) {
select {
case <-ctx.Done():
return
case mq.work <- struct{}{}:
}
}
go mq.runQueue(ctx) go mq.runQueue(ctx)
} }
......
...@@ -25,9 +25,9 @@ func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error { ...@@ -25,9 +25,9 @@ func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error {
func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) { func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) {
if fmn.messageSenderError == nil { if fmn.messageSenderError == nil {
return fmn.messageSender, nil return fmn.messageSender, nil
} else {
return nil, fmn.messageSenderError
} }
return nil, fmn.messageSenderError
} }
type fakeMessageSender struct { type fakeMessageSender struct {
...@@ -81,7 +81,7 @@ func TestStartupAndShutdown(t *testing.T) { ...@@ -81,7 +81,7 @@ func TestStartupAndShutdown(t *testing.T) {
ses := testutil.GenerateSessionID() ses := testutil.GenerateSessionID()
wl := testutil.GenerateWantlist(10, ses) wl := testutil.GenerateWantlist(10, ses)
messageQueue.Startup(ctx, wl.Entries()) messageQueue.Startup(ctx, wl.Entries(), nil, 0)
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond) messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
if len(messages) != 1 { if len(messages) != 1 {
...@@ -123,9 +123,8 @@ func TestSendingMessagesDeduped(t *testing.T) { ...@@ -123,9 +123,8 @@ func TestSendingMessagesDeduped(t *testing.T) {
ses1 := testutil.GenerateSessionID() ses1 := testutil.GenerateSessionID()
ses2 := testutil.GenerateSessionID() ses2 := testutil.GenerateSessionID()
entries := testutil.GenerateMessageEntries(10, false) entries := testutil.GenerateMessageEntries(10, false)
messageQueue.Startup(ctx, nil) messageQueue.Startup(ctx, nil, entries, ses1)
messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(entries, ses2) messageQueue.AddMessage(entries, ses2)
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond) messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
...@@ -148,9 +147,8 @@ func TestSendingMessagesPartialDupe(t *testing.T) { ...@@ -148,9 +147,8 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
entries := testutil.GenerateMessageEntries(10, false) entries := testutil.GenerateMessageEntries(10, false)
moreEntries := testutil.GenerateMessageEntries(5, false) moreEntries := testutil.GenerateMessageEntries(5, false)
secondEntries := append(entries[5:], moreEntries...) secondEntries := append(entries[5:], moreEntries...)
messageQueue.Startup(ctx, nil) messageQueue.Startup(ctx, nil, entries, ses1)
messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(secondEntries, ses2) messageQueue.AddMessage(secondEntries, ses2)
messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond) messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond)
......
...@@ -22,7 +22,7 @@ type PeerQueue interface { ...@@ -22,7 +22,7 @@ type PeerQueue interface {
RefIncrement() RefIncrement()
RefDecrement() bool RefDecrement() bool
AddMessage(entries []*bsmsg.Entry, ses uint64) AddMessage(entries []*bsmsg.Entry, ses uint64)
Startup(ctx context.Context, initialEntries []*wantlist.Entry) Startup(ctx context.Context, initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, ses uint64)
Shutdown() Shutdown()
} }
...@@ -77,7 +77,7 @@ func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) { ...@@ -77,7 +77,7 @@ func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
mq = pm.createPeerQueue(p) mq = pm.createPeerQueue(p)
pm.peerQueues[p] = mq pm.peerQueues[p] = mq
mq.Startup(pm.ctx, initialEntries) mq.Startup(pm.ctx, initialEntries, nil, 0)
} }
// Disconnected is called to remove a peer from the pool. // Disconnected is called to remove a peer from the pool.
...@@ -115,12 +115,13 @@ func (pm *PeerManager) SendMessage(initialEntries []*wantlist.Entry, entries []* ...@@ -115,12 +115,13 @@ func (pm *PeerManager) SendMessage(initialEntries []*wantlist.Entry, entries []*
if !ok { if !ok {
p = pm.createPeerQueue(t) p = pm.createPeerQueue(t)
pm.peerQueues[t] = p pm.peerQueues[t] = p
p.Startup(pm.ctx, initialEntries) p.Startup(pm.ctx, initialEntries, entries, from)
// this is a "0 reference" queue because we haven't actually connected to it // this is a "0 reference" queue because we haven't actually connected to it
// sending the first message will cause it to connect // sending the first message will cause it to connect
p.RefDecrement() p.RefDecrement()
} else {
p.AddMessage(entries, from)
} }
p.AddMessage(entries, from)
} }
} }
} }
...@@ -25,9 +25,13 @@ type fakePeer struct { ...@@ -25,9 +25,13 @@ type fakePeer struct {
messagesSent chan messageSent messagesSent chan messageSent
} }
func (fp *fakePeer) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {} func (fp *fakePeer) Startup(ctx context.Context, initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, ses uint64) {
func (fp *fakePeer) Shutdown() {} if entries != nil {
func (fp *fakePeer) RefIncrement() { fp.refcnt++ } fp.AddMessage(entries, ses)
}
}
func (fp *fakePeer) Shutdown() {}
func (fp *fakePeer) RefIncrement() { fp.refcnt++ }
func (fp *fakePeer) RefDecrement() bool { func (fp *fakePeer) RefDecrement() bool {
fp.refcnt-- fp.refcnt--
return fp.refcnt > 0 return fp.refcnt > 0
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment