Commit 795e2422 authored by Juan Benet's avatar Juan Benet

Merge pull request #1867 from ipfs/fix/mocknet-race

fix a few race conditions in mocknet
parents 27eece3e def46978
...@@ -34,7 +34,7 @@ type mocknet struct { ...@@ -34,7 +34,7 @@ type mocknet struct {
proc goprocess.Process // for Context closing proc goprocess.Process // for Context closing
ctx context.Context ctx context.Context
sync.RWMutex sync.Mutex
} }
func New(ctx context.Context) Mocknet { func New(ctx context.Context) Mocknet {
...@@ -95,8 +95,8 @@ func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peer.Peerstore) (host.Host ...@@ -95,8 +95,8 @@ func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peer.Peerstore) (host.Host
} }
func (mn *mocknet) Peers() []peer.ID { func (mn *mocknet) Peers() []peer.ID {
mn.RLock() mn.Lock()
defer mn.RUnlock() defer mn.Unlock()
cp := make([]peer.ID, 0, len(mn.nets)) cp := make([]peer.ID, 0, len(mn.nets))
for _, n := range mn.nets { for _, n := range mn.nets {
...@@ -107,22 +107,22 @@ func (mn *mocknet) Peers() []peer.ID { ...@@ -107,22 +107,22 @@ func (mn *mocknet) Peers() []peer.ID {
} }
func (mn *mocknet) Host(pid peer.ID) host.Host { func (mn *mocknet) Host(pid peer.ID) host.Host {
mn.RLock() mn.Lock()
host := mn.hosts[pid] host := mn.hosts[pid]
mn.RUnlock() mn.Unlock()
return host return host
} }
func (mn *mocknet) Net(pid peer.ID) inet.Network { func (mn *mocknet) Net(pid peer.ID) inet.Network {
mn.RLock() mn.Lock()
n := mn.nets[pid] n := mn.nets[pid]
mn.RUnlock() mn.Unlock()
return n return n
} }
func (mn *mocknet) Hosts() []host.Host { func (mn *mocknet) Hosts() []host.Host {
mn.RLock() mn.Lock()
defer mn.RUnlock() defer mn.Unlock()
cp := make([]host.Host, 0, len(mn.hosts)) cp := make([]host.Host, 0, len(mn.hosts))
for _, h := range mn.hosts { for _, h := range mn.hosts {
...@@ -134,8 +134,8 @@ func (mn *mocknet) Hosts() []host.Host { ...@@ -134,8 +134,8 @@ func (mn *mocknet) Hosts() []host.Host {
} }
func (mn *mocknet) Nets() []inet.Network { func (mn *mocknet) Nets() []inet.Network {
mn.RLock() mn.Lock()
defer mn.RUnlock() defer mn.Unlock()
cp := make([]inet.Network, 0, len(mn.nets)) cp := make([]inet.Network, 0, len(mn.nets))
for _, n := range mn.nets { for _, n := range mn.nets {
...@@ -148,8 +148,8 @@ func (mn *mocknet) Nets() []inet.Network { ...@@ -148,8 +148,8 @@ func (mn *mocknet) Nets() []inet.Network {
// Links returns a copy of the internal link state map. // Links returns a copy of the internal link state map.
// (wow, much map. so data structure. how compose. ahhh pointer) // (wow, much map. so data structure. how compose. ahhh pointer)
func (mn *mocknet) Links() LinkMap { func (mn *mocknet) Links() LinkMap {
mn.RLock() mn.Lock()
defer mn.RUnlock() defer mn.Unlock()
links := map[string]map[string]map[Link]struct{}{} links := map[string]map[string]map[Link]struct{}{}
for p1, lm := range mn.links { for p1, lm := range mn.links {
...@@ -179,10 +179,10 @@ func (mn *mocknet) LinkAll() error { ...@@ -179,10 +179,10 @@ func (mn *mocknet) LinkAll() error {
} }
func (mn *mocknet) LinkPeers(p1, p2 peer.ID) (Link, error) { func (mn *mocknet) LinkPeers(p1, p2 peer.ID) (Link, error) {
mn.RLock() mn.Lock()
n1 := mn.nets[p1] n1 := mn.nets[p1]
n2 := mn.nets[p2] n2 := mn.nets[p2]
mn.RUnlock() mn.Unlock()
if n1 == nil { if n1 == nil {
return nil, fmt.Errorf("network for p1 not in mocknet") return nil, fmt.Errorf("network for p1 not in mocknet")
...@@ -211,11 +211,11 @@ func (mn *mocknet) validate(n inet.Network) (*peernet, error) { ...@@ -211,11 +211,11 @@ func (mn *mocknet) validate(n inet.Network) (*peernet, error) {
} }
func (mn *mocknet) LinkNets(n1, n2 inet.Network) (Link, error) { func (mn *mocknet) LinkNets(n1, n2 inet.Network) (Link, error) {
mn.RLock() mn.Lock()
n1r, err1 := mn.validate(n1) n1r, err1 := mn.validate(n1)
n2r, err2 := mn.validate(n2) n2r, err2 := mn.validate(n2)
ld := mn.linkDefaults ld := mn.linkDefaults
mn.RUnlock() mn.Unlock()
if err1 != nil { if err1 != nil {
return nil, err1 return nil, err1
...@@ -260,7 +260,7 @@ func (mn *mocknet) UnlinkNets(n1, n2 inet.Network) error { ...@@ -260,7 +260,7 @@ func (mn *mocknet) UnlinkNets(n1, n2 inet.Network) error {
} }
// get from the links map. and lazily contruct. // get from the links map. and lazily contruct.
func (mn *mocknet) linksMapGet(p1, p2 peer.ID) *map[*link]struct{} { func (mn *mocknet) linksMapGet(p1, p2 peer.ID) map[*link]struct{} {
l1, found := mn.links[p1] l1, found := mn.links[p1]
if !found { if !found {
...@@ -275,7 +275,7 @@ func (mn *mocknet) linksMapGet(p1, p2 peer.ID) *map[*link]struct{} { ...@@ -275,7 +275,7 @@ func (mn *mocknet) linksMapGet(p1, p2 peer.ID) *map[*link]struct{} {
l2 = l1[p2] l2 = l1[p2]
} }
return &l2 return l2
} }
func (mn *mocknet) addLink(l *link) { func (mn *mocknet) addLink(l *link) {
...@@ -283,8 +283,8 @@ func (mn *mocknet) addLink(l *link) { ...@@ -283,8 +283,8 @@ func (mn *mocknet) addLink(l *link) {
defer mn.Unlock() defer mn.Unlock()
n1, n2 := l.nets[0], l.nets[1] n1, n2 := l.nets[0], l.nets[1]
(*mn.linksMapGet(n1.peer, n2.peer))[l] = struct{}{} mn.linksMapGet(n1.peer, n2.peer)[l] = struct{}{}
(*mn.linksMapGet(n2.peer, n1.peer))[l] = struct{}{} mn.linksMapGet(n2.peer, n1.peer)[l] = struct{}{}
} }
func (mn *mocknet) removeLink(l *link) { func (mn *mocknet) removeLink(l *link) {
...@@ -292,8 +292,8 @@ func (mn *mocknet) removeLink(l *link) { ...@@ -292,8 +292,8 @@ func (mn *mocknet) removeLink(l *link) {
defer mn.Unlock() defer mn.Unlock()
n1, n2 := l.nets[0], l.nets[1] n1, n2 := l.nets[0], l.nets[1]
delete(*mn.linksMapGet(n1.peer, n2.peer), l) delete(mn.linksMapGet(n1.peer, n2.peer), l)
delete(*mn.linksMapGet(n2.peer, n1.peer), l) delete(mn.linksMapGet(n2.peer, n1.peer), l)
} }
func (mn *mocknet) ConnectAllButSelf() error { func (mn *mocknet) ConnectAllButSelf() error {
...@@ -329,10 +329,10 @@ func (mn *mocknet) DisconnectNets(n1, n2 inet.Network) error { ...@@ -329,10 +329,10 @@ func (mn *mocknet) DisconnectNets(n1, n2 inet.Network) error {
} }
func (mn *mocknet) LinksBetweenPeers(p1, p2 peer.ID) []Link { func (mn *mocknet) LinksBetweenPeers(p1, p2 peer.ID) []Link {
mn.RLock() mn.Lock()
defer mn.RUnlock() defer mn.Unlock()
ls2 := *mn.linksMapGet(p1, p2) ls2 := mn.linksMapGet(p1, p2)
cp := make([]Link, 0, len(ls2)) cp := make([]Link, 0, len(ls2))
for l := range ls2 { for l := range ls2 {
cp = append(cp, l) cp = append(cp, l)
...@@ -351,8 +351,8 @@ func (mn *mocknet) SetLinkDefaults(o LinkOptions) { ...@@ -351,8 +351,8 @@ func (mn *mocknet) SetLinkDefaults(o LinkOptions) {
} }
func (mn *mocknet) LinkDefaults() LinkOptions { func (mn *mocknet) LinkDefaults() LinkOptions {
mn.RLock() mn.Lock()
defer mn.RUnlock() defer mn.Unlock()
return mn.linkDefaults return mn.linkDefaults
} }
......
package mocknet package mocknet
import ( import (
"sync"
"time" "time"
) )
// A ratelimiter is used by a link to determine how long to wait before sending // A ratelimiter is used by a link to determine how long to wait before sending
// data given a bandwidth cap. // data given a bandwidth cap.
type ratelimiter struct { type ratelimiter struct {
lock sync.Mutex
bandwidth float64 // bytes per nanosecond bandwidth float64 // bytes per nanosecond
allowance float64 // in bytes allowance float64 // in bytes
maxAllowance float64 // in bytes maxAllowance float64 // in bytes
...@@ -29,6 +31,8 @@ func NewRatelimiter(bandwidth float64) *ratelimiter { ...@@ -29,6 +31,8 @@ func NewRatelimiter(bandwidth float64) *ratelimiter {
// Changes bandwidth of a ratelimiter and resets its allowance // Changes bandwidth of a ratelimiter and resets its allowance
func (r *ratelimiter) UpdateBandwidth(bandwidth float64) { func (r *ratelimiter) UpdateBandwidth(bandwidth float64) {
r.lock.Lock()
defer r.lock.Unlock()
// Convert bandwidth from bytes/second to bytes/nanosecond // Convert bandwidth from bytes/second to bytes/nanosecond
b := bandwidth / float64(time.Second) b := bandwidth / float64(time.Second)
r.bandwidth = b r.bandwidth = b
...@@ -40,6 +44,8 @@ func (r *ratelimiter) UpdateBandwidth(bandwidth float64) { ...@@ -40,6 +44,8 @@ func (r *ratelimiter) UpdateBandwidth(bandwidth float64) {
// Returns how long to wait before sending data with length 'dataSize' bytes // Returns how long to wait before sending data with length 'dataSize' bytes
func (r *ratelimiter) Limit(dataSize int) time.Duration { func (r *ratelimiter) Limit(dataSize int) time.Duration {
r.lock.Lock()
defer r.lock.Unlock()
// update time // update time
var duration time.Duration = time.Duration(0) var duration time.Duration = time.Duration(0)
if r.bandwidth == 0 { if r.bandwidth == 0 {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment