diff --git a/core/commands/dht.go b/core/commands/dht.go index d7c710c89cd802606ae1c4cb4a81af8ebb8c5475..708ba2e6266b44426f14f979a519ac08c089999f 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -26,6 +26,8 @@ var DhtCmd = &cmds.Command{ "query": queryDhtCmd, "findprovs": findProvidersDhtCmd, "findpeer": findPeerDhtCmd, + "get": getValueDhtCmd, + "put": putValueDhtCmd, }, } @@ -352,3 +354,224 @@ var findPeerDhtCmd = &cmds.Command{ }, Type: notif.QueryEvent{}, } + +var getValueDhtCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Run a 'GetValue' query through the DHT", + ShortDescription: ` +GetValue will return the value stored in the dht at the given key. +`, + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("key", true, true, "The key to find a value for"), + }, + Options: []cmds.Option{ + cmds.BoolOption("verbose", "v", "Write extra information"), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := req.Context().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + dht, ok := n.Routing.(*ipdht.IpfsDHT) + if !ok { + res.SetError(ErrNotDHT, cmds.ErrNormal) + return + } + + outChan := make(chan interface{}) + res.SetOutput((<-chan interface{})(outChan)) + + events := make(chan *notif.QueryEvent) + ctx := notif.RegisterForQueryEvents(req.Context().Context, events) + + go func() { + defer close(outChan) + for e := range events { + outChan <- e + } + }() + + go func() { + defer close(events) + val, err := dht.GetValue(ctx, u.B58KeyDecode(req.Arguments()[0])) + if err != nil { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + } else { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.Value, + Extra: string(val), + }) + } + }() + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + outChan, ok := res.Output().(<-chan interface{}) + if !ok { + return nil, u.ErrCast() + } + + verbose, _, _ := res.Request().Option("v").Bool() + + marshal := func(v interface{}) (io.Reader, error) { + obj, ok := v.(*notif.QueryEvent) + if !ok { + return nil, u.ErrCast() + } + + buf := new(bytes.Buffer) + if verbose { + fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000")) + } + switch obj.Type { + case notif.FinalPeer: + if verbose { + fmt.Fprintf(buf, "* closest peer %s\n", obj.ID) + } + case notif.PeerResponse: + if verbose { + fmt.Fprintf(buf, "* %s says use ", obj.ID) + for _, p := range obj.Responses { + fmt.Fprintf(buf, "%s ", p.ID) + } + fmt.Fprintln(buf) + } + case notif.SendingQuery: + if verbose { + fmt.Fprintf(buf, "* querying %s\n", obj.ID) + } + case notif.Value: + fmt.Fprintf(buf, "got value: '%s'\n", obj.Extra) + case notif.QueryError: + fmt.Fprintf(buf, "error: %s\n", obj.Extra) + default: + fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type) + } + return buf, nil + } + + return &cmds.ChannelMarshaler{ + Channel: outChan, + Marshaler: marshal, + }, nil + }, + }, + Type: notif.QueryEvent{}, +} + +var putValueDhtCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Run a 'PutValue' query through the DHT", + ShortDescription: ` +PutValue will store the given key value pair in the dht. +`, + }, + + Arguments: []cmds.Argument{ + cmds.StringArg("key", true, false, "The key to store the value at"), + cmds.StringArg("value", true, false, "The value to store").EnableStdin(), + }, + Options: []cmds.Option{ + cmds.BoolOption("verbose", "v", "Write extra information"), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := req.Context().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + dht, ok := n.Routing.(*ipdht.IpfsDHT) + if !ok { + res.SetError(ErrNotDHT, cmds.ErrNormal) + return + } + + outChan := make(chan interface{}) + res.SetOutput((<-chan interface{})(outChan)) + + events := make(chan *notif.QueryEvent) + ctx := notif.RegisterForQueryEvents(req.Context().Context, events) + + key := u.B58KeyDecode(req.Arguments()[0]) + data := req.Arguments()[1] + + go func() { + defer close(outChan) + for e := range events { + outChan <- e + } + }() + + go func() { + defer close(events) + err := dht.PutValue(ctx, key, []byte(data)) + if err != nil { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + } + }() + }, + Marshalers: cmds.MarshalerMap{ + cmds.Text: func(res cmds.Response) (io.Reader, error) { + outChan, ok := res.Output().(<-chan interface{}) + if !ok { + return nil, u.ErrCast() + } + + verbose, _, _ := res.Request().Option("v").Bool() + + marshal := func(v interface{}) (io.Reader, error) { + obj, ok := v.(*notif.QueryEvent) + if !ok { + return nil, u.ErrCast() + } + + buf := new(bytes.Buffer) + if verbose { + fmt.Fprintf(buf, "%s: ", time.Now().Format("15:04:05.000")) + } + switch obj.Type { + case notif.FinalPeer: + if verbose { + fmt.Fprintf(buf, "* closest peer %s\n", obj.ID) + } + case notif.PeerResponse: + if verbose { + fmt.Fprintf(buf, "* %s says use ", obj.ID) + for _, p := range obj.Responses { + fmt.Fprintf(buf, "%s ", p.ID) + } + fmt.Fprintln(buf) + } + case notif.SendingQuery: + if verbose { + fmt.Fprintf(buf, "* querying %s\n", obj.ID) + } + case notif.QueryError: + fmt.Fprintf(buf, "error: %s\n", obj.Extra) + case notif.Value: + fmt.Fprintf(buf, "storing value at %s\n", obj.ID) + default: + fmt.Fprintf(buf, "unrecognized event type: %d\n", obj.Type) + } + return buf, nil + } + + return &cmds.ChannelMarshaler{ + Channel: outChan, + Marshaler: marshal, + }, nil + }, + }, + Type: notif.QueryEvent{}, +} diff --git a/notifications/query.go b/notifications/query.go index 77d6fcc586421d7c466ce8ab393ffbd2cf90795a..8f4b8119d54fac3b97ccd3fff155ac7217b1ad50 100644 --- a/notifications/query.go +++ b/notifications/query.go @@ -17,6 +17,7 @@ const ( FinalPeer QueryError Provider + Value ) type QueryEvent struct { diff --git a/routing/dht/dht.go b/routing/dht/dht.go index edd18ff11f62dcc61b0825b51604e1fb09268a0a..e3d05492513891100c972ae8447add2f4dbd5303 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -322,7 +322,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [ // == to self? thats bad for _, p := range closer { if p == dht.self { - log.Debug("Attempted to return self! this shouldnt happen...") + log.Error("Attempted to return self! this shouldnt happen...") return nil } } diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 03c3eda3ca5065ba1140c0dcfb2e50c44ae1e2be..defb7e488653dacfcb44be8fbf2129752a6fb2ac 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -83,7 +83,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess // Find closest peer on given cluster to desired key and reply with that info closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) - if closer != nil { + if len(closer) > 0 { closerinfos := peer.PeerInfos(dht.peerstore, closer) for _, pi := range closerinfos { log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID) diff --git a/routing/dht/routing.go b/routing/dht/routing.go index ade41b82e53f977abfdc3ee604e173fcb9947fef..49449eda7cc90c73fd66206a11b670ffbb109263 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -59,6 +59,11 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error wg.Add(1) go func(p peer.ID) { defer wg.Done() + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.Value, + ID: p, + }) + err := dht.putValueToPeer(ctx, p, key, rec) if err != nil { log.Debugf("failed putting value to peer: %s", err) @@ -92,6 +97,11 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { // setup the Query query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.SendingQuery, + ID: p, + }) + val, peers, err := dht.getValueOrPeers(ctx, p, key) if err != nil { return nil, err @@ -102,6 +112,12 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { res.success = true } + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.PeerResponse, + ID: p, + Responses: pointerizePeerInfos(peers), + }) + return res, nil })