From e133bb549e791a4b126971eb5b1d2a4c59a5f032 Mon Sep 17 00:00:00 2001 From: Sandeep Kunkunuru Date: Wed, 2 Oct 2024 21:21:45 +0530 Subject: [PATCH] Adding reference implementation and saving convergence scores --- .gitignore | 2 + README.md | 30 +++- rao_algorithms/__init__.py | 16 +- rao_algorithms/algorithms.py | 18 ++- rao_algorithms/objective_functions.py | 2 +- rao_algorithms/optimization.py | 38 +++++ reference/unconstrained/reference.py | 211 ++++++++++++++++++++++++++ setup.py | 2 +- tests/test_algorithms.py | 24 ++- 9 files changed, 309 insertions(+), 34 deletions(-) create mode 100644 rao_algorithms/optimization.py create mode 100644 reference/unconstrained/reference.py diff --git a/.gitignore b/.gitignore index 3d02ad8..c3150db 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,10 @@ .oaenv/ .pytest_cache/ tests/__pycache__/ +rao_algorithms/__pycache__/ build/ dist/ +results/ *.egg-info/ .pypirc example.py \ No newline at end of file diff --git a/README.md b/README.md index 4f0a5df..6f267f8 100644 --- a/README.md +++ b/README.md @@ -32,12 +32,32 @@ pip install . ## How to Use +### Example: Constrained BMR Algorithm + +```python +import numpy as np +from rao_algorithms import run_optimization, BMR_algorithm, objective_function, constraint_1, constraint_2 + +# Constrained BMR +# --------------- +bounds = np.array([[-100, 100]] * 2) +num_iterations = 100 +population_size = 50 +constraints = [constraint_1, constraint_2] + +best_solution, best_scores = run_optimization(BMR_algorithm, bounds, num_iterations, population_size, 2, objective_function, constraints) +print(f"Constrained BMR Best solution: {best_solution}") + +``` + ### Example: Unconstrained BMR Algorithm ```python import numpy as np from rao_algorithms import BMR_algorithm, objective_function +# Unconstrained BMR +# --------------- # Define the bounds for a 2D problem bounds = np.array([[-100, 100]] * 2) @@ -47,8 +67,8 @@ population_size = 50 num_variables = 2 # Run the BMR algorithm -best_fitness = BMR_algorithm(bounds, num_iterations, population_size, num_variables, objective_function) -print(f"Best fitness found: {best_fitness}") +best_solution, best_scores = BMR_algorithm(bounds, num_iterations, population_size, num_variables, objective_function) +print(f"Unconstrained BMR Best solution found: {best_solution}") ``` ### Example: Constrained BWR Algorithm @@ -57,6 +77,8 @@ print(f"Best fitness found: {best_fitness}") import numpy as np from rao_algorithms import BWR_algorithm, objective_function, constraint_1, constraint_2 +# Constrained BWR +# --------------- # Define the bounds for a 2D problem bounds = np.array([[-100, 100]] * 2) @@ -67,8 +89,8 @@ num_variables = 2 constraints = [constraint_1, constraint_2] # Run the BWR algorithm with constraints -best_fitness = BWR_algorithm(bounds, num_iterations, population_size, num_variables, objective_function, constraints) -print(f"Best fitness found: {best_fitness}") +best_solution, best_scores = BWR_algorithm(bounds, num_iterations, population_size, num_variables, objective_function, constraints) +print(f"Constrained BWR Best solution found: {best_solution}") ``` ### Unit Testing diff --git a/rao_algorithms/__init__.py b/rao_algorithms/__init__.py index 5ad60a6..901a439 100644 --- a/rao_algorithms/__init__.py +++ b/rao_algorithms/__init__.py @@ -1,13 +1,13 @@ from .algorithms import BMR_algorithm, BWR_algorithm -from .penalty import penalty_function, constrained_objective_function +from .optimization import run_optimization, save_convergence_curve from .objective_functions import objective_function, constraint_1, constraint_2 __all__ = [ - "BMR_algorithm", - "BWR_algorithm", - "penalty_function", - "constrained_objective_function", - "objective_function", - "constraint_1", - "constraint_2" + 'BMR_algorithm', + 'BWR_algorithm', + 'run_optimization', + 'save_convergence_curve', + 'objective_function', + 'constraint_1', + 'constraint_2', ] diff --git a/rao_algorithms/algorithms.py b/rao_algorithms/algorithms.py index ffd9671..1980791 100644 --- a/rao_algorithms/algorithms.py +++ b/rao_algorithms/algorithms.py @@ -4,6 +4,8 @@ def BMR_algorithm(bounds, num_iterations, population_size, num_variables, objective_func, constraints=None): population = np.random.uniform(low=bounds[:, 0], high=bounds[:, 1], size=(population_size, num_variables)) + best_scores = [] + for iteration in range(num_iterations): if constraints: fitness = [constrained_objective_function(ind, objective_func, constraints) for ind in population] @@ -12,6 +14,8 @@ def BMR_algorithm(bounds, num_iterations, population_size, num_variables, object best_solution = population[np.argmin(fitness)] mean_solution = np.mean(population, axis=0) + best_score = np.min(fitness) + best_scores.append(best_score) for i in range(population_size): r1, r2, r3, r4 = np.random.rand(4) @@ -23,15 +27,16 @@ def BMR_algorithm(bounds, num_iterations, population_size, num_variables, object else: population[i] = bounds[:, 1] - (bounds[:, 1] - bounds[:, 0]) * r3 - print(f"Iteration {iteration+1}, Best Fitness: {np.min(fitness)}") + population = np.clip(population, bounds[:, 0], bounds[:, 1]) - best_fitness = np.min(fitness) - return best_fitness + return best_solution, best_scores def BWR_algorithm(bounds, num_iterations, population_size, num_variables, objective_func, constraints=None): population = np.random.uniform(low=bounds[:, 0], high=bounds[:, 1], size=(population_size, num_variables)) + best_scores = [] + for iteration in range(num_iterations): if constraints: fitness = [constrained_objective_function(ind, objective_func, constraints) for ind in population] @@ -40,6 +45,8 @@ def BWR_algorithm(bounds, num_iterations, population_size, num_variables, object best_solution = population[np.argmin(fitness)] worst_solution = population[np.argmax(fitness)] + best_score = np.min(fitness) + best_scores.append(best_score) for i in range(population_size): r1, r2, r3, r4 = np.random.rand(4) @@ -51,7 +58,6 @@ def BWR_algorithm(bounds, num_iterations, population_size, num_variables, object else: population[i] = bounds[:, 1] - (bounds[:, 1] - bounds[:, 0]) * r3 - print(f"Iteration {iteration+1}, Best Fitness: {np.min(fitness)}") + population = np.clip(population, bounds[:, 0], bounds[:, 1]) - best_fitness = np.min(fitness) - return best_fitness + return best_solution, best_scores diff --git a/rao_algorithms/objective_functions.py b/rao_algorithms/objective_functions.py index f2247d1..cdc0623 100644 --- a/rao_algorithms/objective_functions.py +++ b/rao_algorithms/objective_functions.py @@ -1,6 +1,6 @@ import numpy as np -# Example of an objective function (Sphere function) +# Sphere function (default objective) def objective_function(x): return np.sum(x**2) diff --git a/rao_algorithms/optimization.py b/rao_algorithms/optimization.py new file mode 100644 index 0000000..171c11a --- /dev/null +++ b/rao_algorithms/optimization.py @@ -0,0 +1,38 @@ +import numpy as np +import os +import csv + +def initialize_population(bounds, population_size, num_variables): + """Initialize population with random values within bounds.""" + return np.random.uniform(low=bounds[:, 0], high=bounds[:, 1], size=(population_size, num_variables)) + +def clip_position(position, bounds): + """Clip the position to make sure it stays within bounds.""" + return np.clip(position, bounds[:, 0], bounds[:, 1]) + +def run_optimization(algorithm, bounds, num_iterations, population_size, num_variables, objective_function, constraints=None): + """Run the selected algorithm and handle logging, saving results, etc.""" + + # Initialize population and variables + population = initialize_population(bounds, population_size, num_variables) + best_scores = [] + + # Prepare directory for saving results + if not os.path.exists('results'): + os.makedirs('results') + + # Run the algorithm + best_solution, best_scores = algorithm(bounds, num_iterations, population_size, num_variables, objective_function, constraints) + + # Save results + save_convergence_curve(best_scores) + + return best_solution, best_scores + +def save_convergence_curve(best_scores): + """Save the convergence curve as a CSV.""" + with open(f'results/convergence_curve.csv', 'w', newline='') as file: + writer = csv.writer(file) + writer.writerow(['Iteration', 'Best Score']) + for i, score in enumerate(best_scores): + writer.writerow([i, score]) diff --git a/reference/unconstrained/reference.py b/reference/unconstrained/reference.py new file mode 100644 index 0000000..b0f9f60 --- /dev/null +++ b/reference/unconstrained/reference.py @@ -0,0 +1,211 @@ +import os +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt + +# Optimization functions +def sphere_function(x): + return np.sum(x**2) + +# Initialization +def initialize_population(pop_size, dim, bounds): + population = np.random.uniform(bounds[0], bounds[1], (pop_size, dim)) + return population + +# Ensure position is within bounds +def clip_position(position, bounds): + return np.clip(position, bounds[0], bounds[1]) + +# Optimization Step +def optimization_step(population, global_best, global_worst, bounds, dim, objective_function): + new_population = np.copy(population) + u = bounds[1] + l = bounds[0] + x_mean = np.mean(population, axis=0) + + for i in range(len(population)): + x_old = population[i] + x_rand = population[np.random.randint(len(population))] + r1 = np.random.uniform(0, 1) + r2 = np.random.uniform(0, 1) + r3 = np.random.uniform(0, 1) + r4 = np.random.uniform(0, 1) + + # Generate random perturbation + R = u - ((u - l) * r3) + T = np.random.choice([1, 2]) + + if r4 > 0.5: + new_position = x_old + r1 * (global_best - T * x_rand) - r2 * (global_worst - x_rand) + else: + new_position = R + + new_position = clip_position(new_position, bounds) + if objective_function(new_position) < objective_function(x_old): + new_population[i] = new_position + + return new_population + +# Main optimization loop +def optimize(objective_function, dim, bounds, pop_size, max_iter, func_name, optimum_value): + population = initialize_population(pop_size, dim, bounds) + iteration_history = [] + population_history = [] + best_solution_history = [] + best_scores = [] + best_scores_per_iteration = [] + + for t in range(max_iter): + global_best = min(population, key=objective_function) + global_worst = max(population, key=objective_function) + population = optimization_step(population, global_best, global_worst, bounds, dim, objective_function) + best_score = objective_function(global_best) + best_scores.append(best_score) + + # Save iteration and population history + iteration_info = { + 'iteration': t, + 'best_score': best_score, + } + iteration_history.append(iteration_info) + population_history.append(population.copy()) + best_solution_history.append(global_best.copy()) + best_scores_per_iteration.append(best_score) + + # Check if optimum value is reached + if best_score <= optimum_value: + break + + global_best = min(population, key=objective_function) + final_score = objective_function(global_best) + optimization_history = { + 'function_name': func_name, + 'iteration_history': iteration_history, + 'population_history': population_history, + 'best_solution_history': best_solution_history, + 'final_best_solution': global_best, + 'final_best_score': final_score, + 'final_iteration': t+1, # Final iteration number + 'best_scores_per_iteration': best_scores_per_iteration + } + + return optimization_history + +# List of functions with their dimensions, bounds, and optimum values +functions = [ + ("Sphere", sphere_function, 30, [-100, 100], 0), +] + +# Execute optimization for each function with varying population sizes +for name, func, dim, bounds, opt_value in functions: + print("---" * 50) + all_best_scores = [] + all_best_populations = [] + all_mfe = [] + convergence_data = [] + + for r in range(30): + for i in range(10): + pop_size = 10 * (i + 1) + max_iter = (500000 // pop_size) + bounds = [np.full(dim, bounds[0]), np.full(dim, bounds[1])] if isinstance(bounds[0], (int, float)) else bounds + optimization_history = optimize(func, dim, bounds, pop_size=pop_size, max_iter=max_iter, func_name=name, optimum_value=opt_value) + + best_position = optimization_history['final_best_solution'] + best_score = optimization_history['final_best_score'] + final_iteration = optimization_history['final_iteration'] + + mfe = pop_size * final_iteration + + all_best_scores.append(best_score) + all_best_populations.append((pop_size, best_position)) + all_mfe.append(mfe) + + convergence_data.append(optimization_history['best_scores_per_iteration']) + + print(f"Run {r + 1}, Population Size {pop_size}, Best Score: {best_score}, FE: {mfe}") + + if best_score <= opt_value: + break + + # Select best 30 scores and corresponding populations + best_30_scores_indices = np.argsort(all_best_scores)[:30] + best_30_scores = np.array(all_best_scores)[best_30_scores_indices] + best_30_populations = [all_best_populations[i] for i in best_30_scores_indices] + best_30_mfe = np.array(all_mfe)[best_30_scores_indices] + + # Perform statistics + best_score = np.min(best_30_scores) + worst_score = np.max(best_30_scores) + mean_score = np.mean(best_30_scores) + std_score = np.std(best_30_scores) + mean_mfe = np.mean(best_30_mfe) + + # Save results + os.makedirs(name, exist_ok=True) + + # Save best population history + population_data = { + 'population_size': [], + 'best_population': [] + } + for pop_size, pop in best_30_populations: + population_data['population_size'].append(pop_size) + population_data['best_population'].append(pop.tolist()) + + population_df = pd.DataFrame(population_data) + population_file_name = os.path.join(name, 'best_population_history.csv') + population_df.to_csv(population_file_name, index=False) + + # Save best solution history + best_solution_data = { + 'best_score': best_30_scores, + 'mfe': best_30_mfe + } + best_solution_df = pd.DataFrame(best_solution_data) + best_solution_file_name = os.path.join(name, 'best_solution_history.csv') + best_solution_df.to_csv(best_solution_file_name, index=False) + + # Save statistics + stats_data = { + 'Function Name': [name], + 'Best Score': [best_score], + 'Worst Score': [worst_score], + 'Mean Score': [mean_score], + 'Standard Deviation': [std_score], + 'Mean MFE': [mean_mfe] + } + stats_df = pd.DataFrame(stats_data) + stats_file_name = os.path.join(name, 'statistics.csv') + stats_df.to_csv(stats_file_name, index=False) + + # Save best scores + best_scores_df = pd.DataFrame({'Best Scores': best_30_scores}) + best_scores_file_name = os.path.join(name, 'best_scores.csv') + best_scores_df.to_csv(best_scores_file_name, index=False) + + # Compute mean best scores per iteration + max_iterations = max([len(run) for run in convergence_data]) + mean_best_scores_per_iteration = np.zeros(max_iterations) + for i in range(max_iterations): + iteration_scores = [run[i] for run in convergence_data if len(run) > i] + mean_best_scores_per_iteration[i] = np.mean(iteration_scores) + + # Save convergence data + convergence_df = pd.DataFrame({ + 'Iteration': np.arange(max_iterations), + 'Mean Best Score': mean_best_scores_per_iteration + }) + convergence_file_name = os.path.join(name, 'convergence_curve.csv') + convergence_df.to_csv(convergence_file_name, index=False) + + # Plot convergence curve + plt.figure() + plt.plot(np.arange(max_iterations), mean_best_scores_per_iteration, label='Mean Best Score') + plt.xlabel('Iteration') + plt.ylabel('Mean Best Score') + plt.title(f'Convergence Curve for {name}') + plt.legend() + plt.grid(True) + plt.savefig(os.path.join(name, 'convergence_curve.png')) + plt.close() diff --git a/setup.py b/setup.py index 9004c4b..8dbefa1 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def read_file(filename): setup( name='rao_algorithms', - version='0.1.4', + version='0.2.0', author='Samdeep Kunkunuru', author_email='sandeep.kunkunuru@gmail.com', description='BMR and BWR optimization algorithms with constraint handling', diff --git a/tests/test_algorithms.py b/tests/test_algorithms.py index 7e7afd6..44563c3 100644 --- a/tests/test_algorithms.py +++ b/tests/test_algorithms.py @@ -5,32 +5,28 @@ class TestOptimizationAlgorithms(unittest.TestCase): def setUp(self): - self.bounds = np.array([[-100, 100]] * 2) # For a 2D problem + self.bounds = np.array([[-100, 100]] * 2) self.num_iterations = 100 self.population_size = 50 self.num_variables = 2 def test_bmr_unconstrained(self): - best_fitness = BMR_algorithm(self.bounds, self.num_iterations, self.population_size, self.num_variables, objective_function) - self.assertIsInstance(best_fitness, float) - self.assertGreaterEqual(best_fitness, 0, "Fitness should be non-negative for the Sphere function") + best_solution, _ = BMR_algorithm(self.bounds, self.num_iterations, self.population_size, self.num_variables, objective_function) + self.assertIsInstance(best_solution, np.ndarray) def test_bwr_unconstrained(self): - best_fitness = BWR_algorithm(self.bounds, self.num_iterations, self.population_size, self.num_variables, objective_function) - self.assertIsInstance(best_fitness, float) - self.assertGreaterEqual(best_fitness, 0, "Fitness should be non-negative for the Sphere function") + best_solution, _ = BWR_algorithm(self.bounds, self.num_iterations, self.population_size, self.num_variables, objective_function) + self.assertIsInstance(best_solution, np.ndarray) def test_bmr_constrained(self): constraints = [constraint_1, constraint_2] - best_fitness = BMR_algorithm(self.bounds, self.num_iterations, self.population_size, self.num_variables, objective_function, constraints) - self.assertIsInstance(best_fitness, float) - self.assertGreaterEqual(best_fitness, 0, "Fitness should be non-negative even with constraints") + best_solution, _ = BMR_algorithm(self.bounds, self.num_iterations, self.population_size, self.num_variables, objective_function, constraints) + self.assertIsInstance(best_solution, np.ndarray) def test_bwr_constrained(self): constraints = [constraint_1, constraint_2] - best_fitness = BWR_algorithm(self.bounds, self.num_iterations, self.population_size, self.num_variables, objective_function, constraints) - self.assertIsInstance(best_fitness, float) - self.assertGreaterEqual(best_fitness, 0, "Fitness should be non-negative even with constraints") + best_solution, _ = BWR_algorithm(self.bounds, self.num_iterations, self.population_size, self.num_variables, objective_function, constraints) + self.assertIsInstance(best_solution, np.ndarray) if __name__ == '__main__': - unittest.main() + unittest.main() \ No newline at end of file