Source code for clinguin.server.application.backends.clingo_backend

# pylint: disable=R0801
# pylint: disable=too-many-lines
"""
Module that contains the ClingoBackend.
"""
import functools
import logging
import textwrap
import time
from functools import cached_property
from pathlib import Path
from typing import Any

from clingo import Control, parse_term
from clingo.script import enable_python

from clinguin.server import StandardJsonEncoder, UIState
from clinguin.server.data.domain_state import solve, tag

from ....utils.logger import domctl_log
from ....utils.transformer import UsesSignatureTransformer

enable_python()
# pylint: disable=attribute-defined-outside-init


[docs]class ClingoBackend: """ This backend contains the basic clingo functionality for a backend using clingo. """
[docs] def __init__(self, args): """ Creates the Backend with the given arguments. It will setup all attributes by calling :func:`~_init_ds_constructors()` and :func:`~_restart()`. Generally this method should NOT be overwritten by custom backends. Instead, custom backends should overwrite specialized methods. Arguments: args (ArgumentParser): The arguments from the argument parser that are given for the registered options. """ self._args = args self._logger = logging.getLogger(args.log_args["name"]) # Setup static attributes that might be changed by custom backends and must be preserved after restarts self._init_ds_constructors() # Restart the backend to initialize all attributes self._restart()
# --------------------------------------------- # Class methods # ---------------------------------------------
[docs] @classmethod def register_options(cls, parser): """ Registers options in the command line. It can be extended by custom backends to add custom command-line options. Arguments: parser (ArgumentParser): A group of the argparse argument parser Example: .. code-block:: python @classmethod def register_options(cls, parser): ClingoBackend.register_options(parser) parser.add_argument( "--my-custom-option", help="Help message", nargs="*", ) """ parser.add_argument( "--domain-files", nargs="+", help="Files with the domain specific encodings and the instances", metavar="", ) parser.add_argument( "--ui-files", nargs="+", help="Files with the encodings that generate the UI predicates: elem, attr and when", metavar="", ) parser.add_argument( "-c", "--const", action="append", help="Constant passed to clingo, <id>=<term> replaces term occurrences of <id> with <term>", metavar="", ) parser.add_argument( "--clingo-ctl-arg", action="append", help="""Argument that will be passed to clingo control object for the domain. Should have format <name>=<value>, for example parallel-mode=2 will become --parallel-mode=2.""", metavar="", ) parser.add_argument( "--default-opt-mode", type=str, help="Default optimization mode for computing a model", default="ignore", metavar="", ) parser.add_argument( "--opt-timeout", help=textwrap.dedent( """\ Optional timeout for searching for optimal models. The timeout is not exactly enforced (might take longer) but only checked after each solution is found. """ ), type=int, metavar="", )
# --------------------------------------------- # Properties # --------------------------------------------- @property def _is_browsing(self): """ Property to tell if clinguin is in browsing mode. """ return self._iterator is not None @property def _constants_argument_list(self) -> list: """ Gets the constants as a list of strings in the format "-c <name>=<value>" """ return [f"-c {k}={v}" for k, v in self._constants.items()] @property def _ctl_arguments_list(self) -> list: """ Gets the list of arguments used for creating a control object """ return ( ["0"] + self._constants_argument_list + [f"--{o}" for o in self._clingo_ctl_arg] ) @property def _assumption_list(self) -> list: """ A list of assumptions in the format of form (a, True) or (a, False) """ return self._assumptions # --------------------------------------------- # Initialization # ---------------------------------------------
[docs] def _restart(self): """ Restarts the backend by setting all attributes, initializing controls and grounding. It is automatically called when the server starts. See Also: :func:`~_init_command_line`, :func:`~_init_interactive`, :func:`~_outdate`, :func:`~_init_ctl`, :func:`~_ground` """ self._init_command_line() self._init_interactive() self._outdate() self._init_ctl() self._ground()
[docs] def _init_ds_constructors(self): """ This method initializes the domain state constructors list and the backup cache dictionary. It also adds the default domain state constructors to the list. This method is called only when the server starts. Attributes: _domain_state_constructors (list): A list to store the domain state constructors. _backup_ds_cache (dict): A dictionary to store the backup domain state cache. It can be extended by custom backends to add/edit domain state constructors. Adding a domain state constructor should be done by calling :func:`~_add_domain_state_constructor()`. Example: .. code-block:: python @property def _ds_my_custom_constructor(self): # Creates custom program return "my_custom_program." def _init_ds_constructors(self): super()._init_ds_constructors() self._add_domain_state_constructor("_ds_my_custom_constructor") """ self._domain_state_constructors = [] self._backup_ds_cache = {} self._add_domain_state_constructor("_ds_context") self._add_domain_state_constructor("_ds_constants") self._add_domain_state_constructor("_ds_browsing") self._add_domain_state_constructor("_ds_cautious_optimal") self._add_domain_state_constructor("_ds_brave_optimal") self._add_domain_state_constructor("_ds_cautious") self._add_domain_state_constructor("_ds_brave") self._add_domain_state_constructor("_ds_model") # Keep after brave and cautious self._add_domain_state_constructor("_ds_opt") self._add_domain_state_constructor("_ds_unsat") # Keep after all solve calls self._add_domain_state_constructor("_ds_assume") self._add_domain_state_constructor("_ds_external")
[docs] def _init_command_line(self): """ Initializes the attributes based on the command-line arguments provided. This method is called when the server starts or after a restart. Attributes: _domain_files (list): The list of domain files provided via command line. _ui_files (list): The list of UI files provided via command line. _constants (dict): The dictionary of constants provided via command line. _clingo_ctl_arg (list): The list of clingo control arguments provided via command line. If any command line arguments are added in :meth:`~ClingoBackend.register_options`, they should be initialized here. Example: .. code-block:: python def _init_command_line(self): super()._init_command_line() self._my_custom_attr = self._args.my_custom_option """ self._domain_files = self._args.domain_files or [] if not self._args.ui_files: raise RuntimeError("UI files need to be provided under --ui-files") self._ui_files = self._args.ui_files self._constants = {} if self._args.const is not None: for c in self._args.const: if "=" not in c: raise ValueError("Invalid constant format. Expected name=value.") name, value = c.split("=") self._constants[name] = value self._clingo_ctl_arg = self._args.clingo_ctl_arg or [] self._default_opt_mode = self._args.default_opt_mode self._opt_timeout = self._args.opt_timeout
[docs] def _init_interactive(self): """ Initializes the attributes that will change during the interaction. This method is called when the server starts or after a restart. Attributes: _context (list): A list to store the context set by the general handler of requests. _handler (clingo.SolveHandle): The handler set while browsing in the `next_solution` operation. _iterator (iter): The iterator set while browsing in the `next_solution` operation. _ctl (clingo.Control): The domain control set in `_init_ctl`. _ui_state (:class:`UIState`): A UIState object used to handle the UI construction, set in every call to `_update_ui_state`. _atoms (set[str]): A set to store the atoms set dynamically in operations during the interaction. _assumptions (set[(str,bool)]): A set to store the assumptions set dynamically in operations during the interaction. _externals (dict): A dictionary with true, false and released sets of external atoms _model (list[clingo.Symbol]): The model set in `on_model`. _unsat_core (list[int]): The unsatisfiable core set in `on_model`. _cost (list): A list to store the cost set in `on_model`. _optimal (bool): A boolean indicating if the solution is optimal, set in `on_model`. _optimizing (bool): A boolean indicating if the solver is currently optimizing, set in `on_model`. _messages (list[tuple[str,str,str]]): A list to store the messages (title, content, type) to be shown in the UI, set dynamically in operations during the interaction. """ # Context: Set by the general handler of requests self._context = [] # Domain Control: Set in _init_ctl self._ctl = None # UIState object to handle the UI construction: Set in every time in _update_ui_state self._ui_state = None # Atoms and assumptions: Set dynamically in operations during the interaction self._atoms = set() self._assumptions = set() self._externals = {"true": set(), "false": set(), "released": set()} # Handler and Iterator: Set while browsing in next_solution operation self._handler = None self._iterator = None # Attributes from the model: Set in on_model self._model = None self._unsat_core = None self._cost = [] self._optimal = False self._optimizing = False # Messages to be shown in the UI: Set dynamically in operations during the interaction self._messages = []
[docs] def _init_ctl(self): """ Creates the domain control and loads the domain files. See Also: :func:`~_create_ctl`, :func:`~_load_and_add` """ self._create_ctl() self._load_and_add()
[docs] def _create_ctl(self) -> None: """ Initializes the control object (domain-control). It is used when the server is started or after a restart. See Also: :func:`~_load_file` """ self._logger.debug( domctl_log(f"domain_ctl = Control({self._ctl_arguments_list})") ) self._ctl = Control(self._ctl_arguments_list)
[docs] def _load_and_add(self) -> None: """ Loads domain files and atoms into the control. This method iterates over the domain files and atoms specified in the instance and loads them into the control. It raises an exception if a domain file does not exist or if there is a syntax error in the logic program file. Raises: Exception: If a domain file does not exist or if there is a syntax error in the logic program file. See Also: :func:`~_load_file` """ for f in self._domain_files: path = Path(f) if not path.is_file(): self._logger.critical("File %s does not exist", f) raise Exception(f"File {f} does not exist") try: self._load_file(f) except Exception as e: self._logger.critical( "Failed to load file %s (there is likely a syntax error in this logic program file).", f, ) self._logger.critical(str(e)) raise e for atom in self._atoms: self._logger.debug(domctl_log('domctl.add("base", [], {str(atom)} + ".")')) self._ctl.add("base", [], str(atom) + ".")
[docs] def _load_file(self, f): """ Loads a file into the control. This method can be overwritten if any pre-processing is needed. Arguments: f (str): The file path """ self._logger.debug(domctl_log(f"domctl.load({str(f)})")) self._ctl.load(str(f))
[docs] def _outdate(self): """ Outdates all the dynamic values when a change has been made. Any current interaction in the models wil be terminated by canceling the search and removing the iterator. See Also: :func:`~_clear_cache` """ if self._handler: self._handler.cancel() self._handler = None self._iterator = None self._model = None self._clear_cache()
# --------------------------------------------- # Setters # ---------------------------------------------
[docs] def _add_domain_state_constructor(self, method: str): """ Adds a method name to the domain constructors. The provided method needs to be annotated with ``@property`` or ``@cached_property`` Arguments: method (str): Name of the property method """ self._domain_state_constructors.append(method)
[docs] def _set_context(self, context): """ Sets the context. Used by general endpoint handler after a request. Arguments: context: The context dictionary """ self._context = context
[docs] def _set_constant(self, name: str, value: Any) -> None: """ Sets a constant in the backend and restarts the control. Args: name (str): name of the constant value (Any): value of the constant """ name = name.strip('"') self._constants[name] = value value = str(value).strip('"') self._constants[name] = value self._logger.debug("Constant %s updated successfully to %s", name, value)
[docs] def _add_atom(self, predicate_symbol): """ Adds an atom if it hasn't been already added Arguments: predicate_symbol (clingo.Symbool): The symbol for the atom """ if predicate_symbol not in self._atoms: self._atoms.add(predicate_symbol)
[docs] def _set_external(self, symbol, name): """ Sets the external value of a symbol. Args: symbol (clingo.Symbol): The clingo symbol to be set name (str): Either "true", "false" or "release" """ if name == "release": self._logger.debug(domctl_log(f"ctl.release_external({symbol})")) self._ctl.release_external(symbol) self._externals["released"].add(symbol) if symbol in self._externals["true"]: self._externals["true"].remove(symbol) if symbol in self._externals["false"]: self._externals["false"].remove(symbol) elif name == "true": self._logger.debug(domctl_log(f"ctl.assign_external({symbol}, True)")) self._ctl.assign_external(symbol, True) self._externals["true"].add(symbol) if symbol in self._externals["false"]: self._externals["false"].remove(symbol) elif name == "false": self._logger.debug(domctl_log(f"ctl.assign_external({symbol}, False)")) self._ctl.assign_external(symbol, False) self._externals["false"].add(symbol) if symbol in self._externals["true"]: self._externals["true"].remove(symbol) else: raise ValueError( f"Invalid external value {name}. Must be true, false or relase" )
[docs] def _add_assumption(self, symbol, value="true"): """ Adds an assumption to the list of assumptions. Args: symbol (clingo.Symbol): The clingo symbol to be added as a True assumption value (true): The value of the assumption either "true" or "false" """ bool_val = value == "true" self._assumptions.add((symbol, bool_val))
# --------------------------------------------- # Solving # ---------------------------------------------
[docs] def _ground(self, program="base", arguments=None): """ Grounds the provided program Arguments: program (str): The name of the program to ground (defaults to "base") arguments (list, optional): The list of arguments to ground the program. Defaults to an empty list. """ arguments = arguments or [] arguments = [arguments] if not isinstance(arguments, list) else arguments self._logger.debug(domctl_log(f"domctl.ground([({program}, {arguments})])")) arguments_symbols = [parse_term(a) for a in arguments] self._ctl.ground([(program, arguments_symbols)])
[docs] def _prepare(self): """ Does any preparation before a solve call. """
[docs] def _on_model(self, model): """ This method is called each time a model is obtained by the domain control. It sets the model, and optimization attributes. It can be extended to add custom features of the model. Arguments: model (clingo.Model): The found clingo model """ self._optimizing = len(model.cost) > 0 if len(model.cost) > 0: self._logger.debug(" Cost: %s", model.cost) self._optimal = model.optimality_proven self._cost = model.cost
# --------------------------------------------- # UI state # ---------------------------------------------
[docs] def _update_ui_state(self): """ Updates the UI state by calling all domain state methods and creating a new control object (ui_control) using the UI files provided """ domain_state = self._domain_state self._ui_state = UIState( self._ui_files, domain_state, self._constants_argument_list ) self._ui_state.update_ui_state() self._ui_state.replace_images_with_b64() for m in self._messages: self._ui_state.add_message(m[0], m[1], m[2]) self._messages = []
# --------------------------------------------- # Domain state # ---------------------------------------------
[docs] def _clear_cache(self, methods=None): """ Clears the cache of domain state constructor methods Arguments: methods (list, optional): A list with the methods to remove the cache from. If no value is passed then all cache is removed """ if methods is None: methods = self._domain_state_constructors for m in methods: if m in self.__dict__: self._backup_ds_cache[m] = self.__dict__[m] del self.__dict__[m]
[docs] def _call_solver_with_cache( self, ds_id: str, ds_tag: str, models: int, opt_mode: str, enum_mode: str ): """ Generic function to call the using exiting cache on browsing. Un UNSAT it returns the output saved in the cache Arguments: ds_id: Identifier used in the cache Returns: The program tagged """ if self._is_browsing: self._logger.debug("Returning cache for %s", ds_id) return ( self._backup_ds_cache[ds_id] if ds_id in self._backup_ds_cache else "" ) self._logger.debug(domctl_log(f'domctl.configuration.solve.models = {models}"')) self._logger.debug( domctl_log(f'domctl.configuration.solve.opt_mode = {opt_mode}"') ) self._logger.debug( domctl_log(f'domctl.configuration.solve.enum_mode = {enum_mode}"') ) self._ctl.configuration.solve.models = models self._ctl.configuration.solve.opt_mode = opt_mode self._ctl.configuration.solve.enum_mode = enum_mode self._prepare() self._logger.debug( domctl_log( f"domctl.solve({[(str(a),b) for a,b in self._assumption_list]}, yield_=True)" ) ) symbols, ucore = solve( self._ctl, self._assumption_list, self._on_model, ) self._unsat_core = ucore if symbols is None: self._logger.warning("Got an UNSAT result with the given domain encoding.") return ( self._backup_ds_cache[ds_id] if ds_id in self._backup_ds_cache else "" ) return " ".join([str(s) + "." for s in list(tag(symbols, ds_tag))]) + "\n"
[docs] @functools.lru_cache(maxsize=None) # pylint: disable=[method-cache-max-size-none] def _ui_uses_predicate(self, name: str, arity: int): """ Returns a truth value of weather the ui_files contain the given signature. Args: name (str): Predicate name arity (int): Predicate arity """ transformer = UsesSignatureTransformer(name, arity) self._logger.debug("Transformer parsing UI files to find %s/%s", name, arity) transformer.parse_files(self._ui_files) if not transformer.contained: self._logger.debug( "Predicate NOT contained. Domain constructor will be skipped" ) return transformer.contained
@property def _domain_state(self): """ Gets the domain state by calling all the domain constructor methods Some domain state constructors might skip the computation if the UI does not require them. """ ds = "" for f in self._domain_state_constructors: ds += f"\n%%%%%%%% {f} %%%%%%%\n" ds += getattr(self, f) return ds # -------- Domain state methods @property def _ds_context(self): """ Adds context information from the client. Includes predicate ``_clinguin_context/2`` indicating each key and value in the context. """ prg = "#defined _clinguin_context/2. " for a in self._context: value = str(a.value) try: symbol = parse_term(value) except Exception: symbol = None if symbol is None: value = f'"{value}"' prg += f"_clinguin_context({str(a.key)},{value})." return prg + "\n" @cached_property def _ds_model(self): """ Computes model and adds all atoms as facts. When the model is being iterated by the user, the current model is returned. It will use as optimality the mode set in the command line as `default-opt-mode` (`ignore` by default). It uses a cache that is erased after an operation makes changes in the control. """ if self._model is None: self._logger.debug( domctl_log('domctl.configuration.solve.enum_mode = "auto"') ) self._ctl.configuration.solve.models = 1 self._ctl.configuration.solve.opt_mode = self._default_opt_mode self._ctl.configuration.solve.enum_mode = "auto" self._prepare() self._logger.debug( domctl_log( f"domctl.solve({[(str(a),b) for a,b in self._assumption_list]}, yield_=True)" ) ) symbols, ucore = solve(self._ctl, self._assumption_list, self._on_model) self._unsat_core = ucore if symbols is None: self._logger.warning( "Got an UNSAT result with the given domain encoding." ) return ( self._backup_ds_cache["_ds_model"] + "\n".join([str(a) + "." for a in self._atoms]) if "_ds_model" in self._backup_ds_cache else "" ) self._model = symbols return " ".join([str(s) + "." for s in self._model]) + "\n" @cached_property def _ds_brave(self): """ Computes brave consequences adds them as predicates ``_any/1``. This are atoms that appear in some model. If it is not used in the UI files then the computation is not performed. It uses a cache that is erased after an operation makes changes in the control. """ if not self._ui_uses_predicate("_any", 1): return "% NOT USED\n" return self._call_solver_with_cache("_ds_brave", "_any", 0, "ignore", "brave") @cached_property def _ds_cautious(self): """ Computes cautious consequences adds them as predicates ``_all/1``. This are atoms that appear in all models. If it is not used in the UI files then the computation is not performed. It uses a cache that is erased after an operation makes changes in the control. """ if not self._ui_uses_predicate("_all", 1): return "% NOT USED\n" return self._call_solver_with_cache( "_ds_cautious", "_all", 0, "ignore", "cautious" ) @cached_property def _ds_brave_optimal(self): """ Computes brave consequences for only optimal solutions adds them as predicates ``_any_opt/1``. This are atoms that appear in some optimal model. If it is not used in the UI files then the computation is not performed. It uses a cache that is erased after an operation makes changes in the control. """ if not self._ui_uses_predicate("_any_opt", 1): return "% NOT USED\n" return self._call_solver_with_cache( "_ds_brave_optimal", "_any_opt", 0, "optN", "brave" ) @cached_property def _ds_cautious_optimal(self): """ Computes cautious consequences of optimal models adds them as predicates ``_all_opt/1``. This are atoms that appear in all optimal models. If it is not used in the UI files then the computation is not performed. It uses a cache that is erased after an operation makes changes in the control. """ if not self._ui_uses_predicate("_all_opt", 1): return "% NOT USED\n" return self._call_solver_with_cache( "_ds_cautious_optimal", "_all_opt", 0, "optN", "cautious" ) @property def _ds_unsat(self): """ Adds information about the statisfiablity of the domain control Includes predicate ``_clinguin_unsat/0`` if the domain control is unsat """ prg = "#defined _clinguin_unsat/0. " if self._unsat_core is not None: prg += "_clinguin_unsat." return prg + "\n" @property def _ds_browsing(self): """ Adds information about the browsing state Includes predicate ``_clinguin_browsing/0`` if the user is browsing solutions """ prg = "#defined _clinguin_browsing/0. " if self._is_browsing: prg += "_clinguin_browsing." return prg + "\n" @property def _ds_assume(self): """ Adds information about the assumptions. Includes predicate ``_clinguin_assume/2`` for every atom that was assumed, where the second argument is either true or false. """ prg = "#defined _clinguin_assume/2. " for a, v in self._assumption_list: v_str = "true" if v else "false" prg += f"_clinguin_assume({str(a)},{v_str}). " return prg + "\n" @property def _ds_external(self): """ Adds information about the external atoms Includes predicate ``_clinguin_external/2`` for every external atom that has been set. """ prg = "#defined _clinguin_external/2. " for a in self._externals["true"]: prg += f"_clinguin_external({str(a)},true). " for a in self._externals["false"]: prg += f"_clinguin_external({str(a)},false). " for a in self._externals["released"]: prg += f"_clinguin_external({str(a)},release). " return prg + "\n" @property def _ds_opt(self): """ Adds program to pass with optimality information. Includes predicates: - ``_clinguin_cost/1``: With a single tuple indicating the cost of the current model - ``_clinguin_cost/2``: With the index and cost value, linearizing predicate ``_clinguin_cost/1`` - ``_clinguin_optimal/0``: If the solution is optimal - ``_clinguin_optimizing/0``: If there is an optimization in the program """ prg = "#defined _clinguin_cost/2. #defined _clinguin_cost/1. #defined _clinguin_optimal/1. " for i, c in enumerate(self._cost): prg += f"_clinguin_cost({i},{c}). " if self._optimal: prg += "_clinguin_optimal. " if self._optimizing: prg += "_clinguin_optimizing. " prg += f"_clinguin_cost({tuple(self._cost)}).\n" return prg @property def _ds_constants(self): """ Adds constants values. Includes predicate ``_clinguin_const/2`` for each constant provided in the command line and used in the domain files. """ prg = "#defined _clinguin_const/2. " for k, v in self._constants.items(): prg += f"_clinguin_const({k},{v})." return prg + "\n" ######################################################################################################## # --------------------------------------------- # Public operations # ---------------------------------------------
[docs] def get(self): """ Updates the UI and transforms the facts into a JSON. This method will be automatically called after executing all the operations. """ self._update_ui_state() json_structure = StandardJsonEncoder.encode(self._ui_state) return json_structure
[docs] def restart(self): """ Restarts the backend. It will initialize all attributes, remove atoms, assumptions and externals, restart the control object by initializing all parameters, controls, ending the browsing and grounding. """ self._restart()
[docs] def ground(self, program, arguments=None): """ Grounds the given program. Notice that the base program is grounded when the server starts. This operation can be used for grounding encodings with multiple programs in a multi-shot setting. Arguments: program (str): The name of the program to ground used in the #program directive arguments (tuple, optional): The list of arguments to ground the program. Defaults to an empty list. These are the arguments of your #program directive. For instance, in #program step(t). the argument is t. """ self._ground(program, arguments)
[docs] def update(self): """ Updates the UI by clearing the cache and computing the models again. """ self._clear_cache()
[docs] def download(self, show_prg=None, file_name="clinguin_download.lp"): """ Downloads the current model. It must be selected first via :func:`~select` . Arguments: show_prg (_type_, optional): Program to filter output using show statements. Defaults to None. file_name (str, optional): The name of the file for the download. Defaults to "clinguin_download.lp". """ if self._model is None: raise RuntimeError("Cant download when there is no model") show_prg = show_prg or "" prg = "\n".join([f"{s}." for s in self._model]) ctl = Control() ctl.add("base", [], prg) try: ctl.add("base", [], show_prg.replace('"', "")) except RuntimeError as exc: raise Exception( "Show program can't be parsed. Make sure it is a valid clingo program." ) from exc ctl.ground([("base", [])]) with ctl.solve(yield_=True) as hnd: for m in hnd: atoms = [f"{str(s)}." for s in m.symbols(shown=True)] final_prg = "\n".join(atoms) file_name = file_name.strip('"') with open(file_name, "w", encoding="UTF-8") as file: file.write(final_prg) self._messages.append( ( "Download successful", f"Information saved in file {file_name}.", "success", ) )
[docs] def clear_atoms(self): """ Removes all atoms and resets the backend. and finally updates the model and returns the updated gui as a Json structure. """ self._outdate() self._atoms = set() self._init_ctl() self._ground()
[docs] def add_atom(self, predicate): """ Adds an atom, restarts the control and grounds Arguments: predicate (str): The clingo symbol to be added """ predicate_symbol = parse_term(predicate) if predicate_symbol not in self._atoms: self._add_atom(predicate_symbol) self._init_ctl() self._ground() self._outdate()
[docs] def remove_atom(self, predicate): """ Removes an atom (if present), restarts the control and grounds again Arguments: predicate (str): The clingo symbol to be added """ predicate_symbol = parse_term(predicate) if predicate_symbol in self._atoms: self._atoms.remove(predicate_symbol) self._init_ctl() self._ground() self._outdate()
[docs] def next_solution(self, opt_mode="ignore"): """ Obtains the next solution. If a no browsing has been started yet, then it calls solve, otherwise it iterates the models in the last call. To keep the atoms shown in the solution, use :func:`~select`. Arguments: opt_mode: The clingo optimization mode, bu default is 'ignore', to browse only optimal models use 'optN' """ if self._ctl.configuration.solve.opt_mode != opt_mode: self._logger.debug("Ended browsing since opt mode changed") self._outdate() optimizing = opt_mode in ["optN", "opt"] if not self._iterator: self._logger.debug( domctl_log(f"domctl.configuration.solve.opt_mode = {opt_mode}") ) self._ctl.configuration.solve.enum_mode = "auto" self._ctl.configuration.solve.opt_mode = opt_mode self._ctl.configuration.solve.models = 0 self._prepare() self._logger.debug( domctl_log( f"domctl.solve({[(str(a),b) for a,b in self._assumption_list]}, yield_=True)" ) ) self._handler = self._ctl.solve(self._assumption_list, yield_=True) self._iterator = iter(self._handler) try: start = time.time() model = next(self._iterator) self._clear_cache(["_ds_model"]) self._on_model(model) self._model = model.symbols(shown=True, atoms=True, theory=True) while optimizing and not model.optimality_proven: if len(model.cost) == 0: self._messages.append( ( "Browsing Warning", "No optimization provided", "warning", ) ) self._logger.warning( "No optimization statement provided in encoding but optimization condition provided\ in 'next_solution' operation. Exiting browsing." ) break if ( self._opt_timeout is not None and time.time() - start > self._opt_timeout ): self._logger.warning( "Timeout for finding optimal model was reached. Returning model without proving optimality." ) break self._logger.debug("Skipping non-optimal model!") model = next(self._iterator) self._clear_cache(["_ds_model"]) self._on_model(model) self._model = model.symbols(shown=True, atoms=True, theory=True) except StopIteration: self._logger.info("No more solutions") self._outdate() self._messages.append(("Browsing Information", "No more solutions", "info"))
[docs] def clear_assumptions(self): """ Removes all assumptions. """ # pylint: disable=attribute-defined-outside-init self._outdate() self._assumptions = set()
[docs] def add_assumption(self, atom, value="true"): """ Adds an atom `a` as an assumption. If the value is "true", the atom is assumed to be true. This assumption can be considered as an integrity constraint: ``:- not a.`` forcing the program to entail the given atom. If the value is "false", the atom is assumed to be false: This assumption can be considered as an integrity constraint: ``:- a.`` forcing the program to never entail the given atom. Notice that the assumption must be generated by your domain encoding. Arguments: atom (str): The clingo symbol to be added as a true assumption value (str): The value of the assumption either "true" or "false" """ atom_symbol = parse_term(atom) if atom_symbol not in [a[0] for a in self._assumptions]: self._add_assumption(atom_symbol, value) self._outdate()
[docs] def remove_assumption(self, atom): """ Removes an atom from the assumptions list regardless of its value. This will allow the atom to take any value in the solution. Arguments: atom (str): The clingo symbol to be removed """ atom_symbol = parse_term(atom) for a, v in self._assumptions: if a == atom_symbol: self._assumptions.remove((a, v)) self._outdate() return
[docs] def remove_assumption_signature(self, atom): """ Removes from the list of assumptions those matching the given atom. Unlike function remove_assumption, this one allows for partial matches using the placeholder constant `any`. This will allow the atom to take any value in the solution. Arguments: atom (str): The atom description as a symbol, where the reserver word `any` is used to state that anything can take part of that position. For instance, `person(anna,any)`, will remove all assumptions of atom person, where the first argument is anna. """ atom_symbol = parse_term(atom) arity = len(atom_symbol.arguments) to_remove = [] for s, v in self._assumptions: if s.match(atom_symbol.name, arity): for i, a in enumerate(atom_symbol.arguments): if str(a) != "any" and s.arguments[i] != a: break else: to_remove.append((s, v)) continue for a in to_remove: self._assumptions.remove(a) if len(to_remove) > 0: self._outdate()
[docs] def set_external(self, atom, value): """ Sets the value of an external. Externals must be defined in the domain files using `#external`. The truth value of external atoms can then be provided by the user via this function. Arguments: atom (str): The clingo symbol to be set value (str): The value (release, true or false) """ symbol = parse_term(atom) name = value self._outdate() self._set_external(symbol, name)
[docs] def select(self, show_prg: str = ""): """ Select the current solution during browsing. All atoms in the solution are added as assumptions in the backend. Arguments: show_program (str): An optional show program to filter atoms """ if self._model is None: self._messages.append( ("No solution", "There is no solution to be selected", "danger") ) self._logger.error( "No solution. No model has been computed that can be selected" ) else: symbols_to_ignore = self._externals["true"] symbols_to_ignore.union(self._externals["false"]) if show_prg == "": model = self._model else: model = [] ctl = Control(["--warn=none"]) try: ctl.add("base", [], show_prg.strip('"')) except RuntimeError as exc: raise Exception( "Show program can't be parsed. Make sure it is a valid clingo program." ) from exc prg = "\n".join([f"{str(s)}." for s in self._model]) ctl.add("base", [], prg) ctl.ground([("base", [])]) def add_shown(m): for s in m.symbols(shown=True): model.append(s) ctl.solve(on_model=add_shown) for s in model: # pylint: disable=E1133 if s not in symbols_to_ignore: self._add_assumption(s, "true") self._outdate()
[docs] def stop_browsing(self): """ Stops the current browsing. """ self._outdate()
[docs] def set_constant(self, name: str, value: Any): """ Sets a constant value. Will reinitialize the control, ground and set arguments """ self._set_constant(name, value) self._init_interactive() self._outdate() self._init_ctl() self._ground()