# Standard libraries
import math
from itertools import zip_longest
# Third-party modules
import cantera as ct
import numpy as np
import tables
# Local imports
from .printer import divider
from . import utils
from .profiles import (VolumeProfile,
TemperatureProfile,
ICEngineProfile)
[docs]class SimulationCase(object):
"""
Class that sets up and runs a simulation case.
"""
def __init__(self, filenames):
"""Initialize the simulation case.
Read the SENKIN-format input file is read into the ``keywords``
dictionary.
:param filenames:
Dictionary containing the relevant file names for this
case.
"""
self.input_filename = filenames['input_filename']
self.mech_filename = filenames['mech_filename']
self.save_filename = filenames['save_filename']
self.thermo_filename = filenames['thermo_filename']
self.keywords = utils.read_input_file(self.input_filename)
[docs] def setup_case(self):
"""
Sets up the case to be run. Initializes the :py:class:`~cantera.ThermoPhase`,
:py:class:`~cantera.Reactor`, and :py:class:`~cantera.ReactorNet` according
to the values from the input file.
"""
self.gas = ct.Solution(self.mech_filename)
initial_temp = self.keywords['temperature']
# The initial pressure in Cantera is expected in Pa; in SENKIN
# it is expected in atm, so convert
initial_pres = self.keywords['pressure']*ct.one_atm
# If the equivalence ratio has been specified, send the
# keywords for conversion.
if 'eqRatio' in self.keywords:
reactants = utils.equivalence_ratio(
self.gas,
self.keywords['eqRatio'],
self.keywords['fuel'],
self.keywords['oxidizer'],
self.keywords['completeProducts'],
self.keywords['additionalSpecies'],
)
else:
# The reactants are stored in the ``keywords`` dictionary
# as a list of strings, so they need to be joined.
reactants = ','.join(self.keywords['reactants'])
self.gas.TPX = initial_temp, initial_pres, reactants
# Create a non-interacting ``Reservoir`` to be on the other
# side of the ``Wall``.
env = ct.Reservoir(ct.Solution('air.xml'))
# Set the ``temp_func`` to ``None`` as default; it will be set
# later if needed.
self.temp_func = None
# All of the reactors are ``IdealGas`` Reactors. Set a ``Wall``
# for every case so that later code can be more generic. If the
# velocity is set to zero, the ``Wall`` won't affect anything.
# We have to set the ``n_vars`` here because until the first
# time step, ``ReactorNet.n_vars`` is zero, but we need the
# ``n_vars`` before the first time step.
if self.keywords['problemType'] == 1:
self.reac = ct.IdealGasReactor(self.gas)
# Number of solution variables is number of species + mass,
# volume, temperature
self.n_vars = self.reac.kinetics.n_species + 3
self.wall = ct.Wall(self.reac, env, A=1.0, velocity=0)
elif self.keywords['problemType'] == 2:
self.reac = ct.IdealGasConstPressureReactor(self.gas)
# Number of solution variables is number of species + mass,
# temperature
self.n_vars = self.reac.kinetics.n_species + 2
self.wall = ct.Wall(self.reac, env, A=1.0, velocity=0)
elif self.keywords['problemType'] == 3:
self.reac = ct.IdealGasReactor(self.gas)
# Number of solution variables is number of species + mass,
# volume, temperature
self.n_vars = self.reac.kinetics.n_species + 3
self.wall = ct.Wall(self.reac, env, A=1.0,
velocity=VolumeProfile(self.keywords))
elif self.keywords['problemType'] == 4:
self.reac = ct.IdealGasConstPressureReactor(self.gas, energy='off')
# Number of solution variables is number of species + mass,
# temperature
self.n_vars = self.reac.kinetics.n_species + 2
self.wall = ct.Wall(self.reac, env, A=1.0, velocity=0)
elif self.keywords['problemType'] == 5:
self.reac = ct.IdealGasReactor(self.gas, energy='off')
# Number of solution variables is number of species + mass,
# volume, temperature
self.n_vars = self.reac.kinetics.n_species + 3
self.wall = ct.Wall(self.reac, env, A=1.0, velocity=0)
elif self.keywords['problemType'] == 6:
from user_routines import VolumeFunctionTime
self.reac = ct.IdealGasReactor(self.gas)
# Number of solution variables is number of species + mass,
# volume, temperature
self.n_vars = self.reac.kinetics.n_species + 3
self.wall = ct.Wall(self.reac, env, A=1.0,
velocity=VolumeFunctionTime())
elif self.keywords['problemType'] == 7:
from user_routines import TemperatureFunctionTime
self.reac = ct.IdealGasConstPressureReactor(self.gas, energy='off')
# Number of solution variables is number of species + mass,
# temperature
self.n_vars = self.reac.kinetics.n_species + 2
self.wall = ct.Wall(self.reac, env, A=1.0, velocity=0)
self.temp_func = ct.Func1(TemperatureFunctionTime())
elif self.keywords['problemType'] == 8:
self.reac = ct.IdealGasConstPressureReactor(self.gas, energy='off')
# Number of solution variables is number of species + mass,
# temperature
self.n_vars = self.reac.kinetics.n_species + 2
self.wall = ct.Wall(self.reac, env, A=1.0, velocity=0)
self.temp_func = ct.Func1(TemperatureProfile(self.keywords))
elif self.keywords['problemType'] == 9:
self.reac = ct.IdealGasReactor(self.gas)
# Number of solution variables is number of species + mass,
# volume, temperature
self.n_vars = self.reac.kinetics.n_species + 3
self.wall = ct.Wall(env, self.reac, A=1.0,
velocity=ICEngineProfile(self.keywords))
if 'reactorVolume' in self.keywords:
self.reac.volume = self.keywords['reactorVolume']
# Create the Reactor Network.
self.netw = ct.ReactorNet([self.reac])
if 'sensitivity' in self.keywords:
self.sensitivity = True
# There is no automatic way to calculate the sensitivity of
# all of the reactions, so do it manually.
for i in range(self.reac.kinetics.n_reactions):
self.reac.add_sensitivity_reaction(i)
# If no tolerances for the sensitivity are specified, set
# to the SENKIN defaults.
if 'sensAbsTol' in self.keywords:
self.netw.atol_sensitivity = self.keywords['sensAbsTol']
else:
self.netw.atol_sensitivity = 1.0E-06
if 'sensRelTol' in self.keywords:
self.netw.rtol_sensitivity = self.keywords['sensRelTol']
else:
self.netw.rtol_sensitivity = 1.0E-04
else:
self.sensitivity = False
# If no solution tolerances are specified, set to the default
# SENKIN values.
if 'abstol' in self.keywords:
self.netw.atol = self.keywords['abstol']
else:
self.netw.atol = 1.0E-20
if 'reltol' in self.keywords:
self.netw.rtol = self.keywords['reltol']
else:
self.netw.rtol = 1.0E-08
if 'tempLimit' in self.keywords:
self.temp_limit = self.keywords['tempLimit']
else:
# tempThresh is set in the parser even if it is not present
# in the input file
self.temp_limit = (self.keywords['tempThresh'] +
self.keywords['temperature'])
self.tend = self.keywords['endTime']
# Set the maximum time step the solver can take. If a value is
# not specified in the input file, default to 0.001*self.tend.
# Set the time steps for saving to the binary file and writing
# to the screen. If the time step for printing to the screen is
# not set, default to printing 100 times.
print_time_int = self.keywords.get('prntTimeInt')
save_time_int = self.keywords.get('saveTimeInt')
max_time_int = self.keywords.get('maxTimeStep')
time_ints = [value for value in
[print_time_int, save_time_int, max_time_int]
if value is not None
]
if time_ints:
self.netw.set_max_time_step(min(time_ints))
else:
self.netw.set_max_time_step(self.tend/100)
if print_time_int is not None:
self.print_time_step = print_time_int
else:
self.print_time_step = self.tend/100
self.print_time = self.print_time_step
self.save_time_step = save_time_int
if self.save_time_step is not None:
self.save_time = self.save_time_step
# Store the species names in a slightly shorter variable name
self.species_names = self.reac.thermo.species_names
# Initialize the ignition time, in case the end time is reached
# before ignition occurs
self.ignition_time = None
[docs] def run_case(self):
"""
Actually run the case set up by `setup_case`. Sets binary
output file format, then runs the simulation by using
:py:`~cantera.ReactorNet.step`.
"""
# Use the table format of hdf instead of the array format. This
# way, each variable can be saved in its own column and
# referenced individually when read. Solution to the
# interpolation problem was made by saving the most recent time
# steps into numpy arrays. The arrays are not vertically
# appended so we should eliminate the hassle associated with
# that.
table_def = {'time': tables.Float64Col(pos=0),
'temperature': tables.Float64Col(pos=1),
'pressure': tables.Float64Col(pos=2),
'volume': tables.Float64Col(pos=3),
'massfractions': tables.Float64Col(
shape=(self.reac.thermo.n_species), pos=4
),
}
if self.sensitivity:
table_def['sensitivity'] = tables.Float64Col(
shape=(self.n_vars, self.netw.n_sensitivity_params), pos=5
)
with tables.open_file(self.save_filename, mode='w',
title='CanSen Save File') as save_file:
table = save_file.create_table(save_file.root, 'reactor',
table_def, 'Reactor State'
)
# Create a row instance to save information to.
timestep = table.row
# Save information before the first time step.
timestep['time'] = self.netw.time
(timestep['temperature'], timestep['pressure'],
timestep['massfractions']) = self.reac.thermo.TPY
timestep['volume'] = self.reac.volume
if self.sensitivity:
timestep['sensitivity'] = np.zeros((
self.n_vars,
self.netw.n_sensitivity_params
))
# Add the ``timestep`` to the ``table`` and write it to
# disk.
timestep.append()
table.flush()
# Set an array with values from before the first time step
# in case we have to interpolate after the first time step
prev_time = np.hstack((self.netw.time, self.reac.thermo.T,
self.reac.thermo.P, self.reac.volume,
self.wall.vdot(self.netw.time),
self.reac.thermo.X
))
# Print the initial information to the screen
print(divider)
print('Kinetic Mechanism Details:\n')
print(('Total Gas Phase Species = {0}\n'
'Total Gas Phase Reactions = {1}'
).format(self.reac.kinetics.n_species,
self.reac.kinetics.n_reactions))
if self.sensitivity:
print(('Total Sensitivity Reactions = {}'
).format(self.netw.n_sensitivity_params))
print(divider, '\n')
self.reactor_state_printer(prev_time)
ignition_found = False
# Main loop to run the calculation. As long as the time in
# the ``ReactorNet`` is less than the end time, keep going.
while self.netw.time < self.tend:
# If we are using a function to set the temperature as
# a function of time, use it here.
if self.temp_func is not None:
self.gas.TP = self.temp_func(self.netw.time), None
# Take the step towards the end time.
self.netw.step()
# Set an array with the information from the current
# time step for printing.
cur_time = np.hstack((self.netw.time, self.reac.thermo.T,
self.reac.thermo.P, self.reac.volume,
self.wall.vdot(self.netw.time),
self.reac.thermo.X
))
# If we have passed the end time, interpolate backwards
# to get the solution at the end time. Because linear
# interpolation is used, this will not work well if the
# end time is during or before the ignition event and
# we have stepped past it. This is unlikely though, as
# the solver should be taking relatively small time
# steps near ignition.
if self.netw.time > self.tend:
interp_state = utils.reactor_interpolate(self.tend,
prev_time,
cur_time)
self.reactor_state_printer(interp_state, end=True)
timestep['time'] = self.tend
timestep['temperature'] = interp_state[1]
timestep['pressure'] = interp_state[2]
# Mass fractions are saved, so convert the mole
# fractions in ``interp_state`` to mass fractions.
timestep['massfractions'] = \
(interp_state[5:] *
self.reac.thermo.molecular_weights /
self.reac.thermo.mean_molecular_weight)
timestep['volume'] = interp_state[3]
if self.sensitivity:
# Add sensitivity interpolation here by reading
# from file on disk. Only have to do it once,
# so it shouldn't be too expensive.
prev_sens = table.cols.sensitivity[-1]
cur_sens = self.netw.sensitivities()
prev_time = table.cols.time[-1]
cur_time = self.netw.time
interp_sens = prev_sens + ((self.tend - prev_time) *
(cur_sens - prev_sens) /
(cur_time - prev_time))
timestep['sensitivity'] = interp_sens
# We don't need any of the rest of this step, so
# break
break
# If the ``save_time_step`` is set, save at the nearest
# step to the given interval. To avoid any errors, the
# values written to the binary save file will not be
# interpolated, but saved at the solver time step
# instead. If ``save_time_step`` is not set, save every
# time step to the binary file.
if self.save_time_step is not None:
# Add what to do here if the save_time_step is set.
if self.netw.time > self.save_time:
timestep['time'] = self.netw.time
(timestep['temperature'], timestep['pressure'],
timestep['massfractions']) = self.reac.thermo.TPY
timestep['volume'] = self.reac.volume
if self.sensitivity:
timestep['sensitivity'] = self.netw.sensitivities()
timestep.append()
table.flush()
self.save_time += self.save_time_step
else:
timestep['time'] = self.netw.time
(timestep['temperature'], timestep['pressure'],
timestep['massfractions']) = self.reac.thermo.TPY
timestep['volume'] = self.reac.volume
if self.sensitivity:
timestep['sensitivity'] = self.netw.sensitivities()
timestep.append()
table.flush()
# Print Reactor state information to the screen for
# monitoring.
if self.netw.time > self.print_time:
interp_state = utils.reactor_interpolate(self.print_time,
prev_time,
cur_time)
self.reactor_state_printer(interp_state)
self.print_time += self.print_time_step
elif self.netw.time == self.print_time:
self.reactor_state_printer(cur_time)
self.print_time += self.print_time_step
# If the temperature limit has been exceeded, we have
# ignition! Save the time this occurs at. In the
# future, the ignition time may be interpolated.
if self.reac.T >= self.temp_limit and ignition_found is False:
self.ignition_time = self.netw.time
ignition_found = True
if self.keywords.get('break_on_ignition', False):
self.reactor_state_printer(cur_time, end=False)
break
# Set the ``prev_time`` array equal to the ``cur_time``
# array so we can go to the next time step.
prev_time = cur_time
[docs] def run_simulation(self):
"""
Helper function that sequentially sets up the simulation case
and runs it. Useful for cases where nothing needs to be changed
between the setup and run. See `setup_case` and `run_case`.
"""
self.setup_case()
self.run_case()
[docs] def reactor_state_printer(self, state, end=False):
"""Produce pretty-printed output from the input reactor state.
In this function, we have to explicitly pass in the reactor
state instead of using ``self.reac`` because we might have
interpolated to get to the proper time.
:param state:
Vector of reactor state information.
:param end:
Boolean to tell the printer this is the final print operation.
"""
time = state[0]
temperature = state[1]
pressure = state[2]
volume = state[3]
vdot = state[4]
molefracs = state[5:]
# Begin printing
print(divider)
if not end:
print('Solution time (s) = {:E}'.format(time))
else:
print('End time reached (s) = {:E}'.format(time))
if self.ignition_time is not None:
print('Ignition time (s) = {:E}'.format(self.ignition_time))
elif end:
print('Ignition was not found.')
else:
pass
print(("Reactor Temperature (K) = {0:>13.4f}\n"
"Reactor Pressure (Pa) = {1:>13.4E}\n"
"Reactor Volume (m**3) = {2:>13.4E}\n"
"Reactor Vdot (m**3/s) = {3:>13.4E}"
).format(temperature, pressure, volume, vdot))
print('Gas Phase Mole Fractions:')
# Here we calculate the number of columns of species mole fractions
# that will best fill the available number of columns in the
# terminal.
#
# Add one to the max_species_length to ensure that there is at
# least one space between species.
max_species_length = len(max(self.species_names, key=len)) + 1
# Set the precision of the printed mole fractions. This is the
# number of columns that the number itself will take up, including
# the decimal separator. It is not the field width.
mole_frac_precision = 8
# Calculate how much space each species print will take. It is the
# max_species length + len(' = ') + the mole_frac_precision +
# len('E+00').
part_length = max_species_length + 3 + mole_frac_precision + 4
# Set the default number of columns in the terminal. Choose 80
# because it is the preferred width of Python source code, and
# putting a bigger number may make the output text file harder
# to read.
cols = 80
# Calculate the optimum number of columns as the floor of the
# quotient of the print columns and the part_length
num_print_cols = int(math.floor(cols/part_length))
# Create a list to store the values to be printed.
outlist = []
for species_name, mole_frac in zip(self.species_names, molefracs):
outlist.append('{0:>{1}s} = {2:{3}E}'.format(
species_name,
max_species_length,
mole_frac,
mole_frac_precision)
)
grouped = zip_longest(*[iter(outlist)]*num_print_cols, fillvalue='')
for items in grouped:
for item in items:
print(item, end='')
print('\n', end='')
print(divider, '\n')
[docs]class MultiSimulationCase(SimulationCase):
"""Class that sets up and runs a simulation case, for multiple.
When multiple cases are run, no output is printed and no simulation
data is saved; upon completion, the calculated ignition delay times
are written to the output file.
"""
def __init__(self, filenames):
"""Initialize the simulation case.
Read the SENKIN-format input file is read into the ``keywords``
dictionary.
:param filenames:
Dictionary containing the relevant file names for this
case.
"""
self.input_filename = filenames['input_filename']
self.mech_filename = filenames['mech_filename']
self.save_filename = filenames['save_filename']
self.thermo_filename = filenames['thermo_filename']
self.keywords = utils.read_input_file(self.input_filename)
[docs] def run_case(self):
"""
Actually run the case set up by ``setup_case``. Runs the
simulation by using ``ReactorNet.step(self.tend)``.
"""
ignition_found = False
# Main loop to run the calculation. As long as the time in
# the ``ReactorNet`` is less than the end time, keep going.
while self.netw.time < self.tend:
# If we are using a function to set the temperature as
# a function of time, use it here.
if self.temp_func is not None:
self.gas.TP = self.temp_func(self.netw.time), None
# Take the step towards the end time.
self.netw.step(self.tend)
# If the temperature limit has been exceeded, we have
# ignition! Save the time this occurs at. In the
# future, the ignition time may be interpolated.
if self.reac.T >= self.temp_limit and ignition_found is False:
self.ignition_time = self.netw.time
ignition_found = True
break