Some checks failed
CI / test (push) Failing after 13s
- slog.go: SlogLogger adapts *slog.Logger to dbx.Logger interface - scan.go: Collect[T] and CollectOne[T] generic helpers using pgx.RowToStructByName - cluster.go: slow query logging via Config.SlowQueryThreshold (Warn level in queryEnd) - stats.go: PoolStats with Cluster.Stats() aggregating pool stats across all nodes - config.go/node.go: NodeConfig.Tracer passthrough for pgx.QueryTracer (OpenTelemetry) - options.go: WithSlowQueryThreshold and WithTracer functional options - dbxtest/tx.go: RunInTx runs callback in always-rolled-back transaction for test isolation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
248 lines
7.9 KiB
Markdown
248 lines
7.9 KiB
Markdown
# dbx
|
|
|
|
PostgreSQL cluster library for Go built on pgx/v5. Master/replica routing, automatic retries with exponential backoff, round-robin load balancing, background health checking, panic-safe transactions, and context-based Querier injection.
|
|
|
|
```
|
|
go get git.codelab.vc/pkg/dbx
|
|
```
|
|
|
|
## Quick start
|
|
|
|
```go
|
|
cluster, err := dbx.NewCluster(ctx, dbx.Config{
|
|
Master: dbx.NodeConfig{
|
|
Name: "master",
|
|
DSN: "postgres://user:pass@master:5432/mydb",
|
|
Pool: dbx.PoolConfig{MaxConns: 20, MinConns: 5},
|
|
},
|
|
Replicas: []dbx.NodeConfig{
|
|
{Name: "replica-1", DSN: "postgres://user:pass@replica1:5432/mydb"},
|
|
{Name: "replica-2", DSN: "postgres://user:pass@replica2:5432/mydb"},
|
|
},
|
|
})
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer cluster.Close()
|
|
|
|
// Write → master
|
|
cluster.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "alice")
|
|
|
|
// Read → replica with automatic fallback to master
|
|
rows, err := cluster.ReadQuery(ctx, "SELECT * FROM users WHERE active = $1", true)
|
|
|
|
// Transaction → master, panic-safe
|
|
cluster.RunTx(ctx, func(ctx context.Context, tx pgx.Tx) error {
|
|
tx.Exec(ctx, "UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100, fromID)
|
|
tx.Exec(ctx, "UPDATE accounts SET balance = balance + $1 WHERE id = $2", 100, toID)
|
|
return nil
|
|
})
|
|
```
|
|
|
|
## Components
|
|
|
|
| Component | What it does |
|
|
|-----------|-------------|
|
|
| `Cluster` | Entry point. Connects to master + replicas, routes queries, manages lifecycle. |
|
|
| `Node` | Wraps `pgxpool.Pool` with health state and a human-readable name. |
|
|
| `Balancer` | Interface for replica selection. Built-in: `RoundRobinBalancer`. |
|
|
| `retrier` | Exponential backoff with jitter, node fallback, custom error classifiers. |
|
|
| `healthChecker` | Background goroutine that pings all nodes on an interval. |
|
|
| `Querier` injection | `InjectQuerier` / `ExtractQuerier` — context-based Querier for service layers. |
|
|
| `MetricsHook` | Optional callbacks: query start/end, retry, node up/down, replica fallback. |
|
|
| `SlogLogger` | Adapts `*slog.Logger` to the `dbx.Logger` interface. |
|
|
| `Collect`/`CollectOne` | Generic scan helpers — read rows directly into structs via `pgx.RowToStructByName`. |
|
|
| `PoolStats` | Aggregate pool statistics across all nodes via `cluster.Stats()`. |
|
|
|
|
## Routing
|
|
|
|
The library uses explicit method-based routing (no SQL parsing):
|
|
|
|
```
|
|
┌──────────────┐
|
|
│ Cluster │
|
|
└──────┬───────┘
|
|
│
|
|
┌───────────────┴───────────────┐
|
|
│ │
|
|
Write ops Read ops
|
|
Exec, Query, QueryRow ReadQuery, ReadQueryRow
|
|
Begin, BeginTx, RunTx
|
|
CopyFrom, SendBatch
|
|
│ │
|
|
▼ ▼
|
|
┌──────────┐ ┌────────────────────────┐
|
|
│ Master │ │ Balancer → Replicas │
|
|
└──────────┘ │ fallback → Master │
|
|
└────────────────────────┘
|
|
```
|
|
|
|
Direct node access: `cluster.Master()` and `cluster.Replica()` return `DB`.
|
|
|
|
## Multi-replica setup
|
|
|
|
```go
|
|
cluster, _ := dbx.NewCluster(ctx, dbx.Config{
|
|
Master: dbx.NodeConfig{
|
|
Name: "master",
|
|
DSN: "postgres://master:5432/mydb",
|
|
Pool: dbx.PoolConfig{MaxConns: 20, MinConns: 5},
|
|
},
|
|
Replicas: []dbx.NodeConfig{
|
|
{Name: "replica-1", DSN: "postgres://replica1:5432/mydb"},
|
|
{Name: "replica-2", DSN: "postgres://replica2:5432/mydb"},
|
|
{Name: "replica-3", DSN: "postgres://replica3:5432/mydb"},
|
|
},
|
|
Retry: dbx.RetryConfig{
|
|
MaxAttempts: 5,
|
|
BaseDelay: 100 * time.Millisecond,
|
|
MaxDelay: 2 * time.Second,
|
|
},
|
|
HealthCheck: dbx.HealthCheckConfig{
|
|
Interval: 3 * time.Second,
|
|
Timeout: 1 * time.Second,
|
|
},
|
|
})
|
|
defer cluster.Close()
|
|
```
|
|
|
|
## Transactions
|
|
|
|
`RunTx` is panic-safe — if the callback panics, the transaction is rolled back and the panic is re-raised:
|
|
|
|
```go
|
|
err := cluster.RunTx(ctx, func(ctx context.Context, tx pgx.Tx) error {
|
|
_, err := tx.Exec(ctx, "UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, fromID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = tx.Exec(ctx, "UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, toID)
|
|
return err
|
|
})
|
|
```
|
|
|
|
For custom isolation levels use `RunTxOptions`:
|
|
|
|
```go
|
|
cluster.RunTxOptions(ctx, pgx.TxOptions{
|
|
IsoLevel: pgx.Serializable,
|
|
}, fn)
|
|
```
|
|
|
|
## Context-based Querier injection
|
|
|
|
Pass the Querier through context so service layers work both inside and outside transactions:
|
|
|
|
```go
|
|
// Repository
|
|
func CreateUser(ctx context.Context, db dbx.Querier, name string) error {
|
|
q := dbx.ExtractQuerier(ctx, db)
|
|
_, err := q.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", name)
|
|
return err
|
|
}
|
|
|
|
// Outside transaction — uses cluster directly
|
|
CreateUser(ctx, cluster, "alice")
|
|
|
|
// Inside transaction — uses tx
|
|
cluster.RunTx(ctx, func(ctx context.Context, tx pgx.Tx) error {
|
|
ctx = dbx.InjectQuerier(ctx, tx)
|
|
return CreateUser(ctx, cluster, "alice") // will use tx from context
|
|
})
|
|
```
|
|
|
|
## Error classification
|
|
|
|
```go
|
|
dbx.IsRetryable(err) // connection errors, serialization failures, deadlocks, too_many_connections
|
|
dbx.IsConnectionError(err) // PG class 08 + common connection error strings
|
|
dbx.IsConstraintViolation(err) // PG class 23 (unique, FK, check violations)
|
|
dbx.PgErrorCode(err) // extract raw PG error code
|
|
```
|
|
|
|
Sentinel errors: `ErrNoHealthyNode`, `ErrClusterClosed`, `ErrRetryExhausted`.
|
|
|
|
## slog integration
|
|
|
|
```go
|
|
cluster, _ := dbx.NewCluster(ctx, dbx.Config{
|
|
Master: dbx.NodeConfig{DSN: "postgres://..."},
|
|
Logger: dbx.NewSlogLogger(slog.Default()),
|
|
})
|
|
```
|
|
|
|
## Scan helpers
|
|
|
|
Generic functions that eliminate row scanning boilerplate:
|
|
|
|
```go
|
|
type User struct {
|
|
ID int `db:"id"`
|
|
Name string `db:"name"`
|
|
}
|
|
|
|
users, err := dbx.Collect[User](ctx, cluster, "SELECT id, name FROM users WHERE active = $1", true)
|
|
|
|
user, err := dbx.CollectOne[User](ctx, cluster, "SELECT id, name FROM users WHERE id = $1", 42)
|
|
// returns pgx.ErrNoRows if not found
|
|
```
|
|
|
|
## Slow query logging
|
|
|
|
```go
|
|
cluster, _ := dbx.NewCluster(ctx, dbx.Config{
|
|
Master: dbx.NodeConfig{DSN: "postgres://..."},
|
|
Logger: dbx.NewSlogLogger(slog.Default()),
|
|
SlowQueryThreshold: 100 * time.Millisecond,
|
|
})
|
|
// queries exceeding threshold are logged at Warn level
|
|
```
|
|
|
|
## Pool stats
|
|
|
|
```go
|
|
stats := cluster.Stats()
|
|
fmt.Println(stats.TotalConns, stats.IdleConns, stats.AcquireCount)
|
|
// per-node stats: stats.Nodes["master"], stats.Nodes["replica-1"]
|
|
```
|
|
|
|
## OpenTelemetry / pgx tracer
|
|
|
|
Pass any `pgx.QueryTracer` (e.g., `otelpgx.NewTracer()`) to instrument all queries:
|
|
|
|
```go
|
|
dbx.ApplyOptions(&cfg, dbx.WithTracer(otelpgx.NewTracer()))
|
|
```
|
|
|
|
Or set per-node via `NodeConfig.Tracer`.
|
|
|
|
## dbxtest helpers
|
|
|
|
The `dbxtest` package provides test helpers:
|
|
|
|
```go
|
|
func TestMyRepo(t *testing.T) {
|
|
cluster := dbxtest.NewTestCluster(t, dbx.WithLogger(&dbxtest.TestLogger{T: t}))
|
|
// cluster is auto-closed when test finishes
|
|
// skips test if DB is not reachable
|
|
}
|
|
```
|
|
|
|
### Transaction isolation for tests
|
|
|
|
```go
|
|
func TestCreateUser(t *testing.T) {
|
|
c := dbxtest.NewTestCluster(t)
|
|
dbxtest.RunInTx(t, c, func(ctx context.Context, q dbx.Querier) {
|
|
// all changes are rolled back after fn returns
|
|
_, _ = q.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "test")
|
|
})
|
|
}
|
|
```
|
|
|
|
Set `DBX_TEST_DSN` env var to override the default DSN (`postgres://postgres:postgres@localhost:5432/dbx_test?sslmode=disable`).
|
|
|
|
## Requirements
|
|
|
|
Go 1.24+, [pgx/v5](https://github.com/jackc/pgx).
|