Spaces:
Runtime error
Runtime error
| from __future__ import absolute_import | |
| from __future__ import division | |
| from __future__ import print_function | |
| """Genetic algorithm for BF tasks. | |
| Inspired by https://github.com/primaryobjects/AI-Programmer. | |
| GA function code borrowed from https://github.com/DEAP/deap. | |
| """ | |
| from collections import namedtuple | |
| import random | |
| from absl import flags | |
| from absl import logging | |
| import numpy as np | |
| from six.moves import xrange | |
| from common import bf # brain coder | |
| from common import utils # brain coder | |
| from single_task import misc # brain coder | |
| FLAGS = flags.FLAGS | |
| # Saving reward of previous programs saves computation if a program appears | |
| # again. | |
| USE_REWARD_CACHE = True # Disable this if GA is using up too much memory. | |
| GENES = bf.CHARS | |
| MAX_PROGRAM_STEPS = 500 | |
| STEP_BONUS = True | |
| ALPHANUM_CHARS = ( | |
| ['_'] + | |
| [chr(ord('a') + i_) for i_ in range(26)] + | |
| [chr(ord('A') + i_) for i_ in range(26)] + | |
| [chr(ord('0') + i_) for i_ in range(10)]) | |
| Result = namedtuple( | |
| 'Result', | |
| ['reward', 'inputs', 'code_outputs', 'target_outputs', 'type_in', | |
| 'type_out', 'base', 'correct']) | |
| class IOType(object): | |
| string = 'string' | |
| integer = 'integer' | |
| class CustomType(object): | |
| def __init__(self, to_str_fn): | |
| self.to_str_fn = to_str_fn | |
| def __call__(self, obj): | |
| return self.to_str_fn(obj) | |
| def tokens_list_repr(tokens, repr_type, base): | |
| """Make human readable representation of program IO.""" | |
| if isinstance(repr_type, CustomType): | |
| return repr_type(tokens) | |
| elif repr_type == IOType.string: | |
| chars = ( | |
| [ALPHANUM_CHARS[t] for t in tokens] if base < len(ALPHANUM_CHARS) | |
| else [chr(t) for t in tokens]) | |
| return ''.join(chars) | |
| elif repr_type == IOType.integer: | |
| return str(tokens) | |
| raise ValueError('No such representation type "%s"', repr_type) | |
| def io_repr(result): | |
| """Make human readable representation of test cases.""" | |
| inputs = ','.join( | |
| tokens_list_repr(tokens, result.type_in, result.base) | |
| for tokens in result.inputs) | |
| code_outputs = ','.join( | |
| tokens_list_repr(tokens, result.type_out, result.base) | |
| for tokens in result.code_outputs) | |
| target_outputs = ','.join( | |
| tokens_list_repr(tokens, result.type_out, result.base) | |
| for tokens in result.target_outputs) | |
| return inputs, target_outputs, code_outputs | |
| def make_task_eval_fn(task_manager): | |
| """Returns a wrapper that converts an RL task into a GA task. | |
| Args: | |
| task_manager: Is a task manager object from code_tasks.py | |
| Returns: | |
| A function that takes as input a single list of a code chars, and outputs | |
| a Result namedtuple instance containing the reward and information about | |
| code execution. | |
| """ | |
| def to_data_list(single_or_tuple): | |
| if isinstance(single_or_tuple, misc.IOTuple): | |
| return list(single_or_tuple) | |
| return [single_or_tuple] | |
| def to_ga_type(rl_type): | |
| if rl_type == misc.IOType.string: | |
| return IOType.string | |
| return IOType.integer | |
| # Wrapper function. | |
| def evalbf(bf_chars): | |
| result = task_manager._score_code(''.join(bf_chars)) | |
| reward = sum(result.episode_rewards) | |
| correct = result.reason == 'correct' | |
| return Result( | |
| reward=reward, | |
| inputs=to_data_list(result.input_case), | |
| code_outputs=to_data_list(result.code_output), | |
| target_outputs=to_data_list(result.correct_output), | |
| type_in=to_ga_type(result.input_type), | |
| type_out=to_ga_type(result.output_type), | |
| correct=correct, | |
| base=task_manager.task.base) | |
| return evalbf | |
| def debug_str(individual, task_eval_fn): | |
| res = task_eval_fn(individual) | |
| input_str, target_output_str, code_output_str = io_repr(res) | |
| return ( | |
| ''.join(individual) + | |
| ' | ' + input_str + | |
| ' | ' + target_output_str + | |
| ' | ' + code_output_str + | |
| ' | ' + str(res.reward) + | |
| ' | ' + str(res.correct)) | |
| def mutate_single(code_tokens, mutation_rate): | |
| """Mutate a single code string. | |
| Args: | |
| code_tokens: A string/list/Individual of BF code chars. Must end with EOS | |
| symbol '_'. | |
| mutation_rate: Float between 0 and 1 which sets the probability of each char | |
| being mutated. | |
| Returns: | |
| An Individual instance containing the mutated code string. | |
| Raises: | |
| ValueError: If `code_tokens` does not end with EOS symbol. | |
| """ | |
| if len(code_tokens) <= 1: | |
| return code_tokens | |
| if code_tokens[-1] == '_': | |
| # Do this check to ensure that the code strings have not been corrupted. | |
| raise ValueError('`code_tokens` must end with EOS symbol.') | |
| else: | |
| cs = Individual(code_tokens) | |
| eos = [] | |
| mutated = False | |
| for pos in range(len(cs)): | |
| if random.random() < mutation_rate: | |
| mutated = True | |
| new_char = GENES[random.randrange(len(GENES))] | |
| x = random.random() | |
| if x < 0.25 and pos != 0 and pos != len(cs) - 1: | |
| # Insertion mutation. | |
| if random.random() < 0.50: | |
| # Shift up. | |
| cs = cs[:pos] + [new_char] + cs[pos:-1] | |
| else: | |
| # Shift down. | |
| cs = cs[1:pos] + [new_char] + cs[pos:] | |
| elif x < 0.50: | |
| # Deletion mutation. | |
| if random.random() < 0.50: | |
| # Shift down. | |
| cs = cs[:pos] + cs[pos + 1:] + [new_char] | |
| else: | |
| # Shift up. | |
| cs = [new_char] + cs[:pos] + cs[pos + 1:] | |
| elif x < 0.75: | |
| # Shift rotate mutation (position invariant). | |
| if random.random() < 0.50: | |
| # Shift down. | |
| cs = cs[1:] + [cs[0]] | |
| else: | |
| # Shift up. | |
| cs = [cs[-1]] + cs[:-1] | |
| else: | |
| # Replacement mutation. | |
| cs = cs[:pos] + [new_char] + cs[pos + 1:] | |
| assert len(cs) + len(eos) == len(code_tokens) | |
| if mutated: | |
| return Individual(cs + eos) | |
| else: | |
| return Individual(code_tokens) | |
| def crossover(parent1, parent2): | |
| """Performs crossover mating between two code strings. | |
| Crossover mating is where a random position is selected, and the chars | |
| after that point are swapped. The resulting new code strings are returned. | |
| Args: | |
| parent1: First code string. | |
| parent2: Second code string. | |
| Returns: | |
| A 2-tuple of children, i.e. the resulting code strings after swapping. | |
| """ | |
| max_parent, min_parent = ( | |
| (parent1, parent2) if len(parent1) > len(parent2) | |
| else (parent2, parent1)) | |
| pos = random.randrange(len(max_parent)) | |
| if pos >= len(min_parent): | |
| child1 = max_parent[:pos] | |
| child2 = min_parent + max_parent[pos:] | |
| else: | |
| child1 = max_parent[:pos] + min_parent[pos:] | |
| child2 = min_parent[:pos] + max_parent[pos:] | |
| return Individual(child1), Individual(child2) | |
| def _make_even(n): | |
| """Return largest even integer less than or equal to `n`.""" | |
| return (n >> 1) << 1 | |
| def mutate_and_crossover(population, mutation_rate, crossover_rate): | |
| """Take a generational step over a population. | |
| Transforms population of parents into population of children (of the same | |
| size) via crossover mating and then mutation on the resulting children. | |
| Args: | |
| population: Parent population. A list of Individual objects. | |
| mutation_rate: Probability of mutation. See `mutate_single`. | |
| crossover_rate: Probability that two parents will mate. | |
| Returns: | |
| Child population. A list of Individual objects. | |
| """ | |
| children = [None] * len(population) | |
| for i in xrange(0, _make_even(len(population)), 2): | |
| p1 = population[i] | |
| p2 = population[i + 1] | |
| if random.random() < crossover_rate: | |
| p1, p2 = crossover(p1, p2) | |
| c1 = mutate_single(p1, mutation_rate) | |
| c2 = mutate_single(p2, mutation_rate) | |
| children[i] = c1 | |
| children[i + 1] = c2 | |
| if children[-1] is None: | |
| children[-1] = population[-1] | |
| return children | |
| def ga_loop(population, cxpb, mutpb, ngen, task_eval_fn, halloffame=None, | |
| checkpoint_writer=None): | |
| """A bare bones genetic algorithm. | |
| Similar to chapter 7 of Back, Fogel and Michalewicz, "Evolutionary | |
| Computation 1 : Basic Algorithms and Operators", 2000. | |
| Args: | |
| population: A list of individuals. | |
| cxpb: The probability of mating two individuals. | |
| mutpb: The probability of mutating a gene. | |
| ngen: The number of generation. Unlimited if zero. | |
| task_eval_fn: A python function which maps an Individual to a Result | |
| namedtuple. | |
| halloffame: (optional) a utils.MaxUniquePriorityQueue object that will be | |
| used to aggregate the best individuals found during search. | |
| checkpoint_writer: (optional) an object that can save and load populations. | |
| Needs to have `write`, `load`, and `has_checkpoint` methods. Used to | |
| periodically save progress. In event of a restart, the population will | |
| be loaded from disk. | |
| Returns: | |
| GaResult namedtuple instance. This contains information about the GA run, | |
| including the resulting population, best reward (fitness) obtained, and | |
| the best code string found. | |
| """ | |
| has_checkpoint = False | |
| if checkpoint_writer and checkpoint_writer.has_checkpoint(): | |
| try: | |
| gen, population, halloffame = checkpoint_writer.load() | |
| except EOFError: # Data was corrupted. Start over. | |
| pass | |
| else: | |
| has_checkpoint = True | |
| logging.info( | |
| 'Loaded population from checkpoint. Starting at generation %d', gen) | |
| # Evaluate the individuals with an invalid fitness | |
| invalid_ind = [ind for ind in population if not ind.fitness.valid] | |
| for ind in invalid_ind: | |
| ind.fitness.values = task_eval_fn(ind).reward, | |
| for _, ind in halloffame.iter_in_order(): | |
| ind.fitness.values = task_eval_fn(ind).reward, | |
| if not has_checkpoint: | |
| # Evaluate the individuals with an invalid fitness | |
| invalid_ind = [ind for ind in population if not ind.fitness.valid] | |
| for ind in invalid_ind: | |
| ind.fitness.values = task_eval_fn(ind).reward, | |
| if halloffame is not None: | |
| for ind in population: | |
| halloffame.push(ind.fitness.values, tuple(ind), ind) | |
| logging.info('Initialized new population.') | |
| gen = 1 | |
| pop_size = len(population) | |
| program_reward_cache = {} if USE_REWARD_CACHE else None | |
| # Begin the generational process | |
| while ngen == 0 or gen <= ngen: | |
| # Select the next generation individuals | |
| offspring = roulette_selection(population, pop_size - len(halloffame)) | |
| # Vary the pool of individuals | |
| # offspring = varAnd(offspring, toolbox, cxpb, mutpb) | |
| offspring = mutate_and_crossover( | |
| offspring, mutation_rate=mutpb, crossover_rate=cxpb) | |
| # Evaluate the individuals with an invalid fitness | |
| invalid_ind = [ind for ind in offspring if not ind.fitness.valid] | |
| for ind in invalid_ind: | |
| str_repr = ''.join(ind) | |
| if program_reward_cache is not None and str_repr in program_reward_cache: | |
| ind.fitness.values = (program_reward_cache[str_repr],) | |
| else: | |
| eval_result = task_eval_fn(ind) | |
| ind.fitness.values = (eval_result.reward,) | |
| if program_reward_cache is not None: | |
| program_reward_cache[str_repr] = eval_result.reward | |
| # Replace the current population by the offspring | |
| population = list(offspring) | |
| # Update the hall of fame with the generated individuals | |
| if halloffame is not None: | |
| for ind in population: | |
| halloffame.push(ind.fitness.values, tuple(ind), ind) | |
| # elitism | |
| population.extend([ind for _, ind in halloffame.iter_in_order()]) | |
| if gen % 100 == 0: | |
| top_code = '\n'.join([debug_str(ind, task_eval_fn) | |
| for ind in topk(population, k=4)]) | |
| logging.info('gen: %d\nNPE: %d\n%s\n\n', gen, gen * pop_size, top_code) | |
| best_code = ''.join(halloffame.get_max()[1]) | |
| res = task_eval_fn(best_code) | |
| # Write population and hall-of-fame to disk. | |
| if checkpoint_writer: | |
| checkpoint_writer.write(gen, population, halloffame) | |
| if res.correct: | |
| logging.info('Solution found:\n%s\nreward = %s\n', | |
| best_code, res.reward) | |
| break | |
| gen += 1 | |
| best_code = ''.join(halloffame.get_max()[1]) | |
| res = task_eval_fn(best_code) | |
| return GaResult( | |
| population=population, best_code=best_code, reward=res.reward, | |
| solution_found=res.correct, generations=gen, | |
| num_programs=gen * len(population), | |
| max_generations=ngen, max_num_programs=ngen * len(population)) | |
| GaResult = namedtuple( | |
| 'GaResult', | |
| ['population', 'best_code', 'reward', 'generations', 'num_programs', | |
| 'solution_found', 'max_generations', 'max_num_programs']) | |
| def reward_conversion(reward): | |
| """Convert real value into positive value.""" | |
| if reward <= 0: | |
| return 0.05 | |
| return reward + 0.05 | |
| def roulette_selection(population, k): | |
| """Select `k` individuals with prob proportional to fitness. | |
| Each of the `k` selections is independent. | |
| Warning: | |
| The roulette selection by definition cannot be used for minimization | |
| or when the fitness can be smaller or equal to 0. | |
| Args: | |
| population: A list of Individual objects to select from. | |
| k: The number of individuals to select. | |
| Returns: | |
| A list of selected individuals. | |
| """ | |
| fitnesses = np.asarray( | |
| [reward_conversion(ind.fitness.values[0]) | |
| for ind in population]) | |
| assert np.all(fitnesses > 0) | |
| sum_fits = fitnesses.sum() | |
| chosen = [None] * k | |
| for i in xrange(k): | |
| u = random.random() * sum_fits | |
| sum_ = 0 | |
| for ind, fitness in zip(population, fitnesses): | |
| sum_ += fitness | |
| if sum_ > u: | |
| chosen[i] = Individual(ind) | |
| break | |
| if not chosen[i]: | |
| chosen[i] = Individual(population[-1]) | |
| return chosen | |
| def make_population(make_individual_fn, n): | |
| return [make_individual_fn() for _ in xrange(n)] | |
| def best(population): | |
| best_ind = None | |
| for ind in population: | |
| if best_ind is None or best_ind.fitness.values < ind.fitness.values: | |
| best_ind = ind | |
| return best_ind | |
| def topk(population, k): | |
| q = utils.MaxUniquePriorityQueue(k) | |
| for ind in population: | |
| q.push(ind.fitness.values, tuple(ind), ind) | |
| return [ind for _, ind in q.iter_in_order()] | |
| class Fitness(object): | |
| def __init__(self): | |
| self.values = () | |
| def valid(self): | |
| """Assess if a fitness is valid or not.""" | |
| return bool(self.values) | |
| class Individual(list): | |
| def __init__(self, *args): | |
| super(Individual, self).__init__(*args) | |
| self.fitness = Fitness() | |
| def random_individual(genome_size): | |
| return lambda: Individual(np.random.choice(GENES, genome_size).tolist()) | |