# Mb create Problem class? Yep :^)
# in meters
minr, maxr = np.array([0.1]), np.array([150])
nobsr = np.array([50])
minz, maxz = np.array([-400, -200, -100, -50]), np.array([-200, -100, -50, 0])
nobsz = np.array([50, 50, 50, 50])
# in cartesian X, Y
reciever_lines = np.array(
[
[[0, 5], [10, 5]],
[[10, 10], [20, 10]],
[[20, 15], [30, 15]],
[[30, 20], [40, 20]],
[[40, 25], [50, 25]],
[[50, 30], [60, 30]],
[[60, 35], [70, 35]],
[[70, 40], [80, 40]],
[[80, 45], [90, 45]],
],
dtype=np.float64,
)
sources = np.array(
[
[[0.1, -10], [0.1, -30]],
[[0.1, -40], [0.1, -60]],
[[0.1, -70], [0.1, -90]],
[[0.1, -100], [0.1, -120]],
[[0.1, -130], [0.1, -150]],
[[0.1, -160], [0.1, -180]],
[[0.1, -190], [0.1, -210]],
[[0.1, -220], [0.1, -240]],
[[0.1, -250], [0.1, -270]],
[[0.1, -280], [0.1, -300]],
],
dtype=np.float64,
)
conductivities = np.array([1.0, 10.0, 0.1, 0.01])
source_powers = np.array([ 0, 0, 10, 0, 0, 0, 0, 0, 0, 0 ], dtype=np.float64)
initial_problem = Problem(reciever_lines, conductivities, source_powers, sources, minr, maxr, nobsr, minz, maxz, nobsz, sratioz=1)
print(initial_problem)
lo(initial_problem.reciever_weights)
show_calc_area(initial_problem, figsize=(15, 6), show_points=False, show_legend=False)
initial_problem.asm().solve()
print(initial_problem)
show_potential_field(initial_problem, figsize=(15, 6), show_legend=False)
synthetic_data = initial_problem.reciever_lines_voltage
synthetic_data
mean_powers = np.zeros(source_powers.shape[0])
powers = mean_powers
problem = Problem(
reciever_lines,
conductivities,
powers,
sources,
minr,
maxr,
nobsr,
minz,
maxz,
nobsz,
sratioz=1.0,
)
_, m_r, m_z = problem._asm_lhs()
A = problem._apply_lhs_boundaries()
problem._asm_rhs_vecs(m_r, m_z)
solve_problem = factorized(A)
problem_jacob = problem.jac()
def lsm(powers: np.ndarray, problem: Problem, synthetic_data: np.ndarray, mean_powers: np.array, alpha: float) -> np.ndarray:
problem.source_powers = powers
problem._asm_rhs()
rhs = problem._apply_rhs_boundaries()
problem.solution = solve_problem(rhs)
reciever_lines_data = problem.reciever_lines_voltage
least_squares = np.sum(((synthetic_data - reciever_lines_data))**2)
regularization = alpha * np.sum((powers - mean_powers)**2) if not np.isclose(0, alpha, atol=1e-15) else None
if regularization is not None:
least_squares += regularization
return least_squares
def jac(powers: np.ndarray, problem: Problem, synthetic_data: np.ndarray, mean_powers: np.array, alpha: float) -> np.ndarray:
problem.source_powers = powers
problem._asm_rhs()
rhs = problem._apply_rhs_boundaries()
problem.solution = solve_problem(rhs)
reciever_lines_data = problem.reciever_lines_voltage
lsm_jac_part = (synthetic_data - reciever_lines_data)
J = -2.0 * np.sum(lsm_jac_part[:, None] * problem_jacob, axis=0)
regularization_jacob = 2 * alpha * np.sum(powers - mean_powers) if not np.isclose(0, alpha, atol=1e-15) else None
if regularization_jacob is not None:
J += regularization_jacob
return J
res = minimize(
lsm,
powers,
tol=1.0e-10,
jac=jac,
args=(problem, synthetic_data, mean_powers, 0.0),
options={"maxiter": 100, "disp": False},
)
res
alphas = 10.0 ** (-np.arange(16))
alphas
rel_errs = []
for alpha in tqdm(alphas, f"Iterating over alphas"):
res = minimize(
lsm,
powers,
tol=1.0e-15,
jac=jac,
args=(problem, synthetic_data, mean_powers, alpha),
options={"maxiter": 100, "disp": False},
)
rel_err = (
np.linalg.norm(source_powers - res.x) * 100 / np.linalg.norm(source_powers)
)
rel_errs.append(rel_err)
regularization_impact_df = pd.DataFrame({"alphas": alphas, "rel_errors": rel_errs})
regularization_impact_df
noise_lvls = np.arange(0, 11)
noise_lvls
noise_impact_stats = {}
abs_errs = []
rel_errs = []
for noise_lvl in tqdm(noise_lvls, 'Iterating over noise levels'):
noise = 1.0 + noise_lvl * 0.01
noised_data = copy.deepcopy(synthetic_data)
noised_data[:2] *= noise
res = minimize(
lsm,
powers,
tol=1.0e-15,
jac=jac,
args=(problem, noised_data, mean_powers, 0.0),
options={"maxiter": 100, "disp": False},
)
abs_err = np.linalg.norm(np.abs(res.x - source_powers))
rel_err = np.linalg.norm(source_powers - res.x) * 100 / np.linalg.norm(source_powers)
abs_errs.append(abs_err)
rel_errs.append(rel_err)
noise_impact_stats['noise_levels'] = noise_lvls
noise_impact_stats['2 recv lines abs errors'] = abs_errs
noise_impact_stats['2 recv lines rel errors'] = rel_errs
abs_errs = []
rel_errs = []
for noise_lvl in tqdm(noise_lvls, 'Iterating over noise levels'):
noise = 1.0 + noise_lvl * 0.01
noised_data = copy.deepcopy(synthetic_data)
noised_data[:4] *= noise
res = minimize(
lsm,
powers,
tol=1.0e-15,
jac=jac,
args=(problem, noised_data, mean_powers, 0.0),
options={"maxiter": 100, "disp": False},
)
abs_err = np.linalg.norm(np.abs(res.x - source_powers))
rel_err = np.linalg.norm(source_powers - res.x) * 100 / np.linalg.norm(source_powers)
abs_errs.append(abs_err)
rel_errs.append(rel_err)
noise_impact_stats['4 recv lines abs errors'] = abs_errs
noise_impact_stats['4 recv lines rel errors'] = rel_errs
abs_errs = []
rel_errs = []
for noise_lvl in tqdm(noise_lvls, 'Iterating over noise levels'):
noise = 1.0 + noise_lvl * 0.01
noised_data = copy.deepcopy(synthetic_data)
noised_data *= noise
res = minimize(
lsm,
powers,
tol=1.0e-15,
jac=jac,
args=(problem, noised_data, mean_powers, 0.0),
options={"maxiter": 100, "disp": False},
)
abs_err = np.linalg.norm(np.abs(res.x - source_powers))
rel_err = np.linalg.norm(source_powers - res.x) * 100 / np.linalg.norm(source_powers)
abs_errs.append(abs_err)
rel_errs.append(rel_err)
synthetic_data
noise_impact_stats['9 recv lines abs errors'] = abs_errs
noise_impact_stats['9 recv lines rel errors'] = rel_errs
noise_impact = pd.DataFrame(noise_impact_stats)
noise_impact