From ae6285e5a3253a9f8e989e2b11f67d755d3f4db0 Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Fri, 8 Aug 2014 18:09:21 -0700 Subject: [PATCH] address issues from code review (issue #25) --- peer/peer.go | 23 +- routing/dht/{pDHTMessage.go => DHTMessage.go} | 16 +- routing/dht/bucket.go | 2 +- routing/dht/dht.go | 276 +++++++++--------- routing/dht/dht_test.go | 42 +-- routing/dht/diag.go | 2 +- routing/dht/messages.pb.go | 106 ++++--- routing/dht/messages.proto | 10 +- routing/dht/routing.go | 85 +++--- routing/dht/table.go | 2 +- routing/dht/util.go | 4 + swarm/swarm.go | 74 ++++- util/util.go | 16 +- 13 files changed, 379 insertions(+), 279 deletions(-) rename routing/dht/{pDHTMessage.go => DHTMessage.go} (56%) diff --git a/peer/peer.go b/peer/peer.go index 8b6ff4d90..f09c4c03d 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -1,13 +1,13 @@ package peer import ( - "encoding/hex" "time" "sync" u "github.com/jbenet/go-ipfs/util" ma "github.com/jbenet/go-multiaddr" mh "github.com/jbenet/go-multihash" + b58 "github.com/jbenet/go-base58" "bytes" ) @@ -21,7 +21,7 @@ func (id ID) Equal(other ID) bool { } func (id ID) Pretty() string { - return hex.EncodeToString(id) + return b58.Encode(id) } // Map maps Key (string) : *Peer (slices are not comparable). @@ -33,8 +33,8 @@ type Peer struct { ID ID Addresses []*ma.Multiaddr - distance time.Duration - distLock sync.RWMutex + latency time.Duration + latenLock sync.RWMutex } // Key returns the ID as a Key (string) for maps. @@ -64,12 +64,15 @@ func (p *Peer) NetAddress(n string) *ma.Multiaddr { return nil } -func (p *Peer) GetDistance() time.Duration { - return p.distance +func (p *Peer) GetLatency() (out time.Duration) { + p.latenLock.RLock() + out = p.latency + p.latenLock.RUnlock() + return } -func (p *Peer) SetDistance(dist time.Duration) { - p.distLock.Lock() - p.distance = dist - p.distLock.Unlock() +func (p *Peer) SetLatency(laten time.Duration) { + p.latenLock.Lock() + p.latency = laten + p.latenLock.Unlock() } diff --git a/routing/dht/pDHTMessage.go b/routing/dht/DHTMessage.go similarity index 56% rename from routing/dht/pDHTMessage.go rename to routing/dht/DHTMessage.go index bfe37d35f..64ca5bcab 100644 --- a/routing/dht/pDHTMessage.go +++ b/routing/dht/DHTMessage.go @@ -1,17 +1,17 @@ package dht // A helper struct to make working with protbuf types easier -type pDHTMessage struct { - Type DHTMessage_MessageType - Key string - Value []byte +type DHTMessage struct { + Type PBDHTMessage_MessageType + Key string + Value []byte Response bool - Id uint64 - Success bool + Id uint64 + Success bool } -func (m *pDHTMessage) ToProtobuf() *DHTMessage { - pmes := new(DHTMessage) +func (m *DHTMessage) ToProtobuf() *PBDHTMessage { + pmes := new(PBDHTMessage) if m.Value != nil { pmes.Value = m.Value } diff --git a/routing/dht/bucket.go b/routing/dht/bucket.go index 996d299d9..7aa8d0a94 100644 --- a/routing/dht/bucket.go +++ b/routing/dht/bucket.go @@ -49,7 +49,7 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket { e := bucket_list.Front() for e != nil { peer_id := convertPeerID(e.Value.(*peer.Peer).ID) - peer_cpl := xor(peer_id, target).commonPrefixLen() + peer_cpl := prefLen(peer_id, target) if peer_cpl > cpl { cur := e out.PushBack(e.Value) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 47f93e65f..8fbd5c092 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -1,15 +1,15 @@ package dht import ( - "sync" - "time" "bytes" "encoding/json" + "errors" + "sync" + "time" - peer "github.com/jbenet/go-ipfs/peer" - swarm "github.com/jbenet/go-ipfs/swarm" - u "github.com/jbenet/go-ipfs/util" - identify "github.com/jbenet/go-ipfs/identify" + peer "github.com/jbenet/go-ipfs/peer" + swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" ma "github.com/jbenet/go-multiaddr" @@ -37,7 +37,7 @@ type IpfsDHT struct { // Map keys to peers that can provide their value // TODO: implement a TTL on each of these keys - providers map[u.Key][]*providerInfo + providers map[u.Key][]*providerInfo providerLock sync.RWMutex // map of channels waiting for reply messages @@ -54,21 +54,27 @@ type IpfsDHT struct { diaglock sync.Mutex } +// The listen info struct holds information about a message that is being waited for type listenInfo struct { + // Responses matching the listen ID will be sent through resp resp chan *swarm.Message + + // count is the number of responses to listen for count int + + // eol is the time at which this listener will expire eol time.Time } // Create a new DHT object with the given peer as the 'local' host func NewDHT(p *peer.Peer) (*IpfsDHT, error) { if p == nil { - panic("Tried to create new dht with nil peer") + return nil, errors.New("nil peer passed to NewDHT()") } network := swarm.NewSwarm(p) err := network.Listen() if err != nil { - return nil,err + return nil, err } dht := new(IpfsDHT) @@ -90,50 +96,24 @@ func (dht *IpfsDHT) Start() { } // Connect to a new peer at the given address -// TODO: move this into swarm func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { - maddrstr,_ := addr.String() + maddrstr, _ := addr.String() u.DOut("Connect to new peer: %s", maddrstr) - if addr == nil { - panic("addr was nil!") - } - peer := new(peer.Peer) - peer.AddAddress(addr) - - conn,err := swarm.Dial("tcp", peer) - if err != nil { - return nil, err - } - - err = identify.Handshake(dht.self, peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) + npeer, err := dht.network.Connect(addr) if err != nil { return nil, err } - // Send node an address that you can be reached on - myaddr := dht.self.NetAddress("tcp") - mastr,err := myaddr.String() - if err != nil { - panic("No local address to send") - } - - conn.Outgoing.MsgChan <- []byte(mastr) - - dht.network.StartConn(conn) - - removed := dht.routes[0].Update(peer) - if removed != nil { - panic("need to remove this peer.") - } + dht.Update(npeer) // Ping new peer to register in their routing table // NOTE: this should be done better... - err = dht.Ping(peer, time.Second * 2) + err = dht.Ping(npeer, time.Second*2) if err != nil { - panic("Failed to ping new peer.") + return nil, errors.New("Failed to ping newly connected peer.") } - return peer, nil + return npeer, nil } // Read in all messages from swarm and handle them appropriately @@ -144,23 +124,19 @@ func (dht *IpfsDHT) handleMessages() { checkTimeouts := time.NewTicker(time.Minute * 5) for { select { - case mes,ok := <-dht.network.Chan.Incoming: + case mes, ok := <-dht.network.Chan.Incoming: if !ok { u.DOut("handleMessages closing, bad recv on incoming") return } - pmes := new(DHTMessage) + pmes := new(PBDHTMessage) err := proto.Unmarshal(mes.Data, pmes) if err != nil { u.PErr("Failed to decode protobuf message: %s", err) continue } - // Update peers latest visit in routing table - removed := dht.routes[0].Update(mes.Peer) - if removed != nil { - panic("Need to handle removed peer.") - } + dht.Update(mes.Peer) // Note: not sure if this is the correct place for this if pmes.GetResponse() { @@ -180,7 +156,6 @@ func (dht *IpfsDHT) handleMessages() { dht.Unlisten(pmes.GetId()) } } else { - // this is expected behaviour during a timeout u.DOut("Received response with nobody listening...") } @@ -190,65 +165,73 @@ func (dht *IpfsDHT) handleMessages() { u.DOut("[peer: %s]", dht.self.ID.Pretty()) u.DOut("Got message type: '%s' [id = %x, from = %s]", - DHTMessage_MessageType_name[int32(pmes.GetType())], + PBDHTMessage_MessageType_name[int32(pmes.GetType())], pmes.GetId(), mes.Peer.ID.Pretty()) switch pmes.GetType() { - case DHTMessage_GET_VALUE: + case PBDHTMessage_GET_VALUE: dht.handleGetValue(mes.Peer, pmes) - case DHTMessage_PUT_VALUE: + case PBDHTMessage_PUT_VALUE: dht.handlePutValue(mes.Peer, pmes) - case DHTMessage_FIND_NODE: + case PBDHTMessage_FIND_NODE: dht.handleFindPeer(mes.Peer, pmes) - case DHTMessage_ADD_PROVIDER: + case PBDHTMessage_ADD_PROVIDER: dht.handleAddProvider(mes.Peer, pmes) - case DHTMessage_GET_PROVIDERS: + case PBDHTMessage_GET_PROVIDERS: dht.handleGetProviders(mes.Peer, pmes) - case DHTMessage_PING: + case PBDHTMessage_PING: dht.handlePing(mes.Peer, pmes) - case DHTMessage_DIAGNOSTIC: + case PBDHTMessage_DIAGNOSTIC: dht.handleDiagnostic(mes.Peer, pmes) } case err := <-dht.network.Chan.Errors: u.DErr("dht err: %s", err) - panic(err) case <-dht.shutdown: checkTimeouts.Stop() return case <-checkTimeouts.C: - dht.providerLock.Lock() - for k,parr := range dht.providers { - var cleaned []*providerInfo - for _,v := range parr { - if time.Since(v.Creation) < time.Hour { - cleaned = append(cleaned, v) - } - } - dht.providers[k] = cleaned - } - dht.providerLock.Unlock() - dht.listenLock.Lock() - var remove []uint64 - now := time.Now() - for k,v := range dht.listeners { - if now.After(v.eol) { - remove = append(remove, k) - } - } - for _,k := range remove { - delete(dht.listeners, k) + // Time to collect some garbage! + dht.cleanExpiredProviders() + dht.cleanExpiredListeners() + } + } +} + +func (dht *IpfsDHT) cleanExpiredProviders() { + dht.providerLock.Lock() + for k, parr := range dht.providers { + var cleaned []*providerInfo + for _, v := range parr { + if time.Since(v.Creation) < time.Hour { + cleaned = append(cleaned, v) } - dht.listenLock.Unlock() } + dht.providers[k] = cleaned } + dht.providerLock.Unlock() +} + +func (dht *IpfsDHT) cleanExpiredListeners() { + dht.listenLock.Lock() + var remove []uint64 + now := time.Now() + for k, v := range dht.listeners { + if now.After(v.eol) { + remove = append(remove, k) + } + } + for _, k := range remove { + delete(dht.listeners, k) + } + dht.listenLock.Unlock() } func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error { - pmes := pDHTMessage{ - Type: DHTMessage_PUT_VALUE, - Key: key, + pmes := DHTMessage{ + Type: PBDHTMessage_PUT_VALUE, + Key: key, Value: value, - Id: GenerateMessageID(), + Id: GenerateMessageID(), } mes := swarm.NewMessage(p, pmes.ToProtobuf()) @@ -256,27 +239,27 @@ func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error return nil } -func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) { +func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) { dskey := ds.NewKey(pmes.GetKey()) - var resp *pDHTMessage + var resp *DHTMessage i_val, err := dht.datastore.Get(dskey) if err == nil { - resp = &pDHTMessage{ + resp = &DHTMessage{ Response: true, - Id: *pmes.Id, - Key: *pmes.Key, - Value: i_val.([]byte), - Success: true, + Id: *pmes.Id, + Key: *pmes.Key, + Value: i_val.([]byte), + Success: true, } } else if err == ds.ErrNotFound { // Find closest peer(s) to desired key and reply with that info closer := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey()))) - resp = &pDHTMessage{ + resp = &DHTMessage{ Response: true, - Id: *pmes.Id, - Key: *pmes.Key, - Value: closer.ID, - Success: false, + Id: *pmes.Id, + Key: *pmes.Key, + Value: closer.ID, + Success: false, } } @@ -285,7 +268,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) { } // Store a value in this peer local storage -func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) { +func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *PBDHTMessage) { dskey := ds.NewKey(pmes.GetKey()) err := dht.datastore.Put(dskey, pmes.GetValue()) if err != nil { @@ -294,46 +277,51 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) { } } -func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) { - resp := pDHTMessage{ - Type: pmes.GetType(), +func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) { + resp := DHTMessage{ + Type: pmes.GetType(), Response: true, - Id: pmes.GetId(), + Id: pmes.GetId(), } - dht.network.Chan.Outgoing <-swarm.NewMessage(p, resp.ToProtobuf()) + dht.network.Chan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf()) } -func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) { +func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) { + success := true u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty()) closest := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey()))) if closest == nil { - panic("could not find anything.") + u.PErr("handleFindPeer: could not find anything.") + success = false } if len(closest.Addresses) == 0 { - panic("no addresses for connected peer...") + u.PErr("handleFindPeer: no addresses for connected peer...") + success = false } u.POut("handleFindPeer: sending back '%s'", closest.ID.Pretty()) - addr,err := closest.Addresses[0].String() + addr, err := closest.Addresses[0].String() if err != nil { - panic(err) + u.PErr(err.Error()) + success = false } - resp := pDHTMessage{ - Type: pmes.GetType(), + resp := DHTMessage{ + Type: pmes.GetType(), Response: true, - Id: pmes.GetId(), - Value: []byte(addr), + Id: pmes.GetId(), + Value: []byte(addr), + Success: success, } mes := swarm.NewMessage(p, resp.ToProtobuf()) - dht.network.Chan.Outgoing <-mes + dht.network.Chan.Outgoing <- mes } -func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { +func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) { dht.providerLock.RLock() providers := dht.providers[u.Key(pmes.GetKey())] dht.providerLock.RUnlock() @@ -344,9 +332,9 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { // This is just a quick hack, formalize method of sending addrs later addrs := make(map[u.Key]string) - for _,prov := range providers { + for _, prov := range providers { ma := prov.Value.NetAddress("tcp") - str,err := ma.String() + str, err := ma.String() if err != nil { u.PErr("Error: %s", err) continue @@ -355,35 +343,38 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *DHTMessage) { addrs[prov.Value.Key()] = str } - data,err := json.Marshal(addrs) + success := true + data, err := json.Marshal(addrs) if err != nil { - panic(err) + u.POut("handleGetProviders: error marshalling struct to JSON: %s", err) + data = nil + success = false } - resp := pDHTMessage{ - Type: DHTMessage_GET_PROVIDERS, - Key: pmes.GetKey(), - Value: data, - Id: pmes.GetId(), + resp := DHTMessage{ + Type: PBDHTMessage_GET_PROVIDERS, + Key: pmes.GetKey(), + Value: data, + Id: pmes.GetId(), Response: true, + Success: success, } mes := swarm.NewMessage(p, resp.ToProtobuf()) - dht.network.Chan.Outgoing <-mes + dht.network.Chan.Outgoing <- mes } type providerInfo struct { Creation time.Time - Value *peer.Peer + Value *peer.Peer } -func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) { +func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) { //TODO: need to implement TTLs on providers key := u.Key(pmes.GetKey()) dht.addProviderEntry(key, p) } - // Register a handler for a specific message ID, used for getting replies // to certain messages (i.e. response to a GET_VALUE message) func (dht *IpfsDHT) ListenFor(mesid uint64, count int, timeout time.Duration) <-chan *swarm.Message { @@ -407,7 +398,7 @@ func (dht *IpfsDHT) Unlisten(mesid uint64) { func (dht *IpfsDHT) IsListening(mesid uint64) bool { dht.listenLock.RLock() - li,ok := dht.listeners[mesid] + li, ok := dht.listeners[mesid] dht.listenLock.RUnlock() if time.Now().After(li.eol) { dht.listenLock.Lock() @@ -432,7 +423,7 @@ func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) { dht.providerLock.Unlock() } -func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) { +func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) { dht.diaglock.Lock() if dht.IsListening(pmes.GetId()) { //TODO: ehhh.......... @@ -442,15 +433,13 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) { dht.diaglock.Unlock() seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10) - listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second * 30) + listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second*30) - for _,ps := range seq { + for _, ps := range seq { mes := swarm.NewMessage(ps, pmes) - dht.network.Chan.Outgoing <-mes + dht.network.Chan.Outgoing <- mes } - - buf := new(bytes.Buffer) di := dht.getDiagInfo() buf.Write(di.Marshal()) @@ -464,7 +453,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) { //Timeout, return what we have goto out case req_resp := <-listen_chan: - pmes_out := new(DHTMessage) + pmes_out := new(PBDHTMessage) err := proto.Unmarshal(req_resp.Data, pmes_out) if err != nil { // It broke? eh, whatever, keep going @@ -476,19 +465,19 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) { } out: - resp := pDHTMessage{ - Type: DHTMessage_DIAGNOSTIC, - Id: pmes.GetId(), - Value: buf.Bytes(), + resp := DHTMessage{ + Type: PBDHTMessage_DIAGNOSTIC, + Id: pmes.GetId(), + Value: buf.Bytes(), Response: true, } mes := swarm.NewMessage(p, resp.ToProtobuf()) - dht.network.Chan.Outgoing <-mes + dht.network.Chan.Outgoing <- mes } func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) { - v,err := dht.datastore.Get(ds.NewKey(string(key))) + v, err := dht.datastore.Get(ds.NewKey(string(key))) if err != nil { return nil, err } @@ -498,3 +487,10 @@ func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) { func (dht *IpfsDHT) PutLocal(key u.Key, value []byte) error { return dht.datastore.Put(ds.NewKey(string(key)), value) } + +func (dht *IpfsDHT) Update(p *peer.Peer) { + removed := dht.routes[0].Update(p) + if removed != nil { + dht.network.Drop(removed) + } +} diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 2de027a3d..e24ada020 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -2,21 +2,22 @@ package dht import ( "testing" + peer "github.com/jbenet/go-ipfs/peer" - ma "github.com/jbenet/go-multiaddr" u "github.com/jbenet/go-ipfs/util" + ma "github.com/jbenet/go-multiaddr" - "time" "fmt" + "time" ) func TestPing(t *testing.T) { u.Debug = false - addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234") + addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234") if err != nil { t.Fatal(err) } - addr_b,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678") + addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678") if err != nil { t.Fatal(err) } @@ -29,12 +30,12 @@ func TestPing(t *testing.T) { peer_b.AddAddress(addr_b) peer_b.ID = peer.ID([]byte("peer_b")) - dht_a,err := NewDHT(peer_a) + dht_a, err := NewDHT(peer_a) if err != nil { t.Fatal(err) } - dht_b,err := NewDHT(peer_b) + dht_b, err := NewDHT(peer_b) if err != nil { t.Fatal(err) } @@ -42,13 +43,13 @@ func TestPing(t *testing.T) { dht_a.Start() dht_b.Start() - _,err = dht_a.Connect(addr_b) + _, err = dht_a.Connect(addr_b) if err != nil { t.Fatal(err) } //Test that we can ping the node - err = dht_a.Ping(peer_b, time.Second * 2) + err = dht_a.Ping(peer_b, time.Second*2) if err != nil { t.Fatal(err) } @@ -59,11 +60,11 @@ func TestPing(t *testing.T) { func TestValueGetSet(t *testing.T) { u.Debug = false - addr_a,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235") + addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235") if err != nil { t.Fatal(err) } - addr_b,err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679") + addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679") if err != nil { t.Fatal(err) } @@ -76,12 +77,12 @@ func TestValueGetSet(t *testing.T) { peer_b.AddAddress(addr_b) peer_b.ID = peer.ID([]byte("peer_b")) - dht_a,err := NewDHT(peer_a) + dht_a, err := NewDHT(peer_a) if err != nil { t.Fatal(err) } - dht_b,err := NewDHT(peer_b) + dht_b, err := NewDHT(peer_b) if err != nil { t.Fatal(err) } @@ -89,7 +90,7 @@ func TestValueGetSet(t *testing.T) { dht_a.Start() dht_b.Start() - _,err = dht_a.Connect(addr_b) + _, err = dht_a.Connect(addr_b) if err != nil { t.Fatal(err) } @@ -99,7 +100,7 @@ func TestValueGetSet(t *testing.T) { t.Fatal(err) } - val, err := dht_a.GetValue("hello", time.Second * 2) + val, err := dht_a.GetValue("hello", time.Second*2) if err != nil { t.Fatal(err) } @@ -113,14 +114,13 @@ func TestProvides(t *testing.T) { u.Debug = false var addrs []*ma.Multiaddr for i := 0; i < 4; i++ { - a,err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000 + i)) + a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i)) if err != nil { t.Fatal(err) } addrs = append(addrs, a) } - var peers []*peer.Peer for i := 0; i < 4; i++ { p := new(peer.Peer) @@ -131,7 +131,7 @@ func TestProvides(t *testing.T) { var dhts []*IpfsDHT for i := 0; i < 4; i++ { - d,err := NewDHT(peers[i]) + d, err := NewDHT(peers[i]) if err != nil { t.Fatal(err) } @@ -166,7 +166,7 @@ func TestProvides(t *testing.T) { time.Sleep(time.Millisecond * 60) - provs,err := dhts[0].FindProviders(u.Key("hello"), time.Second) + provs, err := dhts[0].FindProviders(u.Key("hello"), time.Second) if err != nil { t.Fatal(err) } @@ -174,6 +174,8 @@ func TestProvides(t *testing.T) { if len(provs) != 1 { t.Fatal("Didnt get back providers") } -} - + for i := 0; i < 4; i++ { + dhts[i].Halt() + } +} diff --git a/routing/dht/diag.go b/routing/dht/diag.go index b8f211e40..4bc752f92 100644 --- a/routing/dht/diag.go +++ b/routing/dht/diag.go @@ -38,7 +38,7 @@ func (dht *IpfsDHT) getDiagInfo() *diagInfo { di.Keys = nil // Currently no way to query datastore for _,p := range dht.routes[0].listpeers() { - di.Connections = append(di.Connections, connDiagInfo{p.GetDistance(), p.ID}) + di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID}) } return di } diff --git a/routing/dht/messages.pb.go b/routing/dht/messages.pb.go index 4f427efa1..a852c5e1f 100644 --- a/routing/dht/messages.pb.go +++ b/routing/dht/messages.pb.go @@ -9,7 +9,7 @@ It is generated from these files: messages.proto It has these top-level messages: - DHTMessage + PBDHTMessage */ package dht @@ -20,19 +20,19 @@ import math "math" var _ = proto.Marshal var _ = math.Inf -type DHTMessage_MessageType int32 +type PBDHTMessage_MessageType int32 const ( - DHTMessage_PUT_VALUE DHTMessage_MessageType = 0 - DHTMessage_GET_VALUE DHTMessage_MessageType = 1 - DHTMessage_ADD_PROVIDER DHTMessage_MessageType = 2 - DHTMessage_GET_PROVIDERS DHTMessage_MessageType = 3 - DHTMessage_FIND_NODE DHTMessage_MessageType = 4 - DHTMessage_PING DHTMessage_MessageType = 5 - DHTMessage_DIAGNOSTIC DHTMessage_MessageType = 6 + PBDHTMessage_PUT_VALUE PBDHTMessage_MessageType = 0 + PBDHTMessage_GET_VALUE PBDHTMessage_MessageType = 1 + PBDHTMessage_ADD_PROVIDER PBDHTMessage_MessageType = 2 + PBDHTMessage_GET_PROVIDERS PBDHTMessage_MessageType = 3 + PBDHTMessage_FIND_NODE PBDHTMessage_MessageType = 4 + PBDHTMessage_PING PBDHTMessage_MessageType = 5 + PBDHTMessage_DIAGNOSTIC PBDHTMessage_MessageType = 6 ) -var DHTMessage_MessageType_name = map[int32]string{ +var PBDHTMessage_MessageType_name = map[int32]string{ 0: "PUT_VALUE", 1: "GET_VALUE", 2: "ADD_PROVIDER", @@ -41,7 +41,7 @@ var DHTMessage_MessageType_name = map[int32]string{ 5: "PING", 6: "DIAGNOSTIC", } -var DHTMessage_MessageType_value = map[string]int32{ +var PBDHTMessage_MessageType_value = map[string]int32{ "PUT_VALUE": 0, "GET_VALUE": 1, "ADD_PROVIDER": 2, @@ -51,79 +51,111 @@ var DHTMessage_MessageType_value = map[string]int32{ "DIAGNOSTIC": 6, } -func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType { - p := new(DHTMessage_MessageType) +func (x PBDHTMessage_MessageType) Enum() *PBDHTMessage_MessageType { + p := new(PBDHTMessage_MessageType) *p = x return p } -func (x DHTMessage_MessageType) String() string { - return proto.EnumName(DHTMessage_MessageType_name, int32(x)) +func (x PBDHTMessage_MessageType) String() string { + return proto.EnumName(PBDHTMessage_MessageType_name, int32(x)) } -func (x *DHTMessage_MessageType) UnmarshalJSON(data []byte) error { - value, err := proto.UnmarshalJSONEnum(DHTMessage_MessageType_value, data, "DHTMessage_MessageType") +func (x *PBDHTMessage_MessageType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(PBDHTMessage_MessageType_value, data, "PBDHTMessage_MessageType") if err != nil { return err } - *x = DHTMessage_MessageType(value) + *x = PBDHTMessage_MessageType(value) return nil } -type DHTMessage struct { - Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"` - Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` - Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` - Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` - Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` - Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"` - XXX_unrecognized []byte `json:"-"` +type PBDHTMessage struct { + Type *PBDHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.PBDHTMessage_MessageType" json:"type,omitempty"` + Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` + Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` + Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` + Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"` + Peers []*PBDHTMessage_PBPeer `protobuf:"bytes,7,rep,name=peers" json:"peers,omitempty"` + XXX_unrecognized []byte `json:"-"` } -func (m *DHTMessage) Reset() { *m = DHTMessage{} } -func (m *DHTMessage) String() string { return proto.CompactTextString(m) } -func (*DHTMessage) ProtoMessage() {} +func (m *PBDHTMessage) Reset() { *m = PBDHTMessage{} } +func (m *PBDHTMessage) String() string { return proto.CompactTextString(m) } +func (*PBDHTMessage) ProtoMessage() {} -func (m *DHTMessage) GetType() DHTMessage_MessageType { +func (m *PBDHTMessage) GetType() PBDHTMessage_MessageType { if m != nil && m.Type != nil { return *m.Type } - return DHTMessage_PUT_VALUE + return PBDHTMessage_PUT_VALUE } -func (m *DHTMessage) GetKey() string { +func (m *PBDHTMessage) GetKey() string { if m != nil && m.Key != nil { return *m.Key } return "" } -func (m *DHTMessage) GetValue() []byte { +func (m *PBDHTMessage) GetValue() []byte { if m != nil { return m.Value } return nil } -func (m *DHTMessage) GetId() uint64 { +func (m *PBDHTMessage) GetId() uint64 { if m != nil && m.Id != nil { return *m.Id } return 0 } -func (m *DHTMessage) GetResponse() bool { +func (m *PBDHTMessage) GetResponse() bool { if m != nil && m.Response != nil { return *m.Response } return false } -func (m *DHTMessage) GetSuccess() bool { +func (m *PBDHTMessage) GetSuccess() bool { if m != nil && m.Success != nil { return *m.Success } return false } +func (m *PBDHTMessage) GetPeers() []*PBDHTMessage_PBPeer { + if m != nil { + return m.Peers + } + return nil +} + +type PBDHTMessage_PBPeer struct { + Id *string `protobuf:"bytes,1,req,name=id" json:"id,omitempty"` + Addr *string `protobuf:"bytes,2,req,name=addr" json:"addr,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *PBDHTMessage_PBPeer) Reset() { *m = PBDHTMessage_PBPeer{} } +func (m *PBDHTMessage_PBPeer) String() string { return proto.CompactTextString(m) } +func (*PBDHTMessage_PBPeer) ProtoMessage() {} + +func (m *PBDHTMessage_PBPeer) GetId() string { + if m != nil && m.Id != nil { + return *m.Id + } + return "" +} + +func (m *PBDHTMessage_PBPeer) GetAddr() string { + if m != nil && m.Addr != nil { + return *m.Addr + } + return "" +} + func init() { - proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value) + proto.RegisterEnum("dht.PBDHTMessage_MessageType", PBDHTMessage_MessageType_name, PBDHTMessage_MessageType_value) } diff --git a/routing/dht/messages.proto b/routing/dht/messages.proto index 278a95202..4d4e8c61f 100644 --- a/routing/dht/messages.proto +++ b/routing/dht/messages.proto @@ -2,7 +2,7 @@ package dht; //run `protoc --go_out=. *.proto` to generate -message DHTMessage { +message PBDHTMessage { enum MessageType { PUT_VALUE = 0; GET_VALUE = 1; @@ -13,6 +13,11 @@ message DHTMessage { DIAGNOSTIC = 6; } + message PBPeer { + required string id = 1; + required string addr = 2; + } + required MessageType type = 1; optional string key = 2; optional bytes value = 3; @@ -23,4 +28,7 @@ message DHTMessage { // Signals whether or not this message is a response to another message optional bool response = 5; optional bool success = 6; + + // Used for returning peers from queries (normally, peers closer to X) + repeated PBPeer peers = 7; } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 1a90ce76b..57d087fdc 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -1,10 +1,11 @@ package dht import ( - "math/rand" - "time" "bytes" "encoding/json" + "errors" + "math/rand" + "time" proto "code.google.com/p/goprotobuf/proto" @@ -34,7 +35,7 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { var p *peer.Peer p = s.routes[0].NearestPeer(convertKey(key)) if p == nil { - panic("Table returned nil peer!") + return errors.New("Table returned nil peer!") } return s.putValueToPeer(p, string(key), value) @@ -47,13 +48,13 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { var p *peer.Peer p = s.routes[0].NearestPeer(convertKey(key)) if p == nil { - panic("Table returned nil peer!") + return nil, errors.New("Table returned nil peer!") } - pmes := pDHTMessage{ - Type: DHTMessage_GET_VALUE, - Key: string(key), - Id: GenerateMessageID(), + pmes := DHTMessage{ + Type: PBDHTMessage_GET_VALUE, + Key: string(key), + Id: GenerateMessageID(), } response_chan := s.ListenFor(pmes.Id, 1, time.Minute) @@ -68,15 +69,13 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { return nil, u.ErrTimeout case resp, ok := <-response_chan: if !ok { - panic("Channel was closed...") + u.PErr("response channel closed before timeout, please investigate.") + return nil, u.ErrTimeout } - if resp == nil { - panic("Why the hell is this response nil?") - } - pmes_out := new(DHTMessage) + pmes_out := new(PBDHTMessage) err := proto.Unmarshal(resp.Data, pmes_out) if err != nil { - return nil,err + return nil, err } if pmes_out.GetSuccess() { return pmes_out.GetValue(), nil @@ -96,15 +95,15 @@ func (s *IpfsDHT) Provide(key u.Key) error { //return an error } - pmes := pDHTMessage{ - Type: DHTMessage_ADD_PROVIDER, - Key: string(key), + pmes := DHTMessage{ + Type: PBDHTMessage_ADD_PROVIDER, + Key: string(key), } pbmes := pmes.ToProtobuf() - for _,p := range peers { + for _, p := range peers { mes := swarm.NewMessage(p, pbmes) - s.network.Chan.Outgoing <-mes + s.network.Chan.Outgoing <- mes } return nil } @@ -113,17 +112,17 @@ func (s *IpfsDHT) Provide(key u.Key) error { func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) { p := s.routes[0].NearestPeer(convertKey(key)) - pmes := pDHTMessage{ - Type: DHTMessage_GET_PROVIDERS, - Key: string(key), - Id: GenerateMessageID(), + pmes := DHTMessage{ + Type: PBDHTMessage_GET_PROVIDERS, + Key: string(key), + Id: GenerateMessageID(), } mes := swarm.NewMessage(p, pmes.ToProtobuf()) listen_chan := s.ListenFor(pmes.Id, 1, time.Minute) u.DOut("Find providers for: '%s'", key) - s.network.Chan.Outgoing <-mes + s.network.Chan.Outgoing <- mes after := time.After(timeout) select { case <-after: @@ -131,7 +130,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, return nil, u.ErrTimeout case resp := <-listen_chan: u.DOut("FindProviders: got response.") - pmes_out := new(DHTMessage) + pmes_out := new(PBDHTMessage) err := proto.Unmarshal(resp.Data, pmes_out) if err != nil { return nil, err @@ -143,10 +142,10 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, } var prov_arr []*peer.Peer - for pid,addr := range addrs { + for pid, addr := range addrs { p := s.network.Find(pid) if p == nil { - maddr,err := ma.NewMultiaddr(addr) + maddr, err := ma.NewMultiaddr(addr) if err != nil { u.PErr("error connecting to new peer: %s", err) continue @@ -171,23 +170,23 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) { p := s.routes[0].NearestPeer(convertPeerID(id)) - pmes := pDHTMessage{ - Type: DHTMessage_FIND_NODE, - Key: string(id), - Id: GenerateMessageID(), + pmes := DHTMessage{ + Type: PBDHTMessage_FIND_NODE, + Key: string(id), + Id: GenerateMessageID(), } mes := swarm.NewMessage(p, pmes.ToProtobuf()) listen_chan := s.ListenFor(pmes.Id, 1, time.Minute) - s.network.Chan.Outgoing <-mes + s.network.Chan.Outgoing <- mes after := time.After(timeout) select { case <-after: s.Unlisten(pmes.Id) return nil, u.ErrTimeout case resp := <-listen_chan: - pmes_out := new(DHTMessage) + pmes_out := new(PBDHTMessage) err := proto.Unmarshal(resp.Data, pmes_out) if err != nil { return nil, err @@ -218,7 +217,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { // Thoughts: maybe this should accept an ID and do a peer lookup? u.DOut("Enter Ping.") - pmes := pDHTMessage{Id: GenerateMessageID(), Type: DHTMessage_PING} + pmes := DHTMessage{Id: GenerateMessageID(), Type: PBDHTMessage_PING} mes := swarm.NewMessage(p, pmes.ToProtobuf()) before := time.Now() @@ -229,7 +228,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { select { case <-response_chan: roundtrip := time.Since(before) - p.SetDistance(roundtrip) + p.SetLatency(roundtrip) u.POut("Ping took %s.", roundtrip.String()) return nil case <-tout: @@ -246,17 +245,17 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) { targets := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10) // TODO: Add timeout to this struct so nodes know when to return - pmes := pDHTMessage{ - Type: DHTMessage_DIAGNOSTIC, - Id: GenerateMessageID(), + pmes := DHTMessage{ + Type: PBDHTMessage_DIAGNOSTIC, + Id: GenerateMessageID(), } - listen_chan := dht.ListenFor(pmes.Id, len(targets), time.Minute * 2) + listen_chan := dht.ListenFor(pmes.Id, len(targets), time.Minute*2) pbmes := pmes.ToProtobuf() - for _,p := range targets { + for _, p := range targets { mes := swarm.NewMessage(p, pbmes) - dht.network.Chan.Outgoing <-mes + dht.network.Chan.Outgoing <- mes } var out []*diagInfo @@ -267,7 +266,7 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) { u.DOut("Diagnostic request timed out.") return out, u.ErrTimeout case resp := <-listen_chan: - pmes_out := new(DHTMessage) + pmes_out := new(PBDHTMessage) err := proto.Unmarshal(resp.Data, pmes_out) if err != nil { // NOTE: here and elsewhere, need to audit error handling, @@ -288,5 +287,5 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) { } } - return nil,nil + return nil, nil } diff --git a/routing/dht/table.go b/routing/dht/table.go index be4a4b392..7a121234f 100644 --- a/routing/dht/table.go +++ b/routing/dht/table.go @@ -125,7 +125,7 @@ func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer { func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { rt.tabLock.RLock() defer rt.tabLock.RUnlock() - cpl := xor(id, rt.local).commonPrefixLen() + cpl := prefLen(id, rt.local) // Get bucket at cpl index or last bucket var bucket *Bucket diff --git a/routing/dht/util.go b/routing/dht/util.go index 2adc8b765..5961cd226 100644 --- a/routing/dht/util.go +++ b/routing/dht/util.go @@ -40,6 +40,10 @@ func (id ID) commonPrefixLen() int { return len(id)*8 - 1 } +func prefLen(a, b ID) int { + return xor(a, b).commonPrefixLen() +} + func xor(a, b ID) ID { a, b = equalizeSizes(a, b) diff --git a/swarm/swarm.go b/swarm/swarm.go index b286c3c82..93ad68755 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -1,15 +1,16 @@ package swarm import ( + "errors" "fmt" "net" "sync" + proto "code.google.com/p/goprotobuf/proto" + ident "github.com/jbenet/go-ipfs/identify" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ma "github.com/jbenet/go-multiaddr" - ident "github.com/jbenet/go-ipfs/identify" - proto "code.google.com/p/goprotobuf/proto" ) // Message represents a packet of information sent to or received from a @@ -24,9 +25,10 @@ type Message struct { // Cleaner looking helper function to make a new message struct func NewMessage(p *peer.Peer, data proto.Message) *Message { - bytes,err := proto.Marshal(data) + bytes, err := proto.Marshal(data) if err != nil { - panic(err) + u.PErr(err.Error()) + return nil } return &Message{ Peer: p, @@ -63,7 +65,7 @@ func (se *SwarmListenErr) Error() string { return "<nil error>" } var out string - for i,v := range se.Errors { + for i, v := range se.Errors { if v != nil { out += fmt.Sprintf("%d: %s\n", i, v) } @@ -80,7 +82,7 @@ type Swarm struct { conns ConnMap connsLock sync.RWMutex - local *peer.Peer + local *peer.Peer listeners []net.Listener } @@ -137,7 +139,7 @@ func (s *Swarm) connListen(maddr *ma.Multiaddr) error { if err != nil { e := fmt.Errorf("Failed to accept connection: %s - %s [%s]", netstr, addr, err) - go func() {s.Chan.Errors <- e}() + go func() { s.Chan.Errors <- e }() return } go s.handleNewConn(nconn) @@ -160,7 +162,9 @@ func (s *Swarm) handleNewConn(nconn net.Conn) { err := ident.Handshake(s.local, p, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) if err != nil { - panic(err) + u.PErr(err.Error()) + conn.Close() + return } // Get address to contact remote peer from @@ -186,7 +190,7 @@ func (s *Swarm) Close() { s.Chan.Close <- true // fan out s.Chan.Close <- true // listener - for _,list := range s.listeners { + for _, list := range s.listeners { list.Close() } } @@ -220,9 +224,9 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) { return conn, nil } -func (s *Swarm) StartConn(conn *Conn) { +func (s *Swarm) StartConn(conn *Conn) error { if conn == nil { - panic("tried to start nil Conn!") + return errors.New("Tried to start nil connection.") } u.DOut("Starting connection: %s", conn.Peer.Key().Pretty()) @@ -233,6 +237,7 @@ func (s *Swarm) StartConn(conn *Conn) { // kick off reader goroutine go s.fanIn(conn) + return nil } // Handles the unwrapping + sending of messages to the right connection. @@ -303,3 +308,50 @@ func (s *Swarm) Find(key u.Key) *peer.Peer { } return conn.Peer } + +func (s *Swarm) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { + if addr == nil { + return nil, errors.New("nil Multiaddr passed to swarm.Connect()") + } + npeer := new(peer.Peer) + npeer.AddAddress(addr) + + conn, err := Dial("tcp", npeer) + if err != nil { + return nil, err + } + + err = ident.Handshake(s.local, npeer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) + if err != nil { + return nil, err + } + + // Send node an address that you can be reached on + myaddr := s.local.NetAddress("tcp") + mastr, err := myaddr.String() + if err != nil { + return nil, errors.New("No local address to send to peer.") + } + + conn.Outgoing.MsgChan <- []byte(mastr) + + s.StartConn(conn) + + return npeer, nil +} + +// Removes a given peer from the swarm and closes connections to it +func (s *Swarm) Drop(p *peer.Peer) error { + s.connsLock.RLock() + conn, found := s.conns[u.Key(p.ID)] + s.connsLock.RUnlock() + if !found { + return u.ErrNotFound + } + + s.connsLock.Lock() + delete(s.conns, u.Key(p.ID)) + s.connsLock.Unlock() + + return conn.Close() +} diff --git a/util/util.go b/util/util.go index ca9ab79d3..ac9ca8100 100644 --- a/util/util.go +++ b/util/util.go @@ -1,13 +1,14 @@ package util import ( - "fmt" "errors" - mh "github.com/jbenet/go-multihash" + "fmt" "os" "os/user" "strings" - "encoding/hex" + + b58 "github.com/jbenet/go-base58" + mh "github.com/jbenet/go-multihash" ) // Debug is a global flag for debugging. @@ -23,11 +24,14 @@ var ErrTimeout = errors.New("Error: Call timed out.") // find the expected node, but did find 'a' node. var ErrSearchIncomplete = errors.New("Error: Search Incomplete.") +// ErrNotFound is returned when a search fails to find anything +var ErrNotFound = errors.New("Error: Not Found.") + // Key is a string representation of multihash for use with maps. type Key string func (k Key) Pretty() string { - return hex.EncodeToString([]byte(k)) + return b58.Encode([]byte(k)) } // Hash is the global IPFS hash function. uses multihash SHA2_256, 256 bits @@ -51,12 +55,12 @@ func TildeExpansion(filename string) (string, error) { // PErr is a shorthand printing function to output to Stderr. func PErr(format string, a ...interface{}) { - fmt.Fprintf(os.Stderr, format + "\n", a...) + fmt.Fprintf(os.Stderr, format+"\n", a...) } // POut is a shorthand printing function to output to Stdout. func POut(format string, a ...interface{}) { - fmt.Fprintf(os.Stdout, format + "\n", a...) + fmt.Fprintf(os.Stdout, format+"\n", a...) } // DErr is a shorthand debug printing function to output to Stderr. -- GitLab