#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2014 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
The core calculations required for transforming the Input-datamodel to the Output one.
Uses *pandalon*'s automatic dependency extraction from calculation functions.
"""
import logging
import numpy as np
import pandas as pd
import lmfit
from . import pdcalc
from . import datamodel
from collections import OrderedDict
from operator import setitem
log = logging.getLogger(__name__)
[docs]def run(mdl, opts=None):
"""
:param mdl: the datamodel to process, all params and data
:param map opts: flags controlling non-functional aspects of the process (ie error-handling and logging, gui, etc)
"""
datamodel.ensure_modelpath_Series(mdl, '/engine')
#datamodel.ensure_modelpath_Series(mdl, '/params')
datamodel.ensure_modelpath_DataFrame(mdl, '/measured_eng_points')
params = mdl['params']
engine = mdl['engine']
measured_eng_points = mdl['measured_eng_points']
## Identify quantities necessary for the FITTING,
# and calculate them.
outcomes = ('eng_points.cm', 'eng_points.bmep', 'eng_points.pmf', 'engine.fuel_lhv')
pdcalc.execute_funcs_factory(eng_points_2_std_map, outcomes, params, engine, measured_eng_points)
## FIT
#
coeffs = datamodel.resolve_jsonpointer(mdl, '/params/fitting/coeffs')
coeffs = [lmfit.parameter.Parameter(name, **kws) for (name, kws) in coeffs.items()]
is_robust = datamodel.resolve_jsonpointer(mdl, '/params/fitting/is_robust', False)
fitted_coeffs = fit_engine_map(measured_eng_points, is_robust, coeffs)
engine['fc_map_coeffs'] = fitted_coeffs
fitted_eng_points = reconstruct_eng_points_fitted(engine, fitted_coeffs, measured_eng_points)
std_to_norm_map(engine, fitted_eng_points)
if datamodel.resolve_jsonpointer(mdl, '/params/plot_maps'):
mesh_eng_points = generate_mesh_eng_points_fitted(measured_eng_points, fitted_coeffs, measured_eng_points)
columns = ['pmf', 'cm', 'bmep']
plot_map(measured_eng_points, mesh_eng_points, columns)
## Flatten 2D-vectors to make a DataFrame
#
mesh_eng_points = {col: vec.flatten() for (col, vec) in mesh_eng_points.items()}
mesh_eng_points = pd.DataFrame(mesh_eng_points)
## Fill calced columns.
#
std_to_norm_map(engine, mesh_eng_points)
mdl['mesh_eng_points'] = pd.DataFrame(mesh_eng_points)
mdl['measured_eng_points'] = measured_eng_points
mdl['fitted_eng_points'] = pd.DataFrame(fitted_eng_points)
#datamodel.validate_model(mdl, additional_properties=False) TODO: Make OUT-MODEL pass validation.
return mdl
[docs]def eng_points_2_std_map(params, engine, eng_points):
"""
A factory of the calculation functions for reaching to the data necessary for the Fitting.
The order of the functions below not important, and the actual order of execution
is calculated from their dependencies, based on the data-frame's column access.
"""
from math import pi
funcs = [
lambda: setitem(engine, 'fuel_lhv', params['fuel'][engine.fuel]['lhv']),
lambda: setitem(eng_points, 'n', eng_points.n_norm * (engine.n_rated - engine.n_idle) + engine.n_idle),
lambda: setitem(eng_points, 'p', eng_points.p_norm * engine.p_max),
lambda: setitem(eng_points, 'fc', eng_points.fc_norm * engine.p_max),
lambda: setitem(eng_points, 'rps', eng_points.n / 60),
#lambda: setitem(eng_points, 'rps', eng_points.cm * 1000 / (2 * engine.stroke)),
lambda: setitem(eng_points, 'torque', (eng_points.p * 1000) / (eng_points.rps * 2 * pi)),
lambda: setitem(eng_points, 'bmep', (eng_points.torque * 10e-5 * 4 * pi) / (engine.capacity * 10e-6)),
lambda: setitem(eng_points, 'pmf', ((4 * pi * engine.fuel_lhv) / (engine.capacity * 10e-6)) * (eng_points.fc / (3600 * eng_points.rps * 2 * pi)) * 10e-5),
lambda: setitem(eng_points, 'cm', eng_points.rps * 2 * engine.stroke / 1000),
]
return funcs
def std_to_norm_map(engine, eng_points):
from math import pi
eng_points['rps'] = eng_points.cm * 1000 / (2 * engine.stroke)
eng_points['n'] = eng_points.rps * 60
eng_points['n_norm'] = eng_points.n / (engine.n_rated - engine.n_idle) + engine.n_idle
eng_points['torque'] = eng_points.bmep * (engine.capacity * 10e-3) / (4 * pi * 10e-5)
eng_points['p'] = eng_points.torque * (eng_points.rps * 2 * pi) / 1000
eng_points['fc'] = (eng_points.pmf * (engine.capacity * 10e-2) * (3600 * eng_points.rps * 2 * pi)) / (4 * pi * engine.fuel_lhv * 10e-5)
eng_points['fc_norm'] = eng_points.fc / engine.p_max
eng_points['p_norm'] = eng_points.p / engine.p_max
[docs]def engine_map_modelfunc(coeff_values, X):
"""
The function that models the engine-map.
"""
a = coeff_values['a']
b = coeff_values['b']
c = coeff_values['c']
a2 = coeff_values['a2']
b2 = coeff_values['b2']
loss0 = coeff_values['loss0']
loss2 = coeff_values['loss2']
pmf = X['pmf']
cm = X['cm']
bmep = (a + b*cm + c*cm**2)*pmf + (a2 + b2*cm)*pmf**2 + loss0 + loss2*cm**2
return bmep
def fit_engine_map(df, is_robust, coeffs):
assert len({'cm', 'bmep', 'pmf'} - set(df.columns)) == 0, \
"Missing fit-columns: %s" % {'cm', 'bmep', 'pmf'} - set(df.columns)
assert not np.any(np.isnan(df['pmf'])), \
"Cannot fit with NaNs in `pmf` data! \n%s" % np.any(np.isnan(df['pmf']), axis=1)
assert not np.any(np.isnan(df['cm'])), \
"Cannot fit with NaNs in `cm` data! \n%s" % np.any(np.isnan(df['cm']), axis=1)
residualfunc_args = (engine_map_modelfunc, df, df['bmep'])
residualfunc_kws = dict(is_robust=is_robust)
minimizer = lmfit.minimize(_robust_residualfunc, coeffs,
args=residualfunc_args,
kws=residualfunc_kws)
res_df = pd.Series(minimizer.params.valuesdict())
return res_df
def reconstruct_eng_points_fitted(engine, fitted_coeffs, eng_points):
bmep = engine_map_modelfunc(fitted_coeffs, eng_points)
fitted_eng_points = pd.DataFrame.from_items(zip(['pmf', 'cm', 'bmep'], (eng_points.pmf, eng_points.cm, bmep)))
return fitted_eng_points
def generate_mesh_eng_points_fitted(engine, fitted_coeffs, eng_points):
## Construct X.
#
dmin = eng_points.loc[:, ['pmf', 'cm']].min(axis=0)
dmax = eng_points.loc[:, ['pmf', 'cm']].max(axis=0)
drng = (dmax - dmin)
dmin -= 0.05 * drng
dmax += 0.10 * drng
dstp = (dmax - dmin) / 40
X1, X2 = np.mgrid[dmin[0]:dmax[0]:dstp[0], dmin[1]:dmax[1]:dstp[1]]
X = {'pmf': X1, 'cm': X2}
Y = engine_map_modelfunc(fitted_coeffs, X)
mesh_eng_points = OrderedDict(zip(['pmf', 'cm', 'bmep'], (X1, X2, Y)))
return mesh_eng_points
def plot_map(dfin, fitted_eng_points, columns):
(X1, X2, Y) = [fitted_eng_points[col] for col in columns]
x1min = X1.min(); x1max = X1.max();
x2min = X2.min(); x2max = X2.max();
extent=(x1min, x1max, x2min, x2max)
levels = np.arange(Y.min(), Y.max(), (Y.max() - Y.min()) / 10.0)
# import matplotlib
# matplotlib.use('WebAgg')
from matplotlib import pyplot as plt
plt.plot(dfin[columns[0]], dfin[columns[1]], '.c')
cntr = plt.contourf(X1, X2, Y, cmap=plt.cm.get_cmap(plt.cm.copper, len(levels)-1), extent=extent) # @UndefinedVariable
colorbar = plt.colorbar(cntr)
colorbar.set_label(columns[2], color='blue')
ax = plt.gca()
ax.set_title('Fitted normalized engine_map')
ax.set_aspect('auto'); ax.set_adjustable('box-forced')
ax.set_xlabel(columns[0], color='red'); ax.set_ylabel(columns[1], color='green')
plt.show()
def proc_vehicle(dfin, datamodel):
## Filter values
#
nrows = len(dfin)
dfin = dfin[(dfin.bmep > -1.0) & (dfin.bmep < 25.0)]
log.warning('Filtered %s out of %s rows for BAD bmep.', nrows-len(dfin), nrows)
return dfin
[docs]def _robust_residualfunc(coeffs, modelfunc, X, YData, is_robust=False, robust_prcntile=None):
"""
A non-linear iteratively-reweighted least-squares (IRLS) residual function (objective-function)
that robustly fits ``YData = modelfunc(X)``.
This method applies weights on each iteration so as to downscale any outliers and high-leverage data-points
based on the 'bisquare' standardized adjusted residuals:[#]_
.. math::
\frac{r}{K \times \hat{\sigma} \times \sqrt{1 - h}}
where:
:math:`r` (vector)
the residuals :math:`\hat{y} - y`
:math:`K` (scalar)
the *robust percentile* tuning constant used on each iteration to filter-out
adjusted-standardized-weights above 1, expressed as the Bisquare M-estimator efficiency
under Gaussian model.
:math:`\hat{\sigma}` (scalar)
the robust estimate of the *standard deviation* of the residuals
based on MAD[#]_ like this: :math:`\hat{\sigma}=1.4826\times\operatorname{MAD}`
:math:`h` : (vector)
the *hat vector*, the diagonal of the *hat matrix*,[#]_
which is used to reduce the weight of high-leverage data points
that are having a large effect on the least-squares fit.
:param modelfunc: The modeling function that accepts the dict of coeffs
:param nparray X:
:param nparray YData: measured-data points
:param boolean is_robust: Whether to deleverage outlier YData.
:param float robust_prcntile: The `K` percentile of the MAD,
[default: 4.68, filters-out approximately 5% of the residuals as outliers]
.. Seealso::
curve_fit, leastsq
.. [#] http://www.mathworks.com/help/stats/robustfit.html
.. [#] https://en.wikipedia.org/wiki/Median_absolute_deviation
.. [#] https://en.wikipedia.org/wiki/Hat_matrix
"""
YFitted = engine_map_modelfunc(coeffs.valuesdict(), X)
Residual = YFitted - YData
## Robust:
## Deleverage and standardize absolute-residuals based on a robust-MAD.
##
if is_robust:
R_abs = Residual.abs()
if not robust_prcntile:
robust_prcntile = 4.685 ## Bisquare M-estimator with 95% efficiency under the Gaussian model.
R_deleved = R_abs / (robust_prcntile * 1.4826 * np.median(R_abs)) ## * sqrt(1 - hat_vector))
## Calc the robust bisquared-residuals excluding outliers.
R_weights = (R_deleved < 1) * (1 - R_deleved**2)**2
Residual = R_weights * Residual
return Residual