Skip to content

Commit da0e3e1

Browse files
authored
Merge pull request #41 from mcarmonaa/improvement/support-multi-org
Improvement/support multi org
2 parents 0190e27 + a07c014 commit da0e3e1

File tree

10 files changed

+424
-246
lines changed

10 files changed

+424
-246
lines changed

cmd/gitcollector/subcmd/download.go

Lines changed: 80 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"io/ioutil"
66
"os"
77
"runtime"
8+
"strings"
9+
"sync"
810
"time"
911

1012
"github.com/src-d/gitcollector"
@@ -27,7 +29,7 @@ type DownloadCmd struct {
2729
TmpPath string `long:"tmp" description:"directory to place generated temporal files" default:"/tmp" env:"GITCOLLECTOR_TMP"`
2830
Workers int `long:"workers" description:"number of workers, default to GOMAXPROCS" env:"GITCOLLECTOR_WORKERS"`
2931
NotAllowUpdates bool `long:"no-updates" description:"don't allow updates on already downloaded repositories" env:"GITCOLLECTOR_NO_UPDATES"`
30-
Org string `long:"org" env:"GITHUB_ORGANIZATION" description:"github organization" required:"true"`
32+
Orgs string `long:"orgs" env:"GITHUB_ORGANIZATIONS" description:"list of github organization names separated by comma" required:"true"`
3133
Token string `long:"token" env:"GITHUB_TOKEN" description:"github token"`
3234
MetricsDBURI string `long:"metrics-db" env:"GITCOLLECTOR_METRICS_DB_URI" description:"uri to a database where metrics will be sent"`
3335
MetricsDBTable string `long:"metrics-db-table" env:"GITCOLLECTOR_METRICS_DB_TABLE" default:"gitcollector_metrics" description:"table name where the metrics will be added"`
@@ -36,6 +38,10 @@ type DownloadCmd struct {
3638

3739
// Execute runs the command.
3840
func (c *DownloadCmd) Execute(args []string) error {
41+
start := time.Now()
42+
43+
orgs := strings.Split(c.Orgs, ",")
44+
3945
info, err := os.Stat(c.LibPath)
4046
check(err, "wrong path to locate the library")
4147

@@ -73,7 +79,9 @@ func (c *DownloadCmd) Execute(args []string) error {
7379
authTokens := map[string]string{}
7480
if c.Token != "" {
7581
log.Debugf("acces token found")
76-
authTokens[c.Org] = c.Token
82+
for _, org := range orgs {
83+
authTokens[org] = c.Token
84+
}
7785
}
7886

7987
workers := c.Workers
@@ -103,16 +111,12 @@ func (c *DownloadCmd) Execute(args []string) error {
103111

104112
var mc gitcollector.MetricsCollector
105113
if c.MetricsDBURI != "" {
106-
db, err := metrics.PrepareDB(
107-
c.MetricsDBURI, c.MetricsDBTable, c.Org,
114+
mc = setupMetrics(
115+
c.MetricsDBURI,
116+
c.MetricsDBTable,
117+
orgs,
118+
c.MetricsSync,
108119
)
109-
check(err, "metrics database")
110-
111-
mc = metrics.NewCollector(&metrics.CollectorOpts{
112-
Log: log.New(nil),
113-
Send: metrics.SendToDB(db, c.MetricsDBTable, c.Org),
114-
SyncTime: time.Duration(c.MetricsSync) * time.Second,
115-
})
116120

117121
log.Debugf("metrics collection activated: sync timeout %d",
118122
c.MetricsSync)
@@ -125,23 +129,13 @@ func (c *DownloadCmd) Execute(args []string) error {
125129
wp.Run()
126130
log.Debugf("worker pool is running")
127131

128-
dp := discovery.NewGHProvider(
129-
c.Org,
130-
download,
131-
&discovery.GHProviderOpts{
132-
AuthToken: c.Token,
133-
},
134-
)
132+
go runGHOrgProviders(log.New(nil), orgs, c.Token, download)
135133

136-
log.Debugf("github provider started")
137-
if err := dp.Start(); err != nil &&
138-
!gitcollector.ErrProviderStopped.Is(err) {
139-
check(err, "github provider failed")
140-
}
141-
142-
close(download)
143134
wp.Wait()
144135
log.Debugf("worker pool stopped successfully")
136+
137+
elapsed := time.Since(start).String()
138+
log.Infof("collection finished in %s", elapsed)
145139
return nil
146140
}
147141

@@ -151,3 +145,64 @@ func check(err error, message string) {
151145
os.Exit(1)
152146
}
153147
}
148+
149+
func setupMetrics(
150+
uri, table string,
151+
orgs []string,
152+
metricSync int64,
153+
) gitcollector.MetricsCollector {
154+
db, err := metrics.PrepareDB(uri, table, orgs)
155+
check(err, "metrics database")
156+
157+
mcs := make(map[string]*metrics.Collector, len(orgs))
158+
for _, org := range orgs {
159+
mc := metrics.NewCollector(&metrics.CollectorOpts{
160+
Log: log.New(log.Fields{"org": org}),
161+
Send: metrics.SendToDB(db, table, org),
162+
SyncTime: time.Duration(metricSync) * time.Second,
163+
})
164+
165+
mcs[org] = mc
166+
}
167+
168+
return metrics.NewCollectorByOrg(mcs)
169+
}
170+
171+
func runGHOrgProviders(
172+
logger log.Logger,
173+
orgs []string,
174+
token string,
175+
download chan gitcollector.Job,
176+
) {
177+
var wg sync.WaitGroup
178+
wg.Add(len(orgs))
179+
for _, o := range orgs {
180+
org := o
181+
p := discovery.NewGHProvider(
182+
download,
183+
discovery.NewGHOrgReposIter(
184+
org,
185+
&discovery.GHReposIterOpts{
186+
AuthToken: token,
187+
},
188+
),
189+
&discovery.GHProviderOpts{},
190+
)
191+
192+
go func() {
193+
err := p.Start()
194+
if err != nil &&
195+
!discovery.ErrNewRepositoriesNotFound.Is(err) {
196+
logger.Warningf(err.Error())
197+
}
198+
199+
logger.Debugf("%s organization provider stopped", org)
200+
wg.Done()
201+
}()
202+
203+
logger.Debugf("%s organization provider started", org)
204+
}
205+
206+
wg.Wait()
207+
close(download)
208+
}

discovery/iterator.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package discovery
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"time"
7+
8+
"github.com/google/go-github/github"
9+
"golang.org/x/oauth2"
10+
)
11+
12+
// GHReposIterOpts represents configuration options for a GHReposIter.
13+
type GHReposIterOpts struct {
14+
HTTPTimeout time.Duration
15+
ResultsPerPage int
16+
TimeNewRepos time.Duration
17+
AuthToken string
18+
}
19+
20+
const (
21+
httpTimeout = 30 * time.Second
22+
resultsPerPage = 100
23+
waitNewRepos = 24 * time.Hour
24+
)
25+
26+
// GHOrgReposIter is a GHRepositoriesIter by organization name.
27+
type GHOrgReposIter struct {
28+
org string
29+
client *github.Client
30+
repos []*github.Repository
31+
checkpoint int
32+
opts *github.RepositoryListByOrgOptions
33+
waitNewRepos time.Duration
34+
}
35+
36+
var _ GHRepositoriesIter = (*GHOrgReposIter)(nil)
37+
38+
// NewGHOrgReposIter builds a new GHOrgReposIter.
39+
func NewGHOrgReposIter(org string, opts *GHReposIterOpts) *GHOrgReposIter {
40+
if opts == nil {
41+
opts = &GHReposIterOpts{}
42+
}
43+
44+
to := opts.HTTPTimeout
45+
if to <= 0 {
46+
to = httpTimeout
47+
}
48+
49+
rpp := opts.ResultsPerPage
50+
if rpp <= 0 || rpp > 100 {
51+
rpp = resultsPerPage
52+
}
53+
54+
wnr := opts.TimeNewRepos
55+
if wnr <= 0 {
56+
wnr = waitNewRepos
57+
}
58+
59+
return &GHOrgReposIter{
60+
org: org,
61+
client: newGithubClient(opts.AuthToken, to),
62+
opts: &github.RepositoryListByOrgOptions{
63+
ListOptions: github.ListOptions{PerPage: rpp},
64+
},
65+
waitNewRepos: wnr,
66+
}
67+
}
68+
69+
func newGithubClient(token string, timeout time.Duration) *github.Client {
70+
var client *http.Client
71+
if token == "" {
72+
client = &http.Client{}
73+
} else {
74+
client = oauth2.NewClient(
75+
context.Background(),
76+
oauth2.StaticTokenSource(
77+
&oauth2.Token{AccessToken: token},
78+
),
79+
)
80+
}
81+
82+
client.Timeout = timeout
83+
return github.NewClient(client)
84+
}
85+
86+
// Next implements the GHRepositoriesIter interface.
87+
func (p *GHOrgReposIter) Next() (*github.Repository, time.Duration, error) {
88+
if len(p.repos) == 0 {
89+
retry, err := p.requestRepos()
90+
if err != nil && len(p.repos) == 0 {
91+
return nil, retry, err
92+
}
93+
}
94+
95+
var next *github.Repository
96+
next, p.repos = p.repos[0], p.repos[1:]
97+
return next, 0, nil
98+
}
99+
100+
func (p *GHOrgReposIter) requestRepos() (time.Duration, error) {
101+
repos, res, err := p.client.Repositories.ListByOrg(
102+
context.Background(),
103+
p.org,
104+
p.opts,
105+
)
106+
107+
if err != nil {
108+
if _, ok := err.(*github.RateLimitError); !ok {
109+
return -1, err
110+
}
111+
112+
return timeToRetry(res), ErrRateLimitExceeded.Wrap(err)
113+
}
114+
115+
bufRepos := repos
116+
if p.checkpoint > 0 {
117+
i := p.checkpoint
118+
if len(repos) < p.checkpoint {
119+
// return err?
120+
i = 0
121+
}
122+
123+
bufRepos = repos[i:]
124+
}
125+
126+
if len(repos) < p.opts.PerPage {
127+
p.checkpoint = len(repos)
128+
}
129+
130+
err = nil
131+
if res.NextPage == 0 {
132+
if len(repos) == p.opts.PerPage {
133+
p.opts.Page++
134+
}
135+
136+
err = ErrNewRepositoriesNotFound.New()
137+
} else {
138+
p.opts.Page = res.NextPage
139+
}
140+
141+
p.repos = bufRepos
142+
return p.waitNewRepos, err
143+
}
144+
145+
func timeToRetry(res *github.Response) time.Duration {
146+
now := time.Now().UTC().Unix()
147+
resetTime := res.Rate.Reset.UTC().Unix()
148+
timeToReset := time.Duration(resetTime-now) * time.Second
149+
remaining := res.Rate.Remaining
150+
if timeToReset < 0 || timeToReset > 1*time.Hour {
151+
// If this happens, the system clock is probably wrong, so we
152+
// assume we are at the beginning of the window and consider
153+
// only total requests per hour.
154+
timeToReset = 1 * time.Hour
155+
remaining = res.Rate.Limit
156+
}
157+
158+
return timeToReset / time.Duration(remaining+1)
159+
}

0 commit comments

Comments
 (0)