Commit dbfc1c39 authored by Michael Avila's avatar Michael Avila

Close provider on ipfs shutdown

License: MIT
Signed-off-by: default avatarMichael Avila <davidmichaelavila@gmail.com>
parent 00f05cb1
...@@ -14,3 +14,7 @@ func (op *offlineProvider) Run() {} ...@@ -14,3 +14,7 @@ func (op *offlineProvider) Run() {}
func (op *offlineProvider) Provide(cid cid.Cid) error { func (op *offlineProvider) Provide(cid cid.Cid) error {
return nil return nil
} }
func (op *offlineProvider) Close() error {
return nil
}
...@@ -20,6 +20,8 @@ type Provider interface { ...@@ -20,6 +20,8 @@ type Provider interface {
Run() Run()
// Provide takes a cid and makes an attempt to announce it to the network // Provide takes a cid and makes an attempt to announce it to the network
Provide(cid.Cid) error Provide(cid.Cid) error
// Close stops the provider
Close() error
} }
type provider struct { type provider struct {
...@@ -39,6 +41,12 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte ...@@ -39,6 +41,12 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte
} }
} }
// Close stops the provider
func (p *provider) Close() error {
p.queue.Close()
return nil
}
// Start workers to handle provide requests. // Start workers to handle provide requests.
func (p *provider) Run() { func (p *provider) Run() {
p.handleAnnouncements() p.handleAnnouncements()
......
...@@ -27,6 +27,8 @@ type Queue struct { ...@@ -27,6 +27,8 @@ type Queue struct {
ds datastore.Datastore // Must be threadsafe ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid dequeue chan cid.Cid
enqueue chan cid.Cid enqueue chan cid.Cid
close context.CancelFunc
closed chan struct{}
} }
// NewQueue creates a queue for cids // NewQueue creates a queue for cids
...@@ -36,19 +38,29 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, ...@@ -36,19 +38,29 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue,
if err != nil { if err != nil {
return nil, err return nil, err
} }
cancelCtx, cancel := context.WithCancel(ctx)
q := &Queue{ q := &Queue{
name: name, name: name,
ctx: ctx, ctx: cancelCtx,
head: head, head: head,
tail: tail, tail: tail,
ds: namespaced, ds: namespaced,
dequeue: make(chan cid.Cid), dequeue: make(chan cid.Cid),
enqueue: make(chan cid.Cid), enqueue: make(chan cid.Cid),
close: cancel,
closed: make(chan struct{}, 1),
} }
q.work() q.work()
return q, nil return q, nil
} }
// Close stops the queue
func (q *Queue) Close() error {
q.close()
<-q.closed
return nil
}
// Enqueue puts a cid in the queue // Enqueue puts a cid in the queue
func (q *Queue) Enqueue(cid cid.Cid) { func (q *Queue) Enqueue(cid cid.Cid) {
select { select {
...@@ -103,6 +115,10 @@ func (q *Queue) work() { ...@@ -103,6 +115,10 @@ func (q *Queue) work() {
var k datastore.Key = datastore.Key{} var k datastore.Key = datastore.Key{}
var c cid.Cid = cid.Undef var c cid.Cid = cid.Undef
defer func() {
close(q.closed)
}()
for { for {
if c == cid.Undef { if c == cid.Undef {
k, c = q.nextEntry() k, c = q.nextEntry()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment