Source code for slugpy.read_cluster_spec

"""
Function to read a SLUG2 cluster_spec file.
"""

import numpy as np
from collections import namedtuple
import struct
import re
from .slug_open import slug_open

[docs]def read_cluster_spec(model_name, output_dir=None, fmt=None, verbose=False, read_info=None): """ Function to read a SLUG2 cluster_spec file. Parameters model_name : string The name of the model to be read output_dir : string The directory where the SLUG2 output is located; if set to None, the current directory is searched, followed by the SLUG_DIR directory if that environment variable is set fmt : 'txt' | 'ascii' | 'bin' | 'binary' | 'fits' | 'fits2' Format for the file to be read. If one of these is set, the function will only attempt to open ASCII-('txt' or 'ascii'), binary ('bin' or 'binary'), or FITS ('fits' or 'fits2') formatted output, ending in .txt., .bin, or .fits, respectively. If set to None, the code will try to open ASCII files first, then if it fails try binary files, and if it fails again try FITS files. verbose : bool If True, verbose output is printed as code runs read_info : dict On return, this dict will contain the keys 'fname' and 'format', giving the name of the file read and the format it was in; 'format' will be one of 'ascii', 'binary', or 'fits' Returns A namedtuple containing the following fields: id : array, dtype uint unique ID of cluster trial: array, dtype uint which trial was this cluster part of time : array times at which cluster spectra are output, in yr wl : array wavelength, in Angstrom spec : array, shape (N_cluster, N_wavelength) specific luminosity of each cluster at each wavelength, in erg/s/A wl_neb : array wavelength for the nebular spectrum, in Angstrom (present only if SLUG was run with nebular emission enabled) spec_neb : array, shape (N_cluster, N_wavelength) specific luminosity at each wavelength and each time for each trial, including emission and absorption by the HII region, in erg/s/A (present only if SLUG was run with nebular emission enabled) wl_ex : array wavelength for the extincted spectrum, in Angstrom (present only if SLUG was run with extinction enabled) spec_ex : array, shape (N_cluster, N_wavelength) specific luminosity at each wavelength in wl_ex and each time for each trial after extinction has been applied, in erg/s/A (present only if SLUG was run with extinction enabled) wl_neb_ex : array wavelength for the extincted spectrum with nebular emission, in Angstrom (present only if SLUG was run with both nebular emission and extinction enabled) spec_neb_ex : array, shape (N_cluster, N_wavelength) specific luminosity at each wavelength in wl_ex and each time for each trial including emission and absorption by the HII region, after extinction has been applied, in erg/s/A (present only if SLUG was run with nebular emission and extinction both enabled) Raises IOError, if no spectrum file can be opened """ # Open file fp, fname = slug_open(model_name+"_cluster_spec", output_dir=output_dir, fmt=fmt) # See if this file is a checkpoint file if len(re.findall('_chk\d\d\d\d', model_name)) != 0: checkpoint = True else: checkpoint = False # Print status if verbose: print("Reading cluster spectra for model "+model_name) if read_info is not None: read_info['fname'] = fname # Prepare storage cluster_id = [] time = [] trial = [] wavelength = [] L_lambda = [] rectified = False # Default is no rectified spectrum # Read ASCII or binary if fname.endswith('.txt'): # ASCII mode if read_info is not None: read_info['format'] = 'ascii' # If this is a checkpoint file, skip the line stating how many # trials it contains; this line is not guaranteed to be # accurate, and is intended for the C++ code, not for us if checkpoint: fp.readline() # Read the first header line hdr = fp.readline() # See if we have extinction hdrsplit = hdr.split() if 'L_lambda_ex' in hdrsplit: extinct = True wl_ex = [] L_lambda_ex = [] else: extinct = False # See if we have nebular emission if 'L_l_neb' in hdrsplit: nebular = True L_lambda_neb = [] nebcol = hdrsplit.index('L_l_neb') if extinct: L_lambda_neb_ex = [] nebexcol = hdrsplit.index('L_l_neb_ex') else: nebular = False # Burn the new two header lines fp.readline() fp.readline() # Read first line and store cluster data trialptr = 0 entry = fp.readline() data = entry.split() cluster_id.append(int(data[0])) time.append(float(data[1])) wavelength.append(float(data[2])) L_lambda.append(float(data[3])) if nebular: L_lambda_neb.append(float(data[4])) if extinct and len(data) > 4 + nebular: L_lambda_ex.append(float(data[4+nebular])) if nebular: L_lambda_neb_ex.append(float(data[6])) if len(data) > 4+nebular: wl_ex.append(float(data[2])) trial.append(trialptr) # Read the rest of the data for first cluster while True: entry = fp.readline() # Check for EOF and separator lines if entry == '': break if entry[:3] == '---': trialptr = trialptr+1 break # Split up data data = entry.split() L_lambda.append(float(data[3])) id_tmp = int(data[0]) time_tmp = float(data[1]) if nebular: L_lambda_neb.append(float(data[4])) if extinct and len(data) > 4 + nebular: L_lambda_ex.append(float(data[4+nebular])) if nebular: L_lambda_neb_ex.append(float(data[6])) # Stop when we find a different cluster or a different time if id_tmp != cluster_id[0] or time_tmp != time[0]: break # Still the same cluster, so append to wavelength list wavelength.append(float(data[2])) if extinct and len(data) > 4+nebular: wl_ex.append(float(data[2])) # We have now read one full chunk, so we know how many # wavelength entries per cluster there are nl = len(wavelength) # Start of next chunk ptr = 1 # Now read through rest of file while True: # Read a line entry = fp.readline() if entry == '': break if entry[:3] == '---': trialptr = trialptr+1 continue data = entry.split() L_lambda.append(float(data[3])) if nebular: L_lambda_neb.append(float(data[4])) if extinct and len(data) > 4+nebular: L_lambda_ex.append(float(data[4+nebular])) if nebular: L_lambda_neb_ex.append(float(data[6])) ptr = ptr+1 # When we get to the end of a chunk, push cluster ID, # time, trial number list, then reset pointer if ptr == nl: cluster_id.append(int(data[0])) time.append(float(data[1])) trial.append(trialptr) ptr = 0 # Convert lists to arrays wavelength = np.array(wavelength) if extinct: wl_ex = np.array(wl_ex) # If we have nebular emission, for ASCII output the nebular # wavelength list is identical to the stellar one if nebular: wl_neb = wavelength if extinct: wl_neb_ex = wl_ex elif fname.endswith('.bin'): # Binary mode if read_info is not None: read_info['format'] = 'binary' # If this is a checkpoint, skip the bytes specifying how many # trials we have; this is inteded for the C++ code, not for # us, since we will determine that on our own if checkpoint: data = fp.read(struct.calcsize('i')) # Read two characters to see if nebular emission and/or # extinction is included in this file or not data = fp.read(struct.calcsize('b')) nebular = struct.unpack('b', data)[0] != 0 data = fp.read(struct.calcsize('b')) extinct = struct.unpack('b', data)[0] != 0 if nebular: L_lambda_neb = [] if extinct: L_lambda_ex = [] if nebular: L_lambda_neb_ex = [] # Read number of wavelengths and wavelength table data = fp.read(struct.calcsize('L')) nl, = struct.unpack('L', data) data = fp.read(struct.calcsize('d')*nl) wavelength = np.array(struct.unpack('d'*nl, data)) if nebular: data = fp.read(struct.calcsize('L')) nl_neb, = struct.unpack('L', data) data = fp.read(struct.calcsize('d')*nl_neb) wl_neb = np.array(struct.unpack('d'*nl_neb, data)) else: nl_neb = 0 if extinct: data = fp.read(struct.calcsize('L')) nl_ex, = struct.unpack('L', data) data = fp.read(struct.calcsize('d')*nl_ex) wl_ex = np.array(struct.unpack('d'*nl_ex, data)) else: nl_ex = 0 if extinct and nebular: data = fp.read(struct.calcsize('L')) nl_neb_ex, = struct.unpack('L', data) data = fp.read(struct.calcsize('d')*nl_neb_ex) wl_neb_ex = np.array(struct.unpack('d'*nl_neb_ex, data)) else: nl_neb_ex = 0 # Go through the rest of the file while True: # Read number of clusters and time in next block, checking # if we've hit eof data = fp.read(struct.calcsize('LdL')) if len(data) < struct.calcsize('LdL'): break trialptr, t, ncluster = struct.unpack('LdL', data) # Skip if no clusters if ncluster == 0: continue # Add to time and trial arrays time.extend([t]*ncluster) trial.extend([trialptr]*ncluster) # Read the next block of clusters data = fp.read(struct.calcsize('L')*ncluster + struct.calcsize('d')*ncluster * (nl + nl_neb + nl_ex + nl_neb_ex)) data_list = struct.unpack(('L'+'d'*(nl+nl_neb+nl_ex+nl_neb_ex)) * ncluster, data) # Pack clusters into data list cluster_id.extend(data_list[::nl+nl_neb+nl_ex+nl_neb_ex+1]) L_lambda.extend( [data_list[(nl+nl_neb+nl_ex+nl_neb_ex+1)*i+1 : (nl+nl_neb+nl_ex+nl_neb_ex+1)*i+1+nl] for i in range(ncluster)]) if nebular: L_lambda_neb.extend( [data_list[(nl+nl_neb+nl_ex+nl_neb_ex+1)*i+1+nl : (nl+nl_neb+nl_ex+nl_neb_ex+1)*i+1+nl+nl_neb] for i in range(ncluster)]) if extinct: L_lambda_ex.extend( [data_list[(nl+nl_neb+nl_ex+nl_neb_ex+1)*i+1 + nl+nl_neb : (nl+nl_neb+nl_ex+nl_neb_ex+1)*i+1 + nl+nl_neb+nl_ex] for i in range(ncluster)]) if nebular: L_lambda_neb_ex.extend( [data_list[(nl+nl_neb+nl_ex+nl_neb_ex+1)*i+1 + nl+nl_neb+nl_ex : (nl+nl_neb+nl_ex+nl_neb_ex+1)*i+1 + nl+nl_neb+nl_ex+nl_neb_ex] for i in range(ncluster)]) elif fname.endswith('.fits'): # FITS mode if read_info is not None: read_info['format'] = 'fits' wavelength = fp[1].data.field('Wavelength') wavelength = wavelength.flatten() cluster_id = fp[2].data.field('UniqueID') trial = fp[2].data.field('Trial') time = fp[2].data.field('Time') L_lambda = fp[2].data.field('L_lambda') # Read nebular data if available if 'L_lambda_neb' in fp[2].data.columns.names: nebular = True wl_neb = fp[1].data.field('Wavelength_neb') wl_neb = wl_neb.flatten() L_lambda_neb = fp[2].data.field('L_lambda_neb') else: nebular = False # If we have extinction data, handle that too if 'Wavelength_ex' in fp[1].data.columns.names: extinct = True wl_ex = fp[1].data.field('Wavelength_ex') wl_ex = wl_ex.flatten() L_lambda_ex = fp[2].data.field('L_lambda_ex') if nebular: L_lambda_neb_ex = fp[2].data.field('L_lambda_neb_ex') else: extinct = False # Handle rectified spectra if 'Wavelength_Rectified' in fp[1].data.columns.names: rectified=True wl_r = fp[1].data.field('Wavelength_Rectified') wl_r = wl_r.flatten() L_rectified = fp[2].data.field('Rectified_Spec') else: rectified= False # Handle extincted neblar data if nebular and extinct: wl_neb_ex = fp[1].data.field('Wavelength_neb_ex') wl_neb_ex = wl_neb_ex.flatten() L_lambda_neb_ex = fp[2].data.field('L_lambda_neb_ex') # Close file fp.close() # Convert to arrays wavelength = np.array(wavelength) cluster_id = np.array(cluster_id, dtype='uint') time = np.array(time) trial = np.array(trial, dtype='uint') L_lambda = np.array(L_lambda) L_lambda = np.reshape(L_lambda, (len(time), len(wavelength))) if nebular: L_lambda_neb = np.array(L_lambda_neb) L_lambda_neb = np.reshape(L_lambda_neb, (len(time), len(wl_neb))) if extinct: wl_ex = np.array(wl_ex) L_lambda_ex = np.array(L_lambda_ex) L_lambda_ex = np.reshape(L_lambda_ex, (len(time), len(wl_ex))) if nebular: L_lambda_neb_ex = np.array(L_lambda_neb_ex) L_lambda_neb_ex = np.reshape(L_lambda_neb_ex, (len(time), len(wl_neb_ex))) if rectified: wl_r = np.array(wl_r) L_rectified = np.array(L_rectified) L_rectified = np.reshape(L_rectified, (len(time),len(wl_r))) # Build namedtuple to hold output fieldnames = ['id', 'trial', 'time', 'wl', 'spec'] fields = [cluster_id, trial, time, wavelength, L_lambda] if nebular: fieldnames = fieldnames + ['wl_neb', 'spec_neb'] fields = fields + [wl_neb, L_lambda_neb] if extinct: fieldnames = fieldnames + ['wl_ex', 'spec_ex'] fields = fields + [wl_ex, L_lambda_ex] if nebular: fieldnames = fieldnames + ['wl_neb_ex', 'spec_neb_ex'] fields = fields + [wl_neb_ex, L_lambda_neb_ex] if rectified: fieldnames = fieldnames + ['wl_r','spec_rec'] fields = fields + [wl_r,L_rectified] out_type = namedtuple('integrated_spec', fieldnames) out = out_type(*fields) # Return return out