Source code for ilustrado.ilustrado

""" This file implements the GA algorithm and acts as main(). """
# standard library
import multiprocessing as mp
import subprocess as sp
import logging
import glob
import shutil
import os
import time
import sys
from traceback import print_exc
from json import dumps, dump
from copy import deepcopy, copy

# external libraries
import numpy as np
from pkg_resources import require

# matador modules
import matador.compute
import matador.compute.slurm
from matador.scrapers.castep_scrapers import (
    res2dict,
    castep2dict,
    cell2dict,
    param2dict,
)
from matador.export import doc2res
from matador.export.utils import generate_hash
from matador.fingerprints.similarity import get_uniq_cursor
from matador.fingerprints.pdf import PDFFactory
from matador.utils.chem_utils import get_formula_from_stoich, get_root_source
from matador.hull import QueryConvexHull

# ilustrado modules
from .adapt import adapt
from .generation import Generation
from .fitness import FitnessCalculator
from .util import strip_useless, LOG, NewbornProcess

__version__ = require("ilustrado")[0].version


# As this class has many settings that are hacked directly into __dict__, disable these warnings.
# pylint: disable=access-member-before-definition
# pylint: disable=attribute-defined-outside-init
# pylint: disable bad-continuation
[docs]class ArtificialSelector: """ ArtificialSelector takes an initial gene pool and applies a genetic algorithm to optimise some fitness function. Keyword Arguments: gene_pool (list(dict)) : initial cursor to use as "Generation 0", seed (str) : seed name of cell and param files for CASTEP, seed_prefix (str) : if not specifying a seed, this name will prefix all runs fitness_metric (str) : currently either 'hull' or 'test', hull (QueryConvexHull) : matador QueryConvexHull object to calculate distances, res_path (str) : path to folder of res files to create hull, if no hull object passed mutation_rate (float) : rate at which to perform single-parent mutations (DEFAULT: 0.5) crossover_rate (float) : rate at which to perform crossovers (DEFAULT: 0.5) num_generations (int) : number of generations to breed before quitting (DEFAULT: 5) num_survivors (int) : number of structures to survive to next generation for breeding (DEFAULT: 10) population (int) : number of structures to breed in any given generation (DEFAULT: 25) failure_ratio (int) : maximum number of attempts per success (DEFAULT: 5) elitism (float) : fraction of next generation to be comprised of elite structures from previous generation (DEFAULT: 0.2) best_from_stoich (bool) : whether to always include the best structure from a stoichiomtery in the next generation, mutations (list(str)) : list of mutation names to use, structure_filter (fn(doc)) : any function that takes a matador doc and returns True or False, check_dupes (bool) : if True, filter relaxed structures for uniqueness on-the-fly (DEFAULT: True) check_dupes_hull (bool) : compare pdf with all hull structures (DEFAULT: True) sandbagging (bool) : whether or not to disfavour nearby compositions (DEFAULT: False) minsep_dict (dict) : dictionary containing element-specific minimum separations, e.g. {('K', 'K'): 2.5, ('K', 'P'): 2.0}. These should only be set such that atoms do not overlap; let the DFT deal with bond lengths. No effort is made to push apart atoms that are too close, the trial will simply be discarded. (DEFAULT: None) max_num_mutations (int) : maximum number of mutations to perform on a single structure, max_num_atoms (int) : most atoms allowed in a structure post-mutation/crossover, nodes (list(str)) : list of node names to run on, ncores (int or list(int)) : specifies the number of cores used by listed `nodes` per thread, nprocs (int) : total number of processes, recover_from (str) : recover from previous run_hash, by default ilustrado will recover if it finds only one run hash in the folder load_only (bool) : only load structures, do not continue breeding (DEFAULT: False) executable (str) : path to DFT binary (DEFAULT: castep) compute_mode (str) : either `direct`, `slurm`, `manual` (DEFAULT: direct) max_num_nodes (int) : amount of array jobs to run per generation in `slurm` mode, walltime_hrs (int) : maximum walltime for a SLURM array job, slurm_template (str) : path to template slurm script that includes module loads etc, entrypoint (str) : path to script that initialised this object, such that it can be called by SLURM debug (bool) : maximum printing level testing (bool) : run test code only if true verbosity (int) : extra printing level, loglevel (str) : follows std library logging levels. """ def __init__(self, **kwargs): """ This is the main entrypoint. Initialises parameters, gene pool and begins the GA. """ prop_defaults = { # important, required parameters "gene_pool": None, "seed": None, "seed_prefix": None, "fitness_metric": "hull", "hull": None, "res_path": None, # recovery and loading parameters "recover_from": None, "load_only": False, # GA numerical parameters "mutation_rate": 1.0, "crossover_rate": 0.0, "num_generations": 5, "num_survivors": 10, "population": 25, "elitism": 0.2, "max_num_mutations": 3, "max_num_atoms": 30, # other GA options "best_from_stoich": True, "mutations": None, "structure_filter": None, "check_dupes": True, "check_dupes_hull": True, "failure_ratio": 5, "sandbagging": False, "minsep_dict": None, # logistical and compute parameters "compute_mode": "direct", "ase_calculator": None, "nodes": None, "ncores": None, "nprocs": 1, "relaxer_params": None, "executable": "castep", "max_num_nodes": None, "walltime_hrs": None, "slurm_template": None, "entrypoint": None, # debug and logging parameters "debug": False, "testing": False, "emt": False, "verbosity": 0, "loglevel": "info", } # cache current params to reload again later self.current_params = deepcopy(prop_defaults) self.current_params.update(kwargs) self.__dict__.update(prop_defaults) self.__dict__.update(kwargs) splash_screen = ( r" _ _ _ _" + "\n" r" (_)| | | | | |" + "\n" r" _ | | _ _ ___ | |_ _ __ __ _ __| | ___" + "\n" r" | || || | | |/ __|| __|| '__| / _` | / _` | / _ \ " + "\n" r" | || || |_| |\__ \| |_ | | | (_| || (_| || (_) |" + "\n" r" |_||_| \__,_||___/ \__||_| \__,_| \__,_| \___/" + "\n\n" "****************************************************\n" ) print("\033[92m\033[1m") print("\n" + splash_screen) print("\033[0m") print("Loading harsh realities of life...", end="") # post-load checks if self.relaxer_params is None: self.relaxer_params = dict() self.next_gen = None if isinstance(self.ncores, list): if len(self.ncores) != len(self.nodes): raise RuntimeError( "Length mismatch between ncores and nodes list: {} vs {}".format( self.ncores, self.nodes ) ) # set up computing resource if self.compute_mode not in ("slurm", "direct", "manual"): raise RuntimeError("`compute_mode` must be one of `slurm`, `direct`, `manual`.") if self.compute_mode == "slurm": errors = [] if not isinstance(self.walltime_hrs, int): errors.append( "`walltime_hrs` specified incorrectly {}".format(self.walltime_hrs) ) elif not self.walltime_hrs > 0: errors.append( "`walltime_hrs` specified incorrectly {}".format(self.walltime_hrs) ) if not isinstance(self.max_num_nodes, int): errors.append( "`max_num_nodes` specified incorrectly {}".format( self.max_num_nodes ) ) elif not self.max_num_nodes > 0: errors.append( "`max_num_nodes` specified incorrectly {}".format( self.max_num_nodes ) ) if not isinstance(self.slurm_template, str): errors.append( "`slurm_template` must be a valid path, not {}".format( self.slurm_template ) ) elif not os.path.isfile(self.slurm_template): errors.append( "`slurm_template` file {} does not exist".format( self.slurm_template ) ) if errors: raise RuntimeError( "Invalid specification for `compute_mode='slurm'`, errors: \n{}".format( "\n".join(errors) ) ) self.slurm_dict = matador.compute.slurm.get_slurm_env() if self.compute_mode == "direct": if self.nodes is not None: if self.nprocs != len(self.nodes): logging.warning( "Specified procs {} being replaced by number of nodes {}".format( self.nprocs, len(self.nodes) ) ) self.nprocs = len(self.nodes) # set up GA logistics self.run_hash = generate_hash() self.generations = [] # list to store all generations self.num_elite = int(self.elitism * self.num_survivors) self.num_accepted = self.num_survivors - self.num_elite self.max_attempts = self.failure_ratio * self.population if self.num_survivors > self.population + self.num_elite: raise RuntimeError( "More survivors than total population: {} vs {}".format( self.num_survivors, self.population + self.num_elite ) ) if self.num_accepted > self.population: raise RuntimeError( "More accepted than total population: {} vs {}".format( self.num_accepted, self.population + self.num_elite ) ) if self.mutations is not None and isinstance(self.mutations, str): self.mutations = [self.mutations] else: self.mutations = ["permute_atoms", "random_strain", "nudge_positions", "vacancy", "transmute_atoms"] try: from VoronoiNetwork import Vornetclass self.mutations.append("voronoi") except ImportError: LOG.warning("Disabling Voronoi mutation.") pass if not isinstance(self.max_num_mutations, int) and self.max_num_mutations < 0: raise RuntimeError( "`max_num_mutations` must be >= 0, not {}".format( self.max_num_mutations ) ) if not isinstance(self.max_num_atoms, int) and self.max_num_atoms < 1: raise RuntimeError( "`max_num_atoms` must be >= 1, not {}".format(self.max_num_atoms) ) # recover from specified run if self.recover_from is not None: if isinstance(self.recover_from, str): self.run_hash = self.recover_from.split("/")[-1] # try to look for gen0 files, if multiple are found, safely exit else: gen0_files = glob.glob("*gen0.json") if len(gen0_files) > 1: raise SystemExit( "Several incomplete runs found in this folder, please tidy up before re-running." ) if len(gen0_files) == 1: self.run_hash = gen0_files[0].split("/")[-1].replace("-gen0.json", "") self.recover_from = self.run_hash else: print("No recovery possible, starting fresh run.") # set up logging numeric_loglevel = getattr(logging, self.loglevel.upper(), None) if not isinstance(numeric_loglevel, int): raise SystemExit( self.loglevel, "is an invalid log level, please use either `info`, `debug` or `warning`.", ) file_handler = logging.FileHandler(self.run_hash + ".log", mode="a") file_handler.setLevel(numeric_loglevel) file_handler.setFormatter( logging.Formatter("%(asctime)s - %(name)s | %(levelname)8s: %(message)s") ) LOG.addHandler(file_handler) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setLevel(numeric_loglevel) stream_handler.setFormatter( logging.Formatter("%(asctime)s - %(name)s | %(levelname)8s: %(message)s") ) LOG.addHandler(stream_handler) LOG.info("Starting up ilustrado {}".format(__version__)) # initialise fitness calculator if self.fitness_metric == "hull" and self.hull is None: if self.res_path is not None and os.path.isfile(self.res_path): res_files = glob.glob("{}/*.res".format(self.res_path)) if not res_files: raise SystemExit("No structures found in {}".format(self.res_path)) self.cursor = [] for res in res_files: self.cursor.append(res2dict(res)) self.hull = QueryConvexHull(cursor=self.cursor) raise SystemExit( "Need to pass a QueryConvexHull object to use hull distance metric." ) if self.fitness_metric in ["dummy", "hull_test"]: self.testing = True if self.testing and self.compute_mode == "slurm": raise SystemExit("Please use `compute_mode=direct` for testing.") print("Done!") self.fitness_calculator = FitnessCalculator( fitness_metric=self.fitness_metric, hull=self.hull, sandbagging=self.sandbagging, debug=self.debug, ) LOG.debug("Successfully initialised fitness calculator.") # if we're checking hull pdfs too, make this list now if self.check_dupes_hull: print("Computing extra PDFs from hull...") PDFFactory(self.hull.cursor) self.extra_pdfs = deepcopy(self.hull.cursor) # remove pdf object from cursor so generation can be serialized for ind, _ in enumerate(self.hull.cursor): del self.hull.cursor[ind]["pdf"] else: self.extra_pdfs = None LOG.info("Successfully initialised similarity lists.") if self.recover_from is not None: print("Attempting to recover from run {}".format(self.run_hash)) if isinstance(self.recover_from, str): LOG.info( "Attempting to recover from previous run {}".format(self.run_hash) ) self.recover() if not self.load_only: self.start()
[docs] def start(self): """ Start running GA. """ print("Initialising quantum mechanics...", end=" ") # read parameters for relaxation from seed files if self.seed is not None: seed = self.seed errors = [] self.cell_dict, success_cell = cell2dict(seed, db=False) self.param_dict, success_param = param2dict(seed, db=False) if not success_cell: errors.append("Failed to read cell file: {}".format(self.cell_dict)) if not success_param: errors.append("Failed to read param file: {}".format(self.param_dict)) if errors: raise RuntimeError("{}".format(errors.join("\n"))) else: self.seed = "ilustrado" if self.seed_prefix is not None: self.seed = self.seed_prefix self.cell_dict = {} self.param_dict = {} print("Done!\n") LOG.debug("Successfully initialised cell and param files.") if self.recover_from is None: self.seed_generation_0(self.gene_pool) if self.debug: print(self.nodes) if self.nodes is not None: LOG.info("Running on nodes: {}".format(" ".join(self.nodes))) elif self.compute_mode == "slurm": LOG.info("Running through SLURM queue") else: LOG.info("Running on localhost only") if self.debug: print( "Current number of generations: {}. Target number: {}".format( len(self.generations), self.num_generations ) ) # run GA self.num_generations while len(self.generations) < self.num_generations: self.breed_generation() LOG.info("Successfully bred generation {}".format(len(self.generations))) assert len(self.generations) == self.num_generations self.finalise_files_for_export() print("Reached target number of generations!") print("Completed GA!") LOG.info("Reached target number of generations!") LOG.info("Completed GA!")
[docs] def breed_generation(self): """ Build next generation from mutations/crossover of current and perform relaxations if necessary. """ # initialise next_gen if self.next_gen is None: self.next_gen = Generation( self.run_hash, len(self.generations), self.num_survivors, self.num_accepted, fitness_calculator=self.fitness_calculator, ) # newborns is a list of structures, initially raw then relaxed if self.compute_mode == "direct": self.continuous_birth() elif self.compute_mode in ("slurm", "manual"): self.batch_birth() if len(self.next_gen) < self.population: LOG.warning("Next gen is smaller than desired population.") # assert len(self.next_gen) >= self.population self.next_gen.rank() LOG.info("Ranked structures in generation {}".format(len(self.generations))) if not self.testing: cleaned = self.next_gen.clean() LOG.info( "Cleaned structures in generation {}, removed {}".format( len(self.generations), cleaned ) ) self.enforce_elitism() self.reset_and_dump() print(self.generations[-1])
[docs] def write_unrelaxed_generation(self): """ Perform mutations and write res files for the resulting structures. Additionally, dump an unrelaxed json file. """ while len(self.next_gen) < self.max_attempts: newborn = self.birth_new_structure() self.next_gen.birth(newborn) for newborn in self.next_gen: newborn = strip_useless(newborn) doc2res(newborn, newborn["source"][0], info=False) self.next_gen.dump("unrelaxed")
[docs] def batch_birth(self): """ Assess whether a generation has been relaxed already. This is done by checking for the existence of a file called <run_hash>-genunrelaxed.json. If so, match the relaxations up with the cached unrelaxed structures and rank them ready for the next generation. If not, create a new generation of structures, dump the unrelaxed structures to file, create the jobscripts to relax them, submit them and the job to check up on the relaxations, then exit. """ print("Beginning birthing of generation {}...".format(len(self.generations))) fname = "{}-genunrelaxed.json".format(self.run_hash) if os.path.isfile(fname): LOG.info("Found existing generation to be relaxed...") # load the unrelaxed structures into a dummy generation assert os.path.isfile(fname) unrelaxed_gen = Generation( self.run_hash, len(self.generations), self.num_survivors, self.num_accepted, dumpfile=fname, fitness_calculator=None, ) # check to see which unrelaxed structures completed successfully LOG.info("Scanning for completed relaxations...") for _, newborn in enumerate(unrelaxed_gen): completed_castep_filename = "completed/{}.castep".format(newborn["source"][0]) completed_res_filename = "completed/{}.res".format(newborn["source"][0]) doc = None s = None if os.path.isfile(completed_castep_filename): doc, s = castep2dict(completed_res_filename, db=True) elif os.path.isfile(completed_res_filename): doc, s = res2dict(completed_res_filename, db=True) # if we find a res file in a completed folder, assumed it was relaxed doc["optimised"] = True # if all was a success, then "birth" the structure, after checking for uniqueness if s and isinstance(doc, dict): newborn = strip_useless(newborn) doc = strip_useless(doc) newborn.update(doc) assert newborn.get("parents") is not None LOG.info("Scraping result for {}".format(newborn["source"][0])) self.scrape_result(newborn) else: LOG.warning( "Failed to add {}, data found: {}".format(newborn["source"][0], doc) ) # if there are not enough unrelaxed structures after that run, clean up then resubmit LOG.info( "Found {} structures out of target {}".format( len(self.next_gen), self.population ) ) if len(self.next_gen) < self.population: LOG.info("Initialising new relaxation jobs...") num_remaining = matador.compute.reset_job_folder() # check if we can even finish this generation if num_remaining < self.population - len(self.next_gen): LOG.warning( "There were too many failures, not enough remaining calculations to reach target." ) LOG.warning( "Consider restarting with a larger allowed failure_ratio." ) raise SystemExit( "Failed to return enough successful structures to continue, exiting..." ) if self.compute_mode == "slurm": # adjust number of nodes so we don't get stuck in the queue if self.max_num_nodes > num_remaining: LOG.info("Adjusted max num nodes to {}".format(self.max_num_nodes)) self.max_num_nodes = self.population - len(self.next_gen) self.slurm_submit_relaxations_and_monitor() LOG.info("Exiting monitor...") exit(0) # otherwise, remove unfinished structures from job file and release control of this generation else: LOG.info("Found enough structures to continue!".format()) count = 0 for doc in unrelaxed_gen: structure = doc["source"][0] + ".res" if os.path.isfile(structure): os.remove(structure) count += 1 LOG.info("Removed {} structures from job folder.".format(count)) return # otherwise, generate a new unrelaxed generation and submit else: LOG.info("Initialising new generation...") self.write_unrelaxed_generation() if self.compute_mode == "slurm": self.slurm_submit_relaxations_and_monitor() LOG.info("Exiting monitor...") exit(0)
[docs] def slurm_submit_relaxations_and_monitor(self): """ Prepare and submit the appropriate slurm files. """ LOG.info("Preparing to submit slurm scripts...") relax_fname = "{}_relax.job".format(self.run_hash) # override jobname with this run's hash to allow for selective job killing self.slurm_dict["SLURM_JOB_NAME"] = self.run_hash compute_string = "run3 {}".format(self.seed) matador.compute.slurm.write_slurm_submission_script( relax_fname, self.slurm_dict, compute_string, self.walltime_hrs, template=self.slurm_template, ) if self.max_num_nodes > self.max_attempts: self.max_num_nodes = self.max_attempts LOG.info("Adjusted max num nodes to {}".format(self.max_num_nodes)) # prepare script to read in results monitor_fname = "{}_monitor.job".format(self.run_hash) compute_string = "python {} >> ilustrado.out 2>> ilustrado.err".format( self.entrypoint ) matador.compute.slurm.write_slurm_submission_script( monitor_fname, self.slurm_dict, compute_string, 1, template=self.slurm_template, ) # submit jobs, if any exceptions, cancel all jobs try: array_job_id = matador.compute.slurm.submit_slurm_script( relax_fname, num_array_tasks=self.max_num_nodes ) LOG.info("Submitted job array: {}".format(array_job_id)) monitor_job_id = matador.compute.slurm.submit_slurm_script( monitor_fname, depend_on_job=array_job_id ) LOG.info("Submitted monitor job: {}".format(monitor_job_id)) except Exception as exc: LOG.error("Something went wrong, trying to cancel all jobs: {}".format(exc)) output = matador.compute.slurm.scancel_all_matching_jobs(name=self.run_hash) LOG.error("scancel output: {}".format(output)) raise SystemExit("Something went wrong, please check the log file.")
[docs] def continuous_birth(self): """ Create new generation and relax "as they come", filling the compute resources allocated. """ newborns = [] procs = [] # queues is a list of mp.Queues where return values will end up queues = [] if self.nodes is None: free_nodes = self.nprocs * [None] if isinstance(self.ncores, list): free_cores = self.nprocs * [None] else: free_cores = self.nprocs * [self.ncores] else: free_nodes = deepcopy(self.nodes) if isinstance(self.ncores, list): free_cores = deepcopy(self.ncores) else: free_cores = len(self.nodes) * [self.ncores] attempts = 0 print("Computing generation {}:".format(len(self.generations))) print(89 * "─") print( "{:^25} {:^10} {:^10} {:^10} {:^30}".format( "ID", "Formula", "# atoms", "Status", "Mutations" ) ) print(89 * "─") # print any recovered structures that already exist if self.next_gen: for _, structure in enumerate(self.next_gen): print( "{:^25} {:^10} {:^10} {:^10} {:^30}".format( structure["source"][0], get_formula_from_stoich(structure["stoichiometry"]), structure["num_atoms"], "Recovered", ", ".join(structure["mutations"]), ) ) self.used_sources = [doc["source"][0] for doc in self.next_gen] else: self.used_sources = [] try: finished = False while attempts < self.max_attempts and not finished: # if we've reached the target popn, try to kill remaining processes nicely if len(self.next_gen) >= self.population: finished = True # while there are still processes running, try to kill them with kill files # that should end the job at the completion of the next CASTEP run self._kill_all_gently(procs, newborns, queues) # are we using all nodes? if not, start some processes elif len(procs) < self.nprocs and len(self.next_gen) < self.population: # generate structure newborn = self.birth_new_structure() newborn_id = len(newborns) newborns.append(newborn) # clear up and assess CPU resources node = free_nodes.pop() ncores = free_cores.pop() # actually relax structure (or not, if testing is turned on) # TODO: refactor to be more general if self.ase_calculator: from ilustrado.util import AseRelaxation queues.append(mp.Queue()) relaxer = AseRelaxation(newborns[-1], queues[-1], calculator=self.ase_calculator) else: if self.testing: from ilustrado.util import FakeComputeTask as ComputeTask else: from matador.compute import ComputeTask queues.append(mp.Queue()) relaxer = ComputeTask( ncores=ncores, nnodes=None, node=node, res=newborns[-1], param_dict=self.param_dict, cell_dict=self.cell_dict, verbosity=1, killcheck=True, reopt=False, executable=self.executable, output_queue=queues[-1], start=False, **self.relaxer_params ) # store proc object with structure ID, node name, output queue and number of cores procs.append( NewbornProcess( newborn_id, node, mp.Process(target=relaxer.relax), ncores=ncores, ) ) procs[-1].process.start() LOG.info( "Initialised relaxation for newborn {} on node {} with {} cores.".format( ", ".join(newborns[-1]["source"]), node, ncores ) ) # are we using all nodes? if so, are they all still running? elif ( all([proc.process.is_alive() for proc in procs]) and len(procs) == self.nprocs ): # poll processes every second time.sleep(1) # so we were using all nodes, but some have died... else: LOG.debug("Suspected at least one dead node") # then find the dead ones, collect their results and # delete them so we're no longer using all nodes found_node = False for ind, proc in enumerate(procs): if not proc.process.is_alive(): LOG.debug("Found dead node {}".format(proc.node)) try: result = queues[ind].get(timeout=60) except Exception: result = False LOG.warning( "Node {} failed to write to queue for newborn {}".format( proc.node, ", ".join(newborns[proc.newborn_id]["source"]), ) ) if isinstance(result, dict): self.scrape_result(result, proc=proc, newborns=newborns) try: procs[ind].process.join(timeout=10) LOG.debug( "Process {proc.newborn_id} on node {proc.node} died gracefully.".format( proc=proc ) ) except Exception: LOG.warning( "Process {proc.newborn_id} on node {proc.node} has not died gracefully.".format( proc=proc ) ) procs[ind].process.terminate() LOG.warning( "Process {proc.newborn_id} on node {proc.node} terminated forcefully.".format( proc=proc ) ) if result is not False: free_nodes.append(proc.node) free_cores.append(proc.ncores) del procs[ind] del queues[ind] attempts += 1 found_node = True break # new_free_nodes, new_free_cores, found_node, extra_attempts = self._collect_from_nodes( # procs, newborns, queues # ) # attempts += extra_attempts # if new_free_nodes: # free_nodes.append(new_free_nodes) # free_cores.append(new_free_cores) if not found_node: time.sleep(10) break except Exception as exc: LOG.warning("Something has gone terribly wrong...") LOG.error("Exception caught:", exc_info=True) print_exc() # clean up on error/interrupt if len(procs) > 1: self.kill_all(procs) raise exc LOG.info("No longer breeding structures in this generation.") # clean up at end either way if len(procs) > 1: LOG.info( "Trying to kill {} on {} processes.".format(self.executable, len(procs)) ) self.kill_all(procs) if attempts >= self.max_attempts: LOG.warning("Failed to return enough successful structures to continue...") print( "Failed to return enough successful structures to continue, exiting..." ) exit()
[docs] def enforce_elitism(self): """ Add elite structures from previous generations to bourgeoisie of current generation, through the merit of their ancestors alone. """ # add random elite structures from previous gen if self.num_elite <= len(self.generations[-1].bourgeoisie): probabilities = ( np.asarray([doc["fitness"] for doc in self.generations[-1].bourgeoisie]) + 0.0001 ) probabilities /= np.sum(probabilities) elites = deepcopy( np.random.choice( self.generations[-1].bourgeoisie, self.num_elite, replace=False, p=probabilities, ) ) else: elites = deepcopy(self.generations[-1].bourgeoisie) if self.debug: for doc in elites: print( "Adding doc {} at {} eV/atom".format( " ".join(doc["text_id"]), doc["hull_distance"] ) ) self.next_gen.set_bourgeoisie( elites=elites, best_from_stoich=self.best_from_stoich ) LOG.info("Added elite structures from previous generation to next gen.") LOG.info("New length of next gen: {}.".format(len(self.next_gen))) LOG.info( "New length of bourgeoisie: {}.".format(len(self.next_gen.bourgeoisie)) )
[docs] def reset_and_dump(self): """ Add now complete generation to generation list, reset the next_gen variable and write dump files. """ # copy next generation to list of generations self.generations.append(copy(self.next_gen)) # reset next_gen ready for, well, the next gen self.next_gen = None assert self.generations[-1] is not None LOG.info( "Added current generation {} to generation list.".format( len(self.generations) - 1 ) ) # remove interim dump file and create new ones for populace and bourgeoisie self.generations[-1].dump(len(self.generations) - 1) self.generations[-1].dump_bourgeoisie(len(self.generations) - 1) if os.path.isfile("{}-gencurrent.json".format(self.run_hash)): os.remove("{}-gencurrent.json".format(self.run_hash)) if os.path.isfile("{}-genunrelaxed.json".format(self.run_hash)): os.remove("{}-genunrelaxed.json".format(self.run_hash)) LOG.info( "Dumped generation file for generation {}".format(len(self.generations) - 1) )
[docs] def birth_new_structure(self): """ Generate a new structure from current settings. Returns: dict: newborn structure to be optimised """ possible_parents = ( self.generations[-1].populace if len(self.generations) == 1 else self.generations[-1].bourgeoisie ) newborn = adapt( possible_parents, self.mutation_rate, self.crossover_rate, mutations=self.mutations, max_num_mutations=self.max_num_mutations, max_num_atoms=self.max_num_atoms, structure_filter=self.structure_filter, minsep_dict=self.minsep_dict, debug=self.debug, ) newborn_source_id = len(self.next_gen) if self.compute_mode == "direct": while ( "{}-GA-{}-{}x{}".format( self.seed, self.run_hash, len(self.generations), newborn_source_id ) in self.used_sources ): newborn_source_id += 1 self.used_sources.append( "{}-GA-{}-{}x{}".format( self.seed, self.run_hash, len(self.generations), newborn_source_id ) ) newborn["source"] = [ "{}-GA-{}-{}x{}".format( self.seed, self.run_hash, len(self.generations), newborn_source_id ) ] LOG.info( "Initialised newborn {} with mutations ({})".format( ", ".join(newborn["source"]), ", ".join(newborn["mutations"]) ) ) return newborn
[docs] def scrape_result(self, result, proc=None, newborns=None): """ Check process for result and scrape into self.next_gen if successful, with duplicate detection if desired. If the optional arguments are provided, extra logging info will be found when running in `direct` mode. Parameters: result (dict): containing output from process Keyword Arguments: proc (tuple) : standard process tuple from above, newborns (list): of new structures to append result to. """ if self.debug: if proc is not None: print(proc) print(dumps(result, sort_keys=True)) if result.get("optimised"): status = "Relaxed" if proc is not None: LOG.debug( "Newborn {} successfully optimised".format( ", ".join(newborns[proc.newborn_id]["source"]) ) ) if result.get("parents") is None: LOG.warning( "Failed to get parents for newborn {}.".format( ", ".join(newborns[proc.newborn_id]["source"]) ) ) result["parents"] = newborns[proc.newborn_id]["parents"] result["mutations"] = newborns[proc.newborn_id]["mutations"] result = strip_useless(result) dupe = False if self.check_dupes: dupe = self.is_newborn_dupe(result, extra_pdfs=self.extra_pdfs) if dupe: status = "Duplicate" if proc is not None: LOG.debug( "Newborn {} is a duplicate and will not be included.".format( ", ".join(newborns[proc.newborn_id]["source"]) ) ) else: LOG.debug( "Newborn {} is a duplicate and will not be included.".format( result["source"][0] ) ) with open(self.run_hash + "-dupe.json", "a") as f: dump(result, f, sort_keys=False, indent=2) if not dupe: self.next_gen.birth(result) if proc is not None: LOG.info( "Newborn {} added to next generation.".format( ", ".join(newborns[proc.newborn_id]["source"]) ) ) else: LOG.info( "Newborn {} added to next generation.".format( result["source"][0] ) ) LOG.info("Current generation size: {}".format(len(self.next_gen))) self.next_gen.dump("current") LOG.debug("Dumping json file for interim generation...") else: status = "Failed" result = strip_useless(result) with open(self.run_hash + "-failed.json", "a") as f: dump(result, f, sort_keys=False, indent=2) print( "{:^25} {:^10} {:^10} {:^10} {:^30}".format( result["source"][0], get_formula_from_stoich(result["stoichiometry"]), result["num_atoms"], status, ", ".join(result["mutations"]), ) )
[docs] def kill_all(self, procs): """ Loop over processes and kill them all. Parameters: procs (list): list of :obj:`NewbornProcess` in form documented above. """ for proc in procs: if self.nodes is not None: sp.run( ["ssh", proc.node, "pkill {}".format(self.executable)], timeout=15, stdout=sp.DEVNULL, shell=False, ) proc.process.terminate()
[docs] def recover(self): """ Attempt to recover previous generations from files in cwd named '<run_hash>_gen{}.json'.format(gen_idx). """ if not os.path.isfile(("{}-gen0.json").format(self.run_hash)): exit("Failed to load run, files missing for {}".format(self.run_hash)) if ( os.path.isfile(("{}-gencurrent.json").format(self.run_hash)) and self.compute_mode != "slurm" ): incomplete = True LOG.info("Found incomplete generation for {}".format(self.run_hash)) else: incomplete = False try: i = 0 while os.path.isfile("{}-gen{}.json".format(self.run_hash, i)): LOG.info( "Trying to load generation {} from run {}.".format(i, self.run_hash) ) fname = "{}-gen{}.json".format(self.run_hash, i) self.generations.append( Generation( self.run_hash, i, self.num_survivors, self.num_accepted, dumpfile=fname, fitness_calculator=None, ) ) LOG.info( "Successfully loaded {} structures into generation {} from run {}.".format( len(self.generations[-1]), i, self.run_hash ) ) i += 1 print("Recovered from run {}".format(self.run_hash)) LOG.info("Successfully loaded run {}.".format(self.run_hash)) except Exception: print_exc() LOG.error( "Something went wrong when reloading run {}".format(self.run_hash) ) exit("Something went wrong when reloading run {}".format(self.run_hash)) if not self.generations: raise SystemExit("No generations found!") for i, _ in enumerate(self.generations): if not self.testing: if i != 0: removed = self.generations[i].clean() LOG.info( "Removed {} structures from generation {}".format(removed, i) ) if i == len(self.generations) - 1 and len(self.generations) > 1: if self.num_elite <= len(self.generations[-2].bourgeoisie): # generate elites with probability proportional to their fitness, but ensure every p is non-zero probabilities = ( np.asarray( [doc["fitness"] for doc in self.generations[-2].bourgeoisie] ) + 0.0001 ) probabilities /= np.sum(probabilities) elites = deepcopy( np.random.choice( self.generations[-2].bourgeoisie, self.num_elite, replace=False, p=probabilities, ) ) else: elites = deepcopy(self.generations[-2].bourgeoisie) self.generations[i].set_bourgeoisie( best_from_stoich=self.best_from_stoich, elites=elites ) else: bourge_fname = "{}-gen{}-bourgeoisie.json".format(self.run_hash, i) if os.path.isfile(bourge_fname): self.generations[i].load_bourgeoisie(bourge_fname) else: self.generations[i].set_bourgeoisie( best_from_stoich=self.best_from_stoich ) LOG.info( "Bourgeoisie contains {} structures: generation {}".format( len(self.generations[i].bourgeoisie), i ) ) assert len(self.generations[i]) >= 1 assert len(self.generations[i].bourgeoisie) >= 1 if incomplete: LOG.info( "Trying to load incomplete generation from run {}.".format( self.run_hash ) ) fname = "{}-gen{}.json".format(self.run_hash, "current") self.next_gen = Generation( self.run_hash, len(self.generations), self.num_survivors, self.num_accepted, dumpfile=fname, fitness_calculator=self.fitness_calculator, ) LOG.info( "Successfully loaded {} structures into current generation ({}) from run {}.".format( len(self.next_gen), len(self.generations), self.run_hash ) ) assert len(self.next_gen) >= 1
[docs] def seed_generation_0(self, gene_pool): """ Set up first generation from gene pool. Parameters: gene_pool (list(dict)): list of structure with which to seed generation. """ self.gene_pool = gene_pool for ind, parent in enumerate(self.gene_pool): if "_id" in parent: del self.gene_pool[ind]["_id"] # check gene pool is sensible errors = [] if not isinstance(self.gene_pool, list): errors.append("Initial gene pool not a list: {}".format(self.gene_pool)) if not len(self.gene_pool) >= 1: errors.append( "Initial gene pool not long enough: {}".format(self.gene_pool) ) if errors: raise SystemExit("Initial genee pool is not sensible: \n".join(errors)) generation = Generation( self.run_hash, 0, len(gene_pool), len(gene_pool), fitness_calculator=self.fitness_calculator, populace=self.gene_pool, ) generation.rank() generation.set_bourgeoisie(best_from_stoich=False) LOG.info( "Successfully initialised generation 0 with {} members".format( len(generation) ) ) generation.dump(0) generation.dump_bourgeoisie(0) print(generation) self.generations.append(generation)
[docs] def is_newborn_dupe(self, newborn, extra_pdfs=None): """ Check each generation for a duplicate structure to the current newborn, using PDF calculator from matador. Parameters: newborn (dict): new structure to screen against the existing, Keyword Arguments: extra_pdfs (list(dict)): any extra PDFs to compare to, e.g. other hull structures not used to seed any generation Returns: bool: True if duplicate, else False. """ for ind, gen in enumerate(self.generations): if ind == 0: if gen.is_dupe(newborn, extra_pdfs=extra_pdfs): return True else: if gen.is_dupe(newborn): return True return False
[docs] def finalise_files_for_export(self): """ Move unique structures from gen1 onwards to folder "<run_hash>-results". """ path = "{}-results".format(self.run_hash) os.makedirs(path.format(self.run_hash), exist_ok=True) LOG.info("Moving unique files to {}-results/...".format(self.run_hash)) cursor = [struc for gen in self.generations[1:] for struc in gen] uniq_inds, _, _, _, = get_uniq_cursor(cursor, projected=True) cursor = [cursor[ind] for ind in uniq_inds] for doc in cursor: source = get_root_source(doc) if not source: LOG.warning("Issue writing {}".format(doc["source"])) continue else: doc2res( doc, "{}/{}".format(path, source), overwrite=False, hash_dupe=False ) if os.path.isfile("completed/{}".format(source.replace(".res", ".castep"))): shutil.copy( "completed/{}".format(source.replace(".res", ".castep")), "{}/{}".format(path, source.replace(".res", ".castep")), )
def _kill_all_gently(self, procs, newborns, queues): """ Kill all running processes. Parameters: procs (list): list of `:obj:NewbornProcess` objects. newborns (list): list of corresponding structures. queues (list): list of queues that were collecting results. """ kill_attempts = 0 while procs and kill_attempts < 5: for ind, proc in enumerate(procs): # create kill file so that matador will stop next finished CASTEP filename = "{}.kill".format(newborns[proc.newborn_id]["source"][0]) with open(filename, "w"): pass # wait 1 minute for CASTEP run if proc.process.join(timeout=60) is not None: result = queues[ind].get(timeout=60) if isinstance(result, dict): self.scrape_result(result, proc=proc, newborns=newborns) del procs[ind] kill_attempts += 1 if kill_attempts >= 5: for ind, proc in enumerate(procs): proc.process.terminate() del procs[ind]