Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-dms3-provider
Commits
7bdb5546
Commit
7bdb5546
authored
Mar 08, 2019
by
Michael Avila
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Refactor per code climate rules
License: MIT Signed-off-by:
Michael Avila
<
davidmichaelavila@gmail.com
>
parent
90d30898
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
35 additions
and
32 deletions
+35
-32
offline.go
offline.go
+2
-1
provider.go
provider.go
+2
-2
queue.go
queue.go
+31
-29
No files found.
offline.go
View file @
7bdb5546
...
...
@@ -2,8 +2,9 @@ package provider
import
"github.com/ipfs/go-cid"
type
offlineProvider
struct
{}
type
offlineProvider
struct
{}
// NewOfflineProvider creates a Provider that does nothing
func
NewOfflineProvider
()
Provider
{
return
&
offlineProvider
{}
}
...
...
provider.go
View file @
7bdb5546
...
...
@@ -18,13 +18,12 @@ const (
provideOutgoingWorkerLimit
=
8
)
// Provider announces blocks to the network
type
Provider
interface
{
Run
()
Provide
(
cid
.
Cid
)
error
}
// Provider announces blocks to the network, tracks which blocks are
// being provided, and untracks blocks when they're no longer in the blockstore.
type
provider
struct
{
ctx
context
.
Context
// the CIDs for which provide announcements should be made
...
...
@@ -33,6 +32,7 @@ type provider struct {
contentRouting
routing
.
ContentRouting
}
// NewProvider creates a provider that announces blocks to the network using a content router
func
NewProvider
(
ctx
context
.
Context
,
queue
*
Queue
,
contentRouting
routing
.
ContentRouting
)
Provider
{
return
&
provider
{
ctx
:
ctx
,
...
...
queue.go
View file @
7bdb5546
...
...
@@ -17,11 +17,12 @@ import (
// not removed from the datastore until you call Complete() on the entry you
// receive.
type
Entry
struct
{
cid
cid
.
Cid
key
ds
.
Key
cid
cid
.
Cid
key
ds
.
Key
queue
*
Queue
}
// Complete the entry by removing it from the queue
func
(
e
*
Entry
)
Complete
()
error
{
return
e
.
queue
.
remove
(
e
.
key
)
}
...
...
@@ -41,36 +42,37 @@ type Queue struct {
tail
uint64
head
uint64
lock
sync
.
Mutex
lock
sync
.
Mutex
datastore
ds
.
Datastore
dequeue
chan
*
Entry
dequeue
chan
*
Entry
notEmpty
chan
struct
{}
isRunning
bool
}
func
NewQueue
(
name
string
,
ctx
context
.
Context
,
datastore
ds
.
Datastore
)
(
*
Queue
,
error
)
{
namespaced
:=
namespace
.
Wrap
(
datastore
,
ds
.
NewKey
(
"/"
+
name
+
"/queue/"
))
head
,
tail
,
err
:=
getQueueHeadTail
(
name
,
ctx
,
namespaced
)
// NewQueue creates a queue for cids
func
NewQueue
(
ctx
context
.
Context
,
name
string
,
datastore
ds
.
Datastore
)
(
*
Queue
,
error
)
{
namespaced
:=
namespace
.
Wrap
(
datastore
,
ds
.
NewKey
(
"/"
+
name
+
"/queue/"
))
head
,
tail
,
err
:=
getQueueHeadTail
(
ctx
,
name
,
namespaced
)
if
err
!=
nil
{
return
nil
,
err
}
q
:=
&
Queue
{
name
:
name
,
ctx
:
ctx
,
head
:
head
,
tail
:
tail
,
lock
:
sync
.
Mutex
{},
name
:
name
,
ctx
:
ctx
,
head
:
head
,
tail
:
tail
,
lock
:
sync
.
Mutex
{},
datastore
:
namespaced
,
dequeue
:
make
(
chan
*
Entry
),
notEmpty
:
make
(
chan
struct
{}),
dequeue
:
make
(
chan
*
Entry
),
notEmpty
:
make
(
chan
struct
{}),
isRunning
:
false
,
}
return
q
,
nil
}
//
P
ut a cid in the queue
//
Enqueue p
ut
s
a cid in the queue
func
(
q
*
Queue
)
Enqueue
(
cid
cid
.
Cid
)
error
{
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
...
...
@@ -95,21 +97,18 @@ func (q *Queue) Enqueue(cid cid.Cid) error {
return
nil
}
//
R
emove
an
entr
y
from the queue
.
//
Dequeue returns a channel that if listened to will r
emove entr
ies
from the queue
func
(
q
*
Queue
)
Dequeue
()
<-
chan
*
Entry
{
return
q
.
dequeue
}
// IsEmpty returns whether or not the queue has any items
func
(
q
*
Queue
)
IsEmpty
()
bool
{
return
(
q
.
tail
-
q
.
head
)
==
0
}
func
(
q
*
Queue
)
remove
(
key
ds
.
Key
)
error
{
return
q
.
datastore
.
Delete
(
key
)
}
// dequeue items when the dequeue channel is available to
// be written to
// Run dequeues items when the dequeue channel is available to
// be written to.
func
(
q
*
Queue
)
Run
()
{
q
.
isRunning
=
true
go
func
()
{
...
...
@@ -178,9 +177,9 @@ func (q *Queue) next() (*Entry, error) {
return
nil
,
err
}
entry
:=
&
Entry
{
cid
:
id
,
key
:
nextKey
,
entry
:=
&
Entry
{
cid
:
id
,
key
:
nextKey
,
queue
:
q
,
}
...
...
@@ -194,14 +193,14 @@ func (q *Queue) queueKey(id uint64) ds.Key {
}
// crawl over the queue entries to find the head and tail
func
getQueueHeadTail
(
name
string
,
ctx
context
.
Context
,
datastore
ds
.
Datastore
)
(
uint64
,
uint64
,
error
)
{
func
getQueueHeadTail
(
ctx
context
.
Context
,
name
string
,
datastore
ds
.
Datastore
)
(
uint64
,
uint64
,
error
)
{
query
:=
query
.
Query
{}
results
,
err
:=
datastore
.
Query
(
query
)
if
err
!=
nil
{
return
0
,
0
,
err
}
var
tail
uint64
=
0
var
tail
uint64
var
head
uint64
=
math
.
MaxUint64
for
entry
:=
range
results
.
Next
()
{
select
{
...
...
@@ -219,8 +218,8 @@ func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore)
head
=
id
}
if
(
id
+
1
)
>
tail
{
tail
=
(
id
+
1
)
if
(
id
+
1
)
>
tail
{
tail
=
(
id
+
1
)
}
}
if
err
:=
results
.
Close
();
err
!=
nil
{
...
...
@@ -233,3 +232,6 @@ func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore)
return
head
,
tail
,
nil
}
func
(
q
*
Queue
)
remove
(
key
ds
.
Key
)
error
{
return
q
.
datastore
.
Delete
(
key
)
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment