diff --git a/blocks/blocks.go b/blocks/blocks.go index 696c774ab4d0e989c430684137ad510f38b0799e..9bf556f5a625677ef77e935beed4752c5eb5fea1 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -1,6 +1,8 @@ package blocks import ( + "fmt" + mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" u "github.com/jbenet/go-ipfs/util" ) @@ -20,3 +22,7 @@ func NewBlock(data []byte) *Block { func (b *Block) Key() u.Key { return u.Key(b.Multihash) } + +func (b *Block) String() string { + return fmt.Sprintf("[Block %s]", b.Key()) +} diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 0ca533b196cc7f553e16af8ade89e2290c8f9c6b..acb6564ed63968e0e47fafcecee62a32faa7d285 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -52,7 +52,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) { - log.Debug("BlockService GetBlock: '%s'", k) + log.Debugf("BlockService GetBlock: '%s'", k) datai, err := s.Datastore.Get(k.DsKey()) if err == nil { log.Debug("Blockservice: Got data in datastore.") diff --git a/crypto/spipe/handshake.go b/crypto/spipe/handshake.go index 687c6a541c9654c5ad758568974a40eba5310796..7a88665efb1f16c04bdc958503a3d7086338681c 100644 --- a/crypto/spipe/handshake.go +++ b/crypto/spipe/handshake.go @@ -53,7 +53,7 @@ func (s *SecurePipe) handshake() error { return err } - log.Debug("handshake: %s <--> %s", s.local, s.remote) + log.Debugf("handshake: %s <--> %s", s.local, s.remote) myPubKey, err := s.local.PubKey().Bytes() if err != nil { return err @@ -105,7 +105,7 @@ func (s *SecurePipe) handshake() error { if err != nil { return err } - log.Debug("%s Remote Peer Identified as %s", s.local, s.remote) + log.Debugf("%s Remote Peer Identified as %s", s.local, s.remote) exchange, err := selectBest(SupportedExchanges, proposeResp.GetExchanges()) if err != nil { @@ -209,7 +209,7 @@ func (s *SecurePipe) handshake() error { return fmt.Errorf("Negotiation failed, got: %s", resp2) } - log.Debug("%s handshake: Got node id: %s", s.local, s.remote) + log.Debugf("%s handshake: Got node id: %s", s.local, s.remote) return nil } diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index f631c651c160bbf3afc081c84b8dfd70bd7bd3bc..5e00a5888a2789bf46027fae477818577cadee85 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -135,7 +135,6 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm peer.Peer, bsmsg.BitSwapMessage) { log.Debugf("ReceiveMessage from %v", p.Key()) log.Debugf("Message wantlist: %v", incoming.Wantlist()) - log.Debugf("Message blockset: %v", incoming.Blocks()) if p == nil { log.Error("Received message from nil peer!") diff --git a/exchange/bitswap/network/net_message_adapter.go b/exchange/bitswap/network/net_message_adapter.go index 9f51e90100f0edfcfccf251275f0568f918d7e84..c7e1a852d665357e8bab6255b28fd4e392558a95 100644 --- a/exchange/bitswap/network/net_message_adapter.go +++ b/exchange/bitswap/network/net_message_adapter.go @@ -4,6 +4,7 @@ import ( "errors" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + "github.com/jbenet/go-ipfs/util" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" inet "github.com/jbenet/go-ipfs/net" @@ -11,6 +12,8 @@ import ( peer "github.com/jbenet/go-ipfs/peer" ) +var log = util.Logger("net_message_adapter") + // NetMessageAdapter wraps a NetMessage network service func NetMessageAdapter(s inet.Service, n inet.Network, r Receiver) Adapter { adapter := impl{ @@ -60,6 +63,7 @@ func (adapter *impl) HandleMessage( return nil } + log.Debugf("Message size: %d", len(outgoing.Data())) return outgoing } diff --git a/net/conn/conn.go b/net/conn/conn.go index 444616b6ce059faa23d82260920627d7efc4166d..3df82d15d5215d09cbad15dc78f1fbfc8e73579a 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -21,7 +21,7 @@ const ( ChanBuffer = 10 // MaxMessageSize is the size of the largest single message - MaxMessageSize = 1 << 20 // 1 MB + MaxMessageSize = 1 << 22 // 4 MB // HandshakeTimeout for when nodes first connect HandshakeTimeout = time.Second * 5 @@ -97,6 +97,17 @@ func (c *singleConn) close() error { return err } +func (c *singleConn) GetError() error { + select { + case err := <-c.msgio.incoming.ErrChan: + return err + case err := <-c.msgio.outgoing.ErrChan: + return err + default: + return nil + } +} + // ID is an identifier unique to this connection. func (c *singleConn) ID() string { return ID(c) diff --git a/net/conn/handshake.go b/net/conn/handshake.go index f8c88a0b61791318f8837cf168730e2105b043c0..ab8cea8e33889075493c42524952b235eac66606 100644 --- a/net/conn/handshake.go +++ b/net/conn/handshake.go @@ -46,7 +46,7 @@ func Handshake1(ctx context.Context, c Conn) error { return fmt.Errorf("could not decode remote version: %q", err) } - log.Debug("Received remote version (%s) from %s", remoteH, rpeer) + log.Debugf("Received remote version (%s) from %s", remoteH, rpeer) } if err := handshake.Handshake1Compatible(localH, remoteH); err != nil { diff --git a/net/conn/interface.go b/net/conn/interface.go index 36ff4131ed288ded6c417eaace32c7b5ef36f74b..689b6c9d6a54e0eeaeb3ef2725744aac0b1754d6 100644 --- a/net/conn/interface.go +++ b/net/conn/interface.go @@ -37,6 +37,8 @@ type Conn interface { // Out returns a writable message channel Out() chan<- []byte + GetError() error + // Close ends the connection // Close() error -- already in ContextCloser } diff --git a/net/conn/multiconn.go b/net/conn/multiconn.go index 13f9a585201bcfd24b9f93ee53b47d9405e19448..885ad60084a26b51aaafe4bfa242c291c4d40aa6 100644 --- a/net/conn/multiconn.go +++ b/net/conn/multiconn.go @@ -198,6 +198,10 @@ func (c *MultiConn) fanInSingle(child Conn) { case m, more := <-child.In(): // receiving data if !more { log.Infof("%s in channel closed", child) + err := c.GetError() + if err != nil { + log.Errorf("Found error on connection: %s", err) + } return // closed } i++ @@ -209,7 +213,7 @@ func (c *MultiConn) fanInSingle(child Conn) { // close is the internal close function, called by ContextCloser.Close func (c *MultiConn) close() error { - log.Debug("%s closing Conn with %s", c.local, c.remote) + log.Debugf("%s closing Conn with %s", c.local, c.remote) // get connections c.RLock() @@ -291,3 +295,13 @@ func (c *MultiConn) In() <-chan []byte { func (c *MultiConn) Out() chan<- []byte { return c.duplex.Out } + +func (c *MultiConn) GetError() error { + for _, sub := range c.conns { + err := sub.GetError() + if err != nil { + return err + } + } + return nil +} diff --git a/net/conn/secure_conn.go b/net/conn/secure_conn.go index 3c80f3273dda9e637cef7d5fcd38c183c1bdfcf8..128b8d2839731257b8d17720aacde4f41d096b62 100644 --- a/net/conn/secure_conn.go +++ b/net/conn/secure_conn.go @@ -134,3 +134,7 @@ func (c *secureConn) In() <-chan []byte { func (c *secureConn) Out() chan<- []byte { return c.secure.Out } + +func (c *secureConn) GetError() error { + return c.insecure.GetError() +} diff --git a/net/swarm/conn.go b/net/swarm/conn.go index 46b04309a8db4326bdf2196b5f1b3430a6ad42ff..1b0aa933be0a8603265d9b9c62581f473dab75d4 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -154,6 +154,9 @@ func (s *Swarm) fanOut() { log.Infof("%s outgoing channel closed", s) return } + if len(msg.Data()) >= conn.MaxMessageSize { + log.Critical("Attempted to send message bigger than max size.") + } s.connsLock.RLock() c, found := s.conns[msg.Peer().Key()] @@ -167,7 +170,7 @@ func (s *Swarm) fanOut() { } i++ - //log.Debugf("%s sent message to %s (%d)", s.local, msg.Peer(), i) + log.Debugf("%s sent message to %s (%d)", s.local, msg.Peer(), i) // queue it in the connection's buffer c.Out() <- msg.Data() } @@ -206,7 +209,7 @@ func (s *Swarm) fanInSingle(c conn.Conn) { return // channel closed. } i++ - //log.Debugf("%s received message from %s (%d)", s.local, c.RemotePeer(), i) + log.Debugf("%s received message from %s (%d)", s.local, c.RemotePeer(), i) s.Incoming <- msg.New(c.RemotePeer(), data) } } diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 35355b32ff468451cd7b0d4a670b1cf407675bc6..d5db8d1da9f9961d67e199c7d5cc35d29ea1a5f9 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -145,7 +145,7 @@ func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Messa resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) // check if we have this value, to add ourselves as provider. - log.Debugf("handling GetProviders: '%s'", pmes.GetKey()) + log.Debugf("handling GetProviders: '%s'", u.Key(pmes.GetKey())) dsk := u.Key(pmes.GetKey()).DsKey() has, err := dht.datastore.Has(dsk) if err != nil && err != ds.ErrNotFound {