tracer.go 6.26 KB
Newer Older
1 2 3
package pubsub

import (
vyzo's avatar
vyzo committed
4 5
	"compress/gzip"
	"context"
6 7 8 9
	"encoding/json"
	"io"
	"os"
	"sync"
vyzo's avatar
vyzo committed
10
	"time"
vyzo's avatar
vyzo committed
11 12

	pb "github.com/libp2p/go-libp2p-pubsub/pb"
vyzo's avatar
vyzo committed
13 14 15 16 17

	"github.com/libp2p/go-libp2p-core/helpers"
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
18
	"github.com/libp2p/go-libp2p-core/peerstore"
vyzo's avatar
vyzo committed
19 20
	"github.com/libp2p/go-libp2p-core/protocol"

21
	"github.com/libp2p/go-msgio/protoio"
22 23
)

vyzo's avatar
vyzo committed
24
var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words.
25
var MinTraceBatchSize = 16
vyzo's avatar
vyzo committed
26

27 28 29 30 31
// rejection reasons
const (
	rejectBlacklstedPeer      = "blacklisted peer"
	rejectBlacklistedSource   = "blacklisted source"
	rejectMissingSignature    = "missing signature"
32 33
	rejectUnexpectedSignature = "unexpected signature"
	rejectUnexpectedAuthInfo  = "unexpected auth info"
34 35 36 37
	rejectInvalidSignature    = "invalid signature"
	rejectValidationQueueFull = "validation queue full"
	rejectValidationThrottled = "validation throttled"
	rejectValidationFailed    = "validation failed"
vyzo's avatar
vyzo committed
38
	rejectValidationIgnored   = "validation ignored"
39
	rejectSelfOrigin          = "self originated message"
40 41
)

vyzo's avatar
vyzo committed
42
type basicTracer struct {
vyzo's avatar
vyzo committed
43 44 45 46 47
	ch     chan struct{}
	mx     sync.Mutex
	buf    []*pb.TraceEvent
	lossy  bool
	closed bool
48 49
}

50
func (t *basicTracer) Trace(evt *pb.TraceEvent) {
vyzo's avatar
vyzo committed
51
	t.mx.Lock()
vyzo's avatar
vyzo committed
52 53
	defer t.mx.Unlock()

vyzo's avatar
vyzo committed
54 55 56 57
	if t.closed {
		return
	}

vyzo's avatar
vyzo committed
58
	if t.lossy && len(t.buf) > TraceBufferSize {
59
		log.Debug("trace buffer overflow; dropping trace event")
vyzo's avatar
vyzo committed
60 61 62
	} else {
		t.buf = append(t.buf, evt)
	}
vyzo's avatar
vyzo committed
63 64 65 66 67 68 69 70

	select {
	case t.ch <- struct{}{}:
	default:
	}
}

func (t *basicTracer) Close() {
vyzo's avatar
vyzo committed
71 72 73 74 75 76
	t.mx.Lock()
	defer t.mx.Unlock()
	if !t.closed {
		t.closed = true
		close(t.ch)
	}
vyzo's avatar
vyzo committed
77 78 79 80 81 82 83 84 85
}

// JSONTracer is a tracer that writes events to a file, encoded in ndjson.
type JSONTracer struct {
	basicTracer
	w io.WriteCloser
}

// NewJsonTracer creates a new JSONTracer writing traces to file.
86 87 88 89
func NewJSONTracer(file string) (*JSONTracer, error) {
	return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}

vyzo's avatar
vyzo committed
90
// OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions.
91 92 93 94 95 96
func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) {
	f, err := os.OpenFile(file, flags, perm)
	if err != nil {
		return nil, err
	}

vyzo's avatar
vyzo committed
97
	tr := &JSONTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
98 99 100 101 102
	go tr.doWrite()

	return tr, nil
}

vyzo's avatar
vyzo committed
103
func (t *JSONTracer) doWrite() {
104
	var buf []*pb.TraceEvent
vyzo's avatar
vyzo committed
105 106 107
	enc := json.NewEncoder(t.w)
	for {
		_, ok := <-t.ch
108

vyzo's avatar
vyzo committed
109 110 111 112 113 114 115 116 117
		t.mx.Lock()
		tmp := t.buf
		t.buf = buf[:0]
		buf = tmp
		t.mx.Unlock()

		for i, evt := range buf {
			err := enc.Encode(evt)
			if err != nil {
vyzo's avatar
vyzo committed
118
				log.Warnf("error writing event trace: %s", err.Error())
vyzo's avatar
vyzo committed
119 120 121 122 123 124 125 126
			}
			buf[i] = nil
		}

		if !ok {
			t.w.Close()
			return
		}
127 128 129
	}
}

vyzo's avatar
vyzo committed
130 131 132 133 134 135
var _ EventTracer = (*JSONTracer)(nil)

// PBTracer is a tracer that writes events to a file, as delimited protobufs.
type PBTracer struct {
	basicTracer
	w io.WriteCloser
136 137
}

vyzo's avatar
vyzo committed
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
func NewPBTracer(file string) (*PBTracer, error) {
	return OpenPBTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}

// OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions.
func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error) {
	f, err := os.OpenFile(file, flags, perm)
	if err != nil {
		return nil, err
	}

	tr := &PBTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
	go tr.doWrite()

	return tr, nil
}

func (t *PBTracer) doWrite() {
156
	var buf []*pb.TraceEvent
157
	w := protoio.NewDelimitedWriter(t.w)
158 159 160 161 162 163 164 165 166 167
	for {
		_, ok := <-t.ch

		t.mx.Lock()
		tmp := t.buf
		t.buf = buf[:0]
		buf = tmp
		t.mx.Unlock()

		for i, evt := range buf {
168
			err := w.WriteMsg(evt)
169
			if err != nil {
vyzo's avatar
vyzo committed
170
				log.Warnf("error writing event trace: %s", err.Error())
171 172 173 174 175 176 177 178 179 180
			}
			buf[i] = nil
		}

		if !ok {
			t.w.Close()
			return
		}
	}
}
vyzo's avatar
vyzo committed
181 182

var _ EventTracer = (*PBTracer)(nil)
vyzo's avatar
vyzo committed
183 184 185 186 187 188 189 190

const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0")

// RemoteTracer is a tracer that sends trace events to a remote peer
type RemoteTracer struct {
	basicTracer
	ctx  context.Context
	host host.Host
191
	peer peer.ID
vyzo's avatar
vyzo committed
192 193 194 195
}

// NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi
func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error) {
196
	tr := &RemoteTracer{ctx: ctx, host: host, peer: pi.ID, basicTracer: basicTracer{ch: make(chan struct{}, 1), lossy: true}}
vyzo's avatar
vyzo committed
197
	host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL)
vyzo's avatar
vyzo committed
198 199 200 201 202 203 204 205 206
	go tr.doWrite()
	return tr, nil
}

func (t *RemoteTracer) doWrite() {
	var buf []*pb.TraceEvent

	s, err := t.openStream()
	if err != nil {
vyzo's avatar
vyzo committed
207
		log.Warnf("error opening remote tracer stream: %s", err.Error())
vyzo's avatar
vyzo committed
208 209 210
		return
	}

vyzo's avatar
vyzo committed
211 212 213
	var batch pb.TraceEventBatch

	gzipW := gzip.NewWriter(s)
214
	w := protoio.NewDelimitedWriter(gzipW)
vyzo's avatar
vyzo committed
215 216 217 218

	for {
		_, ok := <-t.ch

219 220
		// deadline for batch accumulation
		deadline := time.Now().Add(time.Second)
221

vyzo's avatar
vyzo committed
222
		t.mx.Lock()
vyzo's avatar
vyzo committed
223
		for len(t.buf) < MinTraceBatchSize && time.Now().Before(deadline) {
224 225
			t.mx.Unlock()
			time.Sleep(100 * time.Millisecond)
vyzo's avatar
vyzo committed
226
			t.mx.Lock()
227 228
		}

vyzo's avatar
vyzo committed
229 230 231 232 233 234 235 236 237
		tmp := t.buf
		t.buf = buf[:0]
		buf = tmp
		t.mx.Unlock()

		if len(buf) == 0 {
			goto end
		}

vyzo's avatar
vyzo committed
238
		batch.Batch = buf
vyzo's avatar
vyzo committed
239

vyzo's avatar
vyzo committed
240 241
		err = w.WriteMsg(&batch)
		if err != nil {
vyzo's avatar
vyzo committed
242
			log.Warnf("error writing trace event batch: %s", err)
vyzo's avatar
vyzo committed
243 244
			goto end
		}
vyzo's avatar
vyzo committed
245

vyzo's avatar
vyzo committed
246 247
		err = gzipW.Flush()
		if err != nil {
vyzo's avatar
vyzo committed
248
			log.Warnf("error flushin gzip stream: %s", err)
vyzo's avatar
vyzo committed
249
			goto end
vyzo's avatar
vyzo committed
250 251 252
		}

	end:
vyzo's avatar
vyzo committed
253 254 255 256 257
		// nil out the buffer to gc consumed events
		for i := range buf {
			buf[i] = nil
		}

vyzo's avatar
vyzo committed
258
		if !ok {
vyzo's avatar
vyzo committed
259 260 261 262 263 264
			if err != nil {
				s.Reset()
			} else {
				gzipW.Close()
				helpers.FullClose(s)
			}
vyzo's avatar
vyzo committed
265 266
			return
		}
vyzo's avatar
vyzo committed
267 268

		if err != nil {
vyzo's avatar
vyzo committed
269
			s.Reset()
vyzo's avatar
vyzo committed
270 271
			s, err = t.openStream()
			if err != nil {
vyzo's avatar
vyzo committed
272
				log.Warnf("error opening remote tracer stream: %s", err.Error())
vyzo's avatar
vyzo committed
273 274 275 276 277
				return
			}

			gzipW.Reset(s)
		}
vyzo's avatar
vyzo committed
278 279 280 281 282 283
	}
}

func (t *RemoteTracer) openStream() (network.Stream, error) {
	for {
		ctx, cancel := context.WithTimeout(t.ctx, time.Minute)
284
		s, err := t.host.NewStream(ctx, t.peer, RemoteTracerProtoID)
vyzo's avatar
vyzo committed
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
		cancel()
		if err != nil {
			if t.ctx.Err() != nil {
				return nil, err
			}

			// wait a minute and try again, to account for transient server downtime
			select {
			case <-time.After(time.Minute):
				continue
			case <-t.ctx.Done():
				return nil, t.ctx.Err()
			}
		}

		return s, nil
	}
}

var _ EventTracer = (*RemoteTracer)(nil)