Commit 7821f576 authored by Brian Tiger Chow's avatar Brian Tiger Chow

Merge pull request #724 from jbenet/refactors/prep-for-gcr-2

More preparatory refactors/fixes/cleanups for GCR  
parents 69333e47 2d9550f2
......@@ -202,7 +202,6 @@ func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMe
set := pset.New()
wg := sync.WaitGroup{}
for peerToQuery := range peers {
log.Event(ctx, "PeerToQuery", peerToQuery)
if !set.TryAdd(peerToQuery) { //Do once per peer
continue
......@@ -263,75 +262,10 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
}
}
func (bs *bitswap) taskWorker(ctx context.Context) {
defer log.Info("bitswap task worker shutting down...")
for {
select {
case <-ctx.Done():
return
case nextEnvelope := <-bs.engine.Outbox():
select {
case <-ctx.Done():
return
case envelope, ok := <-nextEnvelope:
if !ok {
continue
}
bs.send(ctx, envelope.Peer, envelope.Message)
}
}
}
}
// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")
ctx, cancel := context.WithCancel(parent)
broadcastSignal := time.After(rebroadcastDelay.Get())
defer cancel()
for {
select {
case <-time.Tick(10 * time.Second):
n := bs.wantlist.Len()
if n > 0 {
log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
}
case <-broadcastSignal: // resend unfulfilled wantlist keys
entries := bs.wantlist.Entries()
if len(entries) > 0 {
bs.sendWantlistToProviders(ctx, entries)
}
broadcastSignal = time.After(rebroadcastDelay.Get())
case keys := <-bs.batchRequests:
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
case <-parent.Done():
return
}
}
}
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage) {
defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
if p == "" {
log.Error("Received message from nil peer!")
......@@ -409,6 +343,7 @@ func (bs *bitswap) ReceiveError(err error) {
// send strives to ensure that accounting is always performed when a message is
// sent
func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
defer log.EventBegin(ctx, "sendMessage", p, m).Done()
if err := bs.network.SendMessage(ctx, p, m); err != nil {
return errors.Wrap(err)
}
......@@ -418,3 +353,70 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage)
func (bs *bitswap) Close() error {
return bs.process.Close()
}
func (bs *bitswap) taskWorker(ctx context.Context) {
defer log.Info("bitswap task worker shutting down...")
for {
select {
case <-ctx.Done():
return
case nextEnvelope := <-bs.engine.Outbox():
select {
case <-ctx.Done():
return
case envelope, ok := <-nextEnvelope:
if !ok {
continue
}
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
}
}
}
}
// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")
ctx, cancel := context.WithCancel(parent)
broadcastSignal := time.After(rebroadcastDelay.Get())
defer cancel()
for {
select {
case <-time.Tick(10 * time.Second):
n := bs.wantlist.Len()
if n > 0 {
log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
}
case <-broadcastSignal: // resend unfulfilled wantlist keys
entries := bs.wantlist.Entries()
if len(entries) > 0 {
bs.sendWantlistToProviders(ctx, entries)
}
broadcastSignal = time.After(rebroadcastDelay.Get())
case keys := <-bs.batchRequests:
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
case <-parent.Done():
return
}
}
}
......@@ -39,6 +39,8 @@ type BitSwapMessage interface {
AddBlock(*blocks.Block)
Exportable
Loggable() map[string]interface{}
}
type Exportable interface {
......@@ -170,3 +172,14 @@ func (m *impl) ToNet(w io.Writer) error {
}
return nil
}
func (m *impl) Loggable() map[string]interface{} {
var blocks []string
for _, v := range m.blocks {
blocks = append(blocks, v.Key().Pretty())
}
return map[string]interface{}{
"blocks": blocks,
"wants": m.Wantlist(),
}
}
......@@ -110,6 +110,9 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int)
connectedPeers := bsnet.host.Network().Peers()
out := make(chan peer.ID, len(connectedPeers)) // just enough buffer for these connectedPeers
for _, id := range connectedPeers {
if id == bsnet.host.ID() {
continue // ignore self as provider
}
out <- id
}
......@@ -117,9 +120,10 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int)
defer close(out)
providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
for info := range providers {
if info.ID != bsnet.host.ID() { // dont add addrs for ourselves.
bsnet.host.Peerstore().AddAddresses(info.ID, info.Addrs)
if info.ID == bsnet.host.ID() {
continue // ignore self as provider
}
bsnet.host.Peerstore().AddAddresses(info.ID, info.Addrs)
select {
case <-ctx.Done():
return
......
......@@ -46,6 +46,14 @@ type secureSession struct {
sharedSecret []byte
}
func (s *secureSession) Loggable() map[string]interface{} {
m := make(map[string]interface{})
m["localPeer"] = s.localPeer.Pretty()
m["remotePeer"] = s.remotePeer.Pretty()
m["established"] = (s.secure != nil)
return m
}
func newSecureSession(local peer.ID, key ci.PrivKey) (*secureSession, error) {
s := &secureSession{localPeer: local, localKey: key}
......@@ -80,7 +88,7 @@ func (s *secureSession) handshake(ctx context.Context, insecure io.ReadWriter) e
return err
}
defer log.EventBegin(ctx, "secureHandshake", s.localPeer).Done()
defer log.EventBegin(ctx, "secureHandshake", s).Done()
s.local.permanentPubKey = s.localKey.GetPublic()
myPubKeyBytes, err := s.local.permanentPubKey.Bytes()
......@@ -292,6 +300,5 @@ func (s *secureSession) handshake(ctx context.Context, insecure io.ReadWriter) e
}
// Whew! ok, that's all folks.
log.Event(ctx, "secureHandshakeFinish", s.localPeer, s.remotePeer)
return nil
}
......@@ -2,6 +2,7 @@ package conn
import (
"fmt"
"io"
"net"
"time"
......@@ -15,7 +16,7 @@ import (
peer "github.com/jbenet/go-ipfs/p2p/peer"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
u "github.com/jbenet/go-ipfs/util"
debugerr "github.com/jbenet/go-ipfs/util/debugerror"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
)
var log = eventlog.Logger("conn")
......@@ -33,16 +34,19 @@ type singleConn struct {
remote peer.ID
maconn manet.Conn
msgrw msgio.ReadWriteCloser
event io.Closer
}
// newConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote peer.ID, maconn manet.Conn) (Conn, error) {
ml := lgbl.Dial("conn", local, remote, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
conn := &singleConn{
local: local,
remote: remote,
maconn: maconn,
msgrw: msgio.NewReadWriter(maconn),
event: log.EventBegin(ctx, "connLifetime", ml),
}
log.Debugf("newSingleConn %p: %v to %v", conn, local, remote)
......@@ -51,7 +55,13 @@ func newSingleConn(ctx context.Context, local, remote peer.ID, maconn manet.Conn
// close is the internal close function, called by ContextCloser.Close
func (c *singleConn) Close() error {
log.Debug(debugerr.Errorf("%s closing Conn with %s", c.local, c.remote))
defer func() {
if c.event != nil {
c.event.Close()
c.event = nil
}
}()
// close underlying connection
return c.msgrw.Close()
}
......
......@@ -6,6 +6,7 @@ import (
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
util "github.com/jbenet/go-ipfs/util"
)
var log = eventlog.Logger("dht.pb")
......@@ -142,6 +143,7 @@ func (m *Message) Loggable() map[string]interface{} {
return map[string]interface{}{
"message": map[string]string{
"type": m.Type.String(),
"key": util.Key(m.GetKey()).Pretty(),
},
}
}
......
......@@ -49,7 +49,7 @@ type EventLogger interface {
// the metadata is logged.
Event(ctx context.Context, event string, m ...Loggable)
EventBegin(ctx context.Context, event string, m ...Loggable) EventInProgress
EventBegin(ctx context.Context, event string, m ...Loggable) *EventInProgress
}
// Logger retrieves an event logger by name
......@@ -68,11 +68,11 @@ type eventLogger struct {
// TODO add log-level
}
func (el *eventLogger) EventBegin(ctx context.Context, event string, metadata ...Loggable) EventInProgress {
func (el *eventLogger) EventBegin(ctx context.Context, event string, metadata ...Loggable) *EventInProgress {
start := time.Now()
el.Event(ctx, fmt.Sprintf("%sBegin", event), metadata...)
eip := EventInProgress{}
eip := &EventInProgress{}
eip.doneFunc = func(additional []Loggable) {
metadata = append(metadata, additional...) // anything added during the operation
......@@ -116,12 +116,12 @@ type EventInProgress struct {
}
// Append adds loggables to be included in the call to Done
func (eip EventInProgress) Append(l Loggable) {
func (eip *EventInProgress) Append(l Loggable) {
eip.loggables = append(eip.loggables, l)
}
// SetError includes the provided error
func (eip EventInProgress) SetError(err error) {
func (eip *EventInProgress) SetError(err error) {
eip.loggables = append(eip.loggables, LoggableMap{
"error": err.Error(),
})
......@@ -129,12 +129,12 @@ func (eip EventInProgress) SetError(err error) {
// Done creates a new Event entry that includes the duration and appended
// loggables.
func (eip EventInProgress) Done() {
func (eip *EventInProgress) Done() {
eip.doneFunc(eip.loggables) // create final event with extra data
}
// Close is an alias for done
func (eip EventInProgress) Close() error {
func (eip *EventInProgress) Close() error {
eip.Done()
return nil
}
......@@ -26,3 +26,9 @@ func Deferred(key string, f func() string) Loggable {
}
return LoggableF(function)
}
func Pair(key string, l Loggable) Loggable {
return LoggableMap{
key: l,
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment