Source code for slugpy.cloudy.write_integrated_cloudyspec

"""
This function writes out integrated spectra computed by cloudy from a slug
run.
"""

import numpy as np
try:
    import astropy.io.fits as fits
except ImportError:
    fits = None
    import warnings
    warnings.warn("Unable to import astropy. FITS funtionality" +
                  " will not be available.")

[docs]def write_integrated_cloudyspec(data, model_name, fmt): """ Write out data computed by cloudy on a slug spectrum Parameters data : namedtuple Integrated cloudy spectral data to be written; a namedtuple containing the field time, cloudy_wl, cloudy_inc, cloudy_trans, cloudy_emit, and cloudy_trans_emit model_name : string Base file name to give the model to be written. Can include a directory specification if desired. fmt : string Format for the output file. Allowed values are 'ascii', 'bin' or 'binary, and 'fits'. Returns Nothing """ # Make sure fmt is valid if fmt != 'ascii' and fmt != 'bin' and fmt != 'binary' and \ fmt != 'fits': raise ValueError("fmt must be ascii, bin, binary, or fits") # Make sure we're not trying to do fits if we don't have astropy if fmt == 'fits' and fits is None: raise ValueError("Couldn't import astropy, so fits format "+ "is unavailable.") if fmt == 'ascii': # ASCII mode fp = open(model_name+'_integrated_cloudyspec.txt', 'w') # Write header lines fp.write(("{:<14s}"*6). format('Time', 'Wavelength', 'Incident', 'Transmitted', 'Emitted', 'Trans+Emit') + "\n") fp.write(("{:<14s}"*6). format('(yr)', '(Angstrom)', '(erg/s/A)', '(erg/s/A)', '(erg/s/A)', '(erg/s/A)') + "\n") fp.write(("{:<14s}"*6). format('-----------', '-----------', '-----------', '-----------', '-----------', '-----------') + "\n") # Write data ntime = data.cloudy_inc.shape[-2] ntrial = data.cloudy_inc.shape[-1] if len(data.time) > ntime: random_time = True else: random_time = False nl = len(data.cloudy_wl) for i in range(ntrial): if i != 0: fp.write("-"*(6*14-3)+"\n") for j in range(ntime): if random_time: t_out = data.time[i] else: t_out = data.time[j] for k in range(nl): fp.write(("{:11.5e} {:11.5e} {:11.5e} " + "{:11.5e} {:11.5e} {:11.5e}\n") .format(t_out, data.cloudy_wl[k], data.cloudy_inc[k,j,i], data.cloudy_trans[k,j,i], data.cloudy_emit[k,j,i], data.cloudy_trans_emit[k,j,i])) # Close fp.close() elif fmt == 'bin' or fmt == 'binary': # Binary mode fp = open(model_name+'_integrated_cloudyspec.bin', 'wb') # Write out wavelength data fp.write(np.int64(len(data.cloudy_wl))) fp.write(data.cloudy_wl) # Write out times and spectra ntime = data.cloudy_inc.shape[-2] ntrial = data.cloudy_inc.shape[-1] if len(data.time) > ntime: random_time = True else: random_time = False for i in range(ntrial): for j in range(ntime): fp.write(np.uint(i)) if random_time: fp.write(data.time[i]) else: fp.write(data.time[j]) # Note: need to put data in contiguous block before # writing tmp = np.copy(data.cloudy_inc[:,j,i]) fp.write(tmp) tmp = np.copy(data.cloudy_trans[:,j,i]) fp.write(tmp) tmp = np.copy(data.cloudy_emit[:,j,i]) fp.write(tmp) tmp = np.copy(data.cloudy_trans_emit[:,j,i]) fp.write(tmp) # Close file fp.close() elif fmt == 'fits': # FITS mode # Convert wavelength data to FITS columns and make an HDU # from it; complication: astropy expects the dimensions of # the array to be (n_entries, n_wavelengths) nl = data.cloudy_wl.shape[0] fmtstring = str(nl)+"D" wlcols = [fits.Column(name="Wavelength", format=fmtstring, unit="Angstrom", array=data.cloudy_wl.reshape(1,nl))] wlfits = fits.ColDefs(wlcols) wlhdu = fits.BinTableHDU.from_columns(wlcols) # Figure out number of trials, and tile arrays ntrial = data.cloudy_inc.shape[-1] ntimes = data.cloudy_inc.shape[-2] trial = np.transpose(np.tile( np.arange(ntrial, dtype='int64'), (ntimes,1))).\ flatten() if len(data.time) > ntimes: times = data.time else: times = np.tile(data.time, ntrial) # Convert spectra to FITS columns, and make an HDU from them speccols = [] speccols.append(fits.Column(name="Trial", format="1K", unit="", array=trial)) speccols.append(fits.Column(name="Time", format="1D", unit="yr", array=times)) speccols.append(fits.Column(name="Incident_spectrum", format=fmtstring, unit="erg/s/A", array=np.transpose(data.cloudy_inc). reshape(ntimes*ntrial, nl))) speccols.append(fits.Column(name="Transmitted_spectrum", format=fmtstring, unit="erg/s/A", array=np.transpose(data.cloudy_trans). reshape(ntimes*ntrial, nl))) speccols.append(fits.Column(name="Emitted_spectrum", format=fmtstring, unit="erg/s/A", array=np.transpose(data.cloudy_emit). reshape(ntimes*ntrial, nl))) speccols.append(fits.Column(name="Transmitted_plus_emitted_spectrum", format=fmtstring, unit="erg/s/A", array=np.transpose(data.cloudy_trans_emit). reshape(ntimes*ntrial, nl))) specfits = fits.ColDefs(speccols) spechdu = fits.BinTableHDU.from_columns(specfits) # Create dummy primary HDU prihdu = fits.PrimaryHDU() # Create HDU list and write to file hdulist = fits.HDUList([prihdu, wlhdu, spechdu]) hdulist.writeto(model_name+'_integrated_cloudyspec.fits', overwrite=True)