From 375a38c5f77aea82ef5677a8bae5e7d1d7e4118b Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Thu, 25 Sep 2014 15:10:57 -0700 Subject: [PATCH] add basic publish command, needs polish --- cmd/ipfs/ipfs.go | 2 ++ cmd/ipfs/run.go | 2 +- daemon/daemon.go | 2 ++ namesys/publisher.go | 16 ++++++++++++++-- routing/dht/dht.go | 3 +++ routing/dht/query.go | 7 ++++++- routing/dht/routing.go | 4 ++-- 7 files changed, 30 insertions(+), 6 deletions(-) diff --git a/cmd/ipfs/ipfs.go b/cmd/ipfs/ipfs.go index ed8c26a58..69e5ee5df 100644 --- a/cmd/ipfs/ipfs.go +++ b/cmd/ipfs/ipfs.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "os" + "runtime/pprof" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/gonuts/flag" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/commander" @@ -51,6 +52,7 @@ Use "ipfs help <command>" for more information about a command. cmdIpfsInit, cmdIpfsServe, cmdIpfsRun, + cmdIpfsPub, }, Flag: *flag.NewFlagSet("ipfs", flag.ExitOnError), } diff --git a/cmd/ipfs/run.go b/cmd/ipfs/run.go index 803647c77..a3010cb07 100644 --- a/cmd/ipfs/run.go +++ b/cmd/ipfs/run.go @@ -41,7 +41,7 @@ func runCmd(c *commander.Command, inp []string) error { return err } - dl, err := daemon.NewDaemonListener(n, maddr) + dl, err := daemon.NewDaemonListener(n, maddr, conf) if err != nil { return err } diff --git a/daemon/daemon.go b/daemon/daemon.go index 8180131ba..755de7df9 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -131,6 +131,8 @@ func (dl *DaemonListener) handleConnection(conn net.Conn) { err = commands.Ls(dl.node, command.Args, command.Opts, conn) case "pin": err = commands.Pin(dl.node, command.Args, command.Opts, conn) + case "publish": + err = commands.Publish(dl.node, command.Args, command.Opts, conn) default: err = fmt.Errorf("Invalid Command: '%s'", command.Command) } diff --git a/namesys/publisher.go b/namesys/publisher.go index 83f418a57..67830eb6b 100644 --- a/namesys/publisher.go +++ b/namesys/publisher.go @@ -1,6 +1,8 @@ package namesys import ( + "time" + "code.google.com/p/go.net/context" "code.google.com/p/goprotobuf/proto" @@ -15,8 +17,16 @@ type IpnsPublisher struct { routing routing.IpfsRouting } +func NewPublisher(dag *mdag.DAGService, route routing.IpfsRouting) *IpnsPublisher { + return &IpnsPublisher{ + dag: dag, + routing: route, + } +} + // Publish accepts a keypair and a value, func (p *IpnsPublisher) Publish(k ci.PrivKey, value u.Key) error { + log.Debug("namesys: Publish %s", value.Pretty()) ctx := context.TODO() data, err := CreateEntryData(k, value) if err != nil { @@ -40,13 +50,15 @@ func (p *IpnsPublisher) Publish(k ci.PrivKey, value u.Key) error { } // Store associated public key - err = p.routing.PutValue(ctx, u.Key(nameb), pkbytes) + timectx, _ := context.WithDeadline(ctx, time.Now().Add(time.Second*4)) + err = p.routing.PutValue(timectx, u.Key(nameb), pkbytes) if err != nil { return err } // Store ipns entry at h("/ipns/"+b58(h(pubkey))) - err = p.routing.PutValue(ctx, u.Key(ipnskey), data) + timectx, _ = context.WithDeadline(ctx, time.Now().Add(time.Second*4)) + err = p.routing.PutValue(timectx, u.Key(ipnskey), data) if err != nil { return err } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 11712338b..f9f52b108 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -13,6 +13,7 @@ import ( peer "github.com/jbenet/go-ipfs/peer" kb "github.com/jbenet/go-ipfs/routing/kbucket" u "github.com/jbenet/go-ipfs/util" + "github.com/op/go-logging" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" @@ -21,6 +22,8 @@ import ( "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" ) +var log = logging.MustGetLogger("dht") + // TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js // IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications. diff --git a/routing/dht/query.go b/routing/dht/query.go index 4db3f70e7..cc709d7e9 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -101,6 +101,11 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner { } func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) { + log.Debug("Run query with %d peers.", len(peers)) + if len(peers) == 0 { + log.Warning("Running query with no peers!") + return nil, nil + } // setup concurrency rate limiting for i := 0; i < r.query.concurrency; i++ { r.rateLimit <- struct{}{} @@ -164,7 +169,7 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) { r.peersSeen[next.Key()] = next r.Unlock() - u.DOut("adding peer to query: %v\n", next.ID.Pretty()) + log.Debug("adding peer to query: %v\n", next.ID.Pretty()) // do this after unlocking to prevent possible deadlocks. r.peersRemaining.Increment(1) diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 778aaba75..6a2a0cdcb 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -18,6 +18,7 @@ import ( // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error { + log.Debug("[%s] PutValue %v %v", dht.self.ID.Pretty(), key, value) err := dht.putLocal(key, value) if err != nil { return err @@ -30,7 +31,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error } query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) { - u.DOut("[%s] PutValue qry part %v\n", dht.self.ID.Pretty(), p.ID.Pretty()) + log.Debug("[%s] PutValue qry part %v", dht.self.ID.Pretty(), p.ID.Pretty()) err := dht.putValueToNetwork(ctx, p, string(key), value) if err != nil { return nil, err @@ -39,7 +40,6 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error }) _, err = query.Run(ctx, peers) - u.DOut("[%s] PutValue %v %v\n", dht.self.ID.Pretty(), key, value) return err } -- GitLab