package message import ( "fmt" "io" pb "github.com/ipfs/go-bitswap/message/pb" wantlist "github.com/ipfs/go-bitswap/wantlist" blocks "github.com/ipfs/go-block-format" ggio "github.com/gogo/protobuf/io" cid "github.com/ipfs/go-cid" inet "github.com/libp2p/go-libp2p-net" ) // TODO move message.go into the bitswap package // TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb type BitSwapMessage interface { // Wantlist returns a slice of unique keys that represent data wanted by // the sender. Wantlist() []Entry // Blocks returns a slice of unique blocks. Blocks() []blocks.Block // AddEntry adds an entry to the Wantlist. AddEntry(key cid.Cid, priority int) Cancel(key cid.Cid) Empty() bool // A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set Full() bool AddBlock(blocks.Block) Exportable Loggable() map[string]interface{} } type Exportable interface { ToProtoV0() *pb.Message ToProtoV1() *pb.Message ToNetV0(w io.Writer) error ToNetV1(w io.Writer) error } type impl struct { full bool wantlist map[cid.Cid]*Entry blocks map[cid.Cid]blocks.Block } func New(full bool) BitSwapMessage { return newMsg(full) } func newMsg(full bool) *impl { return &impl{ blocks: make(map[cid.Cid]blocks.Block), wantlist: make(map[cid.Cid]*Entry), full: full, } } type Entry struct { wantlist.Entry Cancel bool } func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) { m := newMsg(pbm.Wantlist.Full) for _, e := range pbm.Wantlist.Entries { c, err := cid.Cast([]byte(e.Block)) if err != nil { return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err) } m.addEntry(c, int(e.Priority), e.Cancel) } // deprecated for _, d := range pbm.Blocks { // CIDv0, sha256, protobuf only b := blocks.NewBlock(d) m.AddBlock(b) } // for _, b := range pbm.GetPayload() { pref, err := cid.PrefixFromBytes(b.GetPrefix()) if err != nil { return nil, err } c, err := pref.Sum(b.GetData()) if err != nil { return nil, err } blk, err := blocks.NewBlockWithCid(b.GetData(), c) if err != nil { return nil, err } m.AddBlock(blk) } return m, nil } func (m *impl) Full() bool { return m.full } func (m *impl) Empty() bool { return len(m.blocks) == 0 && len(m.wantlist) == 0 } func (m *impl) Wantlist() []Entry { out := make([]Entry, 0, len(m.wantlist)) for _, e := range m.wantlist { out = append(out, *e) } return out } func (m *impl) Blocks() []blocks.Block { bs := make([]blocks.Block, 0, len(m.blocks)) for _, block := range m.blocks { bs = append(bs, block) } return bs } func (m *impl) Cancel(k cid.Cid) { delete(m.wantlist, k) m.addEntry(k, 0, true) } func (m *impl) AddEntry(k cid.Cid, priority int) { m.addEntry(k, priority, false) } func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) { e, exists := m.wantlist[c] if exists { e.Priority = priority e.Cancel = cancel } else { m.wantlist[c] = &Entry{ Entry: wantlist.Entry{ Cid: c, Priority: priority, }, Cancel: cancel, } } } func (m *impl) AddBlock(b blocks.Block) { m.blocks[b.Cid()] = b } func FromNet(r io.Reader) (BitSwapMessage, error) { pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax) return FromPBReader(pbr) } func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) { pb := new(pb.Message) if err := pbr.ReadMsg(pb); err != nil { return nil, err } return newMessageFromProto(*pb) } func (m *impl) ToProtoV0() *pb.Message { pbm := new(pb.Message) pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist)) for _, e := range m.wantlist { pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, pb.Message_Wantlist_Entry{ Block: e.Cid.Bytes(), Priority: int32(e.Priority), Cancel: e.Cancel, }) } pbm.Wantlist.Full = m.full blocks := m.Blocks() pbm.Blocks = make([][]byte, 0, len(blocks)) for _, b := range blocks { pbm.Blocks = append(pbm.Blocks, b.RawData()) } return pbm } func (m *impl) ToProtoV1() *pb.Message { pbm := new(pb.Message) pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist)) for _, e := range m.wantlist { pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, pb.Message_Wantlist_Entry{ Block: e.Cid.Bytes(), Priority: int32(e.Priority), Cancel: e.Cancel, }) } pbm.Wantlist.Full = m.full blocks := m.Blocks() pbm.Payload = make([]pb.Message_Block, 0, len(blocks)) for _, b := range blocks { pbm.Payload = append(pbm.Payload, pb.Message_Block{ Data: b.RawData(), Prefix: b.Cid().Prefix().Bytes(), }) } return pbm } func (m *impl) ToNetV0(w io.Writer) error { pbw := ggio.NewDelimitedWriter(w) return pbw.WriteMsg(m.ToProtoV0()) } func (m *impl) ToNetV1(w io.Writer) error { pbw := ggio.NewDelimitedWriter(w) return pbw.WriteMsg(m.ToProtoV1()) } func (m *impl) Loggable() map[string]interface{} { blocks := make([]string, 0, len(m.blocks)) for _, v := range m.blocks { blocks = append(blocks, v.Cid().String()) } return map[string]interface{}{ "blocks": blocks, "wants": m.Wantlist(), } }