Files
dbx/node.go
Aleksey Shakhmatov 2c9af28548
Some checks failed
CI / test (push) Failing after 13s
Add production features: slog adapter, scan helpers, slow query logging, pool stats, tracer passthrough, test tx isolation
- slog.go: SlogLogger adapts *slog.Logger to dbx.Logger interface
- scan.go: Collect[T] and CollectOne[T] generic helpers using pgx.RowToStructByName
- cluster.go: slow query logging via Config.SlowQueryThreshold (Warn level in queryEnd)
- stats.go: PoolStats with Cluster.Stats() aggregating pool stats across all nodes
- config.go/node.go: NodeConfig.Tracer passthrough for pgx.QueryTracer (OpenTelemetry)
- options.go: WithSlowQueryThreshold and WithTracer functional options
- dbxtest/tx.go: RunInTx runs callback in always-rolled-back transaction for test isolation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-23 00:19:26 +03:00

114 lines
2.8 KiB
Go

package dbx
import (
"context"
"sync/atomic"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)
// Node wraps a pgxpool.Pool with health state and a human-readable name.
type Node struct {
name string
pool *pgxpool.Pool
healthy atomic.Bool
}
// newNode creates a Node from an existing pool.
func newNode(name string, pool *pgxpool.Pool) *Node {
n := &Node{
name: name,
pool: pool,
}
n.healthy.Store(true)
return n
}
// connectNode parses the NodeConfig, creates a pgxpool.Pool, and returns a Node.
func connectNode(ctx context.Context, cfg NodeConfig) (*Node, error) {
poolCfg, err := pgxpool.ParseConfig(cfg.DSN)
if err != nil {
return nil, err
}
applyPoolConfig(poolCfg, cfg.Pool)
if cfg.Tracer != nil {
poolCfg.ConnConfig.Tracer = cfg.Tracer
}
pool, err := pgxpool.NewWithConfig(ctx, poolCfg)
if err != nil {
return nil, err
}
return newNode(cfg.Name, pool), nil
}
func applyPoolConfig(dst *pgxpool.Config, src PoolConfig) {
if src.MaxConns > 0 {
dst.MaxConns = src.MaxConns
}
if src.MinConns > 0 {
dst.MinConns = src.MinConns
}
if src.MaxConnLifetime > 0 {
dst.MaxConnLifetime = src.MaxConnLifetime
}
if src.MaxConnIdleTime > 0 {
dst.MaxConnIdleTime = src.MaxConnIdleTime
}
if src.HealthCheckPeriod > 0 {
dst.HealthCheckPeriod = src.HealthCheckPeriod
}
}
// Name returns the node's human-readable name.
func (n *Node) Name() string { return n.name }
// IsHealthy reports whether the node is considered healthy.
func (n *Node) IsHealthy() bool { return n.healthy.Load() }
// Pool returns the underlying pgxpool.Pool.
func (n *Node) Pool() *pgxpool.Pool { return n.pool }
// --- DB interface implementation ---
func (n *Node) Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) {
return n.pool.Exec(ctx, sql, args...)
}
func (n *Node) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) {
return n.pool.Query(ctx, sql, args...)
}
func (n *Node) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
return n.pool.QueryRow(ctx, sql, args...)
}
func (n *Node) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
return n.pool.SendBatch(ctx, b)
}
func (n *Node) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
return n.pool.CopyFrom(ctx, tableName, columnNames, rowSrc)
}
func (n *Node) Begin(ctx context.Context) (pgx.Tx, error) {
return n.pool.Begin(ctx)
}
func (n *Node) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) {
return n.pool.BeginTx(ctx, txOptions)
}
func (n *Node) Ping(ctx context.Context) error {
return n.pool.Ping(ctx)
}
func (n *Node) Close() {
n.pool.Close()
}
// Compile-time check that *Node implements DB.
var _ DB = (*Node)(nil)