import timeit
import numpy as np
import teqp
import scipy
import pandas
import scipy.interpolate
[docs]
def calc_errrho(*, model, df, step=1, iterate=False):
"""
Deviation function from PVT data
Returns percentage signed relative deviation in density
It is not actually the relative difference in density, because that
would require iterative calculations. Instead, the Maclaurin series
expansion around the experimental density is used to obtain a
non-iterative estimate of this error. This error metric breaks down
whn dp/drho|T is very close to zero.
The iterate keyword argument can be used to turn on the iterative
calculations, but they are much slower
"""
def o(row):
T = row['T / K']; rho = row['rho / mol/m^3']
z = np.array([row['z_1 / mole frac.'], row['z_2 / mole frac.']])
Ar0n = model.get_Ar02n(T, rho, z)
R = model.get_R(z)
dpdrho = R*T*(1 + 2*Ar0n[1] + Ar0n[2])
p = rho*R*T*(1 + Ar0n[1])
err_noniterative = -(p-row['p / Pa'])/rho/dpdrho*100
if iterate:
rho = row['rho / mol/m^3']*1.1
for i in range(10):
Ar0n = model.get_Ar02n(T, rho, z)
Ar01 = Ar0n[1]; Ar02 = Ar0n[2]
R = model.get_R(z)
pEOS = rho*R*T*(1+Ar01)
dpdrho = R*T*(1 + 2*Ar0n[1] + Ar0n[2])
res = (pEOS-row['p / Pa'])/(row['p / Pa'])
dresdrho = dpdrho/(row['p / Pa'])
change = -res/dresdrho
if abs(change/rho-1) < 1e-10 or abs(res) < 1e-12:
break
rho += change
# print(res)
return (1-rho/row['rho / mol/m^3'])*100
else:
return err_noniterative
return df.iloc[0:len(df):step].apply(o, axis=1)
[docs]
def calc_errrho_devp(*, model, df, step=1):
"""
Deviation function from PVT data where the deviation is in pressure
Recommended for critical region where density deviations don't make sense
"""
def o(row):
T = row['T / K']; rho = row['rho / mol/m^3']
z = np.array([row['z_1 / mole frac.'], row['z_2 / mole frac.']])
Ar01 = model.get_Ar01(T, rho, z)
R = model.get_R(z)
p = rho*R*T*(1 + Ar01)
return 100*(1-p/row['p / Pa'])
return df.iloc[0:len(df):step].apply(o, axis=1)
[docs]
def calc_errSOS(model, df, *, step=1, max_iter=10):
"""
Deviation function from discrete speed of sound data at given temperature
and pressure
"""
def o(row):
z_1 = row['z_1 / mole frac.']
z = np.array([z_1, 1-z_1])
T = row['T / K']; M = row['M / kg/mol']; p_Pa = row['p / Pa']
# This is the starting density for iteration
rho = row['rho(EOS) / mol/m^3']
Ao20 = row['Ao20'] # This does not depend on the mixture model so long as the pure fluids are the same
# A few steps of Newton polishing on density
R = model.get_R(z)
for i in range(max_iter):
Ar0n = model.get_Ar02n(T, rho, z)
Ar01 = Ar0n[1]
pEOS = rho*R*T*(1+Ar01)
dpdrho = R*T*(1 + 2*Ar01 + Ar0n[2])
res = (pEOS-p_Pa)/p_Pa
if abs(res) < 1e-8:
break
dresdrho = dpdrho/p_Pa
rho += -res/dresdrho
Ar0n = model.get_Ar02n(T, rho, z)
Ar01 = Ar0n[1]; Ar02 = Ar0n[2]
Ar11 = model.get_Ar11(T, rho, z)
Ar20 = model.get_Ar20(T, rho, z)
# M*w^2/(R*T) where w is the speed of sound
# from the definition w = sqrt(dp/drho|s)
Mw2RT = 1 + 2*Ar01 + Ar02 - (1 + Ar01 - Ar11)**2/(Ao20 + Ar20)
if Mw2RT < 0:
return 1e6
w = (Mw2RT*R*T/M)**0.5
if not np.isfinite(w):
return 1e20
return (1-w/row['w / m/s'])*100
return df.iloc[0:len(df):step].apply(o, axis=1)
[docs]
def calc_errrhosat(model, df, *, step=1, Q):
"""
Deviation function for saturated liquid and/or vapor densities
"""
def o(row):
rho_meas = row['rho / mol/m^3']
T = row['T / K']
rhovecL = np.array([row['rhoL_1 / mol/m^3'], row['rhoL_2 / mol/m^3']])
rhovecV = np.array([row['rhoV_1 / mol/m^3'], row['rhoV_2 / mol/m^3']])
if np.isfinite(row['x_1 / mole frac.']):
x_0 = row['x_1 / mole frac.']
z = np.array([x_0, 1-x_0])
swap = False
elif np.isfinite(row['y_1 / mole frac.']):
y_0 = row['y_1 / mole frac.']
z = np.array([y_0, 1-y_0])
swap = True
else:
raise ValueError("Neither x_1 nor y_2 is provided")
try:
if not swap:
code, rhovecLnew, rhovecVnew = model.mix_VLE_Tx(T, rhovecL, rhovecV, z, 1e-8, 1e-8, 1e-8, 1e-8, 20)
else:
code, rhovecVnew, rhovecLnew = model.mix_VLE_Tx(T, rhovecV, rhovecL, z, 1e-8, 1e-8, 1e-8, 1e-8, 20)
if sum(~np.isfinite(rhovecLnew)) > 0:
return 1e20
if sum(~np.isfinite(rhovecVnew)) > 0:
return 1e20
# Check for trivial solutions and penalize them
if np.max(np.abs(rhovecLnew - rhovecVnew)) < 1e-6*np.sum(rhovecLnew):
return 1e20
rho_model = sum(rhovecLnew) if Q == 0 else sum(rhovecVnew)
rho_err = abs(1-rho_model/rho_meas)*100
if not np.isfinite(rho_err):
return 1e20
else:
return rho_err
except BaseException as BE:
print(BE)
return 1e20
tic = timeit.default_timer()
res = df.iloc[0:len(df):step].apply(o, axis=1)
toc = timeit.default_timer()
# print(toc-tic, 's for rhosat error')
return res
[docs]
def calc_errVLE(model, df, *, step=1):
"""
Deviation function from VLE data
"""
def o(row):
T = row['T / K']
rhovecL = np.array([row['rhoL_1 / mol/m^3'], row['rhoL_2 / mol/m^3']])
rhovecV = np.array([row['rhoV_1 / mol/m^3'], row['rhoV_2 / mol/m^3']])
if np.isfinite(row['x_1 / mole frac.']):
x_0 = row['x_1 / mole frac.']
z = np.array([x_0, 1-x_0])
swap = False
elif np.isfinite(row['y_1 / mole frac.']):
y_0 = row['y_1 / mole frac.']
z = np.array([y_0, 1-y_0])
swap = True
else:
raise ValueError("Neither x_1 nor y_2 is provided")
p_meas = row['p / Pa']
try:
if not swap:
code, rhovecLnew, rhovecVnew = model.mix_VLE_Tx(T, rhovecL, rhovecV, z, 1e-8, 1e-8, 1e-8, 1e-8, 20)
else:
code, rhovecVnew, rhovecLnew = model.mix_VLE_Tx(T, rhovecV, rhovecL, z, 1e-8, 1e-8, 1e-8, 1e-8, 20)
# print(rhovecL, rhovecV, 'LV]]')
# print('----->', rhovecLnew, rhovecVnew, 'LV after]')
if sum(~np.isfinite(rhovecLnew)) > 0:
return 1e20
if sum(~np.isfinite(rhovecVnew)) > 0:
return 1e20
# Check for trivial solutions and penalize them
if np.max(np.abs(rhovecLnew - rhovecVnew)) < 1e-6*np.sum(rhovecLnew):
print('trivial solution')
return 1e20
# Check for invalid pressures
p = rhovecLnew.sum()*model.get_R(z)*T*(1+model.get_Ar01(T, rhovecLnew.sum(), z))
if not np.isfinite(p):
print('not finite p')
return 1e20
# y = rhovecVnew/rhovecVnew.sum()
# pV = rhovecVnew.sum()*model.get_R(y)*T + model.get_pr(T, rhovecVnew)
p_err = abs(1-p/p_meas)*100
# print(p_err)
# if abs(pV/p-1) > 0.01:
# print('bad solver!', code, pV/p-1)
if not np.isfinite(p_err):
return 1e20
else:
return p_err
except BaseException as BE:
print(BE)
return 1e20
tic = timeit.default_timer()
res = df.iloc[0:len(df):step].apply(o, axis=1)
toc = timeit.default_timer()
# print(toc-tic, 's for vle error')
return res
[docs]
def calc_errVLE_x(model, df, *, step=1):
"""
Deviation function from VLE data in the x direction
"""
def o(row):
T = row['T / K']
p_meas = row['p / Pa']
rhovecL = np.array([row['rhoL_1 / mol/m^3'], row['rhoL_2 / mol/m^3']])
rhovecV = np.array([row['rhoV_1 / mol/m^3'], row['rhoV_2 / mol/m^3']])
try:
opt = teqp.MixVLETpFlags()
r = model.mix_VLE_Tp(T, p_meas, rhovecL, rhovecV, opt)
rhovecLnew = r.rhovecL; rhovecVnew = r.rhovecV
x = rhovecLnew/rhovecLnew.sum()
# Check for trivial solutions and penalize them
if np.max(np.abs(rhovecLnew - rhovecVnew)) < 1e-6*np.sum(rhovecLnew):
return 1e20
# Penalize failed iterations where the residuals are not all close to zero
if np.max(np.abs(r.r)) > 1e-2:
# print(r.num_iter, r.r, r.return_code)
# return 1e6
pass
x_err = 100*(x[0] - row['x_1 / mole frac.'])
# pL = rhovecL.sum()*model.get_R(x)*T*(1+model.get_Ar01(T, rhovecL.sum(), x))
# pV = rhovecV.sum()*model.get_R(y)*T*(1+model.get_Ar01(T, rhovecV.sum(), y))
# # Penalize failed iterations where the pressures are not equal
# if abs(pL/pV-1) > 0.01:
# return 1e20
if not np.isfinite(x_err) or x[0] > 1 or x[0] < 0:
return 1e20
else:
# print(x_err, row['x_1 / mole frac.'], x)
if np.abs(x_err) > 100:
print(x_err)
return x_err
except BaseException as BE:
print(BE)
return 1e20
tic = timeit.default_timer()
res = df.iloc[0:len(df):step].apply(o, axis=1)
toc = timeit.default_timer()
# print(toc-tic, 's for vle error')
return res
[docs]
def calc_errB12(*, model, df, step=1, z0):
"""
B12 should not have dependence on composition, but alas, it usually does
"""
z = np.array([z0, 1-z0])
def o(row):
B12calc = model.get_B12vir(row['T / K'], z)
return (B12calc-row['B12 / m^3/mol'])
return df.iloc[0:len(df):step].apply(o, axis=1)
[docs]
def calc_errtracecrit(model, df, *, T0, rhovec0, errscheme, step=1):
"""
Deviation function from tracing critical curve
"""
tic = timeit.default_timer()
try:
opt = teqp.TCABOptions()
# print(dir(opt))
opt.init_dt = 100 # step in the arclength parameter
opt.integration_order = 5
opt.max_step_count = 300
# opt.rel_err = 1e-10
# opt.abs_err = 1e-13
opt.max_dt = 1000
opt.polish = True
opt.polish_reltol_T = 100 # very generous polishing bounds
opt.polish_reltol_rho = 100 # vey generous polishing bounds
curveJSON = model.trace_critical_arclength_binary(T0, rhovec0, '', opt)
crit = pandas.DataFrame(curveJSON)
if errscheme == 'log(p)xdistance':
rhotot = crit['rho0 / mol/m^3']+crit['rho1 / mol/m^3']
crit['z0 / mole frac.'] = crit['rho0 / mol/m^3']/rhotot
# We are tracing to the critical point, so both phases should have zero distance
XA = np.c_[crit['z0 / mole frac.'], crit['p / Pa']/1e6] # From the trace
XB = np.c_[df['z_1 / mole frac.'], df['p / Pa']/1e6] # From the measurements
e1 = float(scipy.spatial.distance.cdist(XA, XB).min(axis=0))
toc3 = timeit.default_timer()
return e1
elif errscheme in ['T','T*']:
rhotot = crit['rho0 / mol/m^3']+crit['rho1 / mol/m^3']
crit['z0 / mole frac.'] = crit['rho0 / mol/m^3']/rhotot
Tinterp = scipy.interpolate.interp1d(crit['z0 / mole frac.'], crit['T / K'], bounds_error=False, fill_value=1e20)(df['z_1 / mole frac.'])
if errscheme == 'T*':
print(Tinterp)
return np.mean(np.abs(Tinterp-df['T / K']))
elif errscheme == 'TdevP':
# Interpolate for given value of T to find p along critical curve, compare
# with the measured critical pressure
def get_err(row):
try:
p_crit_curve = scipy.interpolate.interp1d(crit['T / K'], crit['p / Pa'])(row['T / K'])
return 100*(1-p_crit_curve/row['p / Pa'])
except BaseException as be:
# print(np.min(crit['T / K']), np.max(crit['T / K']))
# print(be)
return 100
return df.iloc[0:len(df):step].apply(get_err, axis=1)
else:
raise KeyError("Bad errscheme")
except BaseException as be:
print(be)
return 1000.0
[docs]
def calc_errcritPVT(model, df, *, step=1):
"""
Deviation function from critical points for which temperature
and density are specified
"""
def o(row):
z_1 = row['z_1 / mole frac.']
z = np.array([z_1, 1-z_1])
rho = row['rho / mol/m^3']
if not pandas.isnull(z_1) and not pandas.isnull(rho):
conds = model.get_criticality_conditions(row['T / K'], z*rho)
return np.sum(np.abs(conds)*1e4)
else:
return 0
return df.iloc[0:len(df):step].apply(o, axis=1)
[docs]
def calc_err_critisoT(model, df, *, step=1):
"""
Deviation function for critical points, tracing from a pure
fluid endpoint. Tracing the isotherms ensures that
the isotherms are well-behaved in the critical region. While
the isotherm tracing to the critical point is relatively slow,
it is really important to ensure well-shaped isotherms in the
critical region
Two parts come into the deviation term:
1) The pressure is supposed to be close to the measured critical pressure
2) The isotherm needs to close, so that the liquid and vapor traces have
converged to the same composition given by the critical point.
The slope dp/dx at the critical point should be zero, but this is not checked
"""
def o(row):
opt = teqp.TVLEOptions(); opt.polish=True; opt.integration_order=5;
opt.calc_criticality=True; opt.max_steps=200; opt.terminate_unstable=True
rhovecL = np.array([row['rhoL_pure_1 / mol/m^3'], row['rhoL_pure_2 / mol/m^3']])
rhovecV = np.array([row['rhoV_pure_1 / mol/m^3'], row['rhoV_pure_2 / mol/m^3']])
o = model.trace_VLE_isotherm_binary(row['T / K'], rhovecL, rhovecV, opt)
d = pandas.DataFrame(o)
ipmax = np.argmax(np.array(d['pL / Pa']))
pmax = d['pL / Pa'].iloc[ipmax]
deltaxL = np.abs(d['xV_0 / mole frac.'].iloc[ipmax] - row['z_1 / mole frac.'])
deltaxV = np.abs(d['xL_0 / mole frac.'].iloc[ipmax] - row['z_1 / mole frac.'])
if not np.isfinite(deltaxL):
deltaxL = 1e4
if not np.isfinite(deltaxV):
deltaxV = 1e4
if not np.isfinite(pmax):
pmax = 1e4
return np.abs(100*(1-pmax/row['p / Pa'])) + np.abs(deltaxL)*100 + np.abs(deltaxV)*100
return df.iloc[0:len(df):step].apply(o, axis=1)
[docs]
def calc_errcritPT(model, df, *, step=1):
"""
Deviation function from critical points for which temperature
and pressure are specified, and density guess is provided
The rho(T,p) solver is executed to solve for the matching
density and then the criticality conditions are checked
"""
def o(row):
z_1 = row['z_1 / mole frac.']
rho = row['rho / mol/m^3']
T = row['T / K']
if not pandas.isnull(z_1) and not pandas.isnull(rho):
z = np.array([z_1, 1-z_1])
# Solve for density
for i in range(10):
Ar0n = model.get_Ar02n(T, rho, z)
Ar01 = Ar0n[1]; Ar02 = Ar0n[2]
R = model.get_R(z)
pEOS = rho*R*T*(1+Ar01)
dpdrho = R*T*(1 + 2*Ar01 + Ar02)
res = (pEOS-row['p / Pa'])/(row['p / Pa'])
dresdrho = dpdrho/(row['p / Pa'])
change = -res/dresdrho
if abs(change/rho-1) < 1e-10 or abs(res) < 1e-12:
break
rho += change
# print(res)
conds = model.get_criticality_conditions(T, z*rho)
return np.sum(np.abs(conds)*1e4)
else:
return 0
return df.iloc[0:len(df):step].apply(o, axis=1)