Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-kad-dht
Commits
c0aae658
Commit
c0aae658
authored
Jul 29, 2014
by
Jeromy
Committed by
Juan Batiz-Benet
Aug 07, 2014
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
work on framework for dht message handling
parent
23c26272
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
94 additions
and
35 deletions
+94
-35
dht.go
dht.go
+39
-17
messages.pb.go
messages.pb.go
+24
-10
messages.proto
messages.proto
+1
-1
routing.go
routing.go
+22
-7
table.go
table.go
+8
-0
No files found.
dht.go
View file @
c0aae658
...
...
@@ -2,6 +2,8 @@ package dht
import
(
swarm
"github.com/jbenet/go-ipfs/swarm"
u
"github.com/jbenet/go-ipfs/util"
"code.google.com/p/goprotobuf/proto"
"sync"
)
...
...
@@ -14,36 +16,56 @@ type IpfsDHT struct {
network
*
swarm
.
Swarm
listeners
map
[
uint64
]
chan
swarm
.
Message
// map of channels waiting for reply messages
listeners
map
[
uint64
]
chan
*
swarm
.
Message
listenLock
sync
.
RWMutex
// Signal to shutdown dht
shutdown
chan
struct
{}
}
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
func
(
dht
*
IpfsDHT
)
handleMessages
()
{
for
mes
:=
range
dht
.
network
.
Chan
.
Incoming
{
for
{
select
{
case
mes
:=
<-
dht
.
network
.
Chan
.
Incoming
:
// Unmarshal message
dht
.
listenLock
.
RLock
()
ch
,
ok
:=
dht
.
listeners
[
id
]
dht
.
listenLock
.
RUnlock
()
if
ok
{
// Send message to waiting goroutine
ch
<-
mes
}
//case closeChan: or something
for
{
select
{
case
mes
:=
<-
dht
.
network
.
Chan
.
Incoming
:
pmes
:=
new
(
DHTMessage
)
err
:=
proto
.
Unmarshal
(
mes
.
Data
,
pmes
)
if
err
!=
nil
{
u
.
PErr
(
"Failed to decode protobuf message: %s"
,
err
)
continue
}
// Note: not sure if this is the correct place for this
dht
.
listenLock
.
RLock
()
ch
,
ok
:=
dht
.
listeners
[
pmes
.
GetId
()]
dht
.
listenLock
.
RUnlock
()
if
ok
{
ch
<-
mes
}
//
// Do something else with the messages?
switch
pmes
.
GetType
()
{
case
DHTMessage_ADD_PROVIDER
:
case
DHTMessage_FIND_NODE
:
case
DHTMessage_GET_PROVIDERS
:
case
DHTMessage_GET_VALUE
:
case
DHTMessage_PING
:
case
DHTMessage_PUT_VALUE
:
}
case
<-
dht
.
shutdown
:
return
}
}
}
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
func
(
dht
*
IpfsDHT
)
ListenFor
(
mesid
uint64
)
<-
chan
swarm
.
Message
{
lchan
:=
make
(
chan
swarm
.
Message
)
func
(
dht
*
IpfsDHT
)
ListenFor
(
mesid
uint64
)
<-
chan
*
swarm
.
Message
{
lchan
:=
make
(
chan
*
swarm
.
Message
)
dht
.
listenLock
.
Lock
()
dht
.
listeners
[
mesid
]
=
lchan
dht
.
listenLock
.
Unlock
()
...
...
messages.pb.go
View file @
c0aae658
...
...
@@ -23,23 +23,29 @@ var _ = math.Inf
type
DHTMessage_MessageType
int32
const
(
DHTMessage_PUT_VALUE
DHTMessage_MessageType
=
0
DHTMessage_GET_VALUE
DHTMessage_MessageType
=
1
DHTMessage_PING
DHTMessage_MessageType
=
2
DHTMessage_FIND_NODE
DHTMessage_MessageType
=
3
DHTMessage_PUT_VALUE
DHTMessage_MessageType
=
0
DHTMessage_GET_VALUE
DHTMessage_MessageType
=
1
DHTMessage_ADD_PROVIDER
DHTMessage_MessageType
=
2
DHTMessage_GET_PROVIDERS
DHTMessage_MessageType
=
3
DHTMessage_FIND_NODE
DHTMessage_MessageType
=
4
DHTMessage_PING
DHTMessage_MessageType
=
5
)
var
DHTMessage_MessageType_name
=
map
[
int32
]
string
{
0
:
"PUT_VALUE"
,
1
:
"GET_VALUE"
,
2
:
"PING"
,
3
:
"FIND_NODE"
,
2
:
"ADD_PROVIDER"
,
3
:
"GET_PROVIDERS"
,
4
:
"FIND_NODE"
,
5
:
"PING"
,
}
var
DHTMessage_MessageType_value
=
map
[
string
]
int32
{
"PUT_VALUE"
:
0
,
"GET_VALUE"
:
1
,
"PING"
:
2
,
"FIND_NODE"
:
3
,
"PUT_VALUE"
:
0
,
"GET_VALUE"
:
1
,
"ADD_PROVIDER"
:
2
,
"GET_PROVIDERS"
:
3
,
"FIND_NODE"
:
4
,
"PING"
:
5
,
}
func
(
x
DHTMessage_MessageType
)
Enum
()
*
DHTMessage_MessageType
{
...
...
@@ -63,6 +69,7 @@ type DHTMessage struct {
Type
*
DHTMessage_MessageType
`protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"`
Key
*
string
`protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value
[]
byte
`protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Id
*
uint64
`protobuf:"varint,4,req,name=id" json:"id,omitempty"`
XXX_unrecognized
[]
byte
`json:"-"`
}
...
...
@@ -91,6 +98,13 @@ func (m *DHTMessage) GetValue() []byte {
return
nil
}
func
(
m
*
DHTMessage
)
GetId
()
uint64
{
if
m
!=
nil
&&
m
.
Id
!=
nil
{
return
*
m
.
Id
}
return
0
}
func
init
()
{
proto
.
RegisterEnum
(
"dht.DHTMessage_MessageType"
,
DHTMessage_MessageType_name
,
DHTMessage_MessageType_value
)
}
messages.proto
View file @
c0aae658
...
...
@@ -15,5 +15,5 @@ message DHTMessage {
required
MessageType
type
=
1
;
optional
string
key
=
2
;
optional
bytes
value
=
3
;
required
int64
id
=
4
;
required
u
int64
id
=
4
;
}
routing.go
View file @
c0aae658
...
...
@@ -7,6 +7,11 @@ import (
"time"
)
// TODO: determine a way of creating and managing message IDs
func
GenerateMessageID
()
uint64
{
return
4
}
// This file implements the Routing interface for the IpfsDHT struct.
// Basic Put/Get
...
...
@@ -16,9 +21,15 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
var
p
*
peer
.
Peer
p
=
s
.
routes
.
NearestNode
(
key
)
pmes
:=
new
(
PutValue
)
pmes
.
Key
=
&
key
pmes_type
:=
DHTMessage_PUT_VALUE
str_key
:=
string
(
key
)
mes_id
:=
GenerateMessageID
()
pmes
:=
new
(
DHTMessage
)
pmes
.
Type
=
&
pmes_type
pmes
.
Key
=
&
str_key
pmes
.
Value
=
value
pmes
.
Id
=
&
mes_id
mes
:=
new
(
swarm
.
Message
)
mes
.
Data
=
[]
byte
(
pmes
.
String
())
...
...
@@ -33,23 +44,27 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
var
p
*
peer
.
Peer
p
=
s
.
routes
.
NearestNode
(
key
)
str_key
:=
string
(
key
)
mes_type
:=
DHTMessage_GET_VALUE
mes_id
:=
GenerateMessageID
()
// protobuf structure
pmes
:=
new
(
GetValue
)
pmes
.
Key
=
&
key
pmes
.
Id
=
GenerateMessageID
()
pmes
:=
new
(
DHTMessage
)
pmes
.
Type
=
&
mes_type
pmes
.
Key
=
&
str_key
pmes
.
Id
=
&
mes_id
mes
:=
new
(
swarm
.
Message
)
mes
.
Data
=
[]
byte
(
pmes
.
String
())
mes
.
Peer
=
p
response_chan
:=
s
.
ListenFor
(
pmes
.
Id
)
response_chan
:=
s
.
ListenFor
(
*
pmes
.
Id
)
// Wait for either the response or a timeout
timeup
:=
time
.
After
(
timeout
)
select
{
case
<-
timeup
:
// TODO: unregister listener
return
nil
,
t
imeout
Error
return
nil
,
u
.
ErrT
imeout
case
resp
:=
<-
response_chan
:
return
resp
.
Data
,
nil
}
...
...
table.go
View file @
c0aae658
...
...
@@ -3,6 +3,9 @@ package dht
import
(
"bytes"
"container/list"
peer
"github.com/jbenet/go-ipfs/peer"
u
"github.com/jbenet/go-ipfs/util"
)
// ID for IpfsDHT should be a byte slice, to allow for simpler operations
...
...
@@ -22,6 +25,11 @@ type RoutingTable struct {
Buckets
[]
Bucket
}
//TODO: make this accept an ID, requires method of converting keys to IDs
func
(
rt
*
RoutingTable
)
NearestNode
(
key
u
.
Key
)
*
peer
.
Peer
{
panic
(
"Function not implemented."
)
}
func
(
id
ID
)
Equal
(
other
ID
)
bool
{
return
bytes
.
Equal
(
id
,
other
)
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment