Source code for aiida_phonopy.parsers.phonopy

# -*- coding: utf-8 -*-
"""Parsers of `PhonopyCalculation` output files."""
from __future__ import annotations

import os
import traceback

from aiida import orm
import h5py
import numpy as np
import yaml

from aiida_phonopy.calculations.phonopy import PhonopyCalculation
from aiida_phonopy.utils.mapping import _uppercase_dict, get_logging_container

from .base import Parser


[docs]def file_opener(folder_path: str, filename: str): """Return a function to open different expected output format.""" filename_path = os.path.join(folder_path, filename) if filename.endswith('hdf5'): return h5py.File(filename_path, 'r') return open(filename_path, mode='r', encoding='utf-8')
[docs]class PhonopyParser(Parser): """Parser the files produced by a phonopy post processing calculation."""
[docs] def parse(self, **kwargs): """Parse retrieved files from remote folder.""" retrieved = self.retrieved retrieve_temporary_list = self.node.base.attributes.get('retrieve_temporary_list', None) # If temporary files were specified, check that we have them if retrieve_temporary_list: try: retrieved_temporary_folder = kwargs['retrieved_temporary_folder'] except KeyError: return self.exit(self.exit_codes.ERROR_NO_RETRIEVED_TEMPORARY_FOLDER) # !FIXME! # We should implement `filenames` depending whether they were # put in the retrieved or in the temporary folder. filenames = os.listdir(retrieved_temporary_folder) # Parse the stdout parsed_stdout, logs_stdout, exit_code_stdout = self.parse_stdout() self.emit_logs(logs_stdout) if exit_code_stdout: return self.exit(exit_code_stdout) # Parse phonopy.yaml try: filename = PhonopyCalculation._DEFAULT_PHONOPY_FILE filenames.remove(filename) with file_opener(retrieved_temporary_folder, filename) as file: try: parsed_phonopy_yaml = yaml.safe_load(file) except yaml.YAMLError: return self.exit(self.exit_codes.ERROR_OUTPUT_YAML_LOAD) except ValueError: try: with retrieved.base.repository.open(filename) as file: try: parsed_phonopy_yaml = yaml.safe_load(file) except yaml.YAMLError: return self.exit(self.exit_codes.ERROR_OUTPUT_YAML_LOAD) except IOError: # This should in principle never happen! return self.exit(self.exit_codes.ERROR_OUTPUT_PHONOPY_MISSING) # Output parameters phonopy_yaml_keys = ['phonopy', 'physical_unit', 'space_group'] parsed_parameters = {key: value for key, value in parsed_phonopy_yaml.items() if key in phonopy_yaml_keys} parsed_parameters.update(parsed_stdout) self.out('output_parameters', orm.Dict(parsed_parameters)) # We use the input `parameters` to check the expected retrieved files in folder. expected_filenames_keys = self.get_expected_filenames_keys() parsers = { 'fc': self.parse_force_constants, 'dos': self.parse_total_dos, 'pdos': self.parse_projected_dos, 'band': self.parse_band_structure, 'mesh': self.parse_qpoints, # to do a proper one 'qpoints': self.parse_qpoints, # to do a proper one 'irreps': self.parse_yaml, # to do a proper one 'tprop': self.parse_thermal_properties, 'tdisp': self.parse_yaml, # to do a proper one 'tdispmat': self.parse_yaml, # to do a proper one } missing_files = [] # Loop over every expected output file, parse it and expose its outputs. # If one or more are missing, we put it in the `missing_files` and raise error afterwards. for expected_key in expected_filenames_keys: try: filename, filout = PhonopyCalculation._OUTPUTS[expected_key] filenames.remove(filename) if expected_key == 'fc': fc_output = self.parse_force_constants(os.path.join(retrieved_temporary_folder, filename)) self.out(filout, fc_output) else: with file_opener(retrieved_temporary_folder, filename) as file: self.out(filout, parsers[expected_key](file)) except (FileNotFoundError, ValueError): missing_files.append(filename) if missing_files: return self.exit(self.exit_codes.ERROR_OUTPUT_FILES_MISSING) # We don't parse animation files. for filename in filenames: if filename.endswith(('jmol', 'xyz', 'xyz_jmol', 'arc', 'ascii')) or filename.startswith('APOSCAR-*'): filenames.remove(filename)
# !FIXME! # Still to work out better. Good chances are that is not necessary... # # If something is still present and it has not been parsed, we raise error. # if filenames: # self.exit_codes.ERROR_NOT_IMPLEMENTED_PARSER
[docs] def parse_stdout(self) -> tuple: """Parse the stdout output file. :param parameters: the input parameters dictionary :param parsed_phonopy: the collected parsed data from the yaml phonopy output :return: raw parsed data """ from .raw_parsers.phonopy import parse_stdout as parse_stdout_ parsed_data = {} exit_code_stdout = None filename_stdout = self.node.get_option('output_filename') logs = get_logging_container() # datastructure for logs if filename_stdout not in self.retrieved.base.repository.list_object_names(): exit_code_stdout = self.exit_codes.ERROR_OUTPUT_STDOUT_MISSING return parsed_data, logs, exit_code_stdout try: stdout = self.retrieved.base.repository.get_object_content(filename_stdout) except (IOError, OSError): exit_code_stdout = self.exit_codes.ERROR_OUTPUT_STDOUT_READ return parsed_data, logs, exit_code_stdout try: # Check for job completion, indicating that phonopy exited without interruption, even if there was an error. parsed_data, logs = parse_stdout_(stdout) except ValueError: logs.critical.append(traceback.format_exc()) exit_code_stdout = self.exit_codes.ERROR_UNEXPECTED_PARSER_EXCEPTION # If the stdout was incomplete, most likely the job was interrupted before it could cleanly finish, so the # output files are most likely corrupt and cannot be restarted if 'ERROR_OUTPUT_STDOUT_INCOMPLETE' in logs['error']: exit_code_stdout = self.exit_codes.ERROR_OUTPUT_STDOUT_INCOMPLETE if 'ERROR_BAD_INPUTS' in logs['error']: parameters = _uppercase_dict(self.node.inputs.parameters.get_dict(), dict_name='parameters') if 'FORCE_CONSTANTS' in parameters: if str.upper(parameters['FORCE_CONSTANTS']) != 'WRITE': exit_code_stdout = self.exit_codes.ERROR_BAD_INPUTS return parsed_data, logs, exit_code_stdout
[docs] def get_expected_filenames_keys(self) -> set: """Return the retrieve file keys (that map to the filenames outputs) depending on the tags in `parameters`.""" retrieved_set = set() parameters = _uppercase_dict(self.node.inputs.parameters.get_dict(), dict_name='parameters') maps = { 'band': {'BAND', 'BAND_PATHS', 'BAND_POINTS', 'BAND_LABELS', 'BAND_CONNECTION', 'BAND_INDICES'}, 'mesh': {'MESH', 'MP', 'MESH_NUMBERS', 'MP_SHIFT', 'GAMMA_CENTER', 'WRITE_MESH'}, 'dos': {'DOS'}, 'pdos': {'PDOS'}, 'tprop': {'TPROP'}, 'tdisp': {'TDISP'}, 'tdispmat': {'TDISPMAT'}, 'qpoints': {'QPOINTS', 'WRITEDM'}, 'fc': {'WRITE_FORCE_CONSTANTS', 'FORCE_CONSTANTS'}, 'mod': {'MODULATION'}, 'irreps': {'IRREPS', 'SHOW_IRREPS', 'LITTLE_COGROUP'} } for tag, value in parameters.items(): for key_map, tag_map in maps.items(): if tag in tag_map: # Usually these tags are set only if one needs them, i.e. to TRUE, # thus this further control is actually pointless. Nevertheless, # we want to avoid to raise error when in inputs one of these # tags is set to FALSE, meaning there will be no output to parse. if tag.startswith(('WRITE', 'DOS', 'TPROP', 'TDISP', 'TDISPMAT')): if value: retrieved_set.add(key_map) else: if tag != 'FORCE_CONSTANTS': retrieved_set.add(key_map) elif str.upper(value) == 'WRITE': retrieved_set.add(key_map) # Double check on mesh. The writing tag must have priority. if 'WRITE_MESH' in parameters.keys(): if not parameters['WRITE_MESH']: retrieved_set.remove('mesh') return retrieved_set
[docs] def parse_force_constants(self, filepath: str) -> orm.ArrayData: """Parse the `force_constants.hdf5` output file.""" from phonopy.file_IO import read_force_constants_hdf5 p2s_map = self._get_p2s_map() force_constants = read_force_constants_hdf5(filename=filepath, p2s_map=p2s_map) fc_array = orm.ArrayData() fc_array.set_array('force_constants', force_constants) fc_array.label = 'force_constants' return fc_array
[docs] def load_with_numpy(self, file: str) -> np.ndarray: """Load a txt file using numpy.""" try: data = np.loadtxt(file) except ValueError: self.exit(self.exit_codes.ERROR_OUTPUT_NUMPY_LOAD) return data
[docs] def load_with_yaml(self, file: str) -> dict: """Load a yaml file using.""" try: data = yaml.safe_load(file) except yaml.YAMLError: self.exit(self.exit_codes.ERROR_OUTPUT_YAML_LOAD) return data
[docs] def parse_yaml(self, file: str) -> orm.Dict: """Parse a `.yaml` file and return it as a Dict.""" data = self.load_with_yaml(file=file) return orm.Dict(data)
[docs] def parse_total_dos(self, file: str) -> orm.XyData: """Parse `total_dos.dat` output file.""" data = self.load_with_numpy(file=file) total_dos = {'frequency_points': data[:, 0], 'total_dos': data[:, 1]} dos = orm.XyData() dos.set_x(total_dos['frequency_points'], 'Frequency', 'THz') dos.set_y(total_dos['total_dos'], 'DOS', '1/THz') dos.label = 'Total DOS' return dos
[docs] def parse_projected_dos(self, file: str) -> orm.XyData: """Parse `projected_dos.dat` output file.""" data = self.load_with_numpy(file=file) projected_dos = {'frequency_points': data[:, 0], 'projected_dos': data[:, 1:].T} pdos = orm.XyData() pdos_list = list(projected_dos['projected_dos']) pdos.set_x(projected_dos['frequency_points'], 'Frequency', 'THz') pdos.set_y( pdos_list, [ 'Projected DOS', ] * len(pdos_list), [ '1/THz', ] * len(pdos_list), ) pdos.label = 'Projected DOS' return pdos
[docs] def parse_thermal_properties(self, file: str) -> orm.XyData: """Parse the `thermal_properties.yaml`` output file.""" data = self.load_with_yaml(file=file) thermal_properties = { 'temperatures': [], 'free_energy': [], 'entropy': [], 'heat_capacity': [], } for tp in data['thermal_properties']: thermal_properties['temperatures'].append(tp['temperature']) thermal_properties['entropy'].append(tp['entropy']) thermal_properties['free_energy'].append(tp['free_energy']) thermal_properties['heat_capacity'].append(tp['heat_capacity']) old_thermal_property = thermal_properties.copy() for key, item in old_thermal_property.items(): thermal_properties[key] = np.array(item) tprops = orm.XyData() tprops.set_x(thermal_properties['temperatures'], 'Temperature', 'K') tprops.set_y( [ thermal_properties['free_energy'], thermal_properties['entropy'], thermal_properties['heat_capacity'], ], ['Helmholtz free energy', 'Entropy', 'Cv'], ['kJ/mol', 'J/K/mol', 'J/K/mol'], ) tprops.label = 'Thermal properties' return tprops
[docs] def parse_band_structure(self, file: str, freqs_units: str = 'THz') -> orm.BandsData: """Parse the `band.hdf5`` output file. Expected keys are: * **nqpoint**: array with total number of q-points in the band structure, array(1,) * **frequency**: array of frequencies at each q-point; array(npath, segment_nqpoint, nband) * **label**: array of labels, two per each path; array(npath, 2) * **path**: number of q-points per path; array(npath, segment_nqpoint, qpoint/qposition(3,)) * **distance**: distance between consecutive q-points in a segment of path; array(npath, segment_nqpoint) * **segment_nqpoint**: number of q-points per segment of the entire path; array(npath,) * **eigenvector**: (optional) eigenvector of each phonon mode, i.e. each frequency of each q-point; array(npath, segment_nqpoint, nband, eigenvector(6,)) * **group_velocity**: (optional) group velocity of each phonon mode (as for eigenvector); array(npath, segment_nqpoint, nband, group_velocity(3,)) """ import re # Loading the data data = {key: file[key][:] for key in file.keys()} # Initializing the bands object and setting the reciprocal lattice from the unitcell. if 'phonopy_data' in self.node.inputs: structure = self.node.inputs['phonopy_data'].get_unitcell() else: structure = self.node.inputs['force_constants'].get_unitcell() band_structure = orm.BandsData() band_structure.set_cell_from_structure(structuredata=structure) # Extracting the frequencies and qpoints. frequencies = [] qpoints = [] for freqs_per_path, qpoint_per_path in zip(data['frequency'], data['path']): for freqs_per_qpoint in freqs_per_path: frequencies.append(freqs_per_qpoint) for qpoint in qpoint_per_path: qpoints.append(qpoint) # Extracting the labels along with the index of the qpoint. labels = [] nqpoint = [0, -1] for label_pair, segment in zip( data['label'], data['segment_nqpoint'] ): # they are saved in pairs, two per segment nqpoint[1] += segment for i, label in enumerate(label_pair): try: # the labels are saved in dtype |S16 --> e.g. b'$\\mathrm{X}$' symbol = re.search(r'\{(\w+)\}', str(label)).group(1) except AttributeError: # it means we hit Gamma if label == b'$\\Gamma$': symbol = r'$\Gamma$' else: symbol = '?' labels.append([nqpoint[i], symbol]) nqpoint[0] += segment # Here I remove the symilar symbols if they are consecutive. # The sorting could be smarter, but it works. final_labels = [] for nq, pair in enumerate(labels): if not nq == 0: if not pair[1] == labels[nq - 1][1]: final_labels.append(pair) else: final_labels.append(pair) # Setting the final bands structure data. band_structure.set_kpoints(np.array(qpoints)) band_structure.set_bands(np.array(frequencies), units=freqs_units) band_structure.labels = final_labels return band_structure
[docs] def parse_qpoints(self, file: str, freqs_units: str = 'THz') -> orm.BandsData: """Parse the `mesh.hdf5`` and `qpoints.hdf5`` output files. Expected keys are: * **frequency**: array of frequencies at each q-point; array(nqpoint, nband) * **mesh**: qpoint mesh; array(3,) * **qpoint**: qpoints; array(nqpoint, 3) * **weight**: weight of qpoints; array(nqpoint,) * **eigenvector**: (optional) eigenvector of each phonon mode, i.e. each frequency of each q-point; array(npath, segment_nqpoint, nband, eigenvector(6,)) * **group_velocity**: (optional) group velocity of each phonon mode (as for eigenvector); array(npath, segment_nqpoint, nband, group_velocity(3,)) """ # Loading the data data = {key: file[key][:] for key in file.keys()} # Initializing the bands object and setting the reciprocal lattice from the unitcell. if 'phonopy_data' in self.node.inputs: structure = self.node.inputs['phonopy_data'].get_unitcell() else: structure = self.node.inputs['force_constants'].get_unitcell() band_structure = orm.BandsData() band_structure.set_cell_from_structure(structuredata=structure) qpoints = np.array(data['qpoint']) frequencies = np.array(data['frequency']) if file.filename.endswith('qpoints.hdf5'): weights = None else: weights = np.array(data['weight'].astype(float)) # Setting the final bands structure data. band_structure.set_kpoints(qpoints, weights=weights) band_structure.set_bands(frequencies, units=freqs_units) return band_structure
[docs] def _get_p2s_map(self): """Get the primitive to supercell map.""" if 'force_constants' in self.node.inputs: return self.node.inputs.force_constants.get_cells_mappings()['primitive']['p2s_map'] if 'phonopy_data' in self.node.inputs: return self.node.inputs.phonopy_data.get_cells_mappings()['primitive']['p2s_map'] return None