Source code for cansen.utils
# Standard libraries
import sys
import os
from itertools import product
from math import pi
from tempfile import NamedTemporaryFile
from argparse import ArgumentParser
from multiprocessing import cpu_count
# Third-party modules
from cantera import ck2cti
# Local imports
from .printer import divider
from .exceptions import (KeywordError,
MultipleProblemError,
UnsupportedKeyword,
UndefinedKeywordError,
MissingReqdKeywordError,
MissingKeyword,
)
[docs]class MyParser(ArgumentParser):
"""Parser that redefines the error printing"""
[docs] def error(self, message):
sys.stderr.write('\ncansen: error: %s\n\n' % message)
self.print_help()
sys.exit(2)
[docs]def convert_mech(mech_filename, thermo_filename=None):
"""Convert a mechanism and return a string with the filename.
Convert a CHEMKIN format mechanism to the Cantera CTI format using
the Cantera built-in script `ck2cti`.
:param mech_filename:
Filename of the input CHEMKIN format mechanism. The converted
CTI file will have the same name, but with `.cti` extension.
:param thermo_filename:
Filename of the thermodynamic database. Optional if the
thermodynamic database is present in the mechanism input.
"""
arg = ['--input='+mech_filename]
if thermo_filename is not None:
arg.append('--thermo='+thermo_filename)
# Convert the mechanism
ck2cti.main(arg)
mech_filename = mech_filename[:-4]+'.cti'
print('Mechanism conversion successful, written to '
'{}'.format(mech_filename))
return mech_filename
[docs]def process_multi_input(input_filename):
"""Process a formatted input file into multiple cases.
Processes a formatted input file that contains multiple cases into
separate temporary files, for individual reading of keywords.
:param input_filename:
Filename of the SENKIN input file.
:return filenames:
List of temporary filenames.
"""
filenames = []
temp_file = NamedTemporaryFile(delete=False)
with open(input_filename) as input_file:
for line in input_file:
if (line.startswith('!') or line.startswith('.') or
line.startswith('/') or line.strip() == ''):
# skip comment or blank lines
continue
elif line.upper().startswith('END'):
temp_file.write(line)
# store temporary file and create new
temp_file.seek(0)
filenames.append(temp_file.name)
temp_file = NamedTemporaryFile(delete=False)
continue
else:
# just print line
temp_file.write(line)
# check if last file actually written to; if not, close
if os.stat(temp_file.name).st_size == 0:
temp_file.close()
return filenames
[docs]def remove_files(files):
"""Delete files.
:param files:
List of names of files to be removed
"""
for f in files:
os.remove(f)
return None
[docs]def read_input_file(input_filename):
"""Read a formatted input file and return a dictionary of keywords.
:param input_filename:
Filename of the SENKIN input file.
"""
# Initialize the dictionaries and lists that will be filled by
# the parsing.
keywords = {}
reactants = []
oxidizer = {}
fuel = {}
complete_products = []
additional_species = {}
# problem_type is a boolean indicating whether a problem selection
# has been made.
problem_type = False
# unsupported_keys is a list of keywords that are not supported
# yet.
unsupported_keys = [
'ADAP', 'AEXT', 'AFRA', 'AGGA', 'AGGB', 'AGGD', 'AGGE',
'AGGFD', 'AGGMN', 'AINT', 'AREA', 'AREAQ', 'AROP', 'ASEN',
'ASTEPS', 'AVALUE', 'AVAR', 'BETA', 'BLKEQ', 'BULK', 'CLSC',
'CNTN', 'CNTT', 'COLR', 'DIST', 'ENRG', 'EPSG', 'EPSR', 'CLSM',
'EPSS', 'EPST', 'ETCH', 'GFAC', 'GMHTC', 'HTC', 'HTRN', 'IPSR',
'IRET', 'ISTP', 'KLIM', 'MAXIT', 'MCUT', 'MMASS', 'NADAP',
'NEWRUN', 'NMOM', 'NNEG', 'NOCG', 'NSOL', 'PNDE', 'PPRO', 'PRNT',
'PROE', 'PVFE', 'QFUN', 'QLOS', 'QPRO', 'QRGEQ', 'QRSEQ',
'RELAXC', 'ROP', 'RSTR', 'SCLM', 'SCLS', 'SCOR', 'SENG', 'SENT',
'SFAC', 'SIZE', 'SOLUTION_TECHNIQUE', 'SSTT', 'SURF', 'TAMB',
'TGIV', 'TIFP', 'TRAN', 'TRES', 'TRST', 'TSRF', 'TSTR', 'UIGN',
'USET', 'WENG', 'XMLI'
]
with open(input_filename) as input_file:
print(divider)
print('Keyword Input:\n')
for line in input_file:
# Echo the input back to the output file.
print(' '*10, line, end='')
if (line.startswith('!') or line.startswith('.') or
line.startswith('/') or line.strip() == ""):
continue
elif line.upper().startswith('CONV'):
if problem_type:
raise MultipleProblemError(line, keywords['problemType'])
else:
keywords['problemType'] = 1
problem_type = True
elif line.upper().startswith('CONP'):
if problem_type:
raise MultipleProblemError(line, keywords['problemType'])
else:
keywords['problemType'] = 2
problem_type = True
elif line.upper().startswith('VPRO'):
if not problem_type:
keywords['problemType'] = 3
vproTime = [float(line.split()[1])]
vproVol = [float(line.split()[2])]
problem_type = True
elif problem_type and keywords.get('problemType') != 3:
raise MultipleProblemError(line, keywords['problemType'])
elif problem_type and keywords.get('problemType') == 3:
vproTime.append(float(line.split()[1]))
vproVol.append(float(line.split()[2]))
elif line.upper().startswith('CONT'):
if problem_type:
raise MultipleProblemError(line, keywords['problemType'])
else:
keywords['problemType'] = 4
problem_type = True
elif line.upper().startswith('COTV'):
if problem_type:
raise MultipleProblemError(line, keywords['problemType'])
else:
keywords['problemType'] = 5
problem_type = True
elif line.upper().startswith('VTIM'):
if problem_type:
raise MultipleProblemError(line, keywords['problemType'])
else:
keywords['problemType'] = 6
problem_type = True
elif line.upper().startswith('TTIM'):
if problem_type:
raise MultipleProblemError(line, keywords['problemType'])
else:
keywords['problemType'] = 7
problem_type = True
elif line.upper().startswith('TPRO'):
if not problem_type:
keywords['problemType'] = 8
TproTime = [float(line.split()[1])]
TproTemp = [float(line.split()[2])]
problem_type = True
elif problem_type and keywords.get('problemType') != 8:
raise MultipleProblemError(line, keywords['problemType'])
elif problem_type and keywords.get('problemType') == 8:
TproTime.append(float(line.split()[1]))
TproTemp.append(float(line.split()[2]))
elif line.upper().startswith('ICEN'):
if problem_type:
raise MultipleProblemError(line, keywords['problemType'])
else:
keywords['problemType'] = 9
problem_type = True
elif line.upper().startswith('TEMP'):
keywords['temperature'] = float(line.split()[1])
elif line.upper().startswith('REAC'):
species = line.split()[1]
molefrac = line.split()[2]
reactants.append(':'.join([species, molefrac]))
elif line.upper().startswith('PRES'):
keywords['pressure'] = float(line.split()[1])
elif line.upper().startswith('TIME'):
keywords['endTime'] = float(line.split()[1])
elif line.upper().startswith('TLIM'):
keywords['tempLimit'] = float(line.split()[1])
elif line.upper().startswith('DTIGN'):
keywords['tempThresh'] = float(line.split()[1])
elif line.upper().startswith('ATOL'):
keywords['abstol'] = float(line.split()[1])
elif line.upper().startswith('RTOL'):
keywords['reltol'] = float(line.split()[1])
elif line.upper().startswith('DELT'):
keywords['prntTimeInt'] = float(line.split()[1])
elif line.upper().startswith('DTSV'):
keywords['saveTimeInt'] = float(line.split()[1])
elif line.upper().startswith('STPT'):
keywords['maxTimeStep'] = float(line.split()[1])
elif line.upper().startswith('EQUI'):
keywords['eqRatio'] = float(line.split()[1])
elif line.upper().startswith('OXID'):
species = line.split()[1]
molefrac = float(line.split()[2])
oxidizer[species] = molefrac
elif line.upper().startswith('FUEL'):
species = line.split()[1]
molefrac = float(line.split()[2])
fuel[species] = molefrac
elif line.upper().startswith('CPROD'):
species = line.split()[1]
complete_products.append(species)
elif line.upper().startswith('ADD'):
species = line.split()[1]
molefrac = float(line.split()[2])
additional_species[species] = molefrac
elif line.upper().startswith('SENS'):
keywords['sensitivity'] = True
elif line.upper().startswith('VOL '):
# The default units of volume in SENKIN are cm**3, but
# the default in Cantera is m**3 so we have to convert.
keywords['reactorVolume'] = float(line.split()[1])/1.0E6
elif line.upper().startswith('RTLS'):
keywords['sensRelTol'] = float(line.split()[1])
elif line.upper().startswith('ATLS'):
keywords['sensAbsTol'] = float(line.split()[1])
elif line.upper().startswith('IGNBREAK'):
keywords['break_on_ignition'] = True
elif line.upper().startswith('CMPR'):
keywords['comp_ratio'] = float(line.split()[1])
elif line.upper().startswith('DEG0'):
keywords['start_crank_angle'] = float(line.split()[1])
elif line.upper().startswith('VOLD'):
keywords['swept_volume'] = float(line.split()[1])/1.0E6
elif line.upper().startswith('VOLC'):
keywords['clear_volume'] = float(line.split()[1])/1.0E6
elif line.upper().startswith('LOLR'):
keywords['rod_radius_ratio'] = float(line.split()[1])
elif line.upper().startswith('RPM'):
keywords['rev_per_min'] = float(line.split()[1])
elif line.upper().startswith('BORE'):
keywords['cyl_bore'] = float(line.split()[1])/1.0E2
elif line.upper().startswith('STROKE'):
keywords['stroke_length'] = float(line.split()[1])/1.0E2
elif line.upper().startswith('RODL'):
keywords['connect_rod_len'] = float(line.split()[1])/1.0E2
elif line.upper().startswith('CRAD'):
keywords['crank_radius'] = float(line.split()[1])/1.0E2
elif line.upper()[0:3] in unsupported_keys:
raise UnsupportedKeyword(line)
continue
elif line.upper().startswith('END'):
continue
else:
raise UndefinedKeywordError(line)
print('\n', divider, '\n', sep='')
# The endTime, temperature, pressure, and problemType are required
# input. Exit if any of them are not found.
if 'endTime' not in keywords:
raise MissingReqdKeywordError('TIME')
if 'temperature' not in keywords:
raise MissingReqdKeywordError('TEMP')
if 'pressure' not in keywords:
raise MissingReqdKeywordError('PRES')
if 'problemType' not in keywords:
raise MissingReqdKeywordError(
'CONP', 'CONV', 'VPRO',
'CONT', 'ICEN', 'TPRO',
'COTV', 'TTIM', 'VTIM',
)
elif keywords.get('problemType') == 3:
keywords['vproTime'] = vproTime
keywords['vproVol'] = vproVol
elif keywords.get('problemType') == 8:
keywords['TproTime'] = TproTime
keywords['TproTemp'] = TproTemp
elif keywords.get('problemType') == 9:
# Variables needed to calculate piston velocity and other
# quantities for the IC Engine model: Stroke length, rod
# length to crank radius ratio, rpm, starting angle, initial
# volume
if 'rev_per_min' not in keywords:
raise MissingReqdKeywordError('RPM')
# Handle the various ways to calculate the stroke length
if 'stroke_length' in keywords:
print("Info: 'STROKE' was specified, and will be used for the "
"stroke length regardless of other parameters.")
elif all(key in keywords for key in ('swept_volume', 'cyl_bore')):
print("Info: Using swept volume and cylinder bore to "
"calculate stroke length.")
keywords['stroke_length'] = (keywords['swept_volume']*4 /
(pi*keywords['cyl_bore']**2))
elif all(key in keywords for key in ('comp_ratio',
'clear_volume',
'cyl_bore')):
print("Info: Using compression ratio, clearance volume, and "
"cylinder bore to calculate stroke length.")
keywords['swept_volume'] = (keywords['clear_volume'] *
(keywords['comp_ratio'] - 1))
keywords['stroke_length'] = (keywords['swept_volume']*4 /
(pi*keywords['cyl_bore']**2))
elif 'crank_radius' in keywords:
print("Info: Using crank radius to compute the stroke length.")
keywords['stroke_length'] = 2*keywords['crank_radius']
else:
raise MissingReqdKeywordError(
'STROKE', 'VOLD', 'BORE',
'CMPR', 'VOLC', 'CRAD',
)
# Handle the various ways to calculate the initial volume
if 'reactorVolume' in keywords:
print("Info: The inital reactor volume was specified by the VOL "
"keyword and this value will be used regardless of other "
"settings.")
elif all(key in keywords for key in ('swept_volume', 'clear_volume',
'comp_ratio')):
raise KeywordError("Only two of 'VOLD', 'VOLC', and 'CMPR' may be "
"specified.")
elif all(key in keywords for key in ('swept_volume', 'clear_volume')):
print("Info: Computing initial reactor volume from the swept "
"volume and the clearance volume.")
keywords['reactorVolume'] = (keywords['swept_volume'] +
keywords['clear_volume'])
elif all(key in keywords for key in ('comp_ratio', 'clear_volume')):
print("Info: Computing initial reactor volume from the "
"compression ratio and clearance volume.")
keywords['reactorVolume'] = (keywords['comp_ratio'] *
keywords['clear_volume'])
elif all(key in keywords for key in ('comp_ratio', 'swept_volume')):
print("Info: Computing initial reactor volume from the "
"compression ratio and swept volume.")
keywords['reactorVolume'] = (keywords['swept_volume'] *
(1 + 1/(keywords['comp_ratio'] - 1)))
elif all(key in keywords for key in ('clear_volume', 'cyl_bore')):
print("Info: Computing initial reactor volume from the cylinder "
"bore, stroke length, and clearance volume.")
keywords['reactorVolume'] = (pi/4*keywords['cyl_bore']**2 *
keywords['stroke_length'] +
keywords['clear_volume'])
else:
raise MissingReqdKeywordError('VOLD', 'VOLC', 'CMPR', 'BORE',
'VOL')
# Handle the ways to calculate the rod length to radius ratio
if 'rod_radius_ratio' in keywords:
print("Info: The connecting rod length to crank radius ratio was "
"specified by the 'LOLR' keyword and this value will be "
"used regardless of other settings.")
elif all(key in keywords for key in ('connect_rod_len',
'crank_radius')):
print("Info: Using given connecting rod length and crank radius "
"to compute the ratio.")
keywords['rod_radius_ratio'] = (keywords['connect_rod_len'] /
keywords['crank_radius'])
else:
raise MissingReqdKeywordError('LOLR', 'CRAD', 'RODL')
# Set the default reactor volume, if it is not specified
if 'reactorVolume' not in keywords:
keywords['reactorVolume'] = 1.0E-6
raise MissingKeyword('No reactor volume specified, assuming '
'1.0 cm**3.')
# The reactants can be specified by REAC or EQUI + FUEL + OXID +
# CPROD. One or the other of these must be present; if neither or
# both are present, exit.
if (reactants and
(oxidizer or fuel or complete_products or additional_species or
('eqRatio' in keywords))):
raise KeywordError('REAC and EQUI cannot both be specified.')
elif ('eqRatio' in keywords and not
(oxidizer and fuel and complete_products)):
raise KeywordError('If EQUI is specified, all of FUEL, OXID and '
'CPROD must be as well.')
elif reactants:
keywords['reactants'] = reactants
elif 'eqRatio' in keywords:
keywords['oxidizer'] = oxidizer
keywords['fuel'] = fuel
keywords['completeProducts'] = complete_products
keywords['additionalSpecies'] = additional_species
else:
raise MissingReqdKeywordError('REAC', 'EQUI')
# Set the default temperature threshold to determine ignition
# delay.
if 'tempThresh' not in keywords:
keywords['tempThresh'] = 400
return keywords
[docs]def cli_parser(argv):
"""Parse command line interface input.
:param argv:
List of command line options.
"""
parser = MyParser(
prog='cansen',
description='CanSen - the SENKIN-like wrapper for Cantera written in Python.'
)
parser.add_argument('-V', '--version',
action='store_true',
help='Show the version of CanSen and quit')
parser.add_argument('-i', '--input',
type=str,
help='The simulation input file in SENKIN format.')
parser.add_argument('-o', '--output',
type=str,
default='output.out',
help='The text output file.')
parser.add_argument('-x', '--save',
type=str,
default='save.hdf',
help='The binary save output file.')
parser.add_argument('-c', '--chem',
type=str,
default='chem.xml',
help='The chemistry input file, in either CHEMKIN,'
' Cantera CTI or CTML format.')
parser.add_argument('-d', '--thermo',
type=str,
help='The thermodynamic database. Optional if the'
' thermodynamic database is specified in the'
' chemistry input file. Otherwise, required.')
parser.add_argument('--convert',
action='store_true',
help='Convert the input mechanism to CTI format '
'and quit. If ``--convert`` is specified, '
'the SENKIN input file is optional.')
parser.add_argument('-m', '--multi',
type=int,
nargs='?',
const=cpu_count(),
default=False,
help='Run multiple cases from the input file. '
'Optional. If ``-m`` is used, must specify '
'number of processors to be used (e.g., '
'``-m 4``). If ``--multi`` is specified, '
'CanSen uses the available number of '
'processors by default.')
if len(argv) == 0:
parser.print_help()
sys.exit(1)
args = parser.parse_args(argv)
filenames = {}
if args.version:
from ._version import __version__
print('CanSen {version} from {path} ()'.format(
version=__version__,
path=os.path.abspath(os.path.dirname(__file__))))
sys.exit(0)
if args.input:
input_filename = args.input
if not os.path.isfile(input_filename):
print('Error: The specified input file '
'"{}" does not exist'.format(input_filename)
)
sys.exit(1)
filenames['input_filename'] = input_filename
elif not args.input and not args.convert:
print('Error: The input file must be specified')
sys.exit(1)
else:
filenames['input_filename'] = None
filenames['output_filename'] = args.output
filenames['save_filename'] = args.save
if not os.path.isfile(args.chem):
print('Error: The specified chemistry file '
'"{}" does not exist'.format(args.chem)
)
sys.exit(1)
filenames['mech_filename'] = args.chem
if args.thermo:
if not os.path.isfile(args.thermo):
print('Error: The specified thermodynamic database '
'"{}" does not exist'.format(args.thermo))
sys.exit(1)
filenames['thermo_filename'] = args.thermo
convert = args.convert
num_proc = None
multi = False
if args.multi:
multi = True
num_proc = args.multi
return filenames, convert, multi, num_proc
[docs]def reactor_interpolate(interp_time, state1, state2):
"""Linearly interpolate the reactor states to the given input time.
:param interp_time:
Time at which the interpolated values should be calculated
:param state1:
Array of the state information at the previous time step.
:param state2:
Array of the state information at the current time step.
"""
interp_state = state1 + ((state2 - state1)*(interp_time - state1[0]) /
(state2[0] - state1[0]))
return interp_state
[docs]def equivalence_ratio(gas, eq_ratio, fuel, oxidizer, complete_products,
additional_species):
"""Calculate the mixture mole fractions from the equivalence ratio.
Given the equivalence ratio, fuel mixture, oxidizer mixture,
the products of complete combustion, and any additional species for
the mixture, return a string containing the mole fractions of the
species, suitable for setting the state of the input :py:class:`~cantera.ThermoPhase`.
:param gas:
Cantera :py:class:`~cantera.ThermoPhase` object containing the desired species.
:param eq_ratio:
Equivalence ratio
:param fuel:
Dictionary of molecules in the fuel mixture and the fraction of
each molecule in the fuel mixture
:param oxidizer:
Dictionary of molecules in the oxidizer mixture and the
fraction of each molecule in the oxidizer mixture.
:param complete_products:
List of species in the products of complete combustion.
:param additional_species:
Dictionary of molecules that will be added to the mixture.
The mole fractions given in this dictionary are as a fraction
of the total mixture.
"""
reactants = ''
cprod_elems = {}
fuel_elems = {}
oxid_elems = {}
# Check sum of fuel and oxidizer values; normalize if greater than 1
fuel_sum = sum(fuel.values())
if fuel_sum > 1.0:
for sp, x in fuel.items():
fuel[sp] = x/fuel_sum
oxid_sum = sum(oxidizer.values())
if oxid_sum > 1.0:
for sp, x in oxidizer.items():
oxidizer[sp] = x/oxid_sum
# Check oxidation state of complete products
for sp, el in product(complete_products, gas.element_names):
if el.upper() not in cprod_elems:
cprod_elems[el.upper()] = {}
cprod_elems[el.upper()][sp] = int(gas.n_atoms(sp, el))
num_C_cprod = sum(cprod_elems.get('C', {0: 0}).values())
num_H_cprod = sum(cprod_elems.get('H', {0: 0}).values())
num_O_cprod = sum(cprod_elems.get('O', {0: 0}).values())
oxid_state = 4*num_C_cprod + num_H_cprod - 2*num_O_cprod
if oxid_state != 0:
print("Warning: One or more products of incomplete combustion "
"were specified.")
# Find the number of H, C, and O atoms in the fuel molecules.
for sp, el in product(fuel.keys(), gas.element_names):
if el not in fuel_elems:
fuel_elems[el.upper()] = 0
fuel_elems[el.upper()] += gas.n_atoms(sp, el) * fuel[sp]
num_C_fuel = fuel_elems.get('C', 0)
num_H_fuel = fuel_elems.get('H', 0)
num_O_fuel = fuel_elems.get('O', 0)
# Find the number of H, C, and O atoms in the oxidizer molecules.
for sp, el in product(oxidizer.keys(), gas.element_names):
if el not in oxid_elems:
oxid_elems[el.upper()] = 0
oxid_elems[el.upper()] += gas.n_atoms(sp, el) * oxidizer[sp]
num_O_oxid = oxid_elems.get('O', 0)
# Check that all of the elements specified in the fuel and oxidizer
# are present in the complete products and vice versa.
for el in cprod_elems.keys():
if ((sum(cprod_elems[el].values()) > 0 and fuel_elems[el] == 0 and
oxid_elems[el] == 0) or (sum(cprod_elems[el].values()) == 0 and
(fuel_elems[el] > 0 or oxid_elems[el] > 0))):
print('Error: Must specify all elements in the fuel + oxidizer '
'in the complete products and vice-versa')
sys.exit(1)
# Compute the amount of oxidizer required to consume all the
# carbon and hydrogen in the complete products
if num_C_cprod > 0:
spec = cprod_elems['C'].keys()
ox = sum([cprod_elems['O'][sp]
for sp in spec if cprod_elems['C'][sp] > 0])
C_multiplier = ox/num_C_cprod
else:
C_multiplier = 0
if num_H_cprod > 0:
spec = cprod_elems['H'].keys()
ox = sum([cprod_elems['O'][sp]
for sp in spec if cprod_elems['H'][sp] > 0])
H_multiplier = ox/num_H_cprod
else:
H_multiplier = 0
# Compute how many O atoms are required to oxidize everybody
num_O_req = (num_C_fuel * C_multiplier +
num_H_fuel * H_multiplier - num_O_fuel)
O_mult = num_O_req/num_O_oxid
# Find the total number of moles in the fuel + oxidizer mixture
total_oxid_moles = sum([O_mult * amt for amt in oxidizer.values()])
total_fuel_moles = sum([eq_ratio * amt for amt in fuel.values()])
total_reactant_moles = total_oxid_moles + total_fuel_moles
# Handle the case where additional species are specified separately
if additional_species:
total_additional_species = sum(additional_species.values())
if total_additional_species >= 1.0:
print('Error: Additional species must sum to less than 1')
remain = 1.0 - total_additional_species
for species, molefrac in additional_species.items():
add_spec = ':'.join([species, str(molefrac)])
reactants = ','.join([reactants, add_spec])
else:
remain = 1.0
# Compute the mole fractions of the fuel and oxidizer components
# given that a certain portion of the mixture will have been taken
# up by the additional species, if any.
for species, ox_amt in oxidizer.items():
molefrac = ox_amt * O_mult/total_reactant_moles * remain
add_spec = ':'.join([species, str(molefrac)])
reactants = ','.join([reactants, add_spec])
for species, fuel_amt in fuel.items():
molefrac = fuel_amt*eq_ratio/total_reactant_moles*remain
add_spec = ':'.join([species, str(molefrac)])
reactants = ','.join([reactants, add_spec])
# Take off the first character, which is a comma
reactants = reactants[1:]
return reactants