Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feat/libp2p-sharedtcp
Browse files Browse the repository at this point in the history
  • Loading branch information
lidel committed Dec 3, 2024
2 parents 6de3faa + 433444b commit 10eed69
Show file tree
Hide file tree
Showing 46 changed files with 613 additions and 688 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gotest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
make -j "$PARALLEL" test/unit/gotest.junit.xml &&
[[ ! $(jq -s -c 'map(select(.Action == "fail")) | .[]' test/unit/gotest.json) ]]
- name: Upload coverage to Codecov
uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
uses: codecov/codecov-action@015f24e6818733317a2da2edd6290ab26238649a # v5.0.7
if: failure() || success()
with:
name: unittests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/sharness.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
# increasing parallelism beyond 10 doesn't speed up the tests much
PARALLEL: ${{ github.repository == 'ipfs/kubo' && 10 || 3 }}
- name: Upload coverage report
uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
uses: codecov/codecov-action@015f24e6818733317a2da2edd6290ab26238649a # v5.0.7
if: failure() || success()
with:
name: sharness
Expand Down
64 changes: 25 additions & 39 deletions client/rpc/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,59 +62,45 @@ type pinLsObject struct {
Type string
}

func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, error) {
func (api *PinAPI) Ls(ctx context.Context, pins chan<- iface.Pin, opts ...caopts.PinLsOption) error {
defer close(pins)

options, err := caopts.PinLsOptions(opts...)
if err != nil {
return nil, err
return err
}

res, err := api.core().Request("pin/ls").
Option("type", options.Type).
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
return err
}

pins := make(chan iface.Pin)
go func(ch chan<- iface.Pin) {
defer res.Output.Close()
defer close(ch)

dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
switch err := dec.Decode(&out); err {
case nil:
case io.EOF:
return
default:
select {
case ch <- pin{err: err}:
return
case <-ctx.Done():
return
}
defer res.Output.Close()

dec := json.NewDecoder(res.Output)
var out pinLsObject
for {
err := dec.Decode(&out)
if err != nil {
if err != io.EOF {
return err
}
return nil
}

c, err := cid.Parse(out.Cid)
if err != nil {
select {
case ch <- pin{err: err}:
return
case <-ctx.Done():
return
}
}
c, err := cid.Parse(out.Cid)
if err != nil {
return err
}

select {
case ch <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
case <-ctx.Done():
return
}
select {
case pins <- pin{typ: out.Type, name: out.Name, path: path.FromCid(c)}:
case <-ctx.Done():
return ctx.Err()
}
}(pins)
return pins, nil
}
}

// IsPinned returns whether or not the given cid is pinned
Expand Down
116 changes: 48 additions & 68 deletions client/rpc/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ type lsOutput struct {
Objects []lsObject
}

func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, error) {
func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, out chan<- iface.DirEntry, opts ...caopts.UnixfsLsOption) error {
defer close(out)

options, err := caopts.UnixfsLsOptions(opts...)
if err != nil {
return nil, err
return err
}

resp, err := api.core().Request("ls", p.String()).
Expand All @@ -156,86 +158,64 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.Unixfs
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
return err
}
if resp.Error != nil {
return nil, resp.Error
return err
}
defer resp.Close()

dec := json.NewDecoder(resp.Output)
out := make(chan iface.DirEntry)

go func() {
defer resp.Close()
defer close(out)

for {
var link lsOutput
if err := dec.Decode(&link); err != nil {
if err == io.EOF {
return
}
select {
case out <- iface.DirEntry{Err: err}:
case <-ctx.Done():
}
return
}

if len(link.Objects) != 1 {
select {
case out <- iface.DirEntry{Err: errors.New("unexpected Objects len")}:
case <-ctx.Done():
}
return
for {
var link lsOutput
if err = dec.Decode(&link); err != nil {
if err != io.EOF {
return err
}
return nil
}

if len(link.Objects[0].Links) != 1 {
select {
case out <- iface.DirEntry{Err: errors.New("unexpected Links len")}:
case <-ctx.Done():
}
return
}
if len(link.Objects) != 1 {
return errors.New("unexpected Objects len")
}

l0 := link.Objects[0].Links[0]
if len(link.Objects[0].Links) != 1 {
return errors.New("unexpected Links len")
}

c, err := cid.Decode(l0.Hash)
if err != nil {
select {
case out <- iface.DirEntry{Err: err}:
case <-ctx.Done():
}
return
}
l0 := link.Objects[0].Links[0]

var ftype iface.FileType
switch l0.Type {
case unixfs.TRaw, unixfs.TFile:
ftype = iface.TFile
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
ftype = iface.TDirectory
case unixfs.TSymlink:
ftype = iface.TSymlink
}
c, err := cid.Decode(l0.Hash)
if err != nil {
return err
}

select {
case out <- iface.DirEntry{
Name: l0.Name,
Cid: c,
Size: l0.Size,
Type: ftype,
Target: l0.Target,

Mode: l0.Mode,
ModTime: l0.ModTime,
}:
case <-ctx.Done():
}
var ftype iface.FileType
switch l0.Type {
case unixfs.TRaw, unixfs.TFile:
ftype = iface.TFile
case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata:
ftype = iface.TDirectory
case unixfs.TSymlink:
ftype = iface.TSymlink
}
}()

return out, nil
select {
case out <- iface.DirEntry{
Name: l0.Name,
Cid: c,
Size: l0.Size,
Type: ftype,
Target: l0.Target,

Mode: l0.Mode,
ModTime: l0.ModTime,
}:
case <-ctx.Done():
return ctx.Err()
}
}
}

func (api *UnixfsAPI) core() *HttpApi {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs/kubo/pinmfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func pinMFS(ctx context.Context, node pinMFSNode, cid cid.Cid, svcName string, s

// check if MFS pin exists (across all possible states) and inspect its CID
pinStatuses := []pinclient.Status{pinclient.StatusQueued, pinclient.StatusPinning, pinclient.StatusPinned, pinclient.StatusFailed}
lsPinCh, lsErrCh := c.Ls(ctx, pinclient.PinOpts.FilterName(pinName), pinclient.PinOpts.FilterStatus(pinStatuses...))
lsPinCh, lsErrCh := c.GoLs(ctx, pinclient.PinOpts.FilterName(pinName), pinclient.PinOpts.FilterStatus(pinStatuses...))
existingRequestID := "" // is there any pre-existing MFS pin with pinName (for any CID)?
pinning := false // is CID for current MFS already being pinned?
pinTime := time.Now().UTC()
Expand Down
7 changes: 4 additions & 3 deletions config/bootstrap_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
// import dependency issue. TODO: move this into a config/default/ package.
var DefaultBootstrapAddresses = []string{
"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", // rust-libp2p-server
"/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
"/ip4/104.131.131.82/udp/4001/quic-v1/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
"/dnsaddr/va1.bootstrap.libp2p.io/p2p/12D3KooWKnDdG3iXw9eTFijk3EWSunZcFi54Zka4wmtqtt6rPxc8", // js-libp2p-amino-dht-bootstrapper
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
"/ip4/104.131.131.82/udp/4001/quic-v1/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
}

// ErrInvalidPeerAddr signals an address is not a valid peer address.
Expand Down
9 changes: 9 additions & 0 deletions core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ See 'dag export' and 'dag import' for more information.
return fmt.Errorf("%s and %s options are not compatible", onlyHashOptionName, toFilesOptionName)
}

if wrap && toFilesSet {
return fmt.Errorf("%s and %s options are not compatible", wrapOptionName, toFilesOptionName)
}

hashFunCode, ok := mh.Names[strings.ToLower(hashFunStr)]
if !ok {
return fmt.Errorf("unrecognized hash function: %q", strings.ToLower(hashFunStr))
Expand Down Expand Up @@ -373,6 +377,11 @@ See 'dag export' and 'dag import' for more information.

// creating MFS pointers when optional --to-files is set
if toFilesSet {
if addit.Name() == "" {
errCh <- fmt.Errorf("%s: cannot add unnamed files to MFS", toFilesOptionName)
return
}

if toFilesStr == "" {
toFilesStr = "/"
}
Expand Down
21 changes: 3 additions & 18 deletions core/commands/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"

cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
e "github.com/ipfs/kubo/core/commands/e"

humanize "github.com/dustin/go-humanize"
bitswap "github.com/ipfs/boxo/bitswap"
Expand Down Expand Up @@ -53,10 +52,7 @@ Print out all blocks currently on the bitswap wantlist for the local peer.`,
return ErrNotOnline
}

bs, ok := nd.Exchange.(*bitswap.Bitswap)
if !ok {
return e.TypeErr(bs, nd.Exchange)
}
bs := nd.Bitswap

pstr, found := req.Options[peerOptionName].(string)
if found {
Expand Down Expand Up @@ -112,12 +108,7 @@ var bitswapStatCmd = &cmds.Command{
return cmds.Errorf(cmds.ErrClient, "unable to run offline: %s", ErrNotOnline)
}

bs, ok := nd.Exchange.(*bitswap.Bitswap)
if !ok {
return e.TypeErr(bs, nd.Exchange)
}

st, err := bs.Stat()
st, err := nd.Bitswap.Stat()
if err != nil {
return err
}
Expand All @@ -134,7 +125,6 @@ var bitswapStatCmd = &cmds.Command{
human, _ := req.Options[bitswapHumanOptionName].(bool)

fmt.Fprintln(w, "bitswap status")
fmt.Fprintf(w, "\tprovides buffer: %d / %d\n", s.ProvideBufLen, bitswap.HasBlockBufferSize)
fmt.Fprintf(w, "\tblocks received: %d\n", s.BlocksReceived)
fmt.Fprintf(w, "\tblocks sent: %d\n", s.BlocksSent)
if human {
Expand Down Expand Up @@ -190,17 +180,12 @@ prints the ledger associated with a given peer.
return ErrNotOnline
}

bs, ok := nd.Exchange.(*bitswap.Bitswap)
if !ok {
return e.TypeErr(bs, nd.Exchange)
}

partner, err := peer.Decode(req.Arguments[0])
if err != nil {
return err
}

return cmds.EmitOnce(res, bs.LedgerForPeer(partner))
return cmds.EmitOnce(res, nd.Bitswap.LedgerForPeer(partner))
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *server.Receipt) error {
Expand Down
Loading

0 comments on commit 10eed69

Please sign in to comment.