Source code for ddpc.io.dos

"""Read data from output files."""

import sys
from json import load
from pathlib import Path

import h5py
import numpy as np
import polars as pl
from loguru import logger

from ddpc.io.utils import (
    _get_ao_spin,
    _inplace_update_data,
    _split_atomindex_orbital,
    absf,
    get_h5_str,
)


[docs] @logger.catch def read_dos( p: str | Path, mode: int = 5, ) -> tuple[pl.DataFrame, float, bool]: """Read and process electronic density of states data from HDF5 or JSON files. This function provides a unified interface for reading density of states (DOS) data from DFT calculations, supporting both total and projected DOS with automatic format detection and data formatting. Parameters ---------- p : str or pathlib.Path Path to the DOS data file. Supported formats are HDF5 (.h5) and JSON (.json) files from DFT calculations. mode : int, default 5 Projection mode for projected density of states data. Only relevant when the file contains orbital-projected information. Different modes correspond to different orbital groupings: 1: s/p/d/f orbitals 2: detailed orbitals (px, py, pz, etc.) 3: by element 4: atom + s/p/d/f orbitals 5: atom + detailed orbitals 6: atom + t2g/eg orbitals 7: atom-projected (sum all orbitals per atom) Returns ------- tuple of (polars.DataFrame, float, bool) - DataFrame containing DOS data with energy points and DOS values - Fermi energy in eV - Boolean indicating whether the data contains orbital projections Raises ------ TypeError If the input file is neither HDF5 nor JSON format. Notes ----- The function automatically detects file format based on extension and processes the data accordingly. For projected DOS, the mode parameter determines which orbital contributions are included in the output. The DataFrame includes energy points and DOS values, with spin-polarized calculations having separate up/down columns. """ absfile = str(absf(p)) if absfile.endswith(".h5"): df, efermi, isproj = read_dos_h5(absfile, mode) elif absfile.endswith(".json"): df, efermi, isproj = read_dos_json(absfile, mode) else: raise TypeError(f"{absfile} must be h5 or json file!") return df, efermi, isproj
[docs] @logger.catch def read_dos_h5(absfile: str, mode: int) -> tuple[pl.DataFrame, float, bool]: """Read density of states data from HDF5 file format. Parameters ---------- absfile : str Path to the HDF5 DOS file (typically with .h5 extension). mode : int Projection mode for orbital-projected DOS data. Mode 0 forces total DOS regardless of projection availability. Returns ------- tuple of (polars.DataFrame, float, bool) - DataFrame containing processed DOS data - Fermi energy in eV extracted from the file - Boolean indicating presence of orbital projection data """ with h5py.File(absfile, "r") as dos: dosinfo = dos["DosInfo"] if isinstance(dosinfo, h5py.Group): efermi_list = dosinfo["EFermi"] if isinstance(efermi_list, h5py.Dataset): efermi = efermi_list[0] else: logger.error("cannot read /BandInfo/EFermi") sys.exit(1) proj = dosinfo["Project"] if isinstance(proj, h5py.Dataset): iproj = proj[0] else: logger.error("cannot read /DosInfo/Project") sys.exit(1) if mode == 0: df = read_tdos(dos) elif iproj: df = read_pdos_h5(dos, mode) else: df = read_tdos(dos) else: raise TypeError("h5 file must contain 'DosInfo' group!") return df, efermi, bool(iproj)
[docs] @logger.catch def read_dos_json(absfile: str, mode: int) -> tuple[pl.DataFrame, float, bool]: """Read density of states data from JSON file format. Parameters ---------- absfile : str Path to the JSON DOS file (typically with .json extension). mode : int Projection mode for orbital-projected DOS data. Mode 0 forces total DOS regardless of projection availability. Returns ------- tuple of (polars.DataFrame, float, bool) - DataFrame containing processed DOS data - Fermi energy in eV extracted from the file - Boolean indicating presence of orbital projection data """ with open(absfile, encoding="utf-8") as fin: dos = load(fin) efermi = dos["DosInfo"]["EFermi"] iproj = dos["DosInfo"]["Project"] if mode == 0: df = read_tdos(dos, h5=False) elif iproj: df = read_pdos_json(dos, mode) else: df = read_tdos(dos, h5=False) return df, efermi, bool(iproj)
[docs] @logger.catch def read_tdos(dos: h5py.File | dict, h5: bool = True) -> pl.DataFrame: """Read total (non-projected) density of states data. Parameters ---------- dos : h5py.File or dict DOS data container, either an opened HDF5 file object or a dictionary loaded from JSON. h5 : bool, default True Flag indicating the data source format. True for HDF5, False for JSON. Returns ------- polars.DataFrame DataFrame containing total DOS with energy points and DOS values. """ energies = np.asarray(dos["DosInfo"]["DosEnergy"]) if h5: spin_type = dos["DosInfo"]["SpinType"][0] else: spin_type = dos["DosInfo"]["SpinType"] if spin_type == "collinear": densities = { "energy": energies, "up": np.asarray(dos["DosInfo"]["Spin1"]["Dos"]), "down": np.asarray(dos["DosInfo"]["Spin2"]["Dos"]), } else: densities = { "energy": energies, "dos": np.asarray(dos["DosInfo"]["Spin1"]["Dos"]), } return pl.DataFrame(data=densities)
[docs] @logger.catch def read_pdos_h5(dos: h5py.File, mode: int) -> pl.DataFrame: """Read orbital-projected density of states data from HDF5 file. Parameters ---------- dos : h5py.File Opened HDF5 file object containing projected DOS data. mode : int Projection mode determining which orbital contributions to include. Returns ------- polars.DataFrame DataFrame containing projected DOS with orbital contributions. """ energies: list[float] = dos["/DosInfo/DosEnergy"] data = {} orbitals: list[str] = get_h5_str(dos, "/DosInfo/Orbit") atom_index: int = dos["/DosInfo/Spin1/ProjectDos/AtomIndexs"][0] # 2 orb_index: int = dos["/DosInfo/Spin1/ProjectDos/OrbitIndexs"][0] # 9 if dos["/DosInfo/SpinType"] == "collinear": data.update( { "tdos-up": np.asarray(dos["/DosInfo/Spin1/Dos"]), "tdos-down": np.asarray(dos["/DosInfo/Spin2/Dos"]), }, ) for ai in range(atom_index): for oi in range(orb_index): data.update( { f"{ai + 1}{orbitals[oi]}-up": dos[ f"/DosInfo/Spin1/ProjectDos{ai + 1}/{oi + 1}" ] } ) data.update( { f"{ai + 1}{orbitals[oi]}-down": dos[ f"/DosInfo/Spin2/ProjectDos{ai + 1}/{oi + 1}" ] } ) else: data.update( {"tdos": np.asarray(dos["/DosInfo/Spin1/Dos"])}, ) for ai in range(atom_index): for oi in range(orb_index): data.update( {f"{ai + 1}{orbitals[oi]}": dos[f"/DosInfo/Spin1/ProjectDos{ai + 1}/{oi + 1}"]} ) if mode == 3: elements: list[str] = get_h5_str(dos, "/AtomInfo/Elements") else: elements = [] _data = _refactor_dos(energies, data, mode, elements) return pl.DataFrame(_data)
[docs] @logger.catch def read_pdos_json(dos: dict, mode: int) -> pl.DataFrame: """Read orbital-projected density of states data from JSON file. Parameters ---------- dos : dict Dictionary containing projected DOS data loaded from JSON. mode : int Projection mode determining which orbital contributions to include. Returns ------- polars.DataFrame DataFrame containing projected DOS with orbital contributions. """ energies: list[float] = dos["DosInfo"]["DosEnergy"] data = {} orbitals: list[str] = dos["DosInfo"]["Orbit"] if dos["DosInfo"]["SpinType"] == "collinear": data.update( { "tdos-up": dos["DosInfo"]["Spin1"]["Dos"], "tdos-down": dos["DosInfo"]["Spin2"]["Dos"], }, ) project = dos["DosInfo"]["Spin1"]["ProjectDos"] for p in project: atom_index = p["AtomIndex"] orb_index = p["OrbitIndex"] - 1 contrib = p["Contribution"] data.update({f"{atom_index}{orbitals[orb_index]}-up": contrib}) project = dos["DosInfo"]["Spin2"]["ProjectDos"] for p in project: atom_index = p["AtomIndex"] orb_index = p["OrbitIndex"] - 1 contrib = p["Contribution"] data.update({f"{atom_index}{orbitals[orb_index]}-down": contrib}) else: data.update( {"tdos": dos["DosInfo"]["Spin1"]["Dos"]}, ) project = dos["DosInfo"]["Spin1"]["ProjectDos"] for p in project: atom_index = p["AtomIndex"] orb_index = p["OrbitIndex"] - 1 contrib = p["Contribution"] data.update({f"{atom_index}{orbitals[orb_index]}": contrib}) if mode == 3: elements: list[str] = [atom["Element"] for atom in dos["AtomInfo"]["Atoms"]] else: elements = [] _data = _refactor_dos(energies, data, mode, elements) return pl.DataFrame(_data)
@logger.catch def _dos_spdf(data: dict, energies: np.ndarray) -> dict: _data = {"energy": energies} for k, v in data.items(): if k.startswith("tdos"): # do not process total dos _data[k] = v continue ao, updown = _get_ao_spin(k) _, o = _split_atomindex_orbital(ao) if updown: key = f"{o[0]}-{updown}" else: key = f"{o[0]}" _inplace_update_data(_data, key, v) return _data @logger.catch def _dos_spxpy(data: dict, energies: np.ndarray) -> dict: _data = {"energy": energies} for k, v in data.items(): if k.startswith("tdos"): # do not process total dos _data[k] = v continue ao, updown = _get_ao_spin(k) _, o = _split_atomindex_orbital(ao) if updown: key = f"{o}-{updown}" else: key = f"{o}" _inplace_update_data(_data, key, v) return _data @logger.catch def _dos_element(data: dict, elements: list[str] | None, energies: np.ndarray) -> dict: if not elements: raise ValueError(f"{elements=}") _data = {"energy": energies} for k, v in data.items(): if k.startswith("tdos"): # do not process total dos _data[k] = v continue ao, updown = _get_ao_spin(k) a, _ = _split_atomindex_orbital(ao) if updown: key = f"{elements[a - 1]}-{updown}" else: key = f"{elements[a - 1]}" _inplace_update_data(_data, key, v) return _data @logger.catch def _dos_atomspdf(data: dict, energies: np.ndarray) -> dict: _data = {"energy": energies} for k, v in data.items(): if k.startswith("tdos"): # do not process total dos _data[k] = v continue ao, updown = _get_ao_spin(k) a, o = _split_atomindex_orbital(ao) if updown: key = f"{a}{o[0]}-{updown}" else: key = f"{a}{o[0]}" _inplace_update_data(_data, key, v) return _data @logger.catch def _dos_atomt2geg(data: dict, energies: np.ndarray) -> dict: _data = {"energy": energies} for k, v in data.items(): if k.startswith("tdos"): # do not process total dos _data[k] = v continue ao, updown = _get_ao_spin(k) a, o = _split_atomindex_orbital(ao) if o in ["dxy", "dxz", "dyz"]: if updown: key = f"{a}t2g-{updown}" else: key = f"{a}t2g" _inplace_update_data(_data, key, v) elif o in ["dz2", "dx2y2"]: if updown: key = f"{a}eg-{updown}" else: key = f"{a}eg" _inplace_update_data(_data, key, v) return _data @logger.catch def _dos_atom(data: dict, energies: np.ndarray) -> dict: """Sum all orbitals for each atom (atom-projected DOS).""" _data = {"energy": energies} for k, v in data.items(): if k.startswith("tdos"): # do not process total dos _data[k] = v continue ao, updown = _get_ao_spin(k) a, _ = _split_atomindex_orbital(ao) if updown: key = f"{a}-{updown}" else: key = f"{a}" _inplace_update_data(_data, key, v) return _data @logger.catch def _refactor_dos( energies: list | np.ndarray, data: dict, mode: int, elements: list[str] | None = None ) -> dict: energies = np.asarray(energies) if mode == 1: # spdf _data = _dos_spdf(data, energies) elif mode == 2: # spxpy... _data = _dos_spxpy(data, energies) elif mode == 3: # element _data = _dos_element(data, elements, energies) elif mode == 4: # atom+spdf _data = _dos_atomspdf(data, energies) elif mode == 5: # atom+spxpy... _data = {"energy": energies} | {key: dataset[:] for key, dataset in data.items()} elif mode == 6: # atom+t2g/eg _data = _dos_atomt2geg(data, energies) elif mode == 7: # atom-projected (sum all orbitals per atom) _data = _dos_atom(data, energies) else: print(f"{mode=} not supported yet") raise RuntimeError(f"Unsupported mode: {mode}") return _data