Commit b16adfea authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #672 from jbenet/clear-addrs

AddrSplosion -- Partie Deux Avec Vengeance
parents 4940c3e0 71f2c4de
// Package addr provides utility functions to handle peer addresses.
package addr
import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
// AddrSource is a source of addresses. It allows clients to retrieve
// a set of addresses at a last possible moment in time. It is used
// to query a set of addresses that may change over time, as a result
// of the network changing interfaces or mappings.
type Source interface {
Addrs() []ma.Multiaddr
}
// CombineSources returns a new AddrSource which is the
// concatenation of all input AddrSources:
//
// combined := CombinedSources(a, b)
// combined.Addrs() // append(a.Addrs(), b.Addrs()...)
//
func CombineSources(srcs ...Source) Source {
return combinedAS(srcs)
}
type combinedAS []Source
func (cas combinedAS) Addrs() []ma.Multiaddr {
var addrs []ma.Multiaddr
for _, s := range cas {
addrs = append(addrs, s.Addrs()...)
}
return addrs
}
// UniqueSource returns a new AddrSource which omits duplicate
// addresses from the inputs:
//
// unique := UniqueSource(a, b)
// unique.Addrs() // append(a.Addrs(), b.Addrs()...)
// // but only adds each addr once.
//
func UniqueSource(srcs ...Source) Source {
return uniqueAS(srcs)
}
type uniqueAS []Source
func (uas uniqueAS) Addrs() []ma.Multiaddr {
seen := make(map[string]struct{})
var addrs []ma.Multiaddr
for _, s := range uas {
for _, a := range s.Addrs() {
s := a.String()
if _, found := seen[s]; !found {
addrs = append(addrs, a)
seen[s] = struct{}{}
}
}
}
return addrs
}
// Slice is a simple slice of addresses that implements
// the AddrSource interface.
type Slice []ma.Multiaddr
func (as Slice) Addrs() []ma.Multiaddr {
return as
}
package addr
import (
"fmt"
"testing"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func newAddrOrFatal(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal("error parsing multiaddr", err)
}
return a
}
func newAddrs(t *testing.T, n int) []ma.Multiaddr {
addrs := make([]ma.Multiaddr, n)
for i := 0; i < n; i++ {
s := fmt.Sprintf("/ip4/1.2.3.4/tcp/%d", i)
addrs[i] = newAddrOrFatal(t, s)
}
return addrs
}
func addrSetsSame(a, b []ma.Multiaddr) bool {
if len(a) != len(b) {
return false
}
for i, aa := range a {
bb := b[i]
if !aa.Equal(bb) {
return false
}
}
return true
}
func addrSourcesSame(a, b Source) bool {
return addrSetsSame(a.Addrs(), b.Addrs())
}
func TestAddrCombine(t *testing.T) {
addrs := newAddrs(t, 30)
a := Slice(addrs[0:10])
b := Slice(addrs[10:20])
c := Slice(addrs[20:30])
d := CombineSources(a, b, c)
if !addrSetsSame(addrs, d.Addrs()) {
t.Error("addrs differ")
}
if !addrSourcesSame(Slice(addrs), d) {
t.Error("addrs differ")
}
}
func TestAddrUnique(t *testing.T) {
addrs := newAddrs(t, 40)
a := Slice(addrs[0:20])
b := Slice(addrs[10:30])
c := Slice(addrs[20:40])
d := CombineSources(a, b, c)
e := UniqueSource(a, b, c)
if addrSetsSame(addrs, d.Addrs()) {
t.Error("addrs same")
}
if addrSourcesSame(Slice(addrs), d) {
t.Error("addrs same")
}
if !addrSetsSame(addrs, e.Addrs()) {
t.Error("addrs differ", addrs, "\n\n", e.Addrs(), "\n\n")
}
if !addrSourcesSame(Slice(addrs), e) {
t.Error("addrs differ", addrs, "\n\n", e.Addrs(), "\n\n")
}
}
......@@ -3,6 +3,7 @@ package peer
import (
"errors"
"sync"
"time"
ic "github.com/jbenet/go-ipfs/p2p/crypto"
......@@ -11,6 +12,11 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
const (
// AddressTTL is the expiration time of addresses.
AddressTTL = time.Hour
)
// Peerstore provides a threadsafe store of Peer related
// information.
type Peerstore interface {
......@@ -38,20 +44,35 @@ type Peerstore interface {
// AddressBook tracks the addresses of Peers
type AddressBook interface {
Addresses(ID) []ma.Multiaddr
AddAddress(ID, ma.Multiaddr)
AddAddresses(ID, []ma.Multiaddr)
Addresses(ID) []ma.Multiaddr // returns addresses for ID
AddAddress(ID, ma.Multiaddr) // Adds given addr for ID
AddAddresses(ID, []ma.Multiaddr) // Adds given addrs for ID
SetAddresses(ID, []ma.Multiaddr) // Sets given addrs for ID (clears previously stored)
}
type expiringAddr struct {
Addr ma.Multiaddr
TTL time.Time
}
func (e *expiringAddr) Expired() bool {
return time.Now().After(e.TTL)
}
type addressMap map[string]ma.Multiaddr
type addressMap map[string]expiringAddr
type addressbook struct {
sync.RWMutex // guards all fields
addrs map[ID]addressMap
sync.RWMutex
ttl time.Duration // initial ttl
}
func newAddressbook() *addressbook {
return &addressbook{addrs: map[ID]addressMap{}}
return &addressbook{
addrs: map[ID]addressMap{},
ttl: AddressTTL,
}
}
func (ab *addressbook) Peers() []ID {
......@@ -65,43 +86,65 @@ func (ab *addressbook) Peers() []ID {
}
func (ab *addressbook) Addresses(p ID) []ma.Multiaddr {
ab.RLock()
defer ab.RUnlock()
ab.Lock()
defer ab.Unlock()
maddrs, found := ab.addrs[p]
if !found {
return nil
}
maddrs2 := make([]ma.Multiaddr, 0, len(maddrs))
for _, m := range maddrs {
maddrs2 = append(maddrs2, m)
good := make([]ma.Multiaddr, 0, len(maddrs))
var expired []string
for s, m := range maddrs {
if m.Expired() {
expired = append(expired, s)
} else {
good = append(good, m.Addr)
}
}
return maddrs2
// clean up the expired ones.
for _, s := range expired {
delete(ab.addrs[p], s)
}
return good
}
func (ab *addressbook) AddAddress(p ID, m ma.Multiaddr) {
ab.AddAddresses(p, []ma.Multiaddr{m})
}
func (ab *addressbook) AddAddresses(p ID, ms []ma.Multiaddr) {
ab.Lock()
defer ab.Unlock()
_, found := ab.addrs[p]
amap, found := ab.addrs[p]
if !found {
ab.addrs[p] = addressMap{}
amap = addressMap{}
ab.addrs[p] = amap
}
ttl := time.Now().Add(ab.ttl)
for _, m := range ms {
// re-set all of them for new ttl.
amap[m.String()] = expiringAddr{
Addr: m,
TTL: ttl,
}
}
ab.addrs[p][m.String()] = m
}
func (ab *addressbook) AddAddresses(p ID, ms []ma.Multiaddr) {
func (ab *addressbook) SetAddresses(p ID, ms []ma.Multiaddr) {
ab.Lock()
defer ab.Unlock()
amap := addressMap{}
ttl := time.Now().Add(ab.ttl)
for _, m := range ms {
_, found := ab.addrs[p]
if !found {
ab.addrs[p] = addressMap{}
}
ab.addrs[p][m.String()] = m
amap[m.String()] = expiringAddr{Addr: m, TTL: ttl}
}
ab.addrs[p] = amap // clear what was there before
}
// KeyBook tracks the Public keys of Peers.
......
......@@ -2,6 +2,7 @@ package peer
import (
"testing"
"time"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
......@@ -29,20 +30,37 @@ func TestAddresses(t *testing.T) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ")
id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn")
id4 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Kn")
id5 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Km")
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
ma21 := MA(t, "/ip4/1.2.3.2/tcp/1111")
ma22 := MA(t, "/ip4/1.2.3.2/tcp/2222")
ma31 := MA(t, "/ip4/1.2.3.3/tcp/1111")
ma32 := MA(t, "/ip4/1.2.3.3/tcp/2222")
ma33 := MA(t, "/ip4/1.2.3.3/tcp/3333")
ma21 := MA(t, "/ip4/2.2.3.2/tcp/1111")
ma22 := MA(t, "/ip4/2.2.3.2/tcp/2222")
ma31 := MA(t, "/ip4/3.2.3.3/tcp/1111")
ma32 := MA(t, "/ip4/3.2.3.3/tcp/2222")
ma33 := MA(t, "/ip4/3.2.3.3/tcp/3333")
ma41 := MA(t, "/ip4/4.2.3.3/tcp/1111")
ma42 := MA(t, "/ip4/4.2.3.3/tcp/2222")
ma43 := MA(t, "/ip4/4.2.3.3/tcp/3333")
ma44 := MA(t, "/ip4/4.2.3.3/tcp/4444")
ma51 := MA(t, "/ip4/5.2.3.3/tcp/1111")
ma52 := MA(t, "/ip4/5.2.3.3/tcp/2222")
ma53 := MA(t, "/ip4/5.2.3.3/tcp/3333")
ma54 := MA(t, "/ip4/5.2.3.3/tcp/4444")
ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555")
ps.AddAddress(id1, ma11)
ps.AddAddress(id2, ma21)
ps.AddAddress(id2, ma22)
ps.AddAddresses(id2, []ma.Multiaddr{ma21, ma22})
ps.AddAddresses(id2, []ma.Multiaddr{ma21, ma22}) // idempotency
ps.AddAddress(id3, ma31)
ps.AddAddress(id3, ma32)
ps.AddAddress(id3, ma33)
ps.AddAddress(id3, ma33) // idempotency
ps.AddAddress(id3, ma33)
ps.AddAddresses(id4, []ma.Multiaddr{ma41, ma42, ma43, ma44}) // multiple
ps.AddAddresses(id5, []ma.Multiaddr{ma21, ma22}) // clearing
ps.AddAddresses(id5, []ma.Multiaddr{ma41, ma42, ma43, ma44}) // clearing
ps.SetAddresses(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}) // clearing
test := func(exp, act []ma.Multiaddr) {
if len(exp) != len(act) {
......@@ -69,9 +87,99 @@ func TestAddresses(t *testing.T) {
test([]ma.Multiaddr{ma11}, ps.Addresses(id1))
test([]ma.Multiaddr{ma21, ma22}, ps.Addresses(id2))
test([]ma.Multiaddr{ma31, ma32, ma33}, ps.Addresses(id3))
test([]ma.Multiaddr{ma41, ma42, ma43, ma44}, ps.Addresses(id4))
test([]ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ps.Addresses(id5))
// test also the PeerInfo return
test([]ma.Multiaddr{ma11}, ps.PeerInfo(id1).Addrs)
test([]ma.Multiaddr{ma21, ma22}, ps.PeerInfo(id2).Addrs)
test([]ma.Multiaddr{ma31, ma32, ma33}, ps.PeerInfo(id3).Addrs)
test([]ma.Multiaddr{ma41, ma42, ma43, ma44}, ps.PeerInfo(id4).Addrs)
test([]ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ps.PeerInfo(id5).Addrs)
}
func TestAddressTTL(t *testing.T) {
ps := NewPeerstore()
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
ma1 := MA(t, "/ip4/1.2.3.1/tcp/1111")
ma2 := MA(t, "/ip4/2.2.3.2/tcp/2222")
ma3 := MA(t, "/ip4/3.2.3.3/tcp/3333")
ma4 := MA(t, "/ip4/4.2.3.3/tcp/4444")
ma5 := MA(t, "/ip4/5.2.3.3/tcp/5555")
ps.AddAddress(id1, ma1)
ps.AddAddress(id1, ma2)
ps.AddAddress(id1, ma3)
ps.AddAddress(id1, ma4)
ps.AddAddress(id1, ma5)
test := func(exp, act []ma.Multiaddr) {
if len(exp) != len(act) {
t.Fatal("lengths not the same")
}
for _, a := range exp {
found := false
for _, b := range act {
if a.Equal(b) {
found = true
break
}
}
if !found {
t.Fatal("expected address %s not found", a)
}
}
}
testTTL := func(ttle time.Duration, id ID, addr ma.Multiaddr) {
ab := ps.(*peerstore).addressbook
ttlat := ab.addrs[id][addr.String()].TTL
ttla := ttlat.Sub(time.Now())
if ttla > ttle {
t.Error("ttl is greater than expected", ttle, ttla)
}
if ttla < (ttle / 2) {
t.Error("ttl is smaller than expected", ttle/2, ttla)
}
}
// should they are there
ab := ps.(*peerstore).addressbook
if len(ab.addrs[id1]) != 5 {
t.Error("incorrect addr count", len(ab.addrs[id1]), ab.addrs[id1])
}
// test the Addresses return value
test([]ma.Multiaddr{ma1, ma2, ma3, ma4, ma5}, ps.Addresses(id1))
test([]ma.Multiaddr{ma1, ma2, ma3, ma4, ma5}, ps.PeerInfo(id1).Addrs)
// check the addr TTL is a bit smaller than the init TTL
testTTL(AddressTTL, id1, ma1)
testTTL(AddressTTL, id1, ma2)
testTTL(AddressTTL, id1, ma3)
testTTL(AddressTTL, id1, ma4)
testTTL(AddressTTL, id1, ma5)
// change the TTL
setTTL := func(id ID, addr ma.Multiaddr, ttl time.Time) {
a := ab.addrs[id][addr.String()]
a.TTL = ttl
ab.addrs[id][addr.String()] = a
}
setTTL(id1, ma1, time.Now().Add(-1*time.Second))
setTTL(id1, ma2, time.Now().Add(-1*time.Hour))
setTTL(id1, ma3, time.Now().Add(-1*AddressTTL))
// should no longer list those
test([]ma.Multiaddr{ma4, ma5}, ps.Addresses(id1))
test([]ma.Multiaddr{ma4, ma5}, ps.PeerInfo(id1).Addrs)
// should no longer be there
if len(ab.addrs[id1]) != 2 {
t.Error("incorrect addr count", len(ab.addrs[id1]), ab.addrs[id1])
}
}
......@@ -176,8 +176,9 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) {
lmaddrs = append(lmaddrs, maddr)
}
// update our peerstore with the addresses.
ids.Host.Peerstore().AddAddresses(p, lmaddrs)
// update our peerstore with the addresses. here, we SET the addresses, clearing old ones.
// We are receiving from the peer itself. this is current address ground truth.
ids.Host.Peerstore().SetAddresses(p, lmaddrs)
log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs)
// get protocol versions
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment