Commit b4150aed authored by Daniel Martí's avatar Daniel Martí

decode directly with a []byte

IPLD's codec helper reader has a relatively high cost, unfortunately. It
was the main contributor to a slowdown in go-merkledag when moving from
the old protobuf gogo-generated decoder to go-codec-dagpb.

Using a []byte also means we can reuse protobuf's well-optimized "wire
encoding" helpers, which gets us extra speed and allows removing some
code.

This should not matter in practice for the time being, as the only
go-codec-dagpb user is go-merkledag and it uses bytes.Buffer everywhere.

In the future it would be nice for go-codec-dagpb to be just as
efficient with a stream decoder, but right now I don't have the extra
week to get into that. Plus, if the core protobuf implementation works
on []byte, one can assume it's reasonable for us to do the same.

Using the new BenchmarkRoundtrip in go-merkledag with go-codec-dagpb, we
get a significant uplift in performance:

	name         old time/op    new time/op    delta
	Roundtrip-8    6.49µs ± 1%    5.34µs ± 1%  -17.74%  (p=0.002 n=6+6)

	name         old alloc/op   new alloc/op   delta
	Roundtrip-8    8.07kB ± 0%    7.50kB ± 0%   -7.04%  (p=0.002 n=6+6)

	name         old allocs/op  new allocs/op  delta
	Roundtrip-8       171 ± 0%       148 ± 0%  -13.45%  (p=0.002 n=6+6)
parent d6e141f2
......@@ -5,5 +5,5 @@ go 1.15
require (
github.com/ipfs/go-cid v0.0.7
github.com/ipld/go-ipld-prime v0.9.0
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e
google.golang.org/protobuf v1.26.0
)
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/ipfs/go-cid v0.0.4/go.mod h1:4LLaPOQwmk5z9LBgQnpkivrx8BJjUyGwTXCd5Xfj6+M=
......@@ -38,9 +40,8 @@ github.com/multiformats/go-multihash v0.0.15/go.mod h1:D6aZrWNLFTV/ynMpKsNtB40mJ
github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY=
github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/polydawn/refmt v0.0.0-20190807091052-3d65705ee9f1 h1:CskT+S6Ay54OwxBGB0R3Rsx4Muto6UnEYTyKJbyRIAI=
github.com/polydawn/refmt v0.0.0-20190807091052-3d65705ee9f1/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e h1:ZOcivgkkFRnjfoTcGsDq3UQYiBmekwLA+qg0OjyB/ls=
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
......@@ -63,3 +64,6 @@ golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXR
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
......@@ -3,11 +3,12 @@ package dagpb
import (
"fmt"
"io"
"io/ioutil"
"github.com/ipfs/go-cid"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/polydawn/refmt/shared"
"google.golang.org/protobuf/encoding/protowire"
)
// ErrIntOverflow is returned a varint overflows during decode, it indicates
......@@ -19,12 +20,23 @@ var ErrIntOverflow = fmt.Errorf("protobuf: varint overflow")
// Node. Use the NodeAssembler from the PBNode type for safest construction
// (Type.PBNode.NewBuilder()). A Map assembler will also work.
func Unmarshal(na ipld.NodeAssembler, in io.Reader) error {
var remaining []byte
if buf, ok := in.(interface{ Bytes() []byte }); ok {
remaining = buf.Bytes()
} else {
var err error
remaining, err = ioutil.ReadAll(in)
if err != nil {
return err
}
}
ma, err := na.BeginMap(2)
if err != nil {
return err
}
// always make "Links", even if we don't use it
if err = ma.AssembleKey().AssignString("Links"); err != nil {
if err := ma.AssembleKey().AssignString("Links"); err != nil {
return err
}
links, err := ma.AssembleValue().BeginList(0)
......@@ -33,30 +45,33 @@ func Unmarshal(na ipld.NodeAssembler, in io.Reader) error {
}
haveData := false
reader := shared.NewReader(in)
for {
_, err := reader.Readn1()
if err == io.EOF {
if len(remaining) == 0 {
break
}
reader.Unreadn1()
fieldNum, wireType, err := decodeKey(reader)
if err != nil {
return err
fieldNum, wireType, n := protowire.ConsumeTag(remaining)
if n < 0 {
return protowire.ParseError(n)
}
remaining = remaining[n:]
if wireType != 2 {
return fmt.Errorf("protobuf: (PBNode) invalid wireType, expected 2, got %d", wireType)
}
if fieldNum == 1 {
switch fieldNum {
case 1:
if haveData {
return fmt.Errorf("protobuf: (PBNode) duplicate Data section")
}
var chunk []byte
if chunk, err = decodeBytes(reader); err != nil {
return err
chunk, n := protowire.ConsumeBytes(remaining)
if n < 0 {
return protowire.ParseError(n)
}
remaining = remaining[n:]
// Data must come after Links, so it's safe to close this here even if we
// didn't use it
if err := links.Finish(); err != nil {
......@@ -70,26 +85,31 @@ func Unmarshal(na ipld.NodeAssembler, in io.Reader) error {
return err
}
haveData = true
} else if fieldNum == 2 {
case 2:
if haveData {
return fmt.Errorf("protobuf: (PBNode) invalid order, found Data before Links content")
}
bytesLen, err := decodeVarint(reader)
if err != nil {
return err
bytesLen, n := protowire.ConsumeVarint(remaining)
if n < 0 {
return protowire.ParseError(n)
}
remaining = remaining[n:]
curLink, err := links.AssembleValue().BeginMap(3)
if err != nil {
return err
}
if err = unmarshalLink(reader, int(bytesLen), curLink); err != nil {
if err := unmarshalLink(remaining[:bytesLen], curLink); err != nil {
return err
}
remaining = remaining[bytesLen:]
if err := curLink.Finish(); err != nil {
return err
}
} else {
default:
return fmt.Errorf("protobuf: (PBNode) invalid fieldNumber, expected 1 or 2, got %d", fieldNum)
}
}
......@@ -102,24 +122,23 @@ func Unmarshal(na ipld.NodeAssembler, in io.Reader) error {
return ma.Finish()
}
func unmarshalLink(reader shared.SlickReader, length int, ma ipld.MapAssembler) error {
func unmarshalLink(remaining []byte, ma ipld.MapAssembler) error {
haveHash := false
haveName := false
haveTsize := false
startOffset := reader.NumRead()
for {
readBytes := reader.NumRead() - startOffset
if readBytes == length {
if len(remaining) == 0 {
break
} else if readBytes > length {
return fmt.Errorf("protobuf: (PBLink) bad length for link")
}
fieldNum, wireType, err := decodeKey(reader)
if err != nil {
return err
fieldNum, wireType, n := protowire.ConsumeTag(remaining)
if n < 0 {
return protowire.ParseError(n)
}
remaining = remaining[n:]
if fieldNum == 1 {
switch fieldNum {
case 1:
if haveHash {
return fmt.Errorf("protobuf: (PBLink) duplicate Hash section")
}
......@@ -133,12 +152,14 @@ func unmarshalLink(reader shared.SlickReader, length int, ma ipld.MapAssembler)
return fmt.Errorf("protobuf: (PBLink) wrong wireType (%d) for Hash", wireType)
}
var chunk []byte
if chunk, err = decodeBytes(reader); err != nil {
return err
chunk, n := protowire.ConsumeBytes(remaining)
if n < 0 {
return protowire.ParseError(n)
}
var c cid.Cid
if _, c, err = cid.CidFromBytes(chunk); err != nil {
remaining = remaining[n:]
_, c, err := cid.CidFromBytes(chunk)
if err != nil {
return fmt.Errorf("invalid Hash field found in link, expected CID (%v)", err)
}
if err := ma.AssembleKey().AssignString("Hash"); err != nil {
......@@ -148,7 +169,8 @@ func unmarshalLink(reader shared.SlickReader, length int, ma ipld.MapAssembler)
return err
}
haveHash = true
} else if fieldNum == 2 {
case 2:
if haveName {
return fmt.Errorf("protobuf: (PBLink) duplicate Name section")
}
......@@ -159,10 +181,12 @@ func unmarshalLink(reader shared.SlickReader, length int, ma ipld.MapAssembler)
return fmt.Errorf("protobuf: (PBLink) wrong wireType (%d) for Name", wireType)
}
var chunk []byte
if chunk, err = decodeBytes(reader); err != nil {
return err
chunk, n := protowire.ConsumeBytes(remaining)
if n < 0 {
return protowire.ParseError(n)
}
remaining = remaining[n:]
if err := ma.AssembleKey().AssignString("Name"); err != nil {
return err
}
......@@ -170,7 +194,8 @@ func unmarshalLink(reader shared.SlickReader, length int, ma ipld.MapAssembler)
return err
}
haveName = true
} else if fieldNum == 3 {
case 3:
if haveTsize {
return fmt.Errorf("protobuf: (PBLink) duplicate Tsize section")
}
......@@ -178,10 +203,12 @@ func unmarshalLink(reader shared.SlickReader, length int, ma ipld.MapAssembler)
return fmt.Errorf("protobuf: (PBLink) wrong wireType (%d) for Tsize", wireType)
}
var v uint64
if v, err = decodeVarint(reader); err != nil {
return err
v, n := protowire.ConsumeVarint(remaining)
if n < 0 {
return protowire.ParseError(n)
}
remaining = remaining[n:]
if err := ma.AssembleKey().AssignString("Tsize"); err != nil {
return err
}
......@@ -189,7 +216,8 @@ func unmarshalLink(reader shared.SlickReader, length int, ma ipld.MapAssembler)
return err
}
haveTsize = true
} else {
default:
return fmt.Errorf("protobuf: (PBLink) invalid fieldNumber, expected 1, 2 or 3, got %d", fieldNum)
}
}
......@@ -200,48 +228,3 @@ func unmarshalLink(reader shared.SlickReader, length int, ma ipld.MapAssembler)
return nil
}
// decode the lead for a PB chunk, fieldNum & wireType, that tells us which
// field in the schema we're looking at and what data type it is
func decodeKey(reader shared.SlickReader) (int, int, error) {
var wire uint64
var err error
if wire, err = decodeVarint(reader); err != nil {
return 0, 0, err
}
fieldNum := int(wire >> 3)
wireType := int(wire & 0x7)
return fieldNum, wireType, nil
}
// decode a byte string from PB
func decodeBytes(reader shared.SlickReader) ([]byte, error) {
bytesLen, err := decodeVarint(reader)
if err != nil {
return nil, err
}
byts, err := reader.Readn(int(bytesLen))
if err != nil {
return nil, fmt.Errorf("protobuf: unexpected read error: %w", err)
}
return byts, nil
}
// decode a varint from PB
func decodeVarint(reader shared.SlickReader) (uint64, error) {
var v uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflow
}
b, err := reader.Readn1()
if err != nil {
return 0, fmt.Errorf("protobuf: unexpected read error: %w", err)
}
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
return v, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment