Add production features: slog adapter, scan helpers, slow query logging, pool stats, tracer passthrough, test tx isolation
Some checks failed
CI / test (push) Failing after 13s
Some checks failed
CI / test (push) Failing after 13s
- slog.go: SlogLogger adapts *slog.Logger to dbx.Logger interface - scan.go: Collect[T] and CollectOne[T] generic helpers using pgx.RowToStructByName - cluster.go: slow query logging via Config.SlowQueryThreshold (Warn level in queryEnd) - stats.go: PoolStats with Cluster.Stats() aggregating pool stats across all nodes - config.go/node.go: NodeConfig.Tracer passthrough for pgx.QueryTracer (OpenTelemetry) - options.go: WithSlowQueryThreshold and WithTracer functional options - dbxtest/tx.go: RunInTx runs callback in always-rolled-back transaction for test isolation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -23,8 +23,11 @@ You are working on `git.codelab.vc/pkg/dbx`, a Go PostgreSQL cluster library bui
|
|||||||
- `tx.go` — RunTx, RunTxOptions, InjectQuerier, ExtractQuerier
|
- `tx.go` — RunTx, RunTxOptions, InjectQuerier, ExtractQuerier
|
||||||
- `errors.go` — IsRetryable, IsConnectionError, IsConstraintViolation, PgErrorCode
|
- `errors.go` — IsRetryable, IsConnectionError, IsConstraintViolation, PgErrorCode
|
||||||
- `config.go` — Config, NodeConfig, PoolConfig, RetryConfig, HealthCheckConfig
|
- `config.go` — Config, NodeConfig, PoolConfig, RetryConfig, HealthCheckConfig
|
||||||
- `options.go` — functional options (WithLogger, WithMetrics, WithRetry, WithHealthCheck)
|
- `options.go` — functional options (WithLogger, WithMetrics, WithRetry, WithHealthCheck, WithSlowQueryThreshold, WithTracer)
|
||||||
- `dbxtest/` — test helpers: NewTestCluster, TestLogger
|
- `slog.go` — SlogLogger adapting *slog.Logger to dbx.Logger
|
||||||
|
- `scan.go` — Collect[T], CollectOne[T] generic row scan helpers
|
||||||
|
- `stats.go` — PoolStats aggregate pool statistics via Cluster.Stats()
|
||||||
|
- `dbxtest/` — test helpers: NewTestCluster, TestLogger, RunInTx
|
||||||
|
|
||||||
## Code conventions
|
## Code conventions
|
||||||
|
|
||||||
|
|||||||
12
AGENTS.md
12
AGENTS.md
@@ -11,17 +11,21 @@ Universal guide for AI coding agents working with this codebase.
|
|||||||
```
|
```
|
||||||
dbx/ Root — Cluster, Node, Balancer, retry, health, errors, tx, config
|
dbx/ Root — Cluster, Node, Balancer, retry, health, errors, tx, config
|
||||||
├── dbx.go Interfaces: Querier, DB, Logger, MetricsHook
|
├── dbx.go Interfaces: Querier, DB, Logger, MetricsHook
|
||||||
├── cluster.go Cluster — routing, write/read operations
|
├── cluster.go Cluster — routing, write/read operations, slow query logging
|
||||||
├── node.go Node — pgxpool.Pool wrapper with health state
|
├── node.go Node — pgxpool.Pool wrapper with health state, tracer passthrough
|
||||||
├── balancer.go Balancer interface + RoundRobinBalancer
|
├── balancer.go Balancer interface + RoundRobinBalancer
|
||||||
├── retry.go retrier — exponential backoff with jitter and node fallback
|
├── retry.go retrier — exponential backoff with jitter and node fallback
|
||||||
├── health.go healthChecker — background goroutine pinging nodes
|
├── health.go healthChecker — background goroutine pinging nodes
|
||||||
├── tx.go RunTx, RunTxOptions, InjectQuerier, ExtractQuerier
|
├── tx.go RunTx, RunTxOptions, InjectQuerier, ExtractQuerier
|
||||||
├── errors.go Error classification (IsRetryable, IsConnectionError, etc.)
|
├── errors.go Error classification (IsRetryable, IsConnectionError, etc.)
|
||||||
├── config.go Config, NodeConfig, PoolConfig, RetryConfig, HealthCheckConfig
|
├── config.go Config, NodeConfig, PoolConfig, RetryConfig, HealthCheckConfig
|
||||||
├── options.go Functional options (WithLogger, WithMetrics, WithRetry, etc.)
|
├── options.go Functional options (WithLogger, WithMetrics, WithRetry, WithTracer, etc.)
|
||||||
|
├── slog.go SlogLogger — adapts *slog.Logger to dbx.Logger
|
||||||
|
├── scan.go Collect[T], CollectOne[T] — generic row scan helpers
|
||||||
|
├── stats.go PoolStats — aggregate pool statistics via Cluster.Stats()
|
||||||
└── dbxtest/
|
└── dbxtest/
|
||||||
└── dbxtest.go Test helpers: NewTestCluster, TestLogger
|
├── dbxtest.go Test helpers: NewTestCluster, TestLogger
|
||||||
|
└── tx.go RunInTx — test transaction isolation (always rolled back)
|
||||||
```
|
```
|
||||||
|
|
||||||
## Routing architecture
|
## Routing architecture
|
||||||
|
|||||||
@@ -24,6 +24,12 @@ go vet ./... # static analysis
|
|||||||
- **Health checker** — background goroutine pings all nodes on an interval, flips `Node.healthy` atomic bool
|
- **Health checker** — background goroutine pings all nodes on an interval, flips `Node.healthy` atomic bool
|
||||||
- **RunTx** — panic-safe transaction wrapper: recovers panics, rolls back, re-panics
|
- **RunTx** — panic-safe transaction wrapper: recovers panics, rolls back, re-panics
|
||||||
- **Querier injection** — `InjectQuerier`/`ExtractQuerier` pass `Querier` via context for service layers
|
- **Querier injection** — `InjectQuerier`/`ExtractQuerier` pass `Querier` via context for service layers
|
||||||
|
- **SlogLogger** — adapts `*slog.Logger` to the `dbx.Logger` interface (`slog.go`)
|
||||||
|
- **Collect/CollectOne** — generic scan helpers using `pgx.RowToStructByName` (`scan.go`)
|
||||||
|
- **Slow query logging** — `Config.SlowQueryThreshold` triggers Warn-level logging in `queryEnd`
|
||||||
|
- **PoolStats** — `Cluster.Stats()` aggregates pool statistics across all nodes (`stats.go`)
|
||||||
|
- **Tracer passthrough** — `NodeConfig.Tracer` / `WithTracer` sets `pgx.QueryTracer` for OpenTelemetry
|
||||||
|
- **RunInTx** — test helper that runs a callback in an always-rolled-back transaction (`dbxtest/tx.go`)
|
||||||
|
|
||||||
### Error classification
|
### Error classification
|
||||||
|
|
||||||
@@ -40,6 +46,7 @@ go vet ./... # static analysis
|
|||||||
- `atomic.Bool` for thread safety (`Node.healthy`, `Cluster.closed`)
|
- `atomic.Bool` for thread safety (`Node.healthy`, `Cluster.closed`)
|
||||||
- `dbxtest.NewTestCluster` skips tests when DB unreachable, auto-closes via `t.Cleanup`
|
- `dbxtest.NewTestCluster` skips tests when DB unreachable, auto-closes via `t.Cleanup`
|
||||||
- `dbxtest.TestLogger` writes to `testing.T` for test log output
|
- `dbxtest.TestLogger` writes to `testing.T` for test log output
|
||||||
|
- `dbxtest.RunInTx` runs a callback in a transaction that is always rolled back
|
||||||
|
|
||||||
## See also
|
## See also
|
||||||
|
|
||||||
|
|||||||
69
README.md
69
README.md
@@ -50,6 +50,9 @@ cluster.RunTx(ctx, func(ctx context.Context, tx pgx.Tx) error {
|
|||||||
| `healthChecker` | Background goroutine that pings all nodes on an interval. |
|
| `healthChecker` | Background goroutine that pings all nodes on an interval. |
|
||||||
| `Querier` injection | `InjectQuerier` / `ExtractQuerier` — context-based Querier for service layers. |
|
| `Querier` injection | `InjectQuerier` / `ExtractQuerier` — context-based Querier for service layers. |
|
||||||
| `MetricsHook` | Optional callbacks: query start/end, retry, node up/down, replica fallback. |
|
| `MetricsHook` | Optional callbacks: query start/end, retry, node up/down, replica fallback. |
|
||||||
|
| `SlogLogger` | Adapts `*slog.Logger` to the `dbx.Logger` interface. |
|
||||||
|
| `Collect`/`CollectOne` | Generic scan helpers — read rows directly into structs via `pgx.RowToStructByName`. |
|
||||||
|
| `PoolStats` | Aggregate pool statistics across all nodes via `cluster.Stats()`. |
|
||||||
|
|
||||||
## Routing
|
## Routing
|
||||||
|
|
||||||
@@ -159,6 +162,60 @@ dbx.PgErrorCode(err) // extract raw PG error code
|
|||||||
|
|
||||||
Sentinel errors: `ErrNoHealthyNode`, `ErrClusterClosed`, `ErrRetryExhausted`.
|
Sentinel errors: `ErrNoHealthyNode`, `ErrClusterClosed`, `ErrRetryExhausted`.
|
||||||
|
|
||||||
|
## slog integration
|
||||||
|
|
||||||
|
```go
|
||||||
|
cluster, _ := dbx.NewCluster(ctx, dbx.Config{
|
||||||
|
Master: dbx.NodeConfig{DSN: "postgres://..."},
|
||||||
|
Logger: dbx.NewSlogLogger(slog.Default()),
|
||||||
|
})
|
||||||
|
```
|
||||||
|
|
||||||
|
## Scan helpers
|
||||||
|
|
||||||
|
Generic functions that eliminate row scanning boilerplate:
|
||||||
|
|
||||||
|
```go
|
||||||
|
type User struct {
|
||||||
|
ID int `db:"id"`
|
||||||
|
Name string `db:"name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
users, err := dbx.Collect[User](ctx, cluster, "SELECT id, name FROM users WHERE active = $1", true)
|
||||||
|
|
||||||
|
user, err := dbx.CollectOne[User](ctx, cluster, "SELECT id, name FROM users WHERE id = $1", 42)
|
||||||
|
// returns pgx.ErrNoRows if not found
|
||||||
|
```
|
||||||
|
|
||||||
|
## Slow query logging
|
||||||
|
|
||||||
|
```go
|
||||||
|
cluster, _ := dbx.NewCluster(ctx, dbx.Config{
|
||||||
|
Master: dbx.NodeConfig{DSN: "postgres://..."},
|
||||||
|
Logger: dbx.NewSlogLogger(slog.Default()),
|
||||||
|
SlowQueryThreshold: 100 * time.Millisecond,
|
||||||
|
})
|
||||||
|
// queries exceeding threshold are logged at Warn level
|
||||||
|
```
|
||||||
|
|
||||||
|
## Pool stats
|
||||||
|
|
||||||
|
```go
|
||||||
|
stats := cluster.Stats()
|
||||||
|
fmt.Println(stats.TotalConns, stats.IdleConns, stats.AcquireCount)
|
||||||
|
// per-node stats: stats.Nodes["master"], stats.Nodes["replica-1"]
|
||||||
|
```
|
||||||
|
|
||||||
|
## OpenTelemetry / pgx tracer
|
||||||
|
|
||||||
|
Pass any `pgx.QueryTracer` (e.g., `otelpgx.NewTracer()`) to instrument all queries:
|
||||||
|
|
||||||
|
```go
|
||||||
|
dbx.ApplyOptions(&cfg, dbx.WithTracer(otelpgx.NewTracer()))
|
||||||
|
```
|
||||||
|
|
||||||
|
Or set per-node via `NodeConfig.Tracer`.
|
||||||
|
|
||||||
## dbxtest helpers
|
## dbxtest helpers
|
||||||
|
|
||||||
The `dbxtest` package provides test helpers:
|
The `dbxtest` package provides test helpers:
|
||||||
@@ -171,6 +228,18 @@ func TestMyRepo(t *testing.T) {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Transaction isolation for tests
|
||||||
|
|
||||||
|
```go
|
||||||
|
func TestCreateUser(t *testing.T) {
|
||||||
|
c := dbxtest.NewTestCluster(t)
|
||||||
|
dbxtest.RunInTx(t, c, func(ctx context.Context, q dbx.Querier) {
|
||||||
|
// all changes are rolled back after fn returns
|
||||||
|
_, _ = q.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "test")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
Set `DBX_TEST_DSN` env var to override the default DSN (`postgres://postgres:postgres@localhost:5432/dbx_test?sslmode=disable`).
|
Set `DBX_TEST_DSN` env var to override the default DSN (`postgres://postgres:postgres@localhost:5432/dbx_test?sslmode=disable`).
|
||||||
|
|
||||||
## Requirements
|
## Requirements
|
||||||
|
|||||||
35
cluster.go
35
cluster.go
@@ -17,12 +17,13 @@ type Cluster struct {
|
|||||||
replicas []*Node
|
replicas []*Node
|
||||||
all []*Node // master + replicas for health checker
|
all []*Node // master + replicas for health checker
|
||||||
|
|
||||||
balancer Balancer
|
balancer Balancer
|
||||||
retrier *retrier
|
retrier *retrier
|
||||||
health *healthChecker
|
health *healthChecker
|
||||||
logger Logger
|
logger Logger
|
||||||
metrics *MetricsHook
|
metrics *MetricsHook
|
||||||
closed atomic.Bool
|
slowQueryThreshold time.Duration
|
||||||
|
closed atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCluster creates a Cluster, connecting to all configured nodes.
|
// NewCluster creates a Cluster, connecting to all configured nodes.
|
||||||
@@ -53,13 +54,14 @@ func NewCluster(ctx context.Context, cfg Config) (*Cluster, error) {
|
|||||||
all = append(all, replicas...)
|
all = append(all, replicas...)
|
||||||
|
|
||||||
c := &Cluster{
|
c := &Cluster{
|
||||||
master: master,
|
master: master,
|
||||||
replicas: replicas,
|
replicas: replicas,
|
||||||
all: all,
|
all: all,
|
||||||
balancer: NewRoundRobinBalancer(),
|
balancer: NewRoundRobinBalancer(),
|
||||||
retrier: newRetrier(cfg.Retry, cfg.Logger, cfg.Metrics),
|
retrier: newRetrier(cfg.Retry, cfg.Logger, cfg.Metrics),
|
||||||
logger: cfg.Logger,
|
logger: cfg.Logger,
|
||||||
metrics: cfg.Metrics,
|
metrics: cfg.Metrics,
|
||||||
|
slowQueryThreshold: cfg.SlowQueryThreshold,
|
||||||
}
|
}
|
||||||
|
|
||||||
c.health = newHealthChecker(all, cfg.HealthCheck, cfg.Logger, cfg.Metrics)
|
c.health = newHealthChecker(all, cfg.HealthCheck, cfg.Logger, cfg.Metrics)
|
||||||
@@ -232,6 +234,13 @@ func (c *Cluster) queryEnd(ctx context.Context, node, sql string, err error, d t
|
|||||||
if c.metrics != nil && c.metrics.OnQueryEnd != nil {
|
if c.metrics != nil && c.metrics.OnQueryEnd != nil {
|
||||||
c.metrics.OnQueryEnd(ctx, node, sql, err, d)
|
c.metrics.OnQueryEnd(ctx, node, sql, err, d)
|
||||||
}
|
}
|
||||||
|
if c.slowQueryThreshold > 0 && d >= c.slowQueryThreshold {
|
||||||
|
c.logger.Warn(ctx, "dbx: slow query",
|
||||||
|
"node", node,
|
||||||
|
"duration", d,
|
||||||
|
"sql", sql,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// errRow implements pgx.Row for error cases.
|
// errRow implements pgx.Row for error cases.
|
||||||
|
|||||||
26
config.go
26
config.go
@@ -1,22 +1,28 @@
|
|||||||
package dbx
|
package dbx
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
)
|
||||||
|
|
||||||
// Config is the top-level configuration for a Cluster.
|
// Config is the top-level configuration for a Cluster.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Master NodeConfig
|
Master NodeConfig
|
||||||
Replicas []NodeConfig
|
Replicas []NodeConfig
|
||||||
Retry RetryConfig
|
Retry RetryConfig
|
||||||
Logger Logger
|
Logger Logger
|
||||||
Metrics *MetricsHook
|
Metrics *MetricsHook
|
||||||
HealthCheck HealthCheckConfig
|
HealthCheck HealthCheckConfig
|
||||||
|
SlowQueryThreshold time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeConfig describes a single database node.
|
// NodeConfig describes a single database node.
|
||||||
type NodeConfig struct {
|
type NodeConfig struct {
|
||||||
Name string // human-readable name for logs/metrics, e.g. "master", "replica-1"
|
Name string // human-readable name for logs/metrics, e.g. "master", "replica-1"
|
||||||
DSN string
|
DSN string
|
||||||
Pool PoolConfig
|
Pool PoolConfig
|
||||||
|
Tracer pgx.QueryTracer
|
||||||
}
|
}
|
||||||
|
|
||||||
// PoolConfig controls pgxpool.Pool parameters.
|
// PoolConfig controls pgxpool.Pool parameters.
|
||||||
|
|||||||
27
dbxtest/tx.go
Normal file
27
dbxtest/tx.go
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
package dbxtest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.codelab.vc/pkg/dbx"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RunInTx executes fn inside a transaction that is always rolled back.
|
||||||
|
// This is useful for tests that modify data but should not leave side effects.
|
||||||
|
// The callback receives a dbx.Querier (not pgx.Tx) so it is compatible with
|
||||||
|
// InjectQuerier/ExtractQuerier patterns.
|
||||||
|
func RunInTx(t testing.TB, c *dbx.Cluster, fn func(ctx context.Context, q dbx.Querier)) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
tx, err := c.Begin(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("dbxtest.RunInTx: begin: %v", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = tx.Rollback(ctx)
|
||||||
|
}()
|
||||||
|
|
||||||
|
fn(ctx, tx)
|
||||||
|
}
|
||||||
50
dbxtest/tx_test.go
Normal file
50
dbxtest/tx_test.go
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
package dbxtest_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.codelab.vc/pkg/dbx"
|
||||||
|
"git.codelab.vc/pkg/dbx/dbxtest"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRunInTx(t *testing.T) {
|
||||||
|
c := dbxtest.NewTestCluster(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Create a table that persists across the test.
|
||||||
|
_, err := c.Exec(ctx, `CREATE TABLE IF NOT EXISTS test_run_in_tx (id int)`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() {
|
||||||
|
_, _ = c.Exec(context.Background(), `DROP TABLE IF EXISTS test_run_in_tx`)
|
||||||
|
})
|
||||||
|
|
||||||
|
dbxtest.RunInTx(t, c, func(ctx context.Context, q dbx.Querier) {
|
||||||
|
_, err := q.Exec(ctx, `INSERT INTO test_run_in_tx (id) VALUES (1)`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Row should be visible within the transaction.
|
||||||
|
var count int
|
||||||
|
err = q.QueryRow(ctx, `SELECT count(*) FROM test_run_in_tx`).Scan(&count)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if count != 1 {
|
||||||
|
t.Fatalf("expected 1 row in tx, got %d", count)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// After RunInTx returns, the transaction was rolled back; row should not exist.
|
||||||
|
var count int
|
||||||
|
err = c.QueryRow(ctx, `SELECT count(*) FROM test_run_in_tx`).Scan(&count)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if count != 0 {
|
||||||
|
t.Errorf("expected 0 rows after rollback, got %d", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
3
node.go
3
node.go
@@ -33,6 +33,9 @@ func connectNode(ctx context.Context, cfg NodeConfig) (*Node, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
applyPoolConfig(poolCfg, cfg.Pool)
|
applyPoolConfig(poolCfg, cfg.Pool)
|
||||||
|
if cfg.Tracer != nil {
|
||||||
|
poolCfg.ConnConfig.Tracer = cfg.Tracer
|
||||||
|
}
|
||||||
|
|
||||||
pool, err := pgxpool.NewWithConfig(ctx, poolCfg)
|
pool, err := pgxpool.NewWithConfig(ctx, poolCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
25
options.go
25
options.go
@@ -1,5 +1,11 @@
|
|||||||
package dbx
|
package dbx
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
)
|
||||||
|
|
||||||
// Option is a functional option for NewCluster.
|
// Option is a functional option for NewCluster.
|
||||||
type Option func(*Config)
|
type Option func(*Config)
|
||||||
|
|
||||||
@@ -31,6 +37,25 @@ func WithHealthCheck(h HealthCheckConfig) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithSlowQueryThreshold sets the threshold for slow query warnings.
|
||||||
|
// Queries taking longer than d will be logged at Warn level.
|
||||||
|
func WithSlowQueryThreshold(d time.Duration) Option {
|
||||||
|
return func(c *Config) {
|
||||||
|
c.SlowQueryThreshold = d
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithTracer sets the pgx.QueryTracer on master and all replica configs.
|
||||||
|
// This enables OpenTelemetry integration via libraries like otelpgx.
|
||||||
|
func WithTracer(t pgx.QueryTracer) Option {
|
||||||
|
return func(c *Config) {
|
||||||
|
c.Master.Tracer = t
|
||||||
|
for i := range c.Replicas {
|
||||||
|
c.Replicas[i].Tracer = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ApplyOptions applies functional options to a Config.
|
// ApplyOptions applies functional options to a Config.
|
||||||
func ApplyOptions(cfg *Config, opts ...Option) {
|
func ApplyOptions(cfg *Config, opts ...Option) {
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
|
|||||||
28
scan.go
Normal file
28
scan.go
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
package dbx
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Collect executes a read query and collects all rows into a slice of T
|
||||||
|
// using pgx.RowToStructByName. T must be a struct with db tags matching column names.
|
||||||
|
func Collect[T any](ctx context.Context, c *Cluster, sql string, args ...any) ([]T, error) {
|
||||||
|
rows, err := c.ReadQuery(ctx, sql, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return pgx.CollectRows(rows, pgx.RowToStructByName[T])
|
||||||
|
}
|
||||||
|
|
||||||
|
// CollectOne executes a read query and collects exactly one row into T.
|
||||||
|
// Returns pgx.ErrNoRows if no rows are returned.
|
||||||
|
func CollectOne[T any](ctx context.Context, c *Cluster, sql string, args ...any) (T, error) {
|
||||||
|
rows, err := c.ReadQuery(ctx, sql, args...)
|
||||||
|
if err != nil {
|
||||||
|
var zero T
|
||||||
|
return zero, err
|
||||||
|
}
|
||||||
|
return pgx.CollectExactlyOneRow(rows, pgx.RowToStructByName[T])
|
||||||
|
}
|
||||||
81
scan_test.go
Normal file
81
scan_test.go
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
package dbx_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.codelab.vc/pkg/dbx"
|
||||||
|
"git.codelab.vc/pkg/dbx/dbxtest"
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
)
|
||||||
|
|
||||||
|
type scanRow struct {
|
||||||
|
ID int `db:"id"`
|
||||||
|
Name string `db:"name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCollect(t *testing.T) {
|
||||||
|
c := dbxtest.NewTestCluster(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, err := c.Exec(ctx, `CREATE TEMPORARY TABLE test_collect (id int, name text)`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = c.Exec(ctx, `INSERT INTO test_collect (id, name) VALUES (1, 'alice'), (2, 'bob')`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := dbx.Collect[scanRow](ctx, c, `SELECT id, name FROM test_collect ORDER BY id`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(rows) != 2 {
|
||||||
|
t.Fatalf("expected 2 rows, got %d", len(rows))
|
||||||
|
}
|
||||||
|
if rows[0].ID != 1 || rows[0].Name != "alice" {
|
||||||
|
t.Errorf("row 0: got %+v", rows[0])
|
||||||
|
}
|
||||||
|
if rows[1].ID != 2 || rows[1].Name != "bob" {
|
||||||
|
t.Errorf("row 1: got %+v", rows[1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCollectOne(t *testing.T) {
|
||||||
|
c := dbxtest.NewTestCluster(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, err := c.Exec(ctx, `CREATE TEMPORARY TABLE test_collect_one (id int, name text)`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = c.Exec(ctx, `INSERT INTO test_collect_one (id, name) VALUES (1, 'alice')`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
row, err := dbx.CollectOne[scanRow](ctx, c, `SELECT id, name FROM test_collect_one WHERE id = 1`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if row.ID != 1 || row.Name != "alice" {
|
||||||
|
t.Errorf("got %+v", row)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCollectOneNoRows(t *testing.T) {
|
||||||
|
c := dbxtest.NewTestCluster(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, err := c.Exec(ctx, `CREATE TEMPORARY TABLE test_collect_norows (id int, name text)`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = dbx.CollectOne[scanRow](ctx, c, `SELECT id, name FROM test_collect_norows`)
|
||||||
|
if !errors.Is(err, pgx.ErrNoRows) {
|
||||||
|
t.Errorf("expected pgx.ErrNoRows, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
38
slog.go
Normal file
38
slog.go
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
package dbx
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SlogLogger adapts *slog.Logger to the dbx.Logger interface.
|
||||||
|
type SlogLogger struct {
|
||||||
|
Logger *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSlogLogger creates a SlogLogger. If l is nil, slog.Default() is used.
|
||||||
|
func NewSlogLogger(l *slog.Logger) *SlogLogger {
|
||||||
|
if l == nil {
|
||||||
|
l = slog.Default()
|
||||||
|
}
|
||||||
|
return &SlogLogger{Logger: l}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SlogLogger) Debug(ctx context.Context, msg string, fields ...any) {
|
||||||
|
s.Logger.DebugContext(ctx, msg, fields...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SlogLogger) Info(ctx context.Context, msg string, fields ...any) {
|
||||||
|
s.Logger.InfoContext(ctx, msg, fields...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SlogLogger) Warn(ctx context.Context, msg string, fields ...any) {
|
||||||
|
s.Logger.WarnContext(ctx, msg, fields...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SlogLogger) Error(ctx context.Context, msg string, fields ...any) {
|
||||||
|
s.Logger.ErrorContext(ctx, msg, fields...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compile-time check.
|
||||||
|
var _ Logger = (*SlogLogger)(nil)
|
||||||
52
slog_test.go
Normal file
52
slog_test.go
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package dbx
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSlogLogger(t *testing.T) {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
h := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})
|
||||||
|
l := NewSlogLogger(slog.New(h))
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
l.Debug(ctx, "debug msg", "key", "val1")
|
||||||
|
l.Info(ctx, "info msg", "key", "val2")
|
||||||
|
l.Warn(ctx, "warn msg", "key", "val3")
|
||||||
|
l.Error(ctx, "error msg", "key", "val4")
|
||||||
|
|
||||||
|
out := buf.String()
|
||||||
|
|
||||||
|
for _, want := range []string{
|
||||||
|
"level=DEBUG",
|
||||||
|
"debug msg",
|
||||||
|
"key=val1",
|
||||||
|
"level=INFO",
|
||||||
|
"info msg",
|
||||||
|
"key=val2",
|
||||||
|
"level=WARN",
|
||||||
|
"warn msg",
|
||||||
|
"key=val3",
|
||||||
|
"level=ERROR",
|
||||||
|
"error msg",
|
||||||
|
"key=val4",
|
||||||
|
} {
|
||||||
|
if !strings.Contains(out, want) {
|
||||||
|
t.Errorf("output missing %q\ngot: %s", want, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewSlogLoggerNil(t *testing.T) {
|
||||||
|
l := NewSlogLogger(nil)
|
||||||
|
if l.Logger == nil {
|
||||||
|
t.Fatal("expected non-nil logger when passing nil")
|
||||||
|
}
|
||||||
|
// should not panic
|
||||||
|
l.Info(context.Background(), "test")
|
||||||
|
}
|
||||||
42
stats.go
Normal file
42
stats.go
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
package dbx
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PoolStats is an aggregate of pool statistics across all nodes.
|
||||||
|
type PoolStats struct {
|
||||||
|
AcquireCount int64
|
||||||
|
AcquireDuration time.Duration
|
||||||
|
AcquiredConns int32
|
||||||
|
CanceledAcquireCount int64
|
||||||
|
ConstructingConns int32
|
||||||
|
EmptyAcquireCount int64
|
||||||
|
IdleConns int32
|
||||||
|
MaxConns int32
|
||||||
|
TotalConns int32
|
||||||
|
Nodes map[string]*pgxpool.Stat
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stats returns aggregate pool statistics for all nodes in the cluster.
|
||||||
|
func (c *Cluster) Stats() PoolStats {
|
||||||
|
ps := PoolStats{
|
||||||
|
Nodes: make(map[string]*pgxpool.Stat, len(c.all)),
|
||||||
|
}
|
||||||
|
for _, n := range c.all {
|
||||||
|
s := n.pool.Stat()
|
||||||
|
ps.Nodes[n.name] = s
|
||||||
|
ps.AcquireCount += s.AcquireCount()
|
||||||
|
ps.AcquireDuration += s.AcquireDuration()
|
||||||
|
ps.AcquiredConns += s.AcquiredConns()
|
||||||
|
ps.CanceledAcquireCount += s.CanceledAcquireCount()
|
||||||
|
ps.ConstructingConns += s.ConstructingConns()
|
||||||
|
ps.EmptyAcquireCount += s.EmptyAcquireCount()
|
||||||
|
ps.IdleConns += s.IdleConns()
|
||||||
|
ps.MaxConns += s.MaxConns()
|
||||||
|
ps.TotalConns += s.TotalConns()
|
||||||
|
}
|
||||||
|
return ps
|
||||||
|
}
|
||||||
22
stats_test.go
Normal file
22
stats_test.go
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
package dbx_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.codelab.vc/pkg/dbx/dbxtest"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStats(t *testing.T) {
|
||||||
|
c := dbxtest.NewTestCluster(t)
|
||||||
|
|
||||||
|
ps := c.Stats()
|
||||||
|
if ps.Nodes == nil {
|
||||||
|
t.Fatal("Nodes map is nil")
|
||||||
|
}
|
||||||
|
if _, ok := ps.Nodes["test-master"]; !ok {
|
||||||
|
t.Error("expected test-master in Nodes")
|
||||||
|
}
|
||||||
|
if ps.MaxConns <= 0 {
|
||||||
|
t.Errorf("expected MaxConns > 0, got %d", ps.MaxConns)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user