wrds-download

TUI/CLI tool for browsing and downloading WRDS data
Log | Files | Refs | README

db.py (3606B)


      1 """PostgreSQL connection, DSN construction, and metadata queries."""
      2 
      3 from __future__ import annotations
      4 
      5 import os
      6 from dataclasses import dataclass, field
      7 
      8 import psycopg
      9 
     10 
     11 def _getenv(key: str, fallback: str = "") -> str:
     12     return os.environ.get(key) or fallback
     13 
     14 
     15 def dsn_from_env() -> str:
     16     """Build a PostgreSQL connection string from standard PG* environment variables."""
     17     host = _getenv("PGHOST", "wrds-pgdata.wharton.upenn.edu")
     18     port = _getenv("PGPORT", "9737")
     19     user = _getenv("PGUSER")
     20     password = _getenv("PGPASSWORD")
     21     database = _getenv("PGDATABASE", "wrds")
     22 
     23     if not user:
     24         raise RuntimeError("PGUSER not set")
     25 
     26     dsn = f"host={host} port={port} user={user} sslmode=require"
     27     if password:
     28         dsn += f" password={password}"
     29     if database:
     30         dsn += f" dbname={database}"
     31     return dsn
     32 
     33 
     34 def connect() -> psycopg.Connection:
     35     """Return a psycopg connection using DSN from environment variables."""
     36     return psycopg.connect(dsn_from_env())
     37 
     38 
     39 def quote_ident(s: str) -> str:
     40     """Quote a PostgreSQL identifier to prevent SQL injection."""
     41     return '"' + s.replace('"', '""') + '"'
     42 
     43 
     44 def build_query(
     45     schema: str,
     46     table: str,
     47     columns: str = "*",
     48     where: str = "",
     49     limit: int = 0,
     50 ) -> str:
     51     """Build a SELECT query with quoted identifiers."""
     52     if columns and columns != "*":
     53         parts = [quote_ident(c.strip()) for c in columns.split(",")]
     54         sel = ", ".join(parts)
     55     else:
     56         sel = "*"
     57 
     58     q = f"SELECT {sel} FROM {quote_ident(schema)}.{quote_ident(table)}"
     59     if where:
     60         q += f" WHERE {where}"
     61     if limit > 0:
     62         q += f" LIMIT {limit}"
     63     return q
     64 
     65 
     66 @dataclass
     67 class ColumnMeta:
     68     name: str
     69     data_type: str
     70     nullable: bool
     71     description: str
     72 
     73 
     74 @dataclass
     75 class TableMeta:
     76     schema: str
     77     table: str
     78     comment: str = ""
     79     row_count: int = 0
     80     size: str = ""
     81     columns: list[ColumnMeta] = field(default_factory=list)
     82 
     83 
     84 def table_meta(conn: psycopg.Connection, schema: str, table: str) -> TableMeta:
     85     """Fetch catalog metadata for a table (no data scan)."""
     86     meta = TableMeta(schema=schema, table=table)
     87 
     88     # Table-level stats (best effort).
     89     with conn.cursor() as cur:
     90         cur.execute(
     91             """
     92             SELECT c.reltuples::bigint,
     93                    COALESCE(pg_size_pretty(pg_total_relation_size(c.oid)), ''),
     94                    COALESCE(obj_description(c.oid), '')
     95             FROM pg_class c
     96             JOIN pg_namespace n ON n.oid = c.relnamespace
     97             WHERE n.nspname = %s AND c.relname = %s
     98             """,
     99             (schema, table),
    100         )
    101         row = cur.fetchone()
    102         if row:
    103             meta.row_count, meta.size, meta.comment = row
    104 
    105     # Column metadata with descriptions from pg_description.
    106     with conn.cursor() as cur:
    107         cur.execute(
    108             """
    109             SELECT a.attname,
    110                    pg_catalog.format_type(a.atttypid, a.atttypmod),
    111                    NOT a.attnotnull,
    112                    COALESCE(d.description, '')
    113             FROM pg_attribute a
    114             JOIN pg_class c ON a.attrelid = c.oid
    115             JOIN pg_namespace n ON c.relnamespace = n.oid
    116             LEFT JOIN pg_description d ON d.objoid = c.oid AND d.objsubid = a.attnum
    117             WHERE n.nspname = %s AND c.relname = %s
    118               AND a.attnum > 0 AND NOT a.attisdropped
    119             ORDER BY a.attnum
    120             """,
    121             (schema, table),
    122         )
    123         for name, dtype, nullable, desc in cur:
    124             meta.columns.append(ColumnMeta(name, dtype, nullable, desc))
    125 
    126     return meta