| 1 |
package postgres |
| 2 |
|
| 3 |
import ( |
| 4 |
"encoding/json" |
| 5 |
"fmt" |
| 6 |
"os" |
| 7 |
"time" |
| 8 |
|
| 9 |
"github.com/flynn/flynn/Godeps/_workspace/src/github.com/jackc/pgx" |
| 10 |
"github.com/flynn/flynn/discoverd/client" |
| 11 |
"github.com/flynn/flynn/pkg/attempt" |
| 12 |
"github.com/flynn/flynn/pkg/dialer" |
| 13 |
"github.com/flynn/flynn/pkg/shutdown" |
| 14 |
"github.com/flynn/flynn/pkg/sirenia/state" |
| 15 |
) |
| 16 |
|
| 17 |
const ( |
| 18 |
InvalidTextRepresentation = "22P02" |
| 19 |
CheckViolation = "23514" |
| 20 |
UniqueViolation = "23505" |
| 21 |
RaiseException = "P0001" |
| 22 |
ForeignKeyViolation = "23503" |
| 23 |
) |
| 24 |
|
| 25 |
type Conf struct { |
| 26 |
Service string |
| 27 |
User string |
| 28 |
Password string |
| 29 |
Database string |
| 30 |
} |
| 31 |
|
| 32 |
var connectAttempts = attempt.Strategy{ |
| 33 |
Min: 5, |
| 34 |
Total: 5 * time.Minute, |
| 35 |
Delay: 200 * time.Millisecond, |
| 36 |
} |
| 37 |
|
| 38 |
func New(connPool *pgx.ConnPool, conf *Conf) *DB { |
| 39 |
return &DB{connPool, conf} |
| 40 |
} |
| 41 |
|
| 42 |
func Wait(conf *Conf, afterConn func(*pgx.Conn) error) *DB { |
| 43 |
if conf == nil { |
| 44 |
conf = &Conf{ |
| 45 |
Service: os.Getenv("FLYNN_POSTGRES"), |
| 46 |
User: os.Getenv("PGUSER"), |
| 47 |
Password: os.Getenv("PGPASSWORD"), |
| 48 |
Database: os.Getenv("PGDATABASE"), |
| 49 |
} |
| 50 |
} |
| 51 |
events := make(chan *discoverd.Event) |
| 52 |
stream, err := discoverd.NewService(conf.Service).Watch(events) |
| 53 |
if err != nil { |
| 54 |
shutdown.Fatal(err) |
| 55 |
} |
| 56 |
// wait for service meta that has sync or singleton primary |
| 57 |
for e := range events { |
| 58 |
if e.Kind&discoverd.EventKindServiceMeta == 0 || e.ServiceMeta == nil || len(e.ServiceMeta.Data) == 0 { |
| 59 |
continue |
| 60 |
} |
| 61 |
state := &state.State{} |
| 62 |
json.Unmarshal(e.ServiceMeta.Data, state) |
| 63 |
if state.Singleton || state.Sync != nil { |
| 64 |
break |
| 65 |
} |
| 66 |
} |
| 67 |
stream.Close() |
| 68 |
// TODO(titanous): handle discoverd disconnection |
| 69 |
|
| 70 |
// retry here as authentication may fail if DB is still |
| 71 |
// starting up. |
| 72 |
// TODO(jpg): switch this to use pgmanager to check if user |
| 73 |
// exists, we can also check for r/w with pgmanager |
| 74 |
var db *DB |
| 75 |
err = connectAttempts.Run(func() error { |
| 76 |
db, err = Open(conf, afterConn) |
| 77 |
return err |
| 78 |
}) |
| 79 |
if err != nil { |
| 80 |
panic(err) |
| 81 |
} |
| 82 |
for { |
| 83 |
var readonly string |
| 84 |
// wait until read-write transactions are allowed |
| 85 |
if err := db.QueryRow("SHOW default_transaction_read_only").Scan(&readonly); err != nil || readonly == "on" { |
| 86 |
time.Sleep(100 * time.Millisecond) |
| 87 |
// TODO(titanous): add max wait here |
| 88 |
continue |
| 89 |
} |
| 90 |
return db |
| 91 |
} |
| 92 |
} |
| 93 |
|
| 94 |
func Open(conf *Conf, afterConn func(*pgx.Conn) error) (*DB, error) { |
| 95 |
connConfig := pgx.ConnConfig{ |
| 96 |
Host: fmt.Sprintf("leader.%s.discoverd", conf.Service), |
| 97 |
User: conf.User, |
| 98 |
Database: conf.Database, |
| 99 |
Password: conf.Password, |
| 100 |
Dial: dialer.Retry.Dial, |
| 101 |
} |
| 102 |
connPool, err := pgx.NewConnPool(pgx.ConnPoolConfig{ |
| 103 |
ConnConfig: connConfig, |
| 104 |
AfterConnect: afterConn, |
| 105 |
MaxConnections: 20, |
| 106 |
}) |
| 107 |
db := &DB{connPool, conf} |
| 108 |
return db, err |
| 109 |
} |
| 110 |
|
| 111 |
type DB struct { |
| 112 |
*pgx.ConnPool |
| 113 |
conf *Conf |
| 114 |
} |
| 115 |
|
| 116 |
func (db *DB) Exec(query string, args ...interface{}) error { |
| 117 |
_, err := db.ConnPool.Exec(query, args...) |
| 118 |
return err |
| 119 |
} |
| 120 |
|
| 121 |
func (db *DB) ExecRetry(query string, args ...interface{}) error { |
| 122 |
retries := 0 |
| 123 |
max := 30 |
| 124 |
for { |
| 125 |
_, err := db.ConnPool.Exec(query, args...) |
| 126 |
if err == pgx.ErrDeadConn && retries < max { |
| 127 |
retries++ |
| 128 |
time.Sleep(1 * time.Second) |
| 129 |
continue |
| 130 |
} |
| 131 |
return err |
| 132 |
} |
| 133 |
} |
| 134 |
|
| 135 |
type Scanner interface { |
| 136 |
Scan(...interface{}) error |
| 137 |
} |
| 138 |
|
| 139 |
func (db *DB) QueryRow(query string, args ...interface{}) Scanner { |
| 140 |
return rowErrFixer{db.ConnPool.QueryRow(query, args...)} |
| 141 |
} |
| 142 |
|
| 143 |
func (db *DB) Begin() (*DBTx, error) { |
| 144 |
tx, err := db.ConnPool.Begin() |
| 145 |
return &DBTx{tx}, err |
| 146 |
} |
| 147 |
|
| 148 |
type DBTx struct{ *pgx.Tx } |
| 149 |
|
| 150 |
func (tx *DBTx) Exec(query string, args ...interface{}) error { |
| 151 |
_, err := tx.Tx.Exec(query, args...) |
| 152 |
return err |
| 153 |
} |
| 154 |
|
| 155 |
func (tx *DBTx) QueryRow(query string, args ...interface{}) Scanner { |
| 156 |
return rowErrFixer{tx.Tx.QueryRow(query, args...)} |
| 157 |
} |
| 158 |
|
| 159 |
type rowErrFixer struct { |
| 160 |
s Scanner |
| 161 |
} |
| 162 |
|
| 163 |
func (f rowErrFixer) Scan(args ...interface{}) error { |
| 164 |
err := f.s.Scan(args...) |
| 165 |
if e, ok := err.(pgx.PgError); ok && e.Code == InvalidTextRepresentation && e.File == "uuid.c" && e.Routine == "string_to_uuid" { |
| 166 |
// invalid input syntax for uuid |
| 167 |
err = pgx.ErrNoRows |
| 168 |
} |
| 169 |
return err |
| 170 |
} |
| 171 |
|
| 172 |
func IsUniquenessError(err error, constraint string) bool { |
| 173 |
if e, ok := err.(pgx.PgError); ok && e.Code == UniqueViolation { |
| 174 |
return constraint == "" || constraint == e.ConstraintName |
| 175 |
} |
| 176 |
return false |
| 177 |
} |
| 178 |
|
| 179 |
func IsPostgresCode(err error, code string) bool { |
| 180 |
if e, ok := err.(pgx.PgError); ok && e.Code == code { |
| 181 |
return true |
| 182 |
} |
| 183 |
return false |
| 184 |
} |