Skip to content

Commit d7b7dd0

Browse files
author
eric
committed
retrieve file from CAR
1 parent 15cb0ba commit d7b7dd0

File tree

6 files changed

+294
-5
lines changed

6 files changed

+294
-5
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,5 @@
1515

1616
# Dependency directories (remove the comment below to include it)
1717
# vendor/
18+
19+
.idea

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.PHONY: build
2+
build:
3+
go build -ldflags "-s -w" -o graphsplit graphsplit.go utils.go retrieve.go

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Graphsplit has solved the problem we faced above. It takes advantage of IPLD con
1111

1212
## Build
1313
```sh
14-
go build -o graphsplit graphsplit.go utils.go
14+
make
1515
```
1616

1717
## Usage
@@ -43,6 +43,17 @@ Import car file to IPFS:
4343
ipfs dag import /path/to/car-dir/car-file
4444
```
4545

46+
Retrieve files:
47+
```sh
48+
# car-path: directory or file, in form of .car
49+
# output-dir: usually just be the same as /path/to/output-dir
50+
# parallel: number goroutines run when retrieving
51+
./graphsplit retrieve \
52+
--car-path=/path/to/car-path \
53+
--output-dir=/path/to/output-dir \
54+
--parallel=2
55+
```
56+
4657
## Contribute
4758

4859
PRs are welcome!

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/ipfs/go-ipfs-blockstore v1.0.3
1313
github.com/ipfs/go-ipfs-chunker v0.0.5
1414
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
15+
github.com/ipfs/go-ipfs-files v0.0.8
1516
github.com/ipfs/go-ipld-cbor v0.0.5-0.20200204214505-252690b78669 // indirect
1617
github.com/ipfs/go-ipld-format v0.2.0
1718
github.com/ipfs/go-log/v2 v2.1.2

graphsplit.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"os"
7-
86
logging "github.com/ipfs/go-log/v2"
97
_ "github.com/jinzhu/gorm/dialects/mysql"
108
"github.com/urfave/cli/v2"
119
"golang.org/x/xerrors"
10+
"os"
1211
)
1312

1413
var log = logging.Logger("graphsplit")
@@ -17,6 +16,7 @@ func main() {
1716
logging.SetLogLevel("*", "INFO")
1817
local := []*cli.Command{
1918
chunkCmd,
19+
retrieveCmd,
2020
}
2121

2222
app := &cli.App{
@@ -33,7 +33,7 @@ func main() {
3333

3434
var chunkCmd = &cli.Command{
3535
Name: "chunk",
36-
Usage: "",
36+
Usage: "Generate CAR files of the specified size",
3737
Flags: []cli.Flag{
3838
&cli.Int64Flag{
3939
Name: "slice-size",
@@ -58,6 +58,7 @@ var chunkCmd = &cli.Command{
5858
&cli.StringFlag{
5959
Name: "car-dir",
6060
Required: true,
61+
Usage: fmt.Sprintf("specify output CAR directory"),
6162
},
6263
},
6364
Action: func(c *cli.Context) error {
@@ -154,7 +155,6 @@ var chunkCmd = &cli.Command{
154155
}
155156

156157
}
157-
158158
}
159159
if cumuSize > 0 {
160160
// todo build ipld from graphFiles
@@ -166,3 +166,36 @@ var chunkCmd = &cli.Command{
166166
return nil
167167
},
168168
}
169+
170+
var retrieveCmd = &cli.Command{
171+
Name: "retrieve",
172+
Usage: "Retrieve files from CAR files",
173+
Flags: []cli.Flag{
174+
&cli.StringFlag{
175+
Name: "car-path",
176+
Required: true,
177+
Usage: fmt.Sprintf("specify source car path, directory or file"),
178+
},
179+
&cli.StringFlag{
180+
Name: "output-dir",
181+
Required: true,
182+
Usage: fmt.Sprintf("specify output directory"),
183+
},
184+
&cli.IntFlag{
185+
Name: "parallel",
186+
Value: 4,
187+
Usage: fmt.Sprintf("specify how many number of goroutines runs when generate file node"),
188+
},
189+
},
190+
Action: func(c *cli.Context) error {
191+
parallel := c.Int("parallel")
192+
outputDir := c.String("output-dir")
193+
carPath := c.String("car-path")
194+
195+
CarTo(carPath, outputDir, parallel)
196+
Merge(outputDir, parallel)
197+
198+
fmt.Println("completed!")
199+
return nil
200+
},
201+
}

retrieve.go

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/ipfs/go-blockservice"
7+
"github.com/ipfs/go-cid"
8+
"github.com/ipfs/go-datastore"
9+
dss "github.com/ipfs/go-datastore/sync"
10+
bstore "github.com/ipfs/go-ipfs-blockstore"
11+
offline "github.com/ipfs/go-ipfs-exchange-offline"
12+
files "github.com/ipfs/go-ipfs-files"
13+
"github.com/ipfs/go-merkledag"
14+
unixfile "github.com/ipfs/go-unixfs/file"
15+
"github.com/ipld/go-car"
16+
"golang.org/x/xerrors"
17+
"io"
18+
"os"
19+
"path/filepath"
20+
"strings"
21+
"sync"
22+
)
23+
24+
func Import(path string, st car.Store) (cid.Cid, error) {
25+
f, err := os.Open(path)
26+
if err != nil {
27+
return cid.Undef, err
28+
}
29+
defer f.Close() //nolint:errcheck
30+
31+
stat, err := f.Stat()
32+
if err != nil {
33+
return cid.Undef, err
34+
}
35+
36+
file, err := files.NewReaderPathFile(path, f, stat)
37+
if err != nil {
38+
return cid.Undef, err
39+
}
40+
41+
result, err := car.LoadCar(st, file)
42+
if err != nil {
43+
return cid.Undef, err
44+
}
45+
46+
if len(result.Roots) != 1 {
47+
return cid.Undef, xerrors.New("cannot import car with more than one root")
48+
}
49+
50+
return result.Roots[0], nil
51+
}
52+
53+
func NodeWriteTo(nd files.Node, fpath string) error {
54+
switch nd := nd.(type) {
55+
case *files.Symlink:
56+
return os.Symlink(nd.Target, fpath)
57+
case files.File:
58+
f, err := os.Create(fpath)
59+
if err != nil {
60+
return err
61+
}
62+
defer f.Close()
63+
_, err = io.Copy(f, nd)
64+
if err != nil {
65+
return err
66+
}
67+
return nil
68+
case files.Directory:
69+
if !ExistDir(fpath) {
70+
err := os.Mkdir(fpath, 0777)
71+
if err != nil {
72+
return err
73+
}
74+
}
75+
76+
entries := nd.Entries()
77+
for entries.Next() {
78+
child := filepath.Join(fpath, entries.Name())
79+
if err := NodeWriteTo(entries.Node(), child); err != nil {
80+
return err
81+
}
82+
}
83+
return entries.Err()
84+
default:
85+
return fmt.Errorf("file type %T at %q is not supported", nd, fpath)
86+
}
87+
}
88+
89+
func ExistDir(path string) bool {
90+
s, err := os.Stat(path)
91+
if err != nil {
92+
return false
93+
}
94+
return s.IsDir()
95+
}
96+
97+
func CarTo(carPath, outputDir string, parallel int) {
98+
ctx := context.Background()
99+
bs2 := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore()))
100+
rdag := merkledag.NewDAGService(blockservice.New(bs2, offline.Exchange(bs2)))
101+
102+
workerCh := make(chan func())
103+
go func() {
104+
defer close(workerCh)
105+
err := filepath.Walk(carPath, func(path string, fi os.FileInfo, err error) error {
106+
if err != nil {
107+
return err
108+
}
109+
if fi.IsDir() {
110+
return nil
111+
}
112+
workerCh <- func() {
113+
log.Info(path)
114+
root, err := Import(path, bs2)
115+
if err != nil {
116+
log.Error("import error, ", err)
117+
return
118+
}
119+
nd, err := rdag.Get(ctx, root)
120+
if err != nil {
121+
log.Error("dagService.Get error, ", err)
122+
return
123+
}
124+
file, err := unixfile.NewUnixfsFile(ctx, rdag, nd)
125+
if err != nil {
126+
log.Error("NewUnixfsFile error, ", err)
127+
return
128+
}
129+
err = NodeWriteTo(file, outputDir)
130+
if err != nil {
131+
log.Error("NodeWriteTo error, ", err)
132+
}
133+
}
134+
return nil
135+
})
136+
if err != nil {
137+
log.Error("Walk path failed, ", err)
138+
}
139+
}()
140+
141+
limitCh := make(chan struct{}, parallel)
142+
wg := sync.WaitGroup{}
143+
func() {
144+
for {
145+
select {
146+
case taskFunc, ok := <-workerCh:
147+
if !ok {
148+
return
149+
}
150+
limitCh <- struct{}{}
151+
wg.Add(1)
152+
go func() {
153+
defer func() {
154+
<-limitCh
155+
wg.Done()
156+
}()
157+
taskFunc()
158+
}()
159+
}
160+
}
161+
}()
162+
wg.Wait()
163+
}
164+
165+
func Merge(dir string, parallel int) {
166+
wg := sync.WaitGroup{}
167+
limitCh := make(chan struct{}, parallel)
168+
mergeCh := make(chan string)
169+
wg.Add(1)
170+
go func() {
171+
defer wg.Done()
172+
for {
173+
select {
174+
case fpath, ok := <-mergeCh:
175+
if !ok {
176+
return
177+
}
178+
limitCh <- struct{}{}
179+
wg.Add(1)
180+
go func() {
181+
defer func() {
182+
<-limitCh
183+
wg.Done()
184+
}()
185+
log.Info("merge to ", fpath)
186+
f, err := os.Create(fpath)
187+
if err != nil {
188+
log.Error("Create file failed, ", err)
189+
return
190+
}
191+
defer f.Close()
192+
for i := 0; ; i++ {
193+
chunkPath := fmt.Sprintf("%s.%08d", fpath, i)
194+
err := func(path string) error {
195+
chunkF, err := os.Open(path)
196+
if err != nil {
197+
if os.IsExist(err) {
198+
log.Error("Open file failed, ", err)
199+
}
200+
return err
201+
}
202+
defer chunkF.Close()
203+
_, err = io.Copy(f, chunkF)
204+
if err != nil {
205+
log.Error("io.Copy failed, ", err)
206+
}
207+
return err
208+
}(chunkPath)
209+
os.Remove(chunkPath)
210+
if err != nil {
211+
break
212+
}
213+
}
214+
}()
215+
}
216+
}
217+
}()
218+
err := filepath.Walk(dir, func(path string, fi os.FileInfo, err error) error {
219+
if err != nil {
220+
return err
221+
}
222+
if fi.IsDir() {
223+
return nil
224+
}
225+
matched, err := filepath.Match("*.00000000", fi.Name())
226+
if err != nil {
227+
log.Error("filepath.Match failed, ", err)
228+
return nil
229+
} else if matched {
230+
mergeCh <- strings.TrimSuffix(path, ".00000000")
231+
}
232+
return nil
233+
})
234+
if err != nil {
235+
log.Error("Walk path failed, ", err)
236+
}
237+
close(mergeCh)
238+
wg.Wait()
239+
}

0 commit comments

Comments
 (0)