from typing import List, Dict
import tarfile, zipfile, json, glob, os
from typing import cast, Any
import pandas
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import teqp
[docs]
class ResultsParser:
def __init__(self, path):
"""
path: path to a folder or a .tar.xz or .zip archive of a folder
"""
self.path = path
self.jsonresults = self.assess_from_path(path)
self.dfresults = pandas.DataFrame(self.jsonresults)
[docs]
def get_result(self, uid: str):
"""
Get a particular run result, given by its uid
"""
for result in self.jsonresults:
if result['uid'] == uid:
return result
raise ValueError
[docs]
def assess_from_path(self, path):
if path.endswith('.tar.xz'):
with tarfile.open(path, mode='r:xz') as tar:
results = []
for info in tar.getmembers():
filename = info.name
if 'step' not in filename and '.json' in filename and len(filename.split('/')) == 2:
results.append(json.load(tar.extractfile(info)))
print(filename)
return results
elif path.endswith('.zip'):
with zipfile.ZipFile(path) as z:
results = []
for filename in z.namelist():
if 'step' not in filename and '.json' in filename:
with z.open(filename) as myfile:
results.append(json.load(myfile))
return results
else:
if not os.path.isdir(path):
raise ValueError("this is not a valid path: " + path)
paths = [f for f in glob.glob(path+'/*.json') if 'step' not in f]
results = [json.load(open(f)) for f in paths]
return results
[docs]
def to_csv(self, *, prefix):
self.dfresults.to_csv(prefix+'.csv', index=False)
[docs]
def get_all_uid(self, path):
return [f.replace('_step0.json','') for f in glob.glob(path+'/*step0.json')]
[docs]
def get_lowest_cost_uid(self, *, prefilter=None):
"""
prefilter: a function taking the dataframe, returning a mask array (for instance to throw out some solutions)
returns the uid
"""
if prefilter:
df = self.dfresults[prefilter(self.dfresults)]
else:
df = self.dfresults
imin = np.argmin(np.array(df['cost']))
return df.iloc[imin]['uid']
[docs]
def get_stepfiles(self, uid):
"""
Args:
uid: The unique identifier for the run
"""
if self.path.endswith('.zip'):
raise ValueError("zipfiles are not supported (don't compress as well as LZMA)'")
elif self.path.endswith('.tar.xz'):
with tarfile.open(self.path, mode='r:xz') as tar:
stepfiles = []
for info in tar.getmembers():
filename = info.name
if 'step' in filename and '.json' in filename and uid in filename:
try:
stepfiles.append(json.load(tar.extractfile(info)))
except json.decoder.JSONDecodeError:
print(f'Unable to parse {filename}')
stepfiles.sort(key=lambda x: -x['cost'])
else:
def sort_paths(paths):
return list(zip(*sorted([(int(step.split('.')[0].split('step')[1]), step) for step in paths])))[1]
paths = [f for f in glob.glob(self.path+'/*.json') if 'step' in f and uid in f]
paths = sort_paths(paths)
stepfiles = [json.load(open(f)) for f in paths]
return stepfiles
[docs]
def get_fitdata_df(self, key, **kwargs):
""" Return a selected DataFrame from the ``fitdataroot`` folder in the archive
Args:
key: The search string that should be in the filename to be pulled from the ``fitdataroot`` folder in the archive
Usage: provide 'SOS' for key to obtain the DataFrame for SOS.csv file, for instance
Good options for key are: 'VLE','SOS','PVT', etc.
"""
if self.path.endswith('.tar.xz'):
with tarfile.open(self.path, mode='r:xz') as tar:
for info in tar.getmembers():
filename = info.name
if 'fitdataroot' in filename and key in filename:
return pandas.read_csv(tar.extractfile(info), **kwargs)
elif os.path.isdir(self.path):
return pandas.read_csv(self.path + f'/fitdataroot/{key}.csv')
else:
raise ValueError("zipfiles are not supported (don't compress as well as LZMA)'")
[docs]
class PairMinFilter:
def __init__(self, pair, bgindices=None):
self.pair = tuple(pair)
self.bgindices = bgindices
def __call__(self, df):
# Keep only rows that match the pair
matches_pair = df.apply(lambda row: tuple(row['pair']) == self.pair, axis=1)
df = df[matches_pair].copy()
if len(df) == 0:
raise KeyError(self.pair)
if self.bgindices:
# keep only rows with matching bgindices
def matches_btgt(row):
return row['mutant_kwargs']['bgindices'] == self.bgindices
mask = df.apply(matches_btgt, axis=1)
df = df[mask].copy()
# And return the lowest cost result
return pandas.DataFrame([df.sort_values(by='cost').iloc[0]])
[docs]
class UidFilter:
def __init__(self, uid):
self.uid = uid
def __call__(self, df):
# Keep only the row that matches the given uid
matches_uid = df.apply(lambda row: tuple(row['pair']) == self.uid, axis=1)
df = df[matches_uid].copy()
return df
[docs]
def build_mutant(teqp_names : List[str], path : str, spec: dict, *, flags=None):
if flags is None:
flags = {'estimate': 'Lorentz-Berthelot'}
puremodels = [teqp.build_multifluid_model([name], path, path+'/dev/mixtures/mixture_binary_pairs.json', flags) for name in teqp_names]
basemodel = teqp.build_multifluid_model(teqp_names, path, path+'/dev/mixtures/mixture_binary_pairs.json', flags)
mutant = teqp.build_multifluid_mutant(basemodel, spec)
teqp.attach_model_specific_methods(mutant)
return mutant, basemodel, puremodels
[docs]
def calc_critical_curves(*, model, basemodel, ipure, integration_order, polish_reltol_T = 100, polish_reltol_rho=100):
Tcvec = basemodel.get_Tcvec()
vcvec = basemodel.get_vcvec()
opt = {"alternative_pure_index": ipure, "alternative_length": 2}
[T0, rho0] = model.solve_pure_critical(Tcvec[ipure], 1.0/vcvec[ipure], opt)
rhovec0 = np.array([0.0, 0])
rhovec0[ipure] = rho0
opt = teqp.TCABOptions()
opt.polish = True
opt.integration_order = integration_order
# opt.rel_err = 1e-7
opt.init_dt = 100
opt.max_dt = 1000
opt.polish_reltol_T = polish_reltol_T
opt.polish_reltol_rho = polish_reltol_rho
df = pandas.DataFrame(model.trace_critical_arclength_binary(T0, rhovec0, '', opt))
return df
from collections.abc import Sequence
[docs]
def plot_criticality(*, model, Tlim:Sequence[float], rholim:Sequence[float], z_1:float, TN:int=100, rhoN:int=100, ax=None, show=True):
"""
"""
z = np.array([z_1, 1-z_1])
Tvec = np.linspace(*cast(tuple[float,float], Tlim), TN)
rhovec = np.geomspace(*cast(tuple[float,float], rholim), rhoN)
TT, DD = np.meshgrid(Tvec, rhovec)
Nrow, Ncol = TT.shape
C1 = np.zeros_like(TT)
C2 = np.zeros_like(TT)
for i in range(Nrow):
for j in range(Ncol):
C1[i,j], C2[i,j] = model.get_criticality_conditions(TT[i,j], DD[i,j]*z)
if ax is None:
ax = plt.gca()
ax.contour(DD, TT, C1, levels=[0], colors='k')
ax.contour(DD, TT, C2, levels=[0], colors='grey', linestyles=['dashed'])
ax.set(xlabel=r'$\rho$ / mol/m$^3$', ylabel='$T$ / K')
ax.set_title(f'$z_1$: {z_1:0.5f} mole frac.')
if show:
plt.show()
[docs]
def plot_criticality_constT(*, T, model, zlim=(0,1), rholim, zN=100, rhoN=100, ax=None, show=True):
zvec = np.linspace(*zlim, zN)
rhovec = np.geomspace(*rholim, rhoN)
ZZ, DD = np.meshgrid(zvec, rhovec)
Nrow, Ncol = ZZ.shape
C1 = np.zeros_like(ZZ)
C2 = np.zeros_like(ZZ)
for i in range(Nrow):
for j in range(Ncol):
z_1 = ZZ[i,j]
z = np.array([z_1, 1-z_1])
C1[i,j], C2[i,j] = model.get_criticality_conditions(T, DD[i,j]*z)
if ax is None:
ax = plt.gca()
ax.contour(DD, ZZ, C1, levels=[0], colors='k')
ax.contour(DD, ZZ, C2, levels=[0], colors='grey', linestyles=['dashed'])
ax.set(xlabel=r'$\rho$ / mol/m$^3$', ylabel='$z_1$ / mole frac.')
ax.set_title(f'$T$: {T:0.5f} K')
if show:
plt.show()
[docs]
def isotherm(model, T, rhovecL, rhovecV, also_json=False, crit_threshold=5e-8) -> pandas.DataFrame | tuple[pandas.DataFrame, dict]:
opt = teqp.TVLEOptions(); opt.polish=True; opt.integration_order=5; opt.calc_criticality = True
opt.terminate_unstable = True; opt.max_steps=200;
o = model.trace_VLE_isotherm_binary(T, rhovecL, rhovecV, opt)
df = pandas.DataFrame(o)
# df['too_critical'] = df.apply(lambda row: (abs(row['crit. conditions L'][0]) < crit_threshold), axis=1)
# first_too_critical = np.argmax(df['too_critical'])
# df = df.iloc[0:(first_too_critical if first_too_critical else len(df))]
if also_json:
return df, o
else:
return df
[docs]
def plot_px_history(*, root, uid, stepfiles, override=None):
previous_cost = 1e99
fname = ('' if not override else override) + f'histpx{uid[0:6]}.pdf'
with PdfPages(fname) as PDF:
for N, stepfile in enumerate(stepfiles):
N += 1
cost = stepfile['cost']
if previous_cost > 1e98 or cost < previous_cost:
print(cost)
fluids = ('Neon','Hydrogen')
s = stepfile['model']
if override:
s = override
model, base, short = build_mutant(fluids, teqp.get_datapath(), s)
fig, ax = plt.subplots(1, 1)
for T in [24.6, 26.0, 26.3, 27.15, 28.12, 29.0, 31.51, 33.73, 34.6, 37.6, 39.6, 42.49]:
df = isotherm(model, fluids, 0, T)
line, = ax.plot(df['x_1 / mole frac.'], df['pL / Pa']/1e6, lw=1, label=T)
ax.plot(df['y0 / mole frac.'], df['pL / Pa']/1e6, dashes=[2,2], color=line.get_color(), lw=1)
df = pandas.read_csv('VLE.csv')
df = df[np.abs(df['T / K']-T) < 1e-1]
# df.info()
for Authorname, gp in df.groupby('Authorname'):
# print(T, Authorname)
ax.plot(gp['x0 / mole frac.'], gp['p / MPa'], marker='o', color=line.get_color(), lw=0, ms=1)
ax.plot(gp['y0 / mole frac.'], gp['p / MPa'], marker='d', color=line.get_color(), lw=0, ms=1)
plt.gca().set(xlabel=r'$x_{{\rm Ne}}$ / mol/mol', ylabel='$p$ / MPa', ylim=(0,3))
plt.legend(loc='best')
plt.title(rf'{N} | C$\$$: {cost:0.4f}')
PDF.savefig(plt.gcf())
plt.close()
previous_cost = cost
[docs]
def plot_critical_locus_history(basemodel, *, stepfiles, override=None, dfcr=None, ylim=None):
previous_cost = 1e99
fname = ('' if not override else override) + f'histcrit.pdf'
with PdfPages(fname) as PDF:
for N, stepfile in enumerate(stepfiles):
N += 1
cost = stepfile['cost']
if previous_cost > 1e98 or cost < previous_cost:
# print(cost)
mutant = teqp.build_multifluid_mutant(basemodel, stepfile['model'])
cr = calc_critical_curves(model=mutant, basemodel=basemodel, ipure=0, integration_order=5)
# print(len(cr))
fig, ax = plt.subplots(1, 1)
cr['z_0 / mole frac.'] = cr['rho0 / mol/m^3']/(cr['rho0 / mol/m^3']+cr['rho1 / mol/m^3'])
ax.plot(cr['z_0 / mole frac.'], cr['p / Pa']/1e6)
if dfcr is not None:
ax.plot(dfcr['z_1 / mole frac.'], dfcr['p / Pa']/1e6, 'o')
plt.gca().set(xlabel=r'$x_{1}$ / mole frac.', ylabel='$p$ / MPa')
# plt.legend(loc='best')
plt.title(rf'{N} | C$\$$: {cost:0.4f}')
if ylim:
plt.ylim(*ylim)
PDF.savefig(plt.gcf())
plt.close()
previous_cost = cost
[docs]
def plot_all_dilute_neff(z_1, *, models, aliases):
assert(len(models) == len(aliases))
Tvec = np.geomspace(250, 20000)
for model, alias in zip(models, aliases):
z = np.array([z_1, 1-z_1])
neff = [model.get_neff(T, 1e-6, z) for T in Tvec]
plt.plot(Tvec, neff, label=alias)
plt.xscale('log')
plt.legend(loc='best')
plt.ylim(0,20)
plt.gca().set(xlabel='$T$ / K', ylabel=r'$n_{\rm eff}$')
plt.show()
[docs]
def plot_all_reducing_functions(*, models, aliases, yvar = 'rho'):
assert(len(models) == len(aliases))
Tvec = np.geomspace(250, 20000)
fig, (axT, axv) = plt.subplots(2,1,sharex=True)
for model, alias in zip(models, aliases):
z1vec = np.linspace(1e-10,1.0-1e-10,1000)
Tr = [model.get_Tr(np.array([z_1,1-z_1])) for z_1 in z1vec]
rhor = np.array([model.get_rhor(np.array([z_1,1-z_1])) for z_1 in z1vec])
axT.plot(z1vec, Tr, label=alias)
if yvar == 'rho':
axv.plot(z1vec, rhor, label=alias)
else:
axv.plot(z1vec, 1/rhor*1e6, label=alias)
axT.legend(loc='best')
axT.set(ylabel=r'$T_{\rm red}$ / K')
if yvar == 'rho':
axv.set(xlabel=r'$z_1$ / mole frac.', ylabel=r'$\rho_{\rm red}$ / mol/m$^3$')
else:
axv.set(xlabel=r'$z_1$ / mole frac.', ylabel=r'$v_{\rm red}$ / cm$^3$/mol')
plt.savefig('reducing_functions.pdf')
plt.show()
[docs]
def get_rhovecLV_guess(basemodel, T, ipure):
# This assumes a multifluid model
anc = basemodel.build_ancillaries()
rhoLanc, rhoVanc = anc.rhoL(T), anc.rhoV(T)
# VLE polish
rhoL, rhoV = basemodel.pure_VLE_T(T, rhoLanc, rhoVanc, 10)
rhovecL = np.array([0.0, 0]); rhovecL[ipure] = rhoL
rhovecV = np.array([0.0, 0]); rhovecV[ipure] = rhoV
return rhovecL, rhovecV
[docs]
class ModelAssessmentPlotter:
stepfiles: List[Dict]
last_stepfile: Dict
def __init__(self, *, results=None, result_path=None, result_filter, allow_noresults=True, teqp_data_path=teqp.get_datapath()):
"""
Args:
result_path: The path to the results archive, strongly recommended to work from a .tar.xz archive, also .zip or path to folder (unzipped) are allowed
result_filter: A function that filters from the results down to the result that will be used for further post-processing. The argument is the complete DataFrame, a DataFrame is returned
"""
if results and not result_path:
self.results = results
elif result_path and not results:
self.results = ResultsParser(result_path)
else:
raise ValueError("Wrong specification of results")
self.dfresults = result_filter(self.results.dfresults)
if len(self.dfresults) != 1 and not allow_noresults:
raise ValueError("Result DataFrame must be one element in length after filtering; current length is "+str(len(self.dfresults)))
# Get the unique identifier for the run
self.uid = self.dfresults.uid.iloc[0]
self.pair = self.dfresults['pair'].iloc[0]
# Extract the stepfiles and store in the class
self.stepfiles = self.results.get_stepfiles(self.uid)
self.last_stepfile = self.stepfiles[-1]
self.model, self.basemodel, self.basemodels = build_mutant(self.pair, path=teqp_data_path, spec=self.last_stepfile['model'])
[docs]
def plot_cost_history(self, *, ax, stepfiles=None):
"""
Plot the history of the cost function over the course of the optimization
Args:
ax: The axis to plot onto
stepfiles (optional): The stepfiles, provided as a list of JSON instances
"""
iter_history = []; cost_history = []
if stepfiles is None:
stepfiles = self.stepfiles
for N, stepfile in enumerate(stepfiles):
iter_history.append(N+1)
cost_history.append(stepfile['cost'])
ax.plot(iter_history, cost_history)
ax.set_xscale('log')
ax.set_xlabel('Iteration #')
ax.set_ylabel('Cost function')
[docs]
def plot_B12(self, *, ax, z1_comps, Trange: List[float], labels: List[str], model=None):
"""
Plot the second cross virial coefficient B_12
Args:
ax: the axis onto which to plot
z1_comps: the list of compositions of the first component for which B12 curves are desired
Trange: the two-element list of min and max temperature
labels (optional): the label for each trace
model (optional): the teqp.AbstractModel instance, or the default if not provided
"""
# Add method for adding labels
if labels is None:
labels = ['' for i in range(len(z1_comps))]
if model is None:
model = self.model
for comp, label in zip(z1_comps, labels):
z = np.array([comp, 1-comp])
Tvec = np.linspace(*cast(tuple[float,float],Trange))
assert(len(Trange)==2)
B12 = np.array([model.get_B12vir(T_, z) for T_ in Tvec])
ax.plot(Tvec, B12, label=label)
ax.set_xlabel(r'$T$ / K')
ax.set_ylabel(r'$B_{12}$ / cm$^3$/mol')
ax.legend(loc='best')
[docs]
def plot_binary_VLE_isotherms(self, *, ax, Tvec: List[float], cmap, ipure, model=None, basemodel=None, options: Dict|None = None, plot_kwargs: Dict = {}):
"""
Args:
ax: the axis onto which to plot
Tvec: the iterable containing the temperatures for which isotherms are desired
cmap: the callable with method to_rgba(T) that will be used to determine the color of the trace
ipure: the index, in {0,1}, that is the fluid from which the trace starts
model (optional): the teqp.AbstractModel instance, or the default if not provided
basemodel (optional): the teqp.AbstractModel instance for the basemodel, or the default if not provided
plot_kwargs (optional): a dictionary of common arguments to be applied to liquid and vapor traces
"""
if model is None:
model = self.model
if basemodel is None:
basemodel = self.basemodel
for T in Tvec:
opt = teqp.TVLEOptions(); opt.polish=True; opt.integration_order=5; opt.calc_criticality = True
opt.terminate_unstable = True; opt.max_steps=200;
# User can override tracing options if desired
if options:
for k, v in options.items():
setattr(opt, k, v)
# Pure fluid endpoint
rhovecL, rhovecV = get_rhovecLV_guess(basemodel, T, ipure)
# Now trace and plot
o = model.trace_VLE_isotherm_binary(T, rhovecL, rhovecV, opt)
df = pandas.DataFrame(o)
plt.plot(df['xL_0 / mole frac.'], df['pL / Pa']/1e6, c=cmap.to_rgba(T), **plot_kwargs)
plt.plot(df['xV_0 / mole frac.'], df['pL / Pa']/1e6, c=cmap.to_rgba(T), dashes=[2,2], **plot_kwargs)
ax.set_xlabel(r'$x_1$, $y_1$ / mole frac.')
ax.set_ylabel('$p$ / MPa')
ax.set_yscale('log')
[docs]
def plot_binary_critical_locus(self, *, ax, kind, ipure, model=None, basemodel=None, options: Dict|None = None, plot_kwargs: Dict = {}):
"""
Args:
ax: the axis onto which to plot
kind: the variables to be plotted, one of {'XP','TP'}
ipure: the index of the fluid, in {0,1}, from which the trace starts
model (optional): the teqp.AbstractModel instance, or the default if not provided
basemodel (optional): the teqp.AbstractModel instance for the basemodel, or the default if not provided
options (optional): key-value pairs to overwrite sensible defaults in teqp.TCABOptions
plot_kwargs (optional): a dictionary of common arguments to be applied to the trace
"""
if model is None:
model = self.model
if basemodel is None:
basemodel = self.basemodel
Tcvec = basemodel.get_Tcvec()
vcvec = basemodel.get_vcvec()
pureflags:dict[str, Any] = {"alternative_pure_index": ipure, "alternative_length": 2}
[T0, rho0] = model.solve_pure_critical(Tcvec[ipure], 1.0/vcvec[ipure], pureflags)
rhovec0 = np.array([0.0, 0])
rhovec0[ipure] = rho0
opt = teqp.TCABOptions()
opt.polish = True
opt.integration_order = 5
opt.init_dt = 100
opt.max_dt = 1000
if options:
for k, v in options.items():
setattr(opt, k, v)
cr = pandas.DataFrame(model.trace_critical_arclength_binary(T0, rhovec0, '', opt))
# print(len(cr))
cr['z_0 / mole frac.'] = cr['rho0 / mol/m^3']/(cr['rho0 / mol/m^3']+cr['rho1 / mol/m^3'])
if kind == 'XP':
ax.plot(cr['z_0 / mole frac.'], cr['p / Pa']/1e6, **plot_kwargs)
ax.set_xlabel(r'$x_1$, $y_1$ / mole frac.')
ax.set_ylabel('$p$ / MPa')
ax.set_yscale('log')
elif kind == 'TP':
ax.plot(cr['T / K'], cr['p / Pa']/1e6, **plot_kwargs)
ax.set_xlabel(r'$T$ / K')
ax.set_ylabel('$p$ / MPa')
ax.set_yscale('log')
else:
raise ValueError()