Commit 0cbfff77 authored by Steven Allen's avatar Steven Allen

contexts: make sure to abort when a context is canceled

Also, buffer single-use channels we may walk away from. This was showing
up (rarely) in a go-ipfs test.
parent fa9aec89
...@@ -59,9 +59,18 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { ...@@ -59,9 +59,18 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
// ConnectedPeers returns a list of peers this PeerManager is managing. // ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID { func (pm *PeerManager) ConnectedPeers() []peer.ID {
resp := make(chan []peer.ID) resp := make(chan []peer.ID, 1)
pm.peerMessages <- &getPeersMessage{resp} select {
return <-resp case pm.peerMessages <- &getPeersMessage{resp}:
case <-pm.ctx.Done():
return nil
}
select {
case peers := <-resp:
return peers
case <-pm.ctx.Done():
return nil
}
} }
// Connected is called to add a new peer to the pool, and send it an initial set // Connected is called to add a new peer to the pool, and send it an initial set
......
...@@ -82,7 +82,7 @@ func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) { ...@@ -82,7 +82,7 @@ func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID { func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
// right now this just returns all peers, but soon we might return peers // right now this just returns all peers, but soon we might return peers
// ordered by optimization, or only a subset // ordered by optimization, or only a subset
resp := make(chan []peer.ID) resp := make(chan []peer.ID, 1)
select { select {
case spm.peerMessages <- &peerReqMessage{resp}: case spm.peerMessages <- &peerReqMessage{resp}:
case <-spm.ctx.Done(): case <-spm.ctx.Done():
...@@ -108,11 +108,16 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) { ...@@ -108,11 +108,16 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
// - share peers between sessions based on interest set // - share peers between sessions based on interest set
for p := range spm.network.FindProvidersAsync(ctx, k, 10) { for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
go func(p peer.ID) { go func(p peer.ID) {
// TODO: Also use context from spm.
err := spm.network.ConnectTo(ctx, p) err := spm.network.ConnectTo(ctx, p)
if err != nil { if err != nil {
log.Debugf("failed to connect to provider %s: %s", p, err) log.Debugf("failed to connect to provider %s: %s", p, err)
} }
spm.peerMessages <- &peerFoundMessage{p} select {
case spm.peerMessages <- &peerFoundMessage{p}:
case <-ctx.Done():
case <-spm.ctx.Done():
}
}(p) }(p)
} }
}(c) }(c)
......
...@@ -51,7 +51,7 @@ func New(ctx context.Context) *SessionRequestSplitter { ...@@ -51,7 +51,7 @@ func New(ctx context.Context) *SessionRequestSplitter {
// SplitRequest splits a request for the given cids one or more times among the // SplitRequest splits a request for the given cids one or more times among the
// given peers. // given peers.
func (srs *SessionRequestSplitter) SplitRequest(peers []peer.ID, ks []cid.Cid) []*PartialRequest { func (srs *SessionRequestSplitter) SplitRequest(peers []peer.ID, ks []cid.Cid) []*PartialRequest {
resp := make(chan []*PartialRequest) resp := make(chan []*PartialRequest, 1)
select { select {
case srs.messages <- &splitRequestMessage{peers, ks, resp}: case srs.messages <- &splitRequestMessage{peers, ks, resp}:
......
...@@ -83,30 +83,66 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe ...@@ -83,30 +83,66 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe
// IsWanted returns whether a CID is currently wanted. // IsWanted returns whether a CID is currently wanted.
func (wm *WantManager) IsWanted(c cid.Cid) bool { func (wm *WantManager) IsWanted(c cid.Cid) bool {
resp := make(chan bool) resp := make(chan bool, 1)
wm.wantMessages <- &isWantedMessage{c, resp} select {
return <-resp case wm.wantMessages <- &isWantedMessage{c, resp}:
case <-wm.ctx.Done():
return false
}
select {
case wanted := <-resp:
return wanted
case <-wm.ctx.Done():
return false
}
} }
// CurrentWants returns the list of current wants. // CurrentWants returns the list of current wants.
func (wm *WantManager) CurrentWants() []*wantlist.Entry { func (wm *WantManager) CurrentWants() []*wantlist.Entry {
resp := make(chan []*wantlist.Entry) resp := make(chan []*wantlist.Entry, 1)
wm.wantMessages <- &currentWantsMessage{resp} select {
return <-resp case wm.wantMessages <- &currentWantsMessage{resp}:
case <-wm.ctx.Done():
return nil
}
select {
case wantlist := <-resp:
return wantlist
case <-wm.ctx.Done():
return nil
}
} }
// CurrentBroadcastWants returns the current list of wants that are broadcasts. // CurrentBroadcastWants returns the current list of wants that are broadcasts.
func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry { func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry {
resp := make(chan []*wantlist.Entry) resp := make(chan []*wantlist.Entry, 1)
wm.wantMessages <- &currentBroadcastWantsMessage{resp} select {
return <-resp case wm.wantMessages <- &currentBroadcastWantsMessage{resp}:
case <-wm.ctx.Done():
return nil
}
select {
case wl := <-resp:
return wl
case <-wm.ctx.Done():
return nil
}
} }
// WantCount returns the total count of wants. // WantCount returns the total count of wants.
func (wm *WantManager) WantCount() int { func (wm *WantManager) WantCount() int {
resp := make(chan int) resp := make(chan int, 1)
wm.wantMessages <- &wantCountMessage{resp} select {
return <-resp case wm.wantMessages <- &wantCountMessage{resp}:
case <-wm.ctx.Done():
return 0
}
select {
case count := <-resp:
return count
case <-wm.ctx.Done():
return 0
}
} }
// Startup starts processing for the WantManager. // Startup starts processing for the WantManager.
......
...@@ -217,11 +217,15 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ...@@ -217,11 +217,15 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
// TODO: come up with a better strategy for determining when to search // TODO: come up with a better strategy for determining when to search
// for new providers for blocks. // for new providers for blocks.
i := rand.Intn(len(entries)) i := rand.Intn(len(entries))
bs.findKeys <- &blockRequest{ select {
case bs.findKeys <- &blockRequest{
Cid: entries[i].Cid, Cid: entries[i].Cid,
Ctx: ctx, Ctx: ctx,
}:
case <-ctx.Done():
return
} }
case <-parent.Done(): case <-ctx.Done():
return return
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment