Commit 7c0c3c45 authored by Jeromy's avatar Jeromy

add put and get dht commands to cli

parent 5ea2afc4
......@@ -26,6 +26,8 @@ var DhtCmd = &cmds.Command{
"query": queryDhtCmd,
"findprovs": findProvidersDhtCmd,
"findpeer": findPeerDhtCmd,
"get": getValueDhtCmd,
"put": putValueDhtCmd,
},
}
......@@ -352,3 +354,224 @@ var findPeerDhtCmd = &cmds.Command{
},
Type: notif.QueryEvent{},
}
var getValueDhtCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Run a 'GetValue' query through the DHT",
ShortDescription: `
GetValue will return the value stored in the dht at the given key.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("key", true, true, "The key to find a value for"),
},
Options: []cmds.Option{
cmds.BoolOption("verbose", "v", "Write extra information"),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.Context().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
dht, ok := n.Routing.(*ipdht.IpfsDHT)
if !ok {
res.SetError(ErrNotDHT, cmds.ErrNormal)
return
}
outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))
events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context().Context, events)
go func() {
defer close(outChan)
for e := range events {
outChan <- e
}
}()
go func() {
defer close(events)
val, err := dht.GetValue(ctx, u.B58KeyDecode(req.Arguments()[0]))
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
})
} else {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.Value,
Extra: string(val),
})
}
}()
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
if !ok {
return nil, u.ErrCast()
}
verbose, _, _ := res.Request().Option("v").Bool()
marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, u.ErrCast()
}
buf := new(bytes.Buffer)
if verbose {
fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000"))
}
switch obj.Type {
case notif.FinalPeer:
if verbose {
fmt.Fprintf(buf, "* closest peer %s\n", obj.ID)
}
case notif.PeerResponse:
if verbose {
fmt.Fprintf(buf, "* %s says use ", obj.ID)
for _, p := range obj.Responses {
fmt.Fprintf(buf, "%s ", p.ID)
}
fmt.Fprintln(buf)
}
case notif.SendingQuery:
if verbose {
fmt.Fprintf(buf, "* querying %s\n", obj.ID)
}
case notif.Value:
fmt.Fprintf(buf, "got value: '%s'\n", obj.Extra)
case notif.QueryError:
fmt.Fprintf(buf, "error: %s\n", obj.Extra)
default:
fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type)
}
return buf, nil
}
return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
}, nil
},
},
Type: notif.QueryEvent{},
}
var putValueDhtCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Run a 'PutValue' query through the DHT",
ShortDescription: `
PutValue will store the given key value pair in the dht.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("key", true, false, "The key to store the value at"),
cmds.StringArg("value", true, false, "The value to store").EnableStdin(),
},
Options: []cmds.Option{
cmds.BoolOption("verbose", "v", "Write extra information"),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.Context().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
dht, ok := n.Routing.(*ipdht.IpfsDHT)
if !ok {
res.SetError(ErrNotDHT, cmds.ErrNormal)
return
}
outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))
events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context().Context, events)
key := u.B58KeyDecode(req.Arguments()[0])
data := req.Arguments()[1]
go func() {
defer close(outChan)
for e := range events {
outChan <- e
}
}()
go func() {
defer close(events)
err := dht.PutValue(ctx, key, []byte(data))
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
})
}
}()
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
if !ok {
return nil, u.ErrCast()
}
verbose, _, _ := res.Request().Option("v").Bool()
marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, u.ErrCast()
}
buf := new(bytes.Buffer)
if verbose {
fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000"))
}
switch obj.Type {
case notif.FinalPeer:
if verbose {
fmt.Fprintf(buf, "* closest peer %s\n", obj.ID)
}
case notif.PeerResponse:
if verbose {
fmt.Fprintf(buf, "* %s says use ", obj.ID)
for _, p := range obj.Responses {
fmt.Fprintf(buf, "%s ", p.ID)
}
fmt.Fprintln(buf)
}
case notif.SendingQuery:
if verbose {
fmt.Fprintf(buf, "* querying %s\n", obj.ID)
}
case notif.QueryError:
fmt.Fprintf(buf, "error: %s\n", obj.Extra)
case notif.Value:
fmt.Fprintf(buf, "storing value at %s\n", obj.ID)
default:
fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type)
}
return buf, nil
}
return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
}, nil
},
},
Type: notif.QueryEvent{},
}
......@@ -17,6 +17,7 @@ const (
FinalPeer
QueryError
Provider
Value
)
type QueryEvent struct {
......
......@@ -322,7 +322,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
// == to self? thats bad
for _, p := range closer {
if p == dht.self {
log.Debug("Attempted to return self! this shouldnt happen...")
log.Error("Attempted to return self! this shouldnt happen...")
return nil
}
}
......
......@@ -83,7 +83,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
// Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
if closer != nil {
if len(closer) > 0 {
closerinfos := peer.PeerInfos(dht.peerstore, closer)
for _, pi := range closerinfos {
log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
......
......@@ -59,6 +59,11 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.Value,
ID: p,
})
err := dht.putValueToPeer(ctx, p, key, rec)
if err != nil {
log.Debugf("failed putting value to peer: %s", err)
......@@ -92,6 +97,11 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
// setup the Query
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})
val, peers, err := dht.getValueOrPeers(ctx, p, key)
if err != nil {
return nil, err
......@@ -102,6 +112,12 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
res.success = true
}
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.PeerResponse,
ID: p,
Responses: pointerizePeerInfos(peers),
})
return res, nil
})
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment