Jay Taylor's notes

back to listing index

flynn/flynn

[web search]
Original source (github.com)
Tags: orm postgres postgresql examples golang go flynn pgx github.com
Clipped on: 2016-04-27

Skip to content
flynn / pkg / postgres / postgres.go
Open this file in GitHub Desktop
185 lines (162 sloc) 4.27 KB
1 package postgres
2
3 import (
4 "encoding/json"
5 "fmt"
6 "os"
7 "time"
8
9 "github.com/flynn/flynn/Godeps/_workspace/src/github.com/jackc/pgx"
10 "github.com/flynn/flynn/discoverd/client"
11 "github.com/flynn/flynn/pkg/attempt"
12 "github.com/flynn/flynn/pkg/dialer"
13 "github.com/flynn/flynn/pkg/shutdown"
14 "github.com/flynn/flynn/pkg/sirenia/state"
15 )
16
17 const (
18 InvalidTextRepresentation = "22P02"
19 CheckViolation = "23514"
20 UniqueViolation = "23505"
21 RaiseException = "P0001"
22 ForeignKeyViolation = "23503"
23 )
24
25 type Conf struct {
26 Service string
27 User string
28 Password string
29 Database string
30 }
31
32 var connectAttempts = attempt.Strategy{
33 Min: 5,
34 Total: 5 * time.Minute,
35 Delay: 200 * time.Millisecond,
36 }
37
38 func New(connPool *pgx.ConnPool, conf *Conf) *DB {
39 return &DB{connPool, conf}
40 }
41
42 func Wait(conf *Conf, afterConn func(*pgx.Conn) error) *DB {
43 if conf == nil {
44 conf = &Conf{
45 Service: os.Getenv("FLYNN_POSTGRES"),
46 User: os.Getenv("PGUSER"),
47 Password: os.Getenv("PGPASSWORD"),
48 Database: os.Getenv("PGDATABASE"),
49 }
50 }
51 events := make(chan *discoverd.Event)
52 stream, err := discoverd.NewService(conf.Service).Watch(events)
53 if err != nil {
54 shutdown.Fatal(err)
55 }
56 // wait for service meta that has sync or singleton primary
57 for e := range events {
58 if e.Kind&discoverd.EventKindServiceMeta == 0 || e.ServiceMeta == nil || len(e.ServiceMeta.Data) == 0 {
59 continue
60 }
61 state := &state.State{}
62 json.Unmarshal(e.ServiceMeta.Data, state)
63 if state.Singleton || state.Sync != nil {
64 break
65 }
66 }
67 stream.Close()
68 // TODO(titanous): handle discoverd disconnection
69
70 // retry here as authentication may fail if DB is still
71 // starting up.
72 // TODO(jpg): switch this to use pgmanager to check if user
73 // exists, we can also check for r/w with pgmanager
74 var db *DB
75 err = connectAttempts.Run(func() error {
76 db, err = Open(conf, afterConn)
77 return err
78 })
79 if err != nil {
80 panic(err)
81 }
82 for {
83 var readonly string
84 // wait until read-write transactions are allowed
85 if err := db.QueryRow("SHOW default_transaction_read_only").Scan(&readonly); err != nil || readonly == "on" {
86 time.Sleep(100 * time.Millisecond)
87 // TODO(titanous): add max wait here
88 continue
89 }
90 return db
91 }
92 }
93
94 func Open(conf *Conf, afterConn func(*pgx.Conn) error) (*DB, error) {
95 connConfig := pgx.ConnConfig{
96 Host: fmt.Sprintf("leader.%s.discoverd", conf.Service),
97 User: conf.User,
98 Database: conf.Database,
99 Password: conf.Password,
100 Dial: dialer.Retry.Dial,
101 }
102 connPool, err := pgx.NewConnPool(pgx.ConnPoolConfig{
103 ConnConfig: connConfig,
104 AfterConnect: afterConn,
105 MaxConnections: 20,
106 })
107 db := &DB{connPool, conf}
108 return db, err
109 }
110
111 type DB struct {
112 *pgx.ConnPool
113 conf *Conf
114 }
115
116 func (db *DB) Exec(query string, args ...interface{}) error {
117 _, err := db.ConnPool.Exec(query, args...)
118 return err
119 }
120
121 func (db *DB) ExecRetry(query string, args ...interface{}) error {
122 retries := 0
123 max := 30
124 for {
125 _, err := db.ConnPool.Exec(query, args...)
126 if err == pgx.ErrDeadConn && retries < max {
127 retries++
128 time.Sleep(1 * time.Second)
129 continue
130 }
131 return err
132 }
133 }
134
135 type Scanner interface {
136 Scan(...interface{}) error
137 }
138
139 func (db *DB) QueryRow(query string, args ...interface{}) Scanner {
140 return rowErrFixer{db.ConnPool.QueryRow(query, args...)}
141 }
142
143 func (db *DB) Begin() (*DBTx, error) {
144 tx, err := db.ConnPool.Begin()
145 return &DBTx{tx}, err
146 }
147
148 type DBTx struct{ *pgx.Tx }
149
150 func (tx *DBTx) Exec(query string, args ...interface{}) error {
151 _, err := tx.Tx.Exec(query, args...)
152 return err
153 }
154
155 func (tx *DBTx) QueryRow(query string, args ...interface{}) Scanner {
156 return rowErrFixer{tx.Tx.QueryRow(query, args...)}
157 }
158
159 type rowErrFixer struct {
160 s Scanner
161 }
162
163 func (f rowErrFixer) Scan(args ...interface{}) error {
164 err := f.s.Scan(args...)
165 if e, ok := err.(pgx.PgError); ok && e.Code == InvalidTextRepresentation && e.File == "uuid.c" && e.Routine == "string_to_uuid" {
166 // invalid input syntax for uuid
167 err = pgx.ErrNoRows
168 }
169 return err
170 }
171
172 func IsUniquenessError(err error, constraint string) bool {
173 if e, ok := err.(pgx.PgError); ok && e.Code == UniqueViolation {
174 return constraint == "" || constraint == e.ConstraintName
175 }
176 return false
177 }
178
179 func IsPostgresCode(err error, code string) bool {
180 if e, ok := err.(pgx.PgError); ok && e.Code == code {
181 return true
182 }
183 return false
184 }