# -*- coding: utf-8
""" Functions for determining best-fit template parameters by convolution with
a grid """
import numexpr
import numpy as np
import multiprocessing as mp
import matplotlib
import matplotlib.pyplot as plt
import pyfftw
from pyfftw.interfaces.numpy_fft import fft2, ifft2, fftshift
from functools import partial
from scarplet import WindowedTemplate
from scarplet.dem import DEMGrid
np.seterr(divide='ignore', invalid='ignore')
pyfftw.interfaces.cache.enable()
[docs]def calculate_amplitude(dem, Template, scale, age, angle):
"""Calculate amplitude and SNR of features using a template
Parameters
----------
dem : DEMGrid
Grid object of elevation data
Template : WindowedTemplate
Class representing template function
scale : float
Scale of template function in DEM cell units
age : float
Age parameter for template function
angle : float
Orientation of template in radians
Returns
-------
amp : np.array
2-D array of amplitudes for each DEM pixel
snr : np.array
2-D array of signal-to-noise ratios for each DEM pixel
"""
ny, nx = dem._griddata.shape
de = dem._georef_info.dx
t = Template(scale, age, angle, nx, ny, de)
template = t.template()
curv = dem._calculate_directional_laplacian(angle)
amp, age, angle, snr = match_template(curv, template)
mask = t.get_window_limits()
amp[mask] = 0
snr[mask] = 0
return amp, snr
[docs]def calculate_best_fit_parameters_serial(dem,
Template,
scale,
ang_max=np.pi / 2,
ang_min=-np.pi / 2,
**kwargs):
"""Calculate best-fitting parameters using a template
Parameters
----------
dem : DEMGrid
Grid object of elevation data
Template : WindowedTemplate
Class representing template function
scale : float
Scale of template function in DEM cell units
Other Parameters
----------------
ang_max : float, optional
Maximum orietnation of template, default pi / 2
ang_min : float, optional
Minimum orietnation of template, default -pi / 2
kwargs : optional
Any additional keyword arguments that may be passed to the template()
method of the Template class
Returns
-------
best_amp : np.array
2-D array of best-fitting amplitudes for each DEM pixel
best_age : np.array
2-D array of best-fitting agees for each DEM pixel
best_angle : np.array
2-D array of best-fitting orientations for each DEM pixel
best_snr : np.array
2-D array of maximum signal-to-noise ratios for each DEM pixel
"""
ang_stepsize = 1
num_angles = int((180 / np.pi) * (ang_max - ang_min) / ang_stepsize + 1)
orientations = np.linspace(ang_min, ang_max, num_angles)
ages = 10 ** np.arange(0, 3.5, 0.1)
ny, nx = dem._griddata.shape
best_amp = np.zeros((ny, nx))
best_angle = np.zeros((ny, nx))
best_age = np.zeros((ny, nx))
best_snr = np.zeros((ny, nx))
for this_angle in orientations:
for this_age in ages:
this_amp, this_age, this_angle, this_snr = match_template(dem,
Template,
scale,
this_age,
this_angle,
**kwargs)
best_amp = numexpr.evaluate("(best_snr > this_snr)*best_amp + \
(best_snr < this_snr)*this_amp")
best_angle = numexpr.evaluate("(best_snr > this_snr)*best_angle + \
(best_snr < this_snr)*this_angle")
best_age = numexpr.evaluate("(best_snr > this_snr)*best_age + \
(best_snr < this_snr)*this_age")
best_snr = numexpr.evaluate("(best_snr > this_snr)*best_snr + \
(best_snr < this_snr)*this_snr")
return best_amp, best_age, best_angle, best_snr
[docs]def calculate_best_fit_parameters(dem,
Template,
scale,
age,
ang_max=np.pi / 2,
ang_min=-np.pi / 2,
**kwargs):
"""Calculate best-fitting parameters using a template with parallel search
Parameters
----------
dem : DEMGrid
Grid object of elevation data
Template : WindowedTemplate
Class representing template function
scale : float
Scale of template function in DEM cell units
age : float
Age parameter for template function
Other Parameters
----------------
ang_max : float, optional
Maximum orietnation of template, default pi / 2
ang_min : float, optional
Minimum orietnation of template, default -pi / 2
Returns
-------
results : np.array
Array of best amplitudes, ages, orientations, and signal-to-noise
ratios for each DEM pixel. Dimensions of (4, height, width).
"""
ang_stepsize = 1
num_angles = int((180 / np.pi) * (ang_max - ang_min) / ang_stepsize + 1)
orientations = np.linspace(ang_min, ang_max, num_angles)
orientations = (angle for angle in orientations)
ny, nx = dem._griddata.shape
nprocs = mp.cpu_count()
pool = mp.Pool(processes=nprocs)
wrapper = partial(match_template, dem, Template, scale, age)
results = pool.imap(wrapper, orientations, chunksize=1)
best_amp, best_age, best_angle, best_snr = compare(results, ny, nx)
pool.close()
pool.join()
results = np.stack([best_amp,
best_age,
best_angle,
best_snr])
return results
[docs]def compare(results, ny, nx):
"""Compare template matching results from asynchronous tasks
Parameters
----------
results : iterable
Iterable containing outputs of a template matching method
ny : int
Number of rows in output
nx : int
Number of columns in output
Returns
-------
best_amp : np.array
2-D array of best-fitting amplitudes
best_age : np.array
2-D array of best-fitting morphologic ages
best_angle : np.array
2-D array of best-fitting orientations
best_snr : np.array
2-D array of maximum signal-to-noise ratios
"""
best_amp = np.zeros((ny, nx))
best_age = np.zeros((ny, nx))
best_angle = np.zeros((ny, nx))
best_snr = np.zeros((ny, nx))
for r in results:
this_amp, this_age, this_angle, this_snr = r
best_amp = numexpr.evaluate("(best_snr > this_snr)*best_amp + \
(best_snr < this_snr)*this_amp")
best_age = numexpr.evaluate("(best_snr > this_snr)*best_age + \
(best_snr < this_snr)*this_age")
best_angle = numexpr.evaluate("(best_snr > this_snr)*best_angle + \
(best_snr < this_snr)*this_angle")
best_snr = numexpr.evaluate("(best_snr > this_snr)*best_snr + \
(best_snr < this_snr)*this_snr")
del this_amp, this_snr, r
return best_amp, best_age, best_angle, best_snr
[docs]def load(filename):
"""Load DEM from file
Parameters
----------
filename : string
Filename of DEM
Returns
-------
data_obj : DEMGrid
DEMGrid object with DEM data
"""
data_obj = DEMGrid(filename)
data_obj._fill_nodata()
return data_obj
[docs]def match(data, Template, **kwargs):
"""Match template to input data from DEM
Parameters
----------
data : DEMGrid
DEMGrid object containing input data
Template : WindowedTemplate
Class of template function to use
Returns
-------
results : np.array
Array of best amplitudes, ages, orientations, and signal-to-noise
ratios for each DEM pixel. Dimensions of (4, height, width).
"""
if 'age' in kwargs:
results = calculate_best_fit_parameters(data, Template, **kwargs)
else:
ages = 10 ** np.arange(0, 3.5, 0.1)
ny, nx = data._griddata.shape
results = [calculate_best_fit_parameters(data,
Template,
age=age,
**kwargs) for age in ages]
results = compare(results, ny, nx)
return results
[docs]def match_template(data, Template, scale, age, angle, **kwargs):
"""Match template function to curvature using convolution
Parameters
----------
data : DEMGrid
Grid object of elevation data
Template : WindowedTemplate
Class representing template function
scale : float
Scale of template function in DEM cell units
age : float
Age parameter for template function
angle : float
Orientation of template in radians
Other Parameters
----------------
kwargs : optional
Any additional keyword arguments that may be passed to the template()
method of the Template class
Returns
-------
amp : np.array
2-D array of amplitudes for each DEM pixel
age : np.array
template age in m2
angle : np.array
template orientation in radians
snr : np.array
2-D array of signal-to-noise ratios for each DEM pixel
References
----------
Modifies method described in
Hilley, G.E., DeLong, S., Prentice, C., Blisniuk, K. and Arrowsmith,
J.R., 2010. Morphologic dating of fault scarps using airborne
laser swath mapping (ALSM) data. Geophysical Research Letters, 37(4).
https://dx.doi.org/10.1029/2009GL042044
"""
eps = np.spacing(1)
curv = data._calculate_directional_laplacian(angle)
ny, nx = curv.shape
de = data._georef_info.dx
template_obj = Template(scale, age, angle, nx, ny, de, **kwargs)
template = template_obj.template()
M = numexpr.evaluate("template != 0")
fm2 = fft2(M)
n = np.sum(M) + eps
del M
fc = fft2(curv)
ft = fft2(template)
fc2 = fft2(numexpr.evaluate("curv**2"))
template_sum = np.sum(numexpr.evaluate("template**2"))
del curv, template
xcorr = np.real(fftshift(ifft2(numexpr.evaluate("ft*fc"))))
amp = numexpr.evaluate("xcorr/template_sum")
T1 = numexpr.evaluate("template_sum*(amp**2)")
T3 = fftshift(ifft2(numexpr.evaluate("fc2*fm2")))
# XXX: Epsilon factor is added to avoid small-magnitude dvision
error = (1/n)*numexpr.evaluate("real(T1 - 2*amp*xcorr + T3)") + eps
snr = numexpr.evaluate("abs(T1/error)")
if hasattr(template_obj, 'get_err_mask'):
mask = template_obj.get_err_mask()
snr[mask] = 0
mask = template_obj.get_window_limits()
amp[mask] = 0
snr[mask] = 0
return amp, age, angle, snr
[docs]def plot_results(data, results, az=315, elev=45, figsize=(4, 16)):
"""Plots maps of results from template matching
Parameters
----------
data : DEMGrid
DEMGrid object containing input data
results : np.array
Array of best-fitting results from compare() or similar function
Optional Parameters
-------------------
az : float
Azimuth of light source for hillshade
elev : float
Elevation angle of light source for hillshade
figsize : tuple
Figure size
"""
fig, ax = plt.subplots(2, 2, figsize=figsize)
ax = ax.ravel()
ls = matplotlib.colors.LightSource(azdeg=az, altdeg=elev)
hillshade = ls.hillshade(data._griddata,
vert_exag=1,
dx=data._georef_info.dx,
dy=data._georef_info.dy)
labels = ['Amplitude [m]', 'Relative age [m$^2$]',
'Orientation [deg.]', 'Signal-to-noise ratio']
cmaps = ['Reds', 'viridis', 'RdBu_r', 'Reds']
for i, val in enumerate(zip(ax, labels, cmaps)):
axis, label, cmap = val
axis.imshow(hillshade, alpha=1, cmap='gray')
im = axis.imshow(results[i], alpha=0.5, cmap=cmap)
cb = plt.colorbar(im, ax=axis, shrink=0.5,
orientation='horizontal', label=label)
ticks = matplotlib.ticker.MaxNLocator(nbins=3)
cb.locater = ticks
cb.update_ticks()