Skip to content

Commit 62a8ad4

Browse files
committed
*: refactor discovery
Signed-off-by: Manuel Carmona <[email protected]>
1 parent 1c7fcd7 commit 62a8ad4

File tree

15 files changed

+502
-317
lines changed

15 files changed

+502
-317
lines changed

cmd/gitcollector/subcmd/download.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/src-d/gitcollector/downloader"
1515
"github.com/src-d/gitcollector/library"
1616
"github.com/src-d/gitcollector/metrics"
17+
"github.com/src-d/gitcollector/provider"
1718
"github.com/src-d/go-borges/siva"
1819
"gopkg.in/src-d/go-billy.v4/osfs"
1920
"gopkg.in/src-d/go-cli.v0"
@@ -75,7 +76,7 @@ func (c *DownloadCmd) Execute(args []string) error {
7576
log.Debugf("temporal dir: %s", tmpPath)
7677
temp := osfs.New(tmpPath)
7778

78-
lib, err := siva.NewLibrary("test", fs, siva.LibraryOptions{
79+
lib, err := siva.NewLibrary("test", fs, &siva.LibraryOptions{
7980
Bucket: 2,
8081
Transactional: true,
8182
TempFS: temp,
@@ -190,15 +191,11 @@ func runGHOrgProviders(
190191
wg.Add(len(orgs))
191192
for _, o := range orgs {
192193
org := o
193-
p := discovery.NewGHProvider(
194+
p := provider.NewGitHubOrg(
195+
org,
196+
token,
194197
download,
195-
discovery.NewGHOrgReposIter(
196-
org,
197-
&discovery.GHReposIterOpts{
198-
AuthToken: token,
199-
},
200-
),
201-
&discovery.GHProviderOpts{
198+
&discovery.GitHubOpts{
202199
SkipForks: skipForks,
203200
},
204201
)
File renamed without changes.

discovery/github.go

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
package discovery
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"gopkg.in/src-d/go-errors.v1"
8+
9+
"github.com/google/go-github/github"
10+
"github.com/jpillora/backoff"
11+
)
12+
13+
var (
14+
// ErrEndpointsNotFound is the returned error when couldn't find
15+
// endpoints for a certain repository.
16+
ErrEndpointsNotFound = errors.NewKind("endpoinds not found for %s")
17+
18+
// ErrNewRepositoriesNotFound is returned when there aren't new
19+
// repositories in the organization.
20+
ErrNewRepositoriesNotFound = errors.NewKind(
21+
"couldn't find new repositories")
22+
23+
// ErrRateLimitExceeded is returned when the api rate limit is reached.
24+
ErrRateLimitExceeded = errors.NewKind("rate limit requests exceeded")
25+
26+
// ErrDiscoveryStopped is returned when a discovery has been stopped.
27+
ErrDiscoveryStopped = errors.NewKind("discovery stopped")
28+
29+
// ErrDiscoveryStop is returned when a discovery fails on Stop.
30+
ErrDiscoveryStop = errors.NewKind("discovery failed on stop")
31+
32+
// ErrAdvertiseTimeout is returned when an advertise functions exceeds
33+
// the timeout.
34+
ErrAdvertiseTimeout = errors.NewKind("advertise repositories timeout")
35+
)
36+
37+
// AdvertiseGHRepositoriesFn is used by a GitHub to notify that a new
38+
// repository has been discovered.
39+
type AdvertiseGHRepositoriesFn func(context.Context, []*github.Repository) error
40+
41+
// GitHubOpts represents configuration options for a GitHub discovery.
42+
type GitHubOpts struct {
43+
AdvertiseTimeout time.Duration
44+
SkipForks bool
45+
WaitNewRepos bool
46+
WaitOnRateLimit bool
47+
StopTimeout time.Duration
48+
MaxJobBuffer int
49+
BatchSize int
50+
}
51+
52+
// GitHub will retrieve the information for all the repositories for the
53+
// given GHRepositoriesIterator.
54+
type GitHub struct {
55+
advertiseRepos AdvertiseGHRepositoriesFn
56+
iter GHRepositoriesIter
57+
batch []*github.Repository
58+
retryJobs []*github.Repository
59+
cancel chan struct{}
60+
backoff *backoff.Backoff
61+
opts *GitHubOpts
62+
}
63+
64+
const (
65+
stopTimeout = 10 * time.Second
66+
batchSize = 1
67+
)
68+
69+
// NewGitHub builds a new GitHub.
70+
func NewGitHub(
71+
advertiseRepos AdvertiseGHRepositoriesFn,
72+
iter GHRepositoriesIter,
73+
opts *GitHubOpts,
74+
) *GitHub {
75+
if opts == nil {
76+
opts = &GitHubOpts{}
77+
}
78+
79+
if opts.StopTimeout <= 0 {
80+
opts.StopTimeout = stopTimeout
81+
}
82+
83+
if opts.BatchSize <= 0 {
84+
opts.BatchSize = batchSize
85+
}
86+
87+
if opts.MaxJobBuffer <= 0 {
88+
opts.MaxJobBuffer = opts.BatchSize * 2
89+
}
90+
91+
if opts.AdvertiseTimeout <= 0 {
92+
to := time.Duration(5*opts.BatchSize) * time.Second
93+
opts.AdvertiseTimeout = to
94+
}
95+
96+
if advertiseRepos == nil {
97+
advertiseRepos = func(
98+
context.Context,
99+
[]*github.Repository,
100+
) error {
101+
return nil
102+
}
103+
}
104+
105+
return &GitHub{
106+
advertiseRepos: advertiseRepos,
107+
iter: iter,
108+
batch: make([]*github.Repository, 0, opts.BatchSize),
109+
retryJobs: make([]*github.Repository, 0, opts.MaxJobBuffer),
110+
cancel: make(chan struct{}),
111+
backoff: newBackoff(),
112+
opts: opts,
113+
}
114+
}
115+
116+
func newBackoff() *backoff.Backoff {
117+
const (
118+
minDuration = 500 * time.Millisecond
119+
maxDuration = 5 * time.Second
120+
factor = 4
121+
)
122+
123+
return &backoff.Backoff{
124+
Min: minDuration,
125+
Max: maxDuration,
126+
Factor: factor,
127+
Jitter: true,
128+
}
129+
}
130+
131+
// Start starts the GitHub.
132+
func (p *GitHub) Start() error {
133+
ctx, cancel := context.WithCancel(context.Background())
134+
defer cancel()
135+
136+
for {
137+
var err error
138+
defer func() {
139+
if ErrDiscoveryStopped.Is(err) && len(p.batch) > 0 {
140+
if de := p.sendBatch(ctx); err != nil {
141+
err = de
142+
}
143+
}
144+
}()
145+
146+
done := make(chan struct{})
147+
go func() {
148+
err = p.discoverRepositories(ctx)
149+
close(done)
150+
}()
151+
152+
select {
153+
case <-done:
154+
if err != nil {
155+
return err
156+
}
157+
case <-p.cancel:
158+
return ErrDiscoveryStopped.New()
159+
}
160+
}
161+
}
162+
163+
func (p *GitHub) discoverRepositories(ctx context.Context) error {
164+
if len(p.retryJobs) > 0 {
165+
job := p.retryJobs[0]
166+
p.retryJobs = p.retryJobs[1:]
167+
p.batch = append(p.batch, job)
168+
} else {
169+
repo, retry, err := p.iter.Next(ctx)
170+
if err != nil {
171+
if ErrNewRepositoriesNotFound.Is(err) &&
172+
!p.opts.WaitNewRepos {
173+
return ErrDiscoveryStopped.Wrap(err)
174+
}
175+
176+
if ErrRateLimitExceeded.Is(err) &&
177+
!p.opts.WaitOnRateLimit {
178+
return ErrDiscoveryStopped.Wrap(err)
179+
}
180+
181+
if retry <= 0 {
182+
return err
183+
}
184+
185+
time.Sleep(retry)
186+
return nil
187+
}
188+
189+
if p.opts.SkipForks && repo.GetFork() {
190+
return nil
191+
}
192+
193+
p.batch = append(p.batch, repo)
194+
}
195+
196+
if len(p.batch) < p.opts.BatchSize {
197+
return nil
198+
}
199+
200+
ctxto, cancel := context.WithTimeout(ctx, p.opts.AdvertiseTimeout)
201+
defer cancel()
202+
203+
if err := p.sendBatch(ctxto); err != nil {
204+
if !ErrAdvertiseTimeout.Is(err) {
205+
return err
206+
}
207+
208+
time.Sleep(p.backoff.Duration())
209+
} else {
210+
p.backoff.Reset()
211+
}
212+
213+
return nil
214+
}
215+
216+
func (p *GitHub) sendBatch(ctx context.Context) error {
217+
if err := p.advertiseRepos(ctx, p.batch); err != nil {
218+
return err
219+
}
220+
221+
p.batch = make([]*github.Repository, 0, p.opts.BatchSize)
222+
return nil
223+
}
224+
225+
// GetGHEndpoint gets the enpoint for a github repository.
226+
func GetGHEndpoint(r *github.Repository) (string, error) {
227+
var endpoint string
228+
getURLs := []func() string{
229+
r.GetHTMLURL,
230+
r.GetGitURL,
231+
r.GetSSHURL,
232+
}
233+
234+
for _, getURL := range getURLs {
235+
ep := getURL()
236+
if ep != "" {
237+
endpoint = ep
238+
break
239+
}
240+
}
241+
242+
if endpoint == "" {
243+
return "", ErrEndpointsNotFound.New(r.GetFullName())
244+
}
245+
246+
return endpoint, nil
247+
}
248+
249+
// Stop stops the GitHub.
250+
func (p *GitHub) Stop() error {
251+
select {
252+
case p.cancel <- struct{}{}:
253+
return nil
254+
case <-time.After(p.opts.StopTimeout):
255+
return ErrDiscoveryStop.New()
256+
}
257+
}

0 commit comments

Comments
 (0)