Unverified Commit 97d408fd authored by Raúl Kripalani's avatar Raúl Kripalani Committed by GitHub

Buffer early tags in temporary entries (#39)

parent 22c4d8cb
......@@ -53,12 +53,12 @@ type segment struct {
type segments [256]*segment
func (s *segments) get(p peer.ID) *segment {
return s[byte(p[len(p)-1])]
func (ss *segments) get(p peer.ID) *segment {
return ss[byte(p[len(p)-1])]
}
func (s *segments) countPeers() (count int) {
for _, seg := range s {
func (ss *segments) countPeers() (count int) {
for _, seg := range ss {
seg.Lock()
count += len(seg.peers)
seg.Unlock()
......@@ -66,6 +66,23 @@ func (s *segments) countPeers() (count int) {
return count
}
func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
pi, ok := s.peers[p]
if ok {
return pi
}
// create a temporary peer to buffer early tags before the Connected notification arrives.
pi = &peerInfo{
id: p,
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
temp: true,
tags: make(map[string]int),
conns: make(map[inet.Conn]time.Time),
}
s.peers[p] = pi
return pi
}
// NewConnManager creates a new BasicConnMgr with the provided params:
// * lo and hi are watermarks governing the number of connections that'll be maintained.
// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
......@@ -134,6 +151,7 @@ type peerInfo struct {
id peer.ID
tags map[string]int // value for each tag
value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections
conns map[inet.Conn]time.Time // start time of each connection
......@@ -218,7 +236,13 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {
// Sort peers according to their value.
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].value < candidates[j].value
left, right := candidates[i], candidates[j]
// temporary peers are preferred for pruning.
if left.temp != right.temp {
return left.temp
}
// otherwise, compare by value.
return left.value < right.value
})
target := nconns - cm.lowWater
......@@ -227,6 +251,9 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {
selected := make([]inet.Conn, 0, target+10)
for _, inf := range candidates {
if target <= 0 {
break
}
// TODO: should we be using firstSeen or the time associated with the connection itself?
if inf.firstSeen.Add(cm.gracePeriod).After(now) {
continue
......@@ -235,15 +262,18 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {
// lock this to protect from concurrent modifications from connect/disconnect events
s := cm.segments.get(inf.id)
s.Lock()
for c := range inf.conns {
selected = append(selected, c)
if len(inf.conns) == 0 && inf.temp {
// handle temporary entries for early tags -- this entry has gone past the grace period
// and still holds no connections, so prune it.
delete(s.peers, inf.id)
} else {
for c := range inf.conns {
selected = append(selected, c)
}
}
target -= len(inf.conns)
s.Unlock()
if target <= 0 {
break
}
}
return selected
......@@ -284,14 +314,10 @@ func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) {
s.Lock()
defer s.Unlock()
pi, ok := s.peers[p]
if !ok {
log.Info("tried to tag conn from untracked peer: ", p)
return
}
pi := s.tagInfoFor(p)
// Update the total value of the peer.
pi.value += (val - pi.tags[tag])
pi.value += val - pi.tags[tag]
pi.tags[tag] = val
}
......@@ -318,15 +344,11 @@ func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) {
s.Lock()
defer s.Unlock()
pi, ok := s.peers[p]
if !ok {
log.Info("tried to upsert tag from untracked peer: ", p)
return
}
pi := s.tagInfoFor(p)
oldval := pi.tags[tag]
newval := upsert(oldval)
pi.value += (newval - oldval)
pi.value += newval - oldval
pi.tags[tag] = newval
}
......@@ -383,15 +405,22 @@ func (nn *cmNotifee) Connected(n inet.Network, c inet.Conn) {
s.Lock()
defer s.Unlock()
pinfo, ok := s.peers[p]
id := c.RemotePeer()
pinfo, ok := s.peers[id]
if !ok {
pinfo = &peerInfo{
id: p,
id: id,
firstSeen: time.Now(),
tags: make(map[string]int),
conns: make(map[inet.Conn]time.Time),
}
s.peers[p] = pinfo
s.peers[id] = pinfo
} else if pinfo.temp {
// we had created a temporary entry for this peer to buffer early tags before the
// Connected notification arrived: flip the temporary flag, and update the firstSeen
// timestamp to the real one.
pinfo.temp = false
pinfo.firstSeen = time.Now()
}
_, ok = pinfo.conns[c]
......
......@@ -190,8 +190,8 @@ func TestTagPeerNonExistant(t *testing.T) {
id := tu.RandPeerIDFatal(t)
cm.TagPeer(id, "test", 1)
if cm.segments.countPeers() != 0 {
t.Fatal("expected zero peers")
if !cm.segments.get(id).peers[id].temp {
t.Fatal("expected 1 temporary entry")
}
}
......@@ -525,9 +525,9 @@ func TestUpsertTag(t *testing.T) {
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
rp := conn.RemotePeer()
// this is an early tag, before the Connected notification arrived.
cm.UpsertTag(rp, "tag", func(v int) int { return v + 1 })
if len(cm.segments.get(rp).peers[rp].tags) != 1 {
t.Fatal("expected a tag")
......@@ -536,6 +536,9 @@ func TestUpsertTag(t *testing.T) {
t.Fatal("expected a tag value of 1")
}
// now let's notify the connection.
not.Connected(nil, conn)
cm.UpsertTag(rp, "tag", func(v int) int { return v + 1 })
if len(cm.segments.get(rp).peers[rp].tags) != 1 {
t.Fatal("expected a tag")
......@@ -552,3 +555,44 @@ func TestUpsertTag(t *testing.T) {
t.Fatal("expected a tag value of 1")
}
}
func TestTemporaryEntriesClearedFirst(t *testing.T) {
cm := NewConnManager(1, 1, 0)
id := tu.RandPeerIDFatal(t)
cm.TagPeer(id, "test", 20)
if cm.GetTagInfo(id).Value != 20 {
t.Fatal("expected an early tag with value 20")
}
not := cm.Notifee()
conn1, conn2 := randConn(t, nil), randConn(t, nil)
not.Connected(nil, conn1)
not.Connected(nil, conn2)
cm.TrimOpenConns(context.Background())
if cm.GetTagInfo(id) != nil {
t.Fatal("expected no temporary tags after trimming")
}
}
func TestTemporaryEntryConvertedOnConnection(t *testing.T) {
cm := NewConnManager(1, 1, 0)
conn := randConn(t, nil)
cm.TagPeer(conn.RemotePeer(), "test", 20)
ti := cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()]
if ti.value != 20 || !ti.temp {
t.Fatal("expected a temporary tag with value 20")
}
not := cm.Notifee()
not.Connected(nil, conn)
if ti.value != 20 || ti.temp {
t.Fatal("expected a non-temporary tag with value 20")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment