Commit 70fd0fd9 authored by Steven Allen's avatar Steven Allen

aggressively free memory

This ensures we don't keep large buffers allocated.
parent 480b3f75
...@@ -185,8 +185,8 @@ github.com/libp2p/go-mplex v0.1.0 h1:/nBTy5+1yRyY82YaO6HXQRnO5IAGsXTjEJaR3LdTPc0 ...@@ -185,8 +185,8 @@ github.com/libp2p/go-mplex v0.1.0 h1:/nBTy5+1yRyY82YaO6HXQRnO5IAGsXTjEJaR3LdTPc0
github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU= github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU=
github.com/libp2p/go-msgio v0.0.2 h1:ivPvEKHxmVkTClHzg6RXTYHqaJQ0V9cDbq+6lKb3UV0= github.com/libp2p/go-msgio v0.0.2 h1:ivPvEKHxmVkTClHzg6RXTYHqaJQ0V9cDbq+6lKb3UV0=
github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-msgio v0.0.3 h1:VsOlWispTivSsOMg70e0W77y6oiSBSRCyP6URrWvE04= github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA=
github.com/libp2p/go-msgio v0.0.3/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-nat v0.0.3 h1:l6fKV+p0Xa354EqQOQP+d8CivdLM4kl5GxC1hSc/UeI= github.com/libp2p/go-nat v0.0.3 h1:l6fKV+p0Xa354EqQOQP+d8CivdLM4kl5GxC1hSc/UeI=
github.com/libp2p/go-nat v0.0.3/go.mod h1:88nUEt0k0JD45Bk93NIwDqjlhiOwOoV36GchpcVc1yI= github.com/libp2p/go-nat v0.0.3/go.mod h1:88nUEt0k0JD45Bk93NIwDqjlhiOwOoV36GchpcVc1yI=
github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw= github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw=
......
package message package message
import ( import (
"encoding/binary"
"fmt" "fmt"
"io" "io"
...@@ -8,8 +9,9 @@ import ( ...@@ -8,8 +9,9 @@ import (
wantlist "github.com/ipfs/go-bitswap/wantlist" wantlist "github.com/ipfs/go-bitswap/wantlist"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
pool "github.com/libp2p/go-buffer-pool"
msgio "github.com/libp2p/go-msgio"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
) )
...@@ -170,18 +172,22 @@ func (m *impl) AddBlock(b blocks.Block) { ...@@ -170,18 +172,22 @@ func (m *impl) AddBlock(b blocks.Block) {
// FromNet generates a new BitswapMessage from incoming data on an io.Reader. // FromNet generates a new BitswapMessage from incoming data on an io.Reader.
func FromNet(r io.Reader) (BitSwapMessage, error) { func FromNet(r io.Reader) (BitSwapMessage, error) {
pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax) reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax)
return FromPBReader(pbr) return FromMsgReader(reader)
} }
// FromPBReader generates a new Bitswap message from a gogo-protobuf reader // FromPBReader generates a new Bitswap message from a gogo-protobuf reader
func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) { func FromMsgReader(r msgio.Reader) (BitSwapMessage, error) {
pb := new(pb.Message) msg, err := r.ReadMsg()
if err := pbr.ReadMsg(pb); err != nil { if err != nil {
return nil, err return nil, err
} }
var pb pb.Message
return newMessageFromProto(*pb) if err := pb.Unmarshal(msg); err != nil {
return nil, err
}
r.ReleaseMsg(msg)
return newMessageFromProto(pb)
} }
func (m *impl) ToProtoV0() *pb.Message { func (m *impl) ToProtoV0() *pb.Message {
...@@ -228,15 +234,25 @@ func (m *impl) ToProtoV1() *pb.Message { ...@@ -228,15 +234,25 @@ func (m *impl) ToProtoV1() *pb.Message {
} }
func (m *impl) ToNetV0(w io.Writer) error { func (m *impl) ToNetV0(w io.Writer) error {
pbw := ggio.NewDelimitedWriter(w) return write(w, m.ToProtoV0())
return pbw.WriteMsg(m.ToProtoV0())
} }
func (m *impl) ToNetV1(w io.Writer) error { func (m *impl) ToNetV1(w io.Writer) error {
pbw := ggio.NewDelimitedWriter(w) return write(w, m.ToProtoV1())
}
return pbw.WriteMsg(m.ToProtoV1()) func write(w io.Writer, m *pb.Message) error {
size := m.Size()
buf := pool.Get(size + binary.MaxVarintLen64)
defer pool.Put(buf)
n := binary.PutUvarint(buf, uint64(size))
if written, err := m.MarshalTo(buf[n:]); err != nil {
return err
} else {
n += written
}
_, err := w.Write(buf[:n])
return err
} }
func (m *impl) Loggable() map[string]interface{} { func (m *impl) Loggable() map[string]interface{} {
......
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
bsmsg "github.com/ipfs/go-bitswap/message" bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/libp2p/go-libp2p-core/helpers" "github.com/libp2p/go-libp2p-core/helpers"
ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/connmgr"
...@@ -19,6 +18,7 @@ import ( ...@@ -19,6 +18,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore" peerstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-core/routing"
msgio "github.com/libp2p/go-msgio"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
...@@ -178,9 +178,9 @@ func (bsnet *impl) handleNewStream(s network.Stream) { ...@@ -178,9 +178,9 @@ func (bsnet *impl) handleNewStream(s network.Stream) {
return return
} }
reader := ggio.NewDelimitedReader(s, network.MessageSizeMax) reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
for { for {
received, err := bsmsg.FromPBReader(reader) received, err := bsmsg.FromMsgReader(reader)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
s.Reset() s.Reset()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment