Commit 9652ada0 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

implement publisher for ipns to wait until moments of rapid churn die down

parent 54142b21
...@@ -76,6 +76,9 @@ func CreateRoot(n *core.IpfsNode, keys []ci.PrivKey, ipfsroot string) (*Root, er ...@@ -76,6 +76,9 @@ func CreateRoot(n *core.IpfsNode, keys []ci.PrivKey, ipfsroot string) (*Root, er
nd := new(Node) nd := new(Node)
nd.Ipfs = n nd.Ipfs = n
nd.key = k nd.key = k
nd.repub = NewRepublisher(nd, time.Millisecond*10)
go nd.repub.Run()
pointsTo, err := n.Namesys.Resolve(name) pointsTo, err := n.Namesys.Resolve(name)
if err != nil { if err != nil {
...@@ -185,6 +188,8 @@ func (r *Root) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) { ...@@ -185,6 +188,8 @@ func (r *Root) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
type Node struct { type Node struct {
nsRoot *Node nsRoot *Node
repub *Republisher
// Name really only for logging purposes // Name really only for logging purposes
name string name string
...@@ -316,14 +321,18 @@ func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error { ...@@ -316,14 +321,18 @@ func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error {
return fuse.ENODATA return fuse.ENODATA
} }
err = n.updateTree() n.sendPublishSignal()
if err != nil {
log.Error("updateTree failed: %s", err)
return fuse.ENODATA
} }
return nil
}
func (n *Node) sendPublishSignal() {
root := n.nsRoot
if root == nil {
root = n
} }
return nil
root.repub.Publish <- struct{}{}
} }
func (n *Node) updateTree() error { func (n *Node) updateTree() error {
...@@ -387,7 +396,7 @@ func (n *Node) Mkdir(req *fuse.MkdirRequest, intr fs.Intr) (fs.Node, fuse.Error) ...@@ -387,7 +396,7 @@ func (n *Node) Mkdir(req *fuse.MkdirRequest, intr fs.Intr) (fs.Node, fuse.Error)
} }
n.changed = true n.changed = true
n.updateTree() n.sendPublishSignal()
return child, nil return child, nil
} }
...@@ -425,6 +434,8 @@ func (n *Node) Remove(req *fuse.RemoveRequest, intr fs.Intr) fuse.Error { ...@@ -425,6 +434,8 @@ func (n *Node) Remove(req *fuse.RemoveRequest, intr fs.Intr) fuse.Error {
log.Error("Remove: No such file.") log.Error("Remove: No such file.")
return fuse.ENOENT return fuse.ENOENT
} }
n.changed = true
n.sendPublishSignal()
return nil return nil
} }
...@@ -471,7 +482,7 @@ func Mount(ipfs *core.IpfsNode, fpath string, ipfspath string) error { ...@@ -471,7 +482,7 @@ func Mount(ipfs *core.IpfsNode, fpath string, ipfspath string) error {
if err == nil { if err == nil {
return return
} }
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 100)
} }
ipfs.Network.Close() ipfs.Network.Close()
}() }()
...@@ -563,12 +574,13 @@ func NewRepublisher(n *Node, tout time.Duration) *Republisher { ...@@ -563,12 +574,13 @@ func NewRepublisher(n *Node, tout time.Duration) *Republisher {
} }
func (np *Republisher) Run() { func (np *Republisher) Run() {
for _ := range np.Publish { for _ = range np.Publish {
timer := time.After(np.Timeout) timer := time.After(np.Timeout)
for { for {
select { select {
case <-timer: case <-timer:
//Do the publish! //Do the publish!
log.Info("Publishing Changes!")
err := np.node.updateTree() err := np.node.updateTree()
if err != nil { if err != nil {
log.Critical("updateTree error: %s", err) log.Critical("updateTree error: %s", err)
......
...@@ -177,8 +177,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message ...@@ -177,8 +177,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message
start := time.Now() start := time.Now()
// Print out diagnostic // Print out diagnostic
log.Debug("[peer: %s] Sent message type: '%s' [to = %s]\n", log.Debug("Sent message type: '%s' [to = %s]",
dht.self.ID.Pretty(),
Message_MessageType_name[int32(pmes.GetType())], p.ID.Pretty()) Message_MessageType_name[int32(pmes.GetType())], p.ID.Pretty())
rmes, err := dht.sender.SendRequest(ctx, mes) rmes, err := dht.sender.SendRequest(ctx, mes)
...@@ -281,7 +280,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer, ...@@ -281,7 +280,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
} }
if len(peers) > 0 { if len(peers) > 0 {
u.DOut("getValueOrPeers: peers") log.Debug("getValueOrPeers: peers")
return nil, peers, nil return nil, peers, nil
} }
...@@ -400,7 +399,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer ...@@ -400,7 +399,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer
for _, prov := range peers { for _, prov := range peers {
p, err := dht.peerFromInfo(prov) p, err := dht.peerFromInfo(prov)
if err != nil { if err != nil {
u.PErr("error getting peer from info: %v\n", err) log.Error("error getting peer from info: %v", err)
continue continue
} }
......
...@@ -18,7 +18,7 @@ import ( ...@@ -18,7 +18,7 @@ import (
// PutValue adds value corresponding to given Key. // PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT // This is the top level "Store" operation of the DHT
func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error { func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error {
log.Debug("PutValue %s %v", key.Pretty(), value) log.Debug("PutValue %s", key.Pretty())
err := dht.putLocal(key, value) err := dht.putLocal(key, value)
if err != nil { if err != nil {
return err return err
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment