Skip to content

Commit 22f2495

Browse files
committed
Merge branch 'postgres-lock-fix'
Addresses: #4
2 parents 8f6826c + 1519c59 commit 22f2495

File tree

2 files changed

+61
-14
lines changed

2 files changed

+61
-14
lines changed

database/postgres/postgres.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// +build go1.9
2+
13
package postgres
24

35
import (
@@ -7,9 +9,10 @@ import (
79
"io/ioutil"
810
nurl "net/url"
911

10-
"github.com/lib/pq"
12+
"context"
1113
"github.com/golang-migrate/migrate"
1214
"github.com/golang-migrate/migrate/database"
15+
"github.com/lib/pq"
1316
)
1417

1518
func init() {
@@ -33,7 +36,8 @@ type Config struct {
3336
}
3437

3538
type Postgres struct {
36-
db *sql.DB
39+
// Locking and unlocking need to use the same connection
40+
conn *sql.Conn
3741
isLocked bool
3842

3943
// Open and WithInstance need to garantuee that config is never nil
@@ -65,8 +69,14 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
6569
config.MigrationsTable = DefaultMigrationsTable
6670
}
6771

72+
conn, err := instance.Conn(context.Background())
73+
74+
if err != nil {
75+
return nil, err
76+
}
77+
6878
px := &Postgres{
69-
db: instance,
79+
conn: conn,
7080
config: config,
7181
}
7282

@@ -105,7 +115,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
105115
}
106116

107117
func (p *Postgres) Close() error {
108-
return p.db.Close()
118+
return p.conn.Close()
109119
}
110120

111121
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
@@ -123,7 +133,7 @@ func (p *Postgres) Lock() error {
123133
// or return false if the lock cannot be acquired immediately.
124134
query := `SELECT pg_try_advisory_lock($1)`
125135
var success bool
126-
if err := p.db.QueryRow(query, aid).Scan(&success); err != nil {
136+
if err := p.conn.QueryRowContext(context.Background(), query, aid).Scan(&success); err != nil {
127137
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
128138
}
129139

@@ -146,7 +156,7 @@ func (p *Postgres) Unlock() error {
146156
}
147157

148158
query := `SELECT pg_advisory_unlock($1)`
149-
if _, err := p.db.Exec(query, aid); err != nil {
159+
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
150160
return &database.Error{OrigErr: err, Query: []byte(query)}
151161
}
152162
p.isLocked = false
@@ -161,7 +171,7 @@ func (p *Postgres) Run(migration io.Reader) error {
161171

162172
// run migration
163173
query := string(migr[:])
164-
if _, err := p.db.Exec(query); err != nil {
174+
if _, err := p.conn.ExecContext(context.Background(), query); err != nil {
165175
// TODO: cast to postgress error and get line number
166176
return database.Error{OrigErr: err, Err: "migration failed", Query: migr}
167177
}
@@ -170,7 +180,7 @@ func (p *Postgres) Run(migration io.Reader) error {
170180
}
171181

172182
func (p *Postgres) SetVersion(version int, dirty bool) error {
173-
tx, err := p.db.Begin()
183+
tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{})
174184
if err != nil {
175185
return &database.Error{OrigErr: err, Err: "transaction start failed"}
176186
}
@@ -198,7 +208,7 @@ func (p *Postgres) SetVersion(version int, dirty bool) error {
198208

199209
func (p *Postgres) Version() (version int, dirty bool, err error) {
200210
query := `SELECT version, dirty FROM "` + p.config.MigrationsTable + `" LIMIT 1`
201-
err = p.db.QueryRow(query).Scan(&version, &dirty)
211+
err = p.conn.QueryRowContext(context.Background(), query).Scan(&version, &dirty)
202212
switch {
203213
case err == sql.ErrNoRows:
204214
return database.NilVersion, false, nil
@@ -219,7 +229,7 @@ func (p *Postgres) Version() (version int, dirty bool, err error) {
219229
func (p *Postgres) Drop() error {
220230
// select all tables in current schema
221231
query := `SELECT table_name FROM information_schema.tables WHERE table_schema=(SELECT current_schema())`
222-
tables, err := p.db.Query(query)
232+
tables, err := p.conn.QueryContext(context.Background(), query)
223233
if err != nil {
224234
return &database.Error{OrigErr: err, Query: []byte(query)}
225235
}
@@ -241,7 +251,7 @@ func (p *Postgres) Drop() error {
241251
// delete one by one ...
242252
for _, t := range tableNames {
243253
query = `DROP TABLE IF EXISTS ` + t + ` CASCADE`
244-
if _, err := p.db.Exec(query); err != nil {
254+
if _, err := p.conn.ExecContext(context.Background(), query); err != nil {
245255
return &database.Error{OrigErr: err, Query: []byte(query)}
246256
}
247257
}
@@ -257,7 +267,7 @@ func (p *Postgres) ensureVersionTable() error {
257267
// check if migration table exists
258268
var count int
259269
query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1`
260-
if err := p.db.QueryRow(query, p.config.MigrationsTable).Scan(&count); err != nil {
270+
if err := p.conn.QueryRowContext(context.Background(), query, p.config.MigrationsTable).Scan(&count); err != nil {
261271
return &database.Error{OrigErr: err, Query: []byte(query)}
262272
}
263273
if count == 1 {
@@ -266,7 +276,7 @@ func (p *Postgres) ensureVersionTable() error {
266276

267277
// if not, create the empty migration table
268278
query = `CREATE TABLE "` + p.config.MigrationsTable + `" (version bigint not null primary key, dirty boolean not null)`
269-
if _, err := p.db.Exec(query); err != nil {
279+
if _, err := p.conn.ExecContext(context.Background(), query); err != nil {
270280
return &database.Error{OrigErr: err, Query: []byte(query)}
271281
}
272282
return nil

database/postgres/postgres_test.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"io"
1111
"testing"
1212

13+
"context"
1314
dt "github.com/golang-migrate/migrate/database/testing"
1415
mt "github.com/golang-migrate/migrate/testing"
1516
// "github.com/lib/pq"
@@ -72,7 +73,7 @@ func TestMultiStatement(t *testing.T) {
7273

7374
// make sure second table exists
7475
var exists bool
75-
if err := d.(*Postgres).db.QueryRow("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'bar' AND table_schema = (SELECT current_schema()))").Scan(&exists); err != nil {
76+
if err := d.(*Postgres).conn.QueryRowContext(context.Background(), "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'bar' AND table_schema = (SELECT current_schema()))").Scan(&exists); err != nil {
7677
t.Fatal(err)
7778
}
7879
if !exists {
@@ -154,3 +155,39 @@ func TestWithSchema(t *testing.T) {
154155
func TestWithInstance(t *testing.T) {
155156

156157
}
158+
159+
func TestPostgres_Lock(t *testing.T) {
160+
mt.ParallelTest(t, versions, isReady,
161+
func(t *testing.T, i mt.Instance) {
162+
p := &Postgres{}
163+
addr := fmt.Sprintf("postgres://postgres@%v:%v/postgres?sslmode=disable", i.Host(), i.Port())
164+
d, err := p.Open(addr)
165+
if err != nil {
166+
t.Fatalf("%v", err)
167+
}
168+
169+
dt.Test(t, d, []byte("SELECT 1"))
170+
171+
ps := d.(*Postgres)
172+
173+
err = ps.Lock()
174+
if err != nil {
175+
t.Fatal(err)
176+
}
177+
178+
err = ps.Unlock()
179+
if err != nil {
180+
t.Fatal(err)
181+
}
182+
183+
err = ps.Lock()
184+
if err != nil {
185+
t.Fatal(err)
186+
}
187+
188+
err = ps.Unlock()
189+
if err != nil {
190+
t.Fatal(err)
191+
}
192+
})
193+
}

0 commit comments

Comments
 (0)