Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/push_pull.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,11 @@ jobs:
bash <(curl -s https://codecov.io/bash) -f coverage.txt -cF $os,$go
env:
CODECOV_TOKEN: be731f61-6ca9-42b0-8f1a-59f4a0b28c0d

- name: Test races
run: |
make test-race
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}


1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ CI_REPOSITORY ?= https://github.com/src-d/ci.git
CI_BRANCH ?= v1
CI_PATH ?= .ci
MAKEFILE := $(CI_PATH)/Makefile.main
TEST_RACE ?= true
$(MAKEFILE):
git clone --quiet --depth 1 -b $(CI_BRANCH) $(CI_REPOSITORY) $(CI_PATH);
-include $(MAKEFILE)
7 changes: 3 additions & 4 deletions downloader/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
func Download(ctx context.Context, job *library.Job) error {
logger := job.Logger.New(log.Fields{"job": "download", "id": job.ID})
if job.Type != library.JobDownload ||
len(job.Endpoints) == 0 ||
len(job.Endpoints()) == 0 ||
job.Lib == nil ||
job.TempFS == nil {
err := ErrNotDownloadJob.New()
Expand All @@ -46,7 +46,7 @@ func Download(ctx context.Context, job *library.Job) error {
return err
}

endpoint := job.Endpoints[0]
endpoint := job.Endpoints()[0]
logger = logger.New(log.Fields{"url": endpoint})

repoID, err := library.NewRepositoryID(endpoint)
Expand Down Expand Up @@ -113,8 +113,7 @@ func libHas(
select {
case <-done:
case <-ctx.Done():
ok = false
err = ctx.Err()
return false, "", ctx.Err()
}

return ok, locID, err
Expand Down
41 changes: 25 additions & 16 deletions downloader/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,16 @@ func testFSWithErrors(t *testing.T, h *testhelper.Helper, fsOpts testhelper.Brok
require.NoError(t, err)

testRepo := tests[0].repoIDs[0]
err = Download(context.Background(), &library.Job{
job := &library.Job{
Lib: lib,
Type: library.JobDownload,
Endpoints: []string{endPoint(gitProtocol, testRepo)},
TempFS: h.TempFS,
AuthToken: func(string) string { return "" },
Logger: log.New(nil),
})
}
job.SetEndpoints([]string{endPoint(gitProtocol, testRepo)})

err = Download(context.Background(), job)
require.Error(t, err)
require.Contains(t, err.Error(), errBrokenFS.Error())
}
Expand All @@ -195,14 +196,16 @@ func testAuthSuccess(t *testing.T, h *testhelper.Helper) {
t.Skip()
}

require.NoError(t, Download(context.Background(), &library.Job{
job := &library.Job{
Lib: h.Lib,
Type: library.JobDownload,
Endpoints: []string{endPoint(httpsProtocol, testPrivateRepo.repoIDs[0])},
TempFS: h.TempFS,
AuthToken: func(string) string { return token },
Logger: log.New(nil),
}))
}
job.SetEndpoints([]string{endPoint(httpsProtocol, testPrivateRepo.repoIDs[0])})

require.NoError(t, Download(context.Background(), job))
}

// testAuthErrors
Expand All @@ -212,14 +215,16 @@ func testAuthSuccess(t *testing.T, h *testhelper.Helper) {
// <expected> error: authentication required
func testAuthErrors(t *testing.T, h *testhelper.Helper) {
getJob := func(p protocol) *library.Job {
return &library.Job{
job := &library.Job{
Lib: h.Lib,
Type: library.JobDownload,
Endpoints: []string{endPoint(p, testPrivateRepo.repoIDs[0])},
TempFS: h.TempFS,
AuthToken: func(string) string { return "42" },
Logger: log.New(nil),
}
job.SetEndpoints([]string{endPoint(p, testPrivateRepo.repoIDs[0])})

return job
}

ctx := context.Background()
Expand All @@ -236,14 +241,16 @@ func testContextCancelledFail(t *testing.T, h *testhelper.Helper) {
cancel()

testRepo := tests[0].repoIDs[0]
require.Equal(t, fmt.Errorf("context canceled"), Download(ctx, &library.Job{
job := &library.Job{
Lib: h.Lib,
Type: library.JobDownload,
Endpoints: []string{endPoint(gitProtocol, testRepo)},
TempFS: h.TempFS,
AuthToken: func(string) string { return "" },
Logger: log.New(nil),
}))
}
job.SetEndpoints([]string{endPoint(gitProtocol, testRepo)})

require.Equal(t, fmt.Errorf("context canceled"), Download(ctx, job))
}

// testWrongEndpointFail
Expand All @@ -253,14 +260,16 @@ func testContextCancelledFail(t *testing.T, h *testhelper.Helper) {
func testWrongEndpointFail(t *testing.T, h *testhelper.Helper) {
const corruptedEndpoint = "git://42.git"

err := Download(context.Background(), &library.Job{
job := &library.Job{
Lib: h.Lib,
Type: library.JobDownload,
Endpoints: []string{corruptedEndpoint},
TempFS: h.TempFS,
AuthToken: func(string) string { return "" },
Logger: log.New(nil),
})
}
job.SetEndpoints([]string{corruptedEndpoint})

err := Download(context.Background(), job)
require.Error(t, err)

e, ok := err.(*net.OpError)
Expand All @@ -279,11 +288,11 @@ func testAlreadyDownloadedFail(t *testing.T, h *testhelper.Helper) {
job := &library.Job{
Lib: h.Lib,
Type: library.JobDownload,
Endpoints: []string{endPoint(gitProtocol, testRepo)},
TempFS: h.TempFS,
AuthToken: func(string) string { return "" },
Logger: log.New(nil),
}
job.SetEndpoints([]string{endPoint(gitProtocol, testRepo)})

ctx := context.Background()
require.NoError(t, Download(ctx, job))
Expand Down Expand Up @@ -401,11 +410,11 @@ func concurrentDownloads(h *testhelper.Helper, p protocol) chan error {
job := &library.Job{
Lib: h.Lib,
Type: library.JobDownload,
Endpoints: []string{endPoint(p, id)},
TempFS: h.TempFS,
AuthToken: func(string) string { return "" },
Logger: log.New(nil),
}
job.SetEndpoints([]string{endPoint(p, id)})

jobs = append(jobs, job)
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/src-d/envconfig v1.0.0 // indirect
github.com/src-d/go-borges v0.0.0-20190704083038-44867e8f2a2a
github.com/stretchr/testify v1.3.0
github.com/stretchr/testify v1.4.0
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect
Expand All @@ -41,5 +41,6 @@ require (
gopkg.in/src-d/go-errors.v1 v1.0.0
gopkg.in/src-d/go-git.v4 v4.12.0
gopkg.in/src-d/go-log.v1 v1.0.2
gopkg.in/yaml.v2 v2.2.4 // indirect
gotest.tools v2.2.0+incompatible // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xanzy/ssh-agent v0.2.0 h1:Adglfbi5p9Z0BmK2oKU9nTG+zKfniSfnaMYB+ULd+Ro=
Expand Down Expand Up @@ -214,5 +216,7 @@ gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRN
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
19 changes: 18 additions & 1 deletion library/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package library

import (
"context"
"sync"

"github.com/src-d/gitcollector"
"github.com/src-d/go-borges"
Expand Down Expand Up @@ -31,10 +32,11 @@ const (

// Job represents a gitcollector.Job to perform a task on a borges.Library.
type Job struct {
mu sync.Mutex
endpoints []string
ID string
Type JobType
Lib borges.Library
Endpoints []string
TempFS billy.Filesystem
LocationID borges.LocationID
AllowUpdate bool
Expand All @@ -48,6 +50,21 @@ var _ gitcollector.Job = (*Job)(nil)
// JobFn represents the task to be performed by a Job.
type JobFn func(context.Context, *Job) error

// TODO: we should probably secure other fiels
func (j *Job) SetEndpoints(endpoints []string) {
j.mu.Lock()
defer j.mu.Unlock()

j.endpoints = endpoints
}

func (j *Job) Endpoints() []string {
j.mu.Lock()
defer j.mu.Unlock()

return j.endpoints
}

// Process implements the Job interface.
func (j *Job) Process(ctx context.Context) error {
if j.ProcessFn == nil {
Expand Down
8 changes: 4 additions & 4 deletions library/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestJobScheduleFn(t *testing.T) {
mu.Lock()
defer mu.Unlock()

got = append(got, j.Endpoints[0])
got = append(got, j.Endpoints()[0])
return nil
}
)
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestDownloadJobScheduleFn(t *testing.T) {
mu.Lock()
defer mu.Unlock()

got = append(got, j.Endpoints[0])
got = append(got, j.Endpoints()[0])
return nil
}
)
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestUpdateJobScheduleFn(t *testing.T) {
mu.Lock()
defer mu.Unlock()

got = append(got, j.Endpoints[0])
got = append(got, j.Endpoints()[0])
return nil
}
)
Expand Down Expand Up @@ -127,7 +127,7 @@ func testScheduleFn(

queue <- &Job{
Type: t,
Endpoints: []string{e},
endpoints: []string{e},
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ func (c *Collector) modifyMetrics(job *library.Job, kind int) error {
break
}

for range job.Endpoints {
for range job.Endpoints() {
c.successUpdateCount++
}
case failKind:
for range job.Endpoints {
for range job.Endpoints() {
c.failCount++
}
case discoverKind:
Expand Down Expand Up @@ -351,16 +351,16 @@ func (c *CollectorByOrg) Discover(job gitcollector.Job) {
func triageJob(job gitcollector.Job) map[string]*library.Job {
organizations := map[string]*library.Job{}
lj, _ := job.(*library.Job)
for _, ep := range lj.Endpoints {
for _, ep := range lj.Endpoints() {
org := library.GetOrgFromEndpoint(ep)
j, ok := organizations[org]
if !ok {
j = &(*lj)
j.Endpoints = []string{}
j.SetEndpoints([]string{})
organizations[org] = j
}

j.Endpoints = append(j.Endpoints, ep)
j.SetEndpoints(append(j.Endpoints(), ep))
}

return organizations
Expand Down
Loading