diff --git a/Chapter-02-Intellegent-Agents.ipynb b/Chapter-02-Intellegent-Agents.ipynb new file mode 100644 index 000000000..9af290de8 --- /dev/null +++ b/Chapter-02-Intellegent-Agents.ipynb @@ -0,0 +1,1389 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "8700da5b", + "metadata": {}, + "source": [ + "# Intelligent Agents #\n", + "\n", + "This notebook serves as supporting material for topics covered in **Chapter 2 - Intelligent Agents** from the book *Artificial Intelligence: A Modern Approach.* This notebook uses implementations from [agents.py](https://github.com/aimacode/aima-python/blob/master/agents.py) module. Let's start by importing everything from agents module." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "2a72e87c", + "metadata": {}, + "outputs": [], + "source": [ + "from agents import *\n", + "from notebook import psource" + ] + }, + { + "cell_type": "markdown", + "id": "a08f7f34", + "metadata": {}, + "source": [ + "## CONTENTS\n", + "\n", + "* Overview\n", + "* Agent\n", + "* Environment\n", + "* Simple Agent and Environment\n", + "* Agents in a 2-D Environment\n", + "* Wumpus Environment\n", + "\n", + "## OVERVIEW\n", + "\n", + "An agent, as defined in 2.1, is anything that can perceive its environment through sensors, and act upon that environment through actuators based on its agent program. This can be a dog, a robot, or even you. As long as you can perceive the environment and act on it, you are an agent. This notebook will explain how to implement a simple agent, create an environment, and implement a program that helps the agent act on the environment based on its percepts.\n", + "\n", + "## AGENT\n", + "\n", + "Let us now see how we define an agent. Run the next cell to see how `Agent` is defined in agents module." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "f9baa84d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "
\n", + "class Agent(Thing):\n",
+ " """An Agent is a subclass of Thing with one required instance attribute \n",
+ " (aka slot), .program, which should hold a function that takes one argument,\n",
+ " the percept, and returns an action. (What counts as a percept or action \n",
+ " will depend on the specific environment in which the agent exists.)\n",
+ " Note that 'program' is a slot, not a method. If it were a method, then the\n",
+ " program could 'cheat' and look at aspects of the agent. It's not supposed\n",
+ " to do that: the program can only look at the percepts. An agent program\n",
+ " that needs a model of the world (and of the agent itself) will have to\n",
+ " build and maintain its own model. There is an optional slot, .performance,\n",
+ " which is a number giving the performance measure of the agent in its\n",
+ " environment."""\n",
+ "\n",
+ " def __init__(self, program=None):\n",
+ " self.alive = True\n",
+ " self.bump = False\n",
+ " self.holding = []\n",
+ " self.performance = 0\n",
+ " if program is None or not isinstance(program, collections.abc.Callable):\n",
+ " print("Can't find a valid program for {}, falling back to default.".format(self.__class__.__name__))\n",
+ "\n",
+ " def program(percept):\n",
+ " return eval(input('Percept={}; action? '.format(percept)))\n",
+ "\n",
+ " self.program = program\n",
+ "\n",
+ " def can_grab(self, thing):\n",
+ " """Return True if this agent can grab this thing.\n",
+ " Override for appropriate subclasses of Agent and Thing."""\n",
+ " return False\n",
+ "
class Environment:\n",
+ " """Abstract class representing an Environment. 'Real' Environment classes\n",
+ " inherit from this. Your Environment will typically need to implement:\n",
+ " percept: Define the percept that an agent sees.\n",
+ " execute_action: Define the effects of executing an action.\n",
+ " Also update the agent.performance slot.\n",
+ " The environment keeps a list of .things and .agents (which is a subset\n",
+ " of .things). Each agent has a .performance slot, initialized to 0.\n",
+ " Each thing has a .location slot, even though some environments may not\n",
+ " need this."""\n",
+ "\n",
+ " def __init__(self):\n",
+ " self.things = []\n",
+ " self.agents = []\n",
+ "\n",
+ " def thing_classes(self):\n",
+ " return [] # List of classes that can go into environment\n",
+ "\n",
+ " def percept(self, agent):\n",
+ " """Return the percept that the agent sees at this point. (Implement this.)"""\n",
+ " raise NotImplementedError\n",
+ "\n",
+ " def execute_action(self, agent, action):\n",
+ " """Change the world to reflect this action. (Implement this.)"""\n",
+ " raise NotImplementedError\n",
+ "\n",
+ " def default_location(self, thing):\n",
+ " """Default location to place a new thing with unspecified location."""\n",
+ " return None\n",
+ "\n",
+ " def exogenous_change(self):\n",
+ " """If there is spontaneous change in the world, override this."""\n",
+ " pass\n",
+ "\n",
+ " def is_done(self):\n",
+ " """By default, we're done when we can't find a live agent."""\n",
+ " return not any(agent.is_alive() for agent in self.agents)\n",
+ "\n",
+ " def step(self):\n",
+ " """Run the environment for one time step. If the\n",
+ " actions and exogenous changes are independent, this method will\n",
+ " do. If there are interactions between them, you'll need to\n",
+ " override this method."""\n",
+ " if not self.is_done():\n",
+ " actions = []\n",
+ " for agent in self.agents:\n",
+ " if agent.alive:\n",
+ " actions.append(agent.program(self.percept(agent)))\n",
+ " else:\n",
+ " actions.append("")\n",
+ " for (agent, action) in zip(self.agents, actions):\n",
+ " self.execute_action(agent, action)\n",
+ " self.exogenous_change()\n",
+ "\n",
+ " def run(self, steps=1000):\n",
+ " """Run the Environment for given number of time steps."""\n",
+ " for step in range(steps):\n",
+ " if self.is_done():\n",
+ " return\n",
+ " self.step()\n",
+ "\n",
+ " def list_things_at(self, location, tclass=Thing):\n",
+ " """Return all things exactly at a given location."""\n",
+ " if isinstance(location, numbers.Number):\n",
+ " return [thing for thing in self.things\n",
+ " if thing.location == location and isinstance(thing, tclass)]\n",
+ " return [thing for thing in self.things\n",
+ " if all(x == y for x, y in zip(thing.location, location)) and isinstance(thing, tclass)]\n",
+ "\n",
+ " def some_things_at(self, location, tclass=Thing):\n",
+ " """Return true if at least one of the things at location\n",
+ " is an instance of class tclass (or a subclass)."""\n",
+ " return self.list_things_at(location, tclass) != []\n",
+ "\n",
+ " def add_thing(self, thing, location=None):\n",
+ " """Add a thing to the environment, setting its location. For\n",
+ " convenience, if thing is an agent program we make a new agent\n",
+ " for it. (Shouldn't need to override this.)"""\n",
+ " if not isinstance(thing, Thing):\n",
+ " thing = Agent(thing)\n",
+ " if thing in self.things:\n",
+ " print("Can't add the same thing twice")\n",
+ " else:\n",
+ " thing.location = location if location is not None else self.default_location(thing)\n",
+ " self.things.append(thing)\n",
+ " if isinstance(thing, Agent):\n",
+ " thing.performance = 0\n",
+ " self.agents.append(thing)\n",
+ "\n",
+ " def delete_thing(self, thing):\n",
+ " """Remove a thing from the environment."""\n",
+ " try:\n",
+ " self.things.remove(thing)\n",
+ " except ValueError as e:\n",
+ " print(e)\n",
+ " print(" in Environment delete_thing")\n",
+ " print(" Thing to be removed: {} at {}".format(thing, thing.location))\n",
+ " print(" from list: {}".format([(thing, thing.location) for thing in self.things]))\n",
+ " if thing in self.agents:\n",
+ " self.agents.remove(thing)\n",
+ "
Percept: | \n", + "Feel Food | \n", + "Feel Water | \n", + "Feel Nothing | \n", + "
Action: | \n", + "eat | \n", + "drink | \n", + "move down | \n", + "
class TrivialVacuumEnvironment(Environment):\n",
+ " """This environment has two locations, A and B. Each can be Dirty\n",
+ " or Clean. The agent perceives its location and the location's\n",
+ " status. This serves as an example of how to implement a simple\n",
+ " Environment."""\n",
+ "\n",
+ " def __init__(self):\n",
+ " super().__init__()\n",
+ " self.status = {loc_A: random.choice(['Clean', 'Dirty']),\n",
+ " loc_B: random.choice(['Clean', 'Dirty'])}\n",
+ "\n",
+ " def thing_classes(self):\n",
+ " return [Wall, Dirt, ReflexVacuumAgent, RandomVacuumAgent, TableDrivenVacuumAgent, ModelBasedVacuumAgent]\n",
+ "\n",
+ " def percept(self, agent):\n",
+ " """Returns the agent's location, and the location status (Dirty/Clean)."""\n",
+ " return agent.location, self.status[agent.location]\n",
+ "\n",
+ " def execute_action(self, agent, action):\n",
+ " """Change agent's location and/or location's status; track performance.\n",
+ " Score 10 for each dirt cleaned; -1 for each move."""\n",
+ " if action == 'Right':\n",
+ " agent.location = loc_B\n",
+ " agent.performance -= 1\n",
+ " elif action == 'Left':\n",
+ " agent.location = loc_A\n",
+ " agent.performance -= 1\n",
+ " elif action == 'Suck':\n",
+ " if self.status[agent.location] == 'Dirty':\n",
+ " agent.performance += 10\n",
+ " self.status[agent.location] = 'Clean'\n",
+ "\n",
+ " def default_location(self, thing):\n",
+ " """Agents start in either location at random."""\n",
+ " return random.choice([loc_A, loc_B])\n",
+ "
def hill_climbing(problem):\n",
+ " """\n",
+ " [Figure 4.2]\n",
+ " From the initial node, keep choosing the neighbor with highest value,\n",
+ " stopping when no neighbor is better.\n",
+ " """\n",
+ " current = Node(problem.initial)\n",
+ " while True:\n",
+ " neighbors = current.expand(problem)\n",
+ " if not neighbors:\n",
+ " break\n",
+ " neighbor = argmax_random_tie(neighbors, key=lambda node: problem.value(node.state))\n",
+ " if problem.value(neighbor.state) <= problem.value(current.state):\n",
+ " break\n",
+ " current = neighbor\n",
+ " return current.state\n",
+ "
def simulated_annealing(problem, schedule=exp_schedule()):\n",
+ " """[Figure 4.5] CAUTION: This differs from the pseudocode as it\n",
+ " returns a state instead of a Node."""\n",
+ " current = Node(problem.initial)\n",
+ " for t in range(sys.maxsize):\n",
+ " T = schedule(t)\n",
+ " if T == 0:\n",
+ " return current.state\n",
+ " neighbors = current.expand(problem)\n",
+ " if not neighbors:\n",
+ " return current.state\n",
+ " next_choice = random.choice(neighbors)\n",
+ " delta_e = problem.value(next_choice.state) - problem.value(current.state)\n",
+ " if delta_e > 0 or probability(np.exp(delta_e / T)):\n",
+ " current = next_choice\n",
+ "
def exp_schedule(k=20, lam=0.005, limit=100):\n",
+ " """One possible schedule function for simulated annealing"""\n",
+ " return lambda t: (k * np.exp(-lam * t) if t < limit else 0)\n",
+ "
class PeakFindingProblem(Problem):\n",
+ " """Problem of finding the highest peak in a limited grid"""\n",
+ "\n",
+ " def __init__(self, initial, grid, defined_actions=directions4):\n",
+ " """The grid is a 2 dimensional array/list whose state is specified by tuple of indices"""\n",
+ " super().__init__(initial)\n",
+ " self.grid = grid\n",
+ " self.defined_actions = defined_actions\n",
+ " self.n = len(grid)\n",
+ " assert self.n > 0\n",
+ " self.m = len(grid[0])\n",
+ " assert self.m > 0\n",
+ "\n",
+ " def actions(self, state):\n",
+ " """Returns the list of actions which are allowed to be taken from the given state"""\n",
+ " allowed_actions = []\n",
+ " for action in self.defined_actions:\n",
+ " next_state = vector_add(state, self.defined_actions[action])\n",
+ " if 0 <= next_state[0] <= self.n - 1 and 0 <= next_state[1] <= self.m - 1:\n",
+ " allowed_actions.append(action)\n",
+ "\n",
+ " return allowed_actions\n",
+ "\n",
+ " def result(self, state, action):\n",
+ " """Moves in the direction specified by action"""\n",
+ " return vector_add(state, self.defined_actions[action])\n",
+ "\n",
+ " def value(self, state):\n",
+ " """Value of a state is the value it is the index to"""\n",
+ " x, y = state\n",
+ " assert 0 <= x < self.n\n",
+ " assert 0 <= y < self.m\n",
+ " return self.grid[x][y]\n",
+ "
def genetic_algorithm(population, fitness_fn, gene_pool=[0, 1], f_thres=None, ngen=1000, pmut=0.1):\n",
+ " """[Figure 4.8]"""\n",
+ " for i in range(ngen):\n",
+ " population = [mutate(recombine(*select(2, population, fitness_fn)), gene_pool, pmut)\n",
+ " for i in range(len(population))]\n",
+ "\n",
+ " fittest_individual = fitness_threshold(fitness_fn, f_thres, population)\n",
+ " if fittest_individual:\n",
+ " return fittest_individual\n",
+ "\n",
+ " return max(population, key=fitness_fn)\n",
+ "
def recombine(x, y):\n",
+ " n = len(x)\n",
+ " c = random.randrange(0, n)\n",
+ " return x[:c] + y[c:]\n",
+ "
def mutate(x, gene_pool, pmut):\n",
+ " if random.uniform(0, 1) >= pmut:\n",
+ " return x\n",
+ "\n",
+ " n = len(x)\n",
+ " g = len(gene_pool)\n",
+ " c = random.randrange(0, n)\n",
+ " r = random.randrange(0, g)\n",
+ "\n",
+ " new_gene = gene_pool[r]\n",
+ " return x[:c] + [new_gene] + x[c + 1:]\n",
+ "
def init_population(pop_number, gene_pool, state_length):\n",
+ " """Initializes population for genetic algorithm\n",
+ " pop_number : Number of individuals in population\n",
+ " gene_pool : List of possible values for individuals\n",
+ " state_length: The length of each individual"""\n",
+ " g = len(gene_pool)\n",
+ " population = []\n",
+ " for i in range(pop_number):\n",
+ " new_individual = [gene_pool[random.randrange(0, g)] for j in range(state_length)]\n",
+ " population.append(new_individual)\n",
+ "\n",
+ " return population\n",
+ "
def tt_check_all(kb, alpha, symbols, model):\n",
+ " """Auxiliary routine to implement tt_entails."""\n",
+ " print(f"model:{model}")\n",
+ " if not symbols:\n",
+ " if pl_true(kb, model):\n",
+ " result = pl_true(alpha, model)\n",
+ " print(f"When KB is True, alpha is {result}")\n",
+ " assert result in (True, False)\n",
+ " return result\n",
+ " else:\n",
+ " print("When KB is False, return True")\n",
+ " return True\n",
+ " else:\n",
+ " P, rest = symbols[0], symbols[1:]\n",
+ " return (tt_check_all(kb, alpha, rest, extend(model, P, True)) and\n",
+ " tt_check_all(kb, alpha, rest, extend(model, P, False)))\n",
+ "
def tt_entails(kb, alpha):\n",
+ " """\n",
+ " [Figure 7.10]\n",
+ " Does kb entail the sentence alpha? Use truth tables. For propositional\n",
+ " kb's and sentences. Note that the 'kb' should be an Expr which is a\n",
+ " conjunction of clauses.\n",
+ " >>> tt_entails(expr('P & Q'), expr('Q'))\n",
+ " True\n",
+ " """\n",
+ " assert not variables(alpha)\n",
+ " print(f"KB: {kb}")\n",
+ " print(f"alpha: {alpha}")\n",
+ " symbols = list(prop_symbols(kb & alpha))\n",
+ " print(f"symbols: {symbols}")\n",
+ " return tt_check_all(kb, alpha, symbols, {})\n",
+ "
def to_cnf(s):\n",
+ " """\n",
+ " [Page 253]\n",
+ " Convert a propositional logical sentence to conjunctive normal form.\n",
+ " That is, to the form ((A | ~B | ...) & (B | C | ...) & ...)\n",
+ " >>> to_cnf('~(B | C)')\n",
+ " (~B & ~C)\n",
+ " """\n",
+ " s = expr(s)\n",
+ " if isinstance(s, str):\n",
+ " s = expr(s)\n",
+ " s = eliminate_implications(s) # Steps 1, 2 from p. 253\n",
+ " s = move_not_inwards(s) # Step 3\n",
+ " return distribute_and_over_or(s) # Step 4\n",
+ "
def eliminate_implications(s):\n",
+ " """Change implications into equivalent form with only &, |, and ~ as logical operators."""\n",
+ " s = expr(s)\n",
+ " if not s.args or is_symbol(s.op):\n",
+ " return s # Atoms are unchanged.\n",
+ " args = list(map(eliminate_implications, s.args))\n",
+ " a, b = args[0], args[-1]\n",
+ " if s.op == '==>':\n",
+ " return b | ~a\n",
+ " elif s.op == '<==':\n",
+ " return a | ~b\n",
+ " elif s.op == '<=>':\n",
+ " return (a | ~b) & (b | ~a)\n",
+ " elif s.op == '^':\n",
+ " assert len(args) == 2 # TODO: relax this restriction\n",
+ " return (a & ~b) | (~a & b)\n",
+ " else:\n",
+ " assert s.op in ('&', '|', '~')\n",
+ " return Expr(s.op, *args)\n",
+ "
def move_not_inwards(s):\n",
+ " """Rewrite sentence s by moving negation sign inward.\n",
+ " >>> move_not_inwards(~(A | B))\n",
+ " (~A & ~B)\n",
+ " """\n",
+ " s = expr(s)\n",
+ " if s.op == '~':\n",
+ " def NOT(b):\n",
+ " return move_not_inwards(~b)\n",
+ "\n",
+ " a = s.args[0]\n",
+ " if a.op == '~':\n",
+ " return move_not_inwards(a.args[0]) # ~~A ==> A\n",
+ " if a.op == '&':\n",
+ " return associate('|', list(map(NOT, a.args)))\n",
+ " if a.op == '|':\n",
+ " return associate('&', list(map(NOT, a.args)))\n",
+ " return s\n",
+ " elif is_symbol(s.op) or not s.args:\n",
+ " return s\n",
+ " else:\n",
+ " return Expr(s.op, *list(map(move_not_inwards, s.args)))\n",
+ "
def distribute_and_over_or(s):\n",
+ " """Given a sentence s consisting of conjunctions and disjunctions\n",
+ " of literals, return an equivalent sentence in CNF.\n",
+ " >>> distribute_and_over_or((A & B) | C)\n",
+ " ((A | C) & (B | C))\n",
+ " """\n",
+ " s = expr(s)\n",
+ " if s.op == '|':\n",
+ " s = associate('|', s.args)\n",
+ " if s.op != '|':\n",
+ " return distribute_and_over_or(s)\n",
+ " if len(s.args) == 0:\n",
+ " return False\n",
+ " if len(s.args) == 1:\n",
+ " return distribute_and_over_or(s.args[0])\n",
+ " conj = first(arg for arg in s.args if arg.op == '&')\n",
+ " if not conj:\n",
+ " return s\n",
+ " others = [a for a in s.args if a is not conj]\n",
+ " rest = associate('|', others)\n",
+ " return associate('&', [distribute_and_over_or(c | rest)\n",
+ " for c in conj.args])\n",
+ " elif s.op == '&':\n",
+ " return associate('&', list(map(distribute_and_over_or, s.args)))\n",
+ " else:\n",
+ " return s\n",
+ "
def pl_resolution(kb, alpha):\n",
+ " """\n",
+ " [Figure 7.12]\n",
+ " Propositional-logic resolution: say if alpha follows from KB.\n",
+ " >>> pl_resolution(horn_clauses_KB, A)\n",
+ " True\n",
+ " """\n",
+ " clauses = kb.clauses + conjuncts(to_cnf(~alpha))\n",
+ " new = set()\n",
+ " while True:\n",
+ " n = len(clauses)\n",
+ " pairs = [(clauses[i], clauses[j])\n",
+ " for i in range(n) for j in range(i + 1, n)]\n",
+ " for (ci, cj) in pairs:\n",
+ " resolvents = pl_resolve(ci, cj)\n",
+ " if False in resolvents:\n",
+ " return True\n",
+ " new = new.union(set(resolvents))\n",
+ " if new.issubset(set(clauses)):\n",
+ " return False\n",
+ " for c in new:\n",
+ " if c not in clauses:\n",
+ " clauses.append(c)\n",
+ "
def clauses_with_premise(self, p):\n",
+ " """Return a list of the clauses in KB that have p in their premise.\n",
+ " This could be cached away for O(1) speed, but we'll recompute it."""\n",
+ " return [c for c in self.clauses if c.op == '==>' and p in conjuncts(c.args[0])]\n",
+ "
def pl_fc_entails(kb, q):\n",
+ " """\n",
+ " [Figure 7.15]\n",
+ " Use forward chaining to see if a PropDefiniteKB entails symbol q.\n",
+ " >>> pl_fc_entails(horn_clauses_KB, expr('Q'))\n",
+ " True\n",
+ " """\n",
+ " count = {c: len(conjuncts(c.args[0])) for c in kb.clauses if c.op == '==>'}\n",
+ " inferred = defaultdict(bool)\n",
+ " agenda = [s for s in kb.clauses if is_prop_symbol(s.op)]\n",
+ " while agenda:\n",
+ " print(f"queue:{agenda}")\n",
+ " p = agenda.pop()\n",
+ " print(f"{p} poped from queue {agenda}")\n",
+ " if p == q:\n",
+ " print(f"{p} is the same with query, return True")\n",
+ " return True\n",
+ " if not inferred[p]:\n",
+ " inferred[p] = True\n",
+ " for c in kb.clauses_with_premise(p):\n",
+ " count[c] -= 1\n",
+ " if count[c] == 0:\n",
+ " print(f"premises in clauses: {c} are all true")\n",
+ " print(f"conclusion {c.args[1]} are added to queue")\n",
+ " agenda.append(c.args[1])\n",
+ " return False\n",
+ "
def dpll(clauses, symbols, model, branching_heuristic=no_branching_heuristic):\n",
+ " """See if the clauses are true in a partial model."""\n",
+ " print(f"clauses: {clauses}")\n",
+ " print(f"symbols: {symbols}")\n",
+ " print(f"model: {model}")\n",
+ " unknown_clauses = [] # clauses with an unknown truth value\n",
+ " for c in clauses:\n",
+ " print(f"check pl_true of clause {c} with model {model}")\n",
+ " val = pl_true(c, model)\n",
+ " print(f"Check Results: {val}")\n",
+ " if val is False:\n",
+ " return False\n",
+ " if val is None:\n",
+ " unknown_clauses.append(c)\n",
+ " if not unknown_clauses:\n",
+ " return model\n",
+ " print(f"Finding pure symbol with symbols {symbols} and clauses {unknown_clauses}")\n",
+ " P, value = find_pure_symbol(symbols, unknown_clauses)\n",
+ " print(f"Pure symbol results P: {P} value: {value}")\n",
+ " if P:\n",
+ " return dpll(clauses, remove_all(P, symbols), extend(model, P, value), branching_heuristic)\n",
+ " P, value = find_unit_clause(clauses, model)\n",
+ " if P:\n",
+ " return dpll(clauses, remove_all(P, symbols), extend(model, P, value), branching_heuristic)\n",
+ " P, value = branching_heuristic(symbols, unknown_clauses)\n",
+ " return (dpll(clauses, remove_all(P, symbols), extend(model, P, value), branching_heuristic) or\n",
+ " dpll(clauses, remove_all(P, symbols), extend(model, P, not value), branching_heuristic))\n",
+ "
def dpll_satisfiable(s, branching_heuristic=no_branching_heuristic):\n",
+ " """Check satisfiability of a propositional sentence.\n",
+ " This differs from the book code in two ways: (1) it returns a model\n",
+ " rather than True when it succeeds; this is more useful. (2) The\n",
+ " function find_pure_symbol is passed a list of unknown clauses, rather\n",
+ " than a list of all clauses and the model; this is more efficient.\n",
+ " >>> dpll_satisfiable(A |'<=>'| B) == {A: True, B: True}\n",
+ " True\n",
+ " """\n",
+ " return dpll(conjuncts(to_cnf(s)), prop_symbols(s), {}, branching_heuristic)\n",
+ "
def WalkSAT(clauses, p=0.5, max_flips=10000):\n",
+ " """Checks for satisfiability of all clauses by randomly flipping values of variables\n",
+ " >>> WalkSAT([A & ~A], 0.5, 100) is None\n",
+ " True\n",
+ " """\n",
+ " # Set of all symbols in all clauses\n",
+ " symbols = {sym for clause in clauses for sym in prop_symbols(clause)}\n",
+ " # model is a random assignment of true/false to the symbols in clauses\n",
+ " model = {s: random.choice([True, False]) for s in symbols}\n",
+ " for i in range(max_flips):\n",
+ " satisfied, unsatisfied = [], []\n",
+ " for clause in clauses:\n",
+ " (satisfied if pl_true(clause, model) else unsatisfied).append(clause)\n",
+ " if not unsatisfied: # if model satisfies all the clauses\n",
+ " return model\n",
+ " clause = random.choice(unsatisfied)\n",
+ " if probability(p):\n",
+ " sym = random.choice(list(prop_symbols(clause)))\n",
+ " else:\n",
+ " # Flip the symbol in clause that maximizes number of sat. clauses\n",
+ " def sat_count(sym):\n",
+ " # Return the the number of clauses satisfied after flipping the symbol.\n",
+ " model[sym] = not model[sym]\n",
+ " count = len([clause for clause in clauses if pl_true(clause, model)])\n",
+ " model[sym] = not model[sym]\n",
+ " return count\n",
+ "\n",
+ " sym = max(prop_symbols(clause), key=sat_count)\n",
+ " model[sym] = not model[sym]\n",
+ " # If no solution is found within the flip limit, we return failure\n",
+ " return None\n",
+ "
def SAT_plan(init, transition, goal, t_max, SAT_solver=cdcl_satisfiable):\n",
+ " """\n",
+ " [Figure 7.22]\n",
+ " Converts a planning problem to Satisfaction problem by translating it to a cnf sentence.\n",
+ " >>> transition = {'A': {'Left': 'A', 'Right': 'B'}, 'B': {'Left': 'A', 'Right': 'C'}, 'C': {'Left': 'B', 'Right': 'C'}}\n",
+ " >>> SAT_plan('A', transition, 'C', 1) is None\n",
+ " True\n",
+ " """\n",
+ "\n",
+ " # Functions used by SAT_plan\n",
+ " def translate_to_SAT(init, transition, goal, time):\n",
+ " clauses = []\n",
+ " states = [state for state in transition]\n",
+ "\n",
+ " # Symbol claiming state s at time t\n",
+ " state_counter = itertools.count()\n",
+ " for s in states:\n",
+ " for t in range(time + 1):\n",
+ " state_sym[s, t] = Expr('S_{}'.format(next(state_counter)))\n",
+ "\n",
+ " # Add initial state axiom\n",
+ " clauses.append(state_sym[init, 0])\n",
+ "\n",
+ " # Add goal state axiom\n",
+ " clauses.append(state_sym[first(clause[0] for clause in state_sym\n",
+ " if set(conjuncts(clause[0])).issuperset(conjuncts(goal))), time]) \\\n",
+ " if isinstance(goal, Expr) else clauses.append(state_sym[goal, time])\n",
+ "\n",
+ " # All possible transitions\n",
+ " transition_counter = itertools.count()\n",
+ " for s in states:\n",
+ " for action in transition[s]:\n",
+ " s_ = transition[s][action]\n",
+ " for t in range(time):\n",
+ " # Action 'action' taken from state 's' at time 't' to reach 's_'\n",
+ " action_sym[s, action, t] = Expr('T_{}'.format(next(transition_counter)))\n",
+ "\n",
+ " # Change the state from s to s_\n",
+ " clauses.append(action_sym[s, action, t] | '==>' | state_sym[s, t])\n",
+ " clauses.append(action_sym[s, action, t] | '==>' | state_sym[s_, t + 1])\n",
+ "\n",
+ " # Allow only one state at any time\n",
+ " for t in range(time + 1):\n",
+ " # must be a state at any time\n",
+ " clauses.append(associate('|', [state_sym[s, t] for s in states]))\n",
+ "\n",
+ " for s in states:\n",
+ " for s_ in states[states.index(s) + 1:]:\n",
+ " # for each pair of states s, s_ only one is possible at time t\n",
+ " clauses.append((~state_sym[s, t]) | (~state_sym[s_, t]))\n",
+ "\n",
+ " # Restrict to one transition per timestep\n",
+ " for t in range(time):\n",
+ " # list of possible transitions at time t\n",
+ " transitions_t = [tr for tr in action_sym if tr[2] == t]\n",
+ "\n",
+ " # make sure at least one of the transitions happens\n",
+ " clauses.append(associate('|', [action_sym[tr] for tr in transitions_t]))\n",
+ "\n",
+ " for tr in transitions_t:\n",
+ " for tr_ in transitions_t[transitions_t.index(tr) + 1:]:\n",
+ " # there cannot be two transitions tr and tr_ at time t\n",
+ " clauses.append(~action_sym[tr] | ~action_sym[tr_])\n",
+ "\n",
+ " # Combine the clauses to form the cnf\n",
+ " return associate('&', clauses)\n",
+ "\n",
+ " def extract_solution(model):\n",
+ " true_transitions = [t for t in action_sym if model[action_sym[t]]]\n",
+ " # Sort transitions based on time, which is the 3rd element of the tuple\n",
+ " true_transitions.sort(key=lambda x: x[2])\n",
+ " return [action for s, action, time in true_transitions]\n",
+ "\n",
+ " # Body of SAT_plan algorithm\n",
+ " for t in range(t_max + 1):\n",
+ " # dictionaries to help extract the solution from model\n",
+ " state_sym = {}\n",
+ " action_sym = {}\n",
+ "\n",
+ " cnf = translate_to_SAT(init, transition, goal, t)\n",
+ " model = SAT_solver(cnf)\n",
+ " if model is not False:\n",
+ " return extract_solution(model)\n",
+ " return None\n",
+ "
def subst(s, x):\n",
+ " """Substitute the substitution s into the expression x.\n",
+ " >>> subst({x: 42, y:0}, F(x) + y)\n",
+ " (F(42) + 0)\n",
+ " """\n",
+ " if isinstance(x, list):\n",
+ " return [subst(s, xi) for xi in x]\n",
+ " elif isinstance(x, tuple):\n",
+ " return tuple([subst(s, xi) for xi in x])\n",
+ " elif not isinstance(x, Expr):\n",
+ " return x\n",
+ " elif is_var_symbol(x.op):\n",
+ " return s.get(x, x)\n",
+ " else:\n",
+ " return Expr(x.op, *[subst(s, arg) for arg in x.args])\n",
+ "
def fol_fc_ask(kb, alpha):\n",
+ " """\n",
+ " [Figure 9.3]\n",
+ " A simple forward-chaining algorithm.\n",
+ " """\n",
+ " # TODO: improve efficiency\n",
+ " kb_consts = list({c for clause in kb.clauses for c in constant_symbols(clause)})\n",
+ "\n",
+ " def enum_subst(p):\n",
+ " query_vars = list({v for clause in p for v in variables(clause)})\n",
+ " for assignment_list in itertools.product(kb_consts, repeat=len(query_vars)):\n",
+ " theta = {x: y for x, y in zip(query_vars, assignment_list)}\n",
+ " yield theta\n",
+ "\n",
+ " # check if we can answer without new inferences\n",
+ " for q in kb.clauses:\n",
+ " phi = unify_mm(q, alpha)\n",
+ " if phi is not None:\n",
+ " yield phi\n",
+ "\n",
+ " while True:\n",
+ " new = []\n",
+ " for rule in kb.clauses:\n",
+ " p, q = parse_definite_clause(rule)\n",
+ " for theta in enum_subst(p):\n",
+ " if set(subst(theta, p)).issubset(set(kb.clauses)):\n",
+ " q_ = subst(theta, q)\n",
+ " if all([unify_mm(x, q_) is None for x in kb.clauses + new]):\n",
+ " new.append(q_)\n",
+ " phi = unify_mm(q_, alpha)\n",
+ " if phi is not None:\n",
+ " yield phi\n",
+ " if not new:\n",
+ " break\n",
+ " for clause in new:\n",
+ " kb.tell(clause)\n",
+ " return None\n",
+ "
def fol_bc_or(kb, goal, theta):\n",
+ " print(f"Starting Back Or with (KB, Goal: {goal}, theta: {theta})")\n",
+ " for rule in kb.fetch_rules_for_goal(goal):\n",
+ " print(f"Processing Rule: {rule}")\n",
+ " lhs, rhs = parse_definite_clause(standardize_variables(rule))\n",
+ " print(f"lhs: {lhs}, rhs: {rhs}")\n",
+ " for theta1 in fol_bc_and(kb, lhs, unify_mm(rhs, goal, theta)):\n",
+ " yield theta1\n",
+ "
def fol_bc_and(kb, goals, theta):\n",
+ " print(f"Starting Back And with (KB, Goals: {goals}, theta: {theta})")\n",
+ " if theta is None:\n",
+ " print(f"theta is None, pass")\n",
+ " pass\n",
+ " elif not goals:\n",
+ " print(f"Length(goals)=0, yield theta: {theta}")\n",
+ " yield theta\n",
+ " else:\n",
+ " first, rest = goals[0], goals[1:]\n",
+ " print(f"first: {first} for Back Or, rest: {rest} for Back And")\n",
+ " for theta1 in fol_bc_or(kb, subst(theta, first), theta):\n",
+ " for theta2 in fol_bc_and(kb, rest, theta1):\n",
+ " yield theta2\n",
+ "
def refinements(self, library): # refinements may be (multiple) HLA themselves ...\n",
+ " """\n",
+ " State is a Problem, containing the current state kb library is a\n",
+ " dictionary containing details for every possible refinement. e.g.:\n",
+ " {\n",
+ " 'HLA': [\n",
+ " 'Go(Home, SFO)',\n",
+ " 'Go(Home, SFO)',\n",
+ " 'Drive(Home, SFOLongTermParking)',\n",
+ " 'Shuttle(SFOLongTermParking, SFO)',\n",
+ " 'Taxi(Home, SFO)'\n",
+ " ],\n",
+ " 'steps': [\n",
+ " ['Drive(Home, SFOLongTermParking)', 'Shuttle(SFOLongTermParking, SFO)'],\n",
+ " ['Taxi(Home, SFO)'],\n",
+ " [],\n",
+ " [],\n",
+ " []\n",
+ " ],\n",
+ " # empty refinements indicate a primitive action\n",
+ " 'precond': [\n",
+ " ['At(Home) & Have(Car)'],\n",
+ " ['At(Home)'],\n",
+ " ['At(Home) & Have(Car)'],\n",
+ " ['At(SFOLongTermParking)'],\n",
+ " ['At(Home)']\n",
+ " ],\n",
+ " 'effect': [\n",
+ " ['At(SFO) & ~At(Home)'],\n",
+ " ['At(SFO) & ~At(Home)'],\n",
+ " ['At(SFOLongTermParking) & ~At(Home)'],\n",
+ " ['At(SFO) & ~At(SFOLongTermParking)'],\n",
+ " ['At(SFO) & ~At(Home)']\n",
+ " ]}\n",
+ " """\n",
+ "# print(self.name)\n",
+ " indices = [i for i, x in enumerate(library['HLA']) if expr(x).op == self.name]\n",
+ "# print(f"indices: {indices}")\n",
+ " for i in indices:\n",
+ " actions = []\n",
+ " for j in range(len(library['steps'][i])):\n",
+ " # find the index of the step [j] of the HLA\n",
+ " index_step = [k for k, x in enumerate(library['HLA']) if x == library['steps'][i][j]][0]\n",
+ " precond = library['precond'][index_step][0] # preconditions of step [j]\n",
+ " effect = library['effect'][index_step][0] # effect of step [j]\n",
+ " actions.append(HLA(library['steps'][i][j], precond, effect))\n",
+ " yield actions\n",
+ "
def hierarchical_search(self, hierarchy):\n",
+ " """\n",
+ " [Figure 11.5]\n",
+ " 'Hierarchical Search, a Breadth First Search implementation of Hierarchical\n",
+ " Forward Planning Search'\n",
+ " The problem is a real-world problem defined by the problem class, and the hierarchy is\n",
+ " a dictionary of HLA - refinements (see refinements generator for details)\n",
+ " """\n",
+ " print(f"hiearchy: {hierarchy}")\n",
+ " act = Node(self.initial, None, [self.actions[0]])\n",
+ " frontier = deque()\n",
+ " frontier.append(act)\n",
+ " print(f"frontier: {[x.__dict__ for x in frontier]}")\n",
+ " while True:\n",
+ " if not frontier:\n",
+ " return None\n",
+ " plan = frontier.popleft()\n",
+ " print(f"Plan {plan.action} poped from frontier: {[x.__dict__ for x in frontier]}")\n",
+ " # finds the first non primitive hla in plan actions\n",
+ " (hla, index) = RealWorldPlanningProblem.find_hla(plan, hierarchy)\n",
+ " print(f"Find HLA, get hla: {hla} and index: {index}")\n",
+ " prefix = plan.action[:index]\n",
+ " print(f"prefix: {prefix}")\n",
+ " outcome = RealWorldPlanningProblem(\n",
+ " RealWorldPlanningProblem.result(self.initial, prefix), self.goals, self.actions)\n",
+ " print(f"Outcome: {outcome.initial}")\n",
+ " suffix = plan.action[index + 1:]\n",
+ " print(f"suffix: {suffix}")\n",
+ " if not hla: # hla is None and plan is primitive\n",
+ " if outcome.goal_test():\n",
+ " print(f"Outcome: {outcome.initial} Goal Test Success, return Plan:{ 1}")\n",
+ " return plan.action\n",
+ " else:\n",
+ " for sequence in RealWorldPlanningProblem.refinements(hla, hierarchy): # find refinements\n",
+ " print(f"Refinement(hla: {hla}) got sequence: {sequence}")\n",
+ " frontier.append(Node(outcome.initial, plan, prefix + sequence + suffix))\n",
+ " print(f"frontier Added: {frontier[-1].__dict__}")\n",
+ "
def angelic_search(self, hierarchy, initial_plan):\n",
+ " """\n",
+ " [Figure 11.8]\n",
+ " A hierarchical planning algorithm that uses angelic semantics to identify and\n",
+ " commit to high-level plans that work while avoiding high-level plans that don’t.\n",
+ " The predicate MAKING-PROGRESS checks to make sure that we aren’t stuck in an infinite regression\n",
+ " of refinements.\n",
+ " At top level, call ANGELIC-SEARCH with [Act] as the initialPlan.\n",
+ "\n",
+ " InitialPlan contains a sequence of HLA's with angelic semantics\n",
+ "\n",
+ " The possible effects of an angelic HLA in initialPlan are:\n",
+ " ~ : effect remove\n",
+ " $+: effect possibly add\n",
+ " $-: effect possibly remove\n",
+ " $$: possibly add or remove\n",
+ " """\n",
+ " frontier = deque(initial_plan)\n",
+ " while True:\n",
+ " if not frontier:\n",
+ " return None\n",
+ " plan = frontier.popleft() # sequence of HLA/Angelic HLA's\n",
+ " print('\\n')\n",
+ " print(f"Plan {plan.action} poped from frontier: {[x.__dict__ for x in frontier]}")\n",
+ " opt_reachable_set = RealWorldPlanningProblem.reach_opt(self.initial, plan)\n",
+ " print(f"Opt Reachable Set: {opt_reachable_set}")\n",
+ " pes_reachable_set = RealWorldPlanningProblem.reach_pes(self.initial, plan)\n",
+ " print(f"Pes Reachable Set: {pes_reachable_set}")\n",
+ " if self.intersects_goal(opt_reachable_set):\n",
+ " print(f"Opt Reachable Set intersects with Goal.")\n",
+ " if RealWorldPlanningProblem.is_primitive(plan, hierarchy):\n",
+ " print(f"Plan is premitive, return plan")\n",
+ " return [x for x in plan.action]\n",
+ " guaranteed = self.intersects_goal(pes_reachable_set)\n",
+ " print(f"Pes Reachable Set intersects with Goal, get Guaranteed: {guaranteed}")\n",
+ " if guaranteed and RealWorldPlanningProblem.making_progress(plan, initial_plan):\n",
+ " print(f"guaranteed: {guaranteed} not empty and Making-Progress")\n",
+ " final_state = guaranteed[0] # any element of guaranteed\n",
+ " print(f"final_state: {final_state}")\n",
+ " return RealWorldPlanningProblem.decompose(hierarchy, plan, final_state, pes_reachable_set)\n",
+ " # there should be at least one HLA/AngelicHLA, otherwise plan would be primitive\n",
+ " hla, index = RealWorldPlanningProblem.find_hla(plan, hierarchy)\n",
+ " print(f"Find HLA: {hla} in plan: {plan}")\n",
+ " prefix = plan.action[:index]\n",
+ " print(f"Prefix: {prefix}")\n",
+ " suffix = plan.action[index + 1:]\n",
+ " print(f"suffix: {suffix}")\n",
+ " outcome = RealWorldPlanningProblem(\n",
+ " RealWorldPlanningProblem.result(self.initial, prefix), self.goals, self.actions)\n",
+ " print(f"Outcome: {outcome.initial}")\n",
+ " for sequence in RealWorldPlanningProblem.refinements(hla, hierarchy): # find refinements\n",
+ " print(f"Refinement(hla: {hla}) got sequence: {sequence}")\n",
+ " frontier.append(\n",
+ " AngelicNode(outcome.initial, plan, prefix + sequence + suffix, prefix + sequence + suffix))\n",
+ " print(f"frontier Added: {frontier[-1].__dict__}")\n",
+ "
def decompose(hierarchy, plan, s_f, reachable_set):\n",
+ " solution = []\n",
+ " i = max(reachable_set.keys())\n",
+ " print(f"Running Decompose with hierarchy, plan: {plan}, final_state: {s_f}, reachable_set: {reachable_set}")\n",
+ " while plan.action_pes:\n",
+ " action = plan.action_pes.pop()\n",
+ " print(f"Pop action: {action} from Plan: {plan.action_pes}")\n",
+ " if i == 0:\n",
+ " return solution\n",
+ " s_i = RealWorldPlanningProblem.find_previous_state(\n",
+ " s_f, reachable_set, i, action)\n",
+ " print(f"Find Previous state: {s_i}")\n",
+ " problem = RealWorldPlanningProblem(s_i, s_f, plan.action)\n",
+ " print(f"Define problem with initial s_i: {s_i} and goal s_f: {s_f}")\n",
+ " angelic_call = RealWorldPlanningProblem.angelic_search(\n",
+ " problem, hierarchy, [AngelicNode(s_i, Node(None), [action], [action])])\n",
+ " print(f"Run Angelic Search, get {angelic_call}")\n",
+ " if angelic_call:\n",
+ " for x in angelic_call:\n",
+ " solution.insert(0, x)\n",
+ " else:\n",
+ " return None\n",
+ " s_f = s_i\n",
+ " i -= 1\n",
+ " return solution\n",
+ "
class ProbDist:\n",
+ " """A discrete probability distribution. You name the random variable\n",
+ " in the constructor, then assign and query probability of values.\n",
+ " >>> P = ProbDist('Flip'); P['H'], P['T'] = 0.25, 0.75; P['H']\n",
+ " 0.25\n",
+ " >>> P = ProbDist('X', {'lo': 125, 'med': 375, 'hi': 500})\n",
+ " >>> P['lo'], P['med'], P['hi']\n",
+ " (0.125, 0.375, 0.5)\n",
+ " """\n",
+ "\n",
+ " def __init__(self, var_name='?', freq=None):\n",
+ " """If freq is given, it is a dictionary of values - frequency pairs,\n",
+ " then ProbDist is normalized."""\n",
+ " self.prob = {}\n",
+ " self.var_name = var_name\n",
+ " self.values = []\n",
+ " if freq:\n",
+ " for (v, p) in freq.items():\n",
+ " self[v] = p\n",
+ " self.normalize()\n",
+ "\n",
+ " def __getitem__(self, val):\n",
+ " """Given a value, return P(value)."""\n",
+ " try:\n",
+ " return self.prob[val]\n",
+ " except KeyError:\n",
+ " return 0\n",
+ "\n",
+ " def __setitem__(self, val, p):\n",
+ " """Set P(val) = p."""\n",
+ " if val not in self.values:\n",
+ " self.values.append(val)\n",
+ " self.prob[val] = p\n",
+ "\n",
+ " def normalize(self):\n",
+ " """Make sure the probabilities of all values sum to 1.\n",
+ " Returns the normalized distribution.\n",
+ " Raises a ZeroDivisionError if the sum of the values is 0."""\n",
+ " total = sum(self.prob.values())\n",
+ " if not np.isclose(total, 1.0):\n",
+ " for val in self.prob:\n",
+ " self.prob[val] /= total\n",
+ " return self\n",
+ "\n",
+ " def show_approx(self, numfmt='{:.3g}'):\n",
+ " """Show the probabilities rounded and sorted by key, for the\n",
+ " sake of portable doctests."""\n",
+ " return ', '.join([('{}: ' + numfmt).format(v, p) for (v, p) in sorted(self.prob.items())])\n",
+ "\n",
+ " def __repr__(self):\n",
+ " return "P({})".format(self.var_name)\n",
+ "
class JointProbDist(ProbDist):\n",
+ " """A discrete probability distribute over a set of variables.\n",
+ " >>> P = JointProbDist(['X', 'Y']); P[1, 1] = 0.25\n",
+ " >>> P[1, 1]\n",
+ " 0.25\n",
+ " >>> P[dict(X=0, Y=1)] = 0.5\n",
+ " >>> P[dict(X=0, Y=1)]\n",
+ " 0.5"""\n",
+ "\n",
+ " def __init__(self, variables):\n",
+ " self.prob = {}\n",
+ " self.variables = variables\n",
+ " self.vals = defaultdict(list)\n",
+ "\n",
+ " def __getitem__(self, values):\n",
+ " """Given a tuple or dict of values, return P(values)."""\n",
+ " values = event_values(values, self.variables)\n",
+ " return ProbDist.__getitem__(self, values)\n",
+ "\n",
+ " def __setitem__(self, values, p):\n",
+ " """Set P(values) = p. Values can be a tuple or a dict; it must\n",
+ " have a value for each of the variables in the joint. Also keep track\n",
+ " of the values we have seen so far for each variable."""\n",
+ " values = event_values(values, self.variables)\n",
+ " self.prob[values] = p\n",
+ " for var, val in zip(self.variables, values):\n",
+ " if val not in self.vals[var]:\n",
+ " self.vals[var].append(val)\n",
+ "\n",
+ " def values(self, var):\n",
+ " """Return the set of possible values for a variable."""\n",
+ " return self.vals[var]\n",
+ "\n",
+ " def __repr__(self):\n",
+ " return "P({})".format(self.variables)\n",
+ "
def enumerate_joint(variables, e, P):\n",
+ " """Return the sum of those entries in P consistent with e,\n",
+ " provided variables is P's remaining variables (the ones not in e)."""\n",
+ " if not variables:\n",
+ " return P[e]\n",
+ " Y, rest = variables[0], variables[1:]\n",
+ " return sum([enumerate_joint(rest, extend(e, Y, y), P) for y in P.values(Y)])\n",
+ "
def enumerate_joint_ask(X, e, P):\n",
+ " """\n",
+ " [Section 13.3]\n",
+ " Return a probability distribution over the values of the variable X,\n",
+ " given the {var:val} observations e, in the JointProbDist P.\n",
+ " >>> P = JointProbDist(['X', 'Y'])\n",
+ " >>> P[0,0] = 0.25; P[0,1] = 0.5; P[1,1] = P[2,1] = 0.125\n",
+ " >>> enumerate_joint_ask('X', dict(Y=1), P).show_approx()\n",
+ " '0: 0.667, 1: 0.167, 2: 0.167'\n",
+ " """\n",
+ " assert X not in e, "Query variable must be distinct from evidence"\n",
+ " Q = ProbDist(X) # probability distribution for X, initially empty\n",
+ " Y = [v for v in P.variables if v != X and v not in e] # hidden variables.\n",
+ " for xi in P.values(X):\n",
+ " Q[xi] = enumerate_joint(Y, extend(e, X, xi), P)\n",
+ " return Q.normalize()\n",
+ "
class BayesNode:\n",
+ " """A conditional probability distribution for a boolean variable,\n",
+ " P(X | parents). Part of a BayesNet."""\n",
+ "\n",
+ " def __init__(self, X, parents, cpt):\n",
+ " """X is a variable name, and parents a sequence of variable\n",
+ " names or a space-separated string. cpt, the conditional\n",
+ " probability table, takes one of these forms:\n",
+ "\n",
+ " * A number, the unconditional probability P(X=true). You can\n",
+ " use this form when there are no parents.\n",
+ "\n",
+ " * A dict {v: p, ...}, the conditional probability distribution\n",
+ " P(X=true | parent=v) = p. When there's just one parent.\n",
+ "\n",
+ " * A dict {(v1, v2, ...): p, ...}, the distribution P(X=true |\n",
+ " parent1=v1, parent2=v2, ...) = p. Each key must have as many\n",
+ " values as there are parents. You can use this form always;\n",
+ " the first two are just conveniences.\n",
+ "\n",
+ " In all cases the probability of X being false is left implicit,\n",
+ " since it follows from P(X=true).\n",
+ "\n",
+ " >>> X = BayesNode('X', '', 0.2)\n",
+ " >>> Y = BayesNode('Y', 'P', {T: 0.2, F: 0.7})\n",
+ " >>> Z = BayesNode('Z', 'P Q',\n",
+ " ... {(T, T): 0.2, (T, F): 0.3, (F, T): 0.5, (F, F): 0.7})\n",
+ " """\n",
+ " if isinstance(parents, str):\n",
+ " parents = parents.split()\n",
+ "\n",
+ " # We store the table always in the third form above.\n",
+ " if isinstance(cpt, (float, int)): # no parents, 0-tuple\n",
+ " cpt = {(): cpt}\n",
+ " elif isinstance(cpt, dict):\n",
+ " # one parent, 1-tuple\n",
+ " if cpt and isinstance(list(cpt.keys())[0], bool):\n",
+ " cpt = {(v,): p for v, p in cpt.items()}\n",
+ "\n",
+ " assert isinstance(cpt, dict)\n",
+ " for vs, p in cpt.items():\n",
+ " assert isinstance(vs, tuple) and len(vs) == len(parents)\n",
+ " assert all(isinstance(v, bool) for v in vs)\n",
+ " assert 0 <= p <= 1\n",
+ "\n",
+ " self.variable = X\n",
+ " self.parents = parents\n",
+ " self.cpt = cpt\n",
+ " self.children = []\n",
+ "\n",
+ " def p(self, value, event):\n",
+ " """Return the conditional probability\n",
+ " P(X=value | parents=parent_values), where parent_values\n",
+ " are the values of parents in event. (event must assign each\n",
+ " parent a value.)\n",
+ " >>> bn = BayesNode('X', 'Burglary', {T: 0.2, F: 0.625})\n",
+ " >>> bn.p(False, {'Burglary': False, 'Earthquake': True})\n",
+ " 0.375"""\n",
+ " assert isinstance(value, bool)\n",
+ " ptrue = self.cpt[event_values(event, self.parents)]\n",
+ " return ptrue if value else 1 - ptrue\n",
+ "\n",
+ " def sample(self, event):\n",
+ " """Sample from the distribution for this variable conditioned\n",
+ " on event's values for parent_variables. That is, return True/False\n",
+ " at random according with the conditional probability given the\n",
+ " parents."""\n",
+ " return probability(self.p(True, event))\n",
+ "\n",
+ " def __repr__(self):\n",
+ " return repr((self.variable, ' '.join(self.parents)))\n",
+ "
class BayesNet:\n",
+ " """Bayesian network containing only boolean-variable nodes."""\n",
+ "\n",
+ " def __init__(self, node_specs=None):\n",
+ " """Nodes must be ordered with parents before children."""\n",
+ " self.nodes = []\n",
+ " self.variables = []\n",
+ " node_specs = node_specs or []\n",
+ " for node_spec in node_specs:\n",
+ " self.add(node_spec)\n",
+ "\n",
+ " def add(self, node_spec):\n",
+ " """Add a node to the net. Its parents must already be in the\n",
+ " net, and its variable must not."""\n",
+ " node = BayesNode(*node_spec)\n",
+ " assert node.variable not in self.variables\n",
+ " assert all((parent in self.variables) for parent in node.parents)\n",
+ " self.nodes.append(node)\n",
+ " self.variables.append(node.variable)\n",
+ " for parent in node.parents:\n",
+ " self.variable_node(parent).children.append(node)\n",
+ "\n",
+ " def variable_node(self, var):\n",
+ " """Return the node for the variable named var.\n",
+ " >>> burglary.variable_node('Burglary').variable\n",
+ " 'Burglary'"""\n",
+ " for n in self.nodes:\n",
+ " if n.variable == var:\n",
+ " return n\n",
+ " raise Exception("No such variable: {}".format(var))\n",
+ "\n",
+ " def variable_values(self, var):\n",
+ " """Return the domain of var."""\n",
+ " return [True, False]\n",
+ "\n",
+ " def __repr__(self):\n",
+ " return 'BayesNet({0!r})'.format(self.nodes)\n",
+ "
def enumerate_all(variables, e, bn):\n",
+ " """Return the sum of those entries in P(variables | e{others})\n",
+ " consistent with e, where P is the joint distribution represented\n",
+ " by bn, and e{others} means e restricted to bn's other variables\n",
+ " (the ones other than variables). Parents must precede children in variables."""\n",
+ " if not variables:\n",
+ " return 1.0\n",
+ " Y, rest = variables[0], variables[1:]\n",
+ " Ynode = bn.variable_node(Y)\n",
+ " if Y in e:\n",
+ " return Ynode.p(e[Y], e) * enumerate_all(rest, e, bn)\n",
+ " else:\n",
+ " return sum(Ynode.p(y, e) * enumerate_all(rest, extend(e, Y, y), bn)\n",
+ " for y in bn.variable_values(Y))\n",
+ "
def enumeration_ask(X, e, bn):\n",
+ " """\n",
+ " [Figure 14.9]\n",
+ " Return the conditional probability distribution of variable X\n",
+ " given evidence e, from BayesNet bn.\n",
+ " >>> enumeration_ask('Burglary', dict(JohnCalls=T, MaryCalls=T), burglary\n",
+ " ... ).show_approx()\n",
+ " 'False: 0.716, True: 0.284'"""\n",
+ " assert X not in e, "Query variable must be distinct from evidence"\n",
+ " Q = ProbDist(X)\n",
+ " for xi in bn.variable_values(X):\n",
+ " Q[xi] = enumerate_all(bn.variables, extend(e, X, xi), bn)\n",
+ " return Q.normalize()\n",
+ "
def make_factor(var, e, bn):\n",
+ " """Return the factor for var in bn's joint distribution given e.\n",
+ " That is, bn's full joint distribution, projected to accord with e,\n",
+ " is the pointwise product of these factors for bn's variables."""\n",
+ " node = bn.variable_node(var)\n",
+ " variables = [X for X in [var] + node.parents if X not in e]\n",
+ " cpt = {event_values(e1, variables): node.p(e1[var], e1)\n",
+ " for e1 in all_events(variables, bn, e)}\n",
+ " return Factor(variables, cpt)\n",
+ "
def all_events(variables, bn, e):\n",
+ " """Yield every way of extending e with values for all variables."""\n",
+ " if not variables:\n",
+ " yield e\n",
+ " else:\n",
+ " X, rest = variables[0], variables[1:]\n",
+ " for e1 in all_events(rest, bn, e):\n",
+ " for x in bn.variable_values(X):\n",
+ " yield extend(e1, X, x)\n",
+ "
def pointwise_product(self, other, bn):\n",
+ " """Multiply two factors, combining their variables."""\n",
+ " variables = list(set(self.variables) | set(other.variables))\n",
+ " cpt = {event_values(e, variables): self.p(e) * other.p(e) for e in all_events(variables, bn, {})}\n",
+ " return Factor(variables, cpt)\n",
+ "
def pointwise_product(factors, bn):\n",
+ " return reduce(lambda f, g: f.pointwise_product(g, bn), factors)\n",
+ "
def sum_out(self, var, bn):\n",
+ " """Make a factor eliminating var by summing over its values."""\n",
+ " variables = [X for X in self.variables if X != var]\n",
+ " cpt = {event_values(e, variables): sum(self.p(extend(e, var, val)) for val in bn.variable_values(var))\n",
+ " for e in all_events(variables, bn, {})}\n",
+ " return Factor(variables, cpt)\n",
+ "
def sum_out(var, factors, bn):\n",
+ " """Eliminate var from all factors by summing over its values."""\n",
+ " result, var_factors = [], []\n",
+ " for f in factors:\n",
+ " (var_factors if var in f.variables else result).append(f)\n",
+ " result.append(pointwise_product(var_factors, bn).sum_out(var, bn))\n",
+ " return result\n",
+ "
def elimination_ask(X, e, bn):\n",
+ " """\n",
+ " [Figure 14.11]\n",
+ " Compute bn's P(X|e) by variable elimination.\n",
+ " >>> elimination_ask('Burglary', dict(JohnCalls=T, MaryCalls=T), burglary\n",
+ " ... ).show_approx()\n",
+ " 'False: 0.716, True: 0.284'"""\n",
+ " assert X not in e, "Query variable must be distinct from evidence"\n",
+ " factors = []\n",
+ " for var in reversed(bn.variables):\n",
+ " factors.append(make_factor(var, e, bn))\n",
+ " if is_hidden(var, X, e):\n",
+ " factors = sum_out(var, factors, bn)\n",
+ " return pointwise_product(factors, bn).normalize()\n",
+ "
def sample(self, event):\n",
+ " """Sample from the distribution for this variable conditioned\n",
+ " on event's values for parent_variables. That is, return True/False\n",
+ " at random according with the conditional probability given the\n",
+ " parents."""\n",
+ " return probability(self.p(True, event))\n",
+ "
def prior_sample(bn):\n",
+ " """\n",
+ " [Figure 14.13]\n",
+ " Randomly sample from bn's full joint distribution.\n",
+ " The result is a {variable: value} dict.\n",
+ " """\n",
+ " event = {}\n",
+ " for node in bn.nodes:\n",
+ " event[node.variable] = node.sample(event)\n",
+ " return event\n",
+ "
def rejection_sampling(X, e, bn, N=10000):\n",
+ " """\n",
+ " [Figure 14.14]\n",
+ " Estimate the probability distribution of variable X given\n",
+ " evidence e in BayesNet bn, using N samples.\n",
+ " Raises a ZeroDivisionError if all the N samples are rejected,\n",
+ " i.e., inconsistent with e.\n",
+ " >>> random.seed(47)\n",
+ " >>> rejection_sampling('Burglary', dict(JohnCalls=T, MaryCalls=T),\n",
+ " ... burglary, 10000).show_approx()\n",
+ " 'False: 0.7, True: 0.3'\n",
+ " """\n",
+ " counts = {x: 0 for x in bn.variable_values(X)} # bold N in [Figure 14.14]\n",
+ " for j in range(N):\n",
+ " sample = prior_sample(bn) # boldface x in [Figure 14.14]\n",
+ " if consistent_with(sample, e):\n",
+ " counts[sample[X]] += 1\n",
+ " return ProbDist(X, counts)\n",
+ "
def consistent_with(event, evidence):\n",
+ " """Is event consistent with the given evidence?"""\n",
+ " return all(evidence.get(k, v) == v for k, v in event.items())\n",
+ "
def weighted_sample(bn, e):\n",
+ " """\n",
+ " Sample an event from bn that's consistent with the evidence e;\n",
+ " return the event and its weight, the likelihood that the event\n",
+ " accords to the evidence.\n",
+ " """\n",
+ " w = 1\n",
+ " event = dict(e) # boldface x in [Figure 14.15]\n",
+ " for node in bn.nodes:\n",
+ " Xi = node.variable\n",
+ " if Xi in e:\n",
+ " w *= node.p(e[Xi], event)\n",
+ " else:\n",
+ " event[Xi] = node.sample(event)\n",
+ " return event, w\n",
+ "
def likelihood_weighting(X, e, bn, N=10000):\n",
+ " """\n",
+ " [Figure 14.15]\n",
+ " Estimate the probability distribution of variable X given\n",
+ " evidence e in BayesNet bn.\n",
+ " >>> random.seed(1017)\n",
+ " >>> likelihood_weighting('Burglary', dict(JohnCalls=T, MaryCalls=T),\n",
+ " ... burglary, 10000).show_approx()\n",
+ " 'False: 0.702, True: 0.298'\n",
+ " """\n",
+ " W = {x: 0 for x in bn.variable_values(X)}\n",
+ " for j in range(N):\n",
+ " sample, weight = weighted_sample(bn, e) # boldface x, w in [Figure 14.15]\n",
+ " W[sample[X]] += weight\n",
+ " return ProbDist(X, W)\n",
+ "
def gibbs_ask(X, e, bn, N=1000):\n",
+ " """[Figure 14.16]"""\n",
+ " assert X not in e, "Query variable must be distinct from evidence"\n",
+ " counts = {x: 0 for x in bn.variable_values(X)} # bold N in [Figure 14.16]\n",
+ " Z = [var for var in bn.variables if var not in e]\n",
+ " state = dict(e) # boldface x in [Figure 14.16]\n",
+ " for Zi in Z:\n",
+ " state[Zi] = random.choice(bn.variable_values(Zi))\n",
+ " for j in range(N):\n",
+ " for Zi in Z:\n",
+ " state[Zi] = markov_blanket_sample(Zi, state, bn)\n",
+ " counts[state[X]] += 1\n",
+ " return ProbDist(X, counts)\n",
+ "
class HiddenMarkovModel:\n",
+ " """A Hidden markov model which takes Transition model and Sensor model as inputs"""\n",
+ "\n",
+ " def __init__(self, transition_model, sensor_model, prior=None):\n",
+ " self.transition_model = transition_model\n",
+ " self.sensor_model = sensor_model\n",
+ " self.prior = prior or [0.5, 0.5]\n",
+ "\n",
+ " def sensor_dist(self, ev):\n",
+ " if ev is True:\n",
+ " return self.sensor_model[0]\n",
+ " else:\n",
+ " return self.sensor_model[1]\n",
+ "
def forward(HMM, fv, ev):\n",
+ " prediction = vector_add(scalar_vector_product(fv[0], HMM.transition_model[0]),\n",
+ " scalar_vector_product(fv[1], HMM.transition_model[1]))\n",
+ " sensor_dist = HMM.sensor_dist(ev)\n",
+ "\n",
+ " return normalize(element_wise_product(sensor_dist, prediction))\n",
+ "
def backward(HMM, b, ev):\n",
+ " sensor_dist = HMM.sensor_dist(ev)\n",
+ " prediction = element_wise_product(sensor_dist, b)\n",
+ "\n",
+ " return normalize(vector_add(scalar_vector_product(prediction[0], HMM.transition_model[0]),\n",
+ " scalar_vector_product(prediction[1], HMM.transition_model[1])))\n",
+ "
def particle_filtering(e, N, HMM):\n",
+ " """Particle filtering considering two states variables."""\n",
+ " dist = [0.5, 0.5]\n",
+ " # Weight Initialization\n",
+ " w = [0 for _ in range(N)]\n",
+ " # STEP 1\n",
+ " # Propagate one step using transition model given prior state\n",
+ " dist = vector_add(scalar_vector_product(dist[0], HMM.transition_model[0]),\n",
+ " scalar_vector_product(dist[1], HMM.transition_model[1]))\n",
+ " # Assign state according to probability\n",
+ " s = ['A' if probability(dist[0]) else 'B' for _ in range(N)]\n",
+ " w_tot = 0\n",
+ " # Calculate importance weight given evidence e\n",
+ " for i in range(N):\n",
+ " if s[i] == 'A':\n",
+ " # P(U|A)*P(A)\n",
+ " w_i = HMM.sensor_dist(e)[0] * dist[0]\n",
+ " if s[i] == 'B':\n",
+ " # P(U|B)*P(B)\n",
+ " w_i = HMM.sensor_dist(e)[1] * dist[1]\n",
+ " w[i] = w_i\n",
+ " w_tot += w_i\n",
+ "\n",
+ " # Normalize all the weights\n",
+ " for i in range(N):\n",
+ " w[i] = w[i] / w_tot\n",
+ "\n",
+ " # Limit weights to 4 digits\n",
+ " for i in range(N):\n",
+ " w[i] = float("{0:.4f}".format(w[i]))\n",
+ "\n",
+ " # STEP 2\n",
+ " s = weighted_sample_with_replacement(N, s, w)\n",
+ "\n",
+ " return s\n",
+ "
class MDP:\n",
+ " """A Markov Decision Process, defined by an initial state, transition model,\n",
+ " and reward function. We also keep track of a gamma value, for use by\n",
+ " algorithms. The transition model is represented somewhat differently from\n",
+ " the text. Instead of P(s' | s, a) being a probability number for each\n",
+ " state/state/action triplet, we instead have T(s, a) return a\n",
+ " list of (p, s') pairs. We also keep track of the possible states,\n",
+ " terminal states, and actions for each state. [Page 646]"""\n",
+ "\n",
+ " def __init__(self, init, actlist, terminals, transitions=None, reward=None, states=None, gamma=0.9):\n",
+ " if not (0 < gamma <= 1):\n",
+ " raise ValueError("An MDP must have 0 < gamma <= 1")\n",
+ "\n",
+ " # collect states from transitions table if not passed.\n",
+ " self.states = states or self.get_states_from_transitions(transitions)\n",
+ "\n",
+ " self.init = init\n",
+ "\n",
+ " if isinstance(actlist, list):\n",
+ " # if actlist is a list, all states have the same actions\n",
+ " self.actlist = actlist\n",
+ "\n",
+ " elif isinstance(actlist, dict):\n",
+ " # if actlist is a dict, different actions for each state\n",
+ " self.actlist = actlist\n",
+ "\n",
+ " self.terminals = terminals\n",
+ " self.transitions = transitions or {}\n",
+ " if not self.transitions:\n",
+ " print("Warning: Transition table is empty.")\n",
+ "\n",
+ " self.gamma = gamma\n",
+ "\n",
+ " self.reward = reward or {s: 0 for s in self.states}\n",
+ "\n",
+ " # self.check_consistency()\n",
+ "\n",
+ " def R(self, state):\n",
+ " """Return a numeric reward for this state."""\n",
+ "\n",
+ " return self.reward[state]\n",
+ "\n",
+ " def T(self, state, action):\n",
+ " """Transition model. From a state and an action, return a list\n",
+ " of (probability, result-state) pairs."""\n",
+ "\n",
+ " if not self.transitions:\n",
+ " raise ValueError("Transition model is missing")\n",
+ " else:\n",
+ " return self.transitions[state][action]\n",
+ "\n",
+ " def actions(self, state):\n",
+ " """Return a list of actions that can be performed in this state. By default, a\n",
+ " fixed list of actions, except for terminal states. Override this\n",
+ " method if you need to specialize by state."""\n",
+ "\n",
+ " if state in self.terminals:\n",
+ " return [None]\n",
+ " else:\n",
+ " return self.actlist\n",
+ "\n",
+ " def get_states_from_transitions(self, transitions):\n",
+ " if isinstance(transitions, dict):\n",
+ " s1 = set(transitions.keys())\n",
+ " s2 = set(tr[1] for actions in transitions.values()\n",
+ " for effects in actions.values()\n",
+ " for tr in effects)\n",
+ " return s1.union(s2)\n",
+ " else:\n",
+ " print('Could not retrieve states from transitions')\n",
+ " return None\n",
+ "\n",
+ " def check_consistency(self):\n",
+ "\n",
+ " # check that all states in transitions are valid\n",
+ " assert set(self.states) == self.get_states_from_transitions(self.transitions)\n",
+ "\n",
+ " # check that init is a valid state\n",
+ " assert self.init in self.states\n",
+ "\n",
+ " # check reward for each state\n",
+ " assert set(self.reward.keys()) == set(self.states)\n",
+ "\n",
+ " # check that all terminals are valid states\n",
+ " assert all(t in self.states for t in self.terminals)\n",
+ "\n",
+ " # check that probability distributions for all actions sum to 1\n",
+ " for s1, actions in self.transitions.items():\n",
+ " for a in actions.keys():\n",
+ " s = 0\n",
+ " for o in actions[a]:\n",
+ " s += o[0]\n",
+ " assert abs(s - 1) < 0.001\n",
+ "
class GridMDP(MDP):\n",
+ " """A two-dimensional grid MDP, as in [Figure 17.1]. All you have to do is\n",
+ " specify the grid as a list of lists of rewards; use None for an obstacle\n",
+ " (unreachable state). Also, you should specify the terminal states.\n",
+ " An action is an (x, y) unit vector; e.g. (1, 0) means move east."""\n",
+ "\n",
+ " def __init__(self, grid, terminals, init=(0, 0), gamma=.9):\n",
+ " grid.reverse() # because we want row 0 on bottom, not on top\n",
+ " reward = {}\n",
+ " states = set()\n",
+ " self.rows = len(grid)\n",
+ " self.cols = len(grid[0])\n",
+ " self.grid = grid\n",
+ " for x in range(self.cols):\n",
+ " for y in range(self.rows):\n",
+ " if grid[y][x]:\n",
+ " states.add((x, y))\n",
+ " reward[(x, y)] = grid[y][x]\n",
+ " self.states = states\n",
+ " actlist = orientations\n",
+ " transitions = {}\n",
+ " for s in states:\n",
+ " transitions[s] = {}\n",
+ " for a in actlist:\n",
+ " transitions[s][a] = self.calculate_T(s, a)\n",
+ " MDP.__init__(self, init, actlist=actlist,\n",
+ " terminals=terminals, transitions=transitions,\n",
+ " reward=reward, states=states, gamma=gamma)\n",
+ "\n",
+ " def calculate_T(self, state, action):\n",
+ " if action:\n",
+ " return [(0.8, self.go(state, action)),\n",
+ " (0.1, self.go(state, turn_right(action))),\n",
+ " (0.1, self.go(state, turn_left(action)))]\n",
+ " else:\n",
+ " return [(0.0, state)]\n",
+ "\n",
+ " def T(self, state, action):\n",
+ " return self.transitions[state][action] if action else [(0.0, state)]\n",
+ "\n",
+ " def go(self, state, direction):\n",
+ " """Return the state that results from going in this direction."""\n",
+ "\n",
+ " state1 = vector_add(state, direction)\n",
+ " return state1 if state1 in self.states else state\n",
+ "\n",
+ " def to_grid(self, mapping):\n",
+ " """Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""\n",
+ "\n",
+ " return list(reversed([[mapping.get((x, y), None)\n",
+ " for x in range(self.cols)]\n",
+ " for y in range(self.rows)]))\n",
+ "\n",
+ " def to_arrows(self, policy):\n",
+ " chars = {(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}\n",
+ " return self.to_grid({s: chars[a] for (s, a) in policy.items()})\n",
+ "
def value_iteration(mdp, epsilon=0.001):\n",
+ " """Solving an MDP by value iteration. [Figure 17.4]"""\n",
+ "\n",
+ " U1 = {s: 0 for s in mdp.states}\n",
+ " R, T, gamma = mdp.R, mdp.T, mdp.gamma\n",
+ " while True:\n",
+ " U = U1.copy()\n",
+ " delta = 0\n",
+ " for s in mdp.states:\n",
+ " U1[s] = R(s) + gamma * max(sum(p * U[s1] for (p, s1) in T(s, a))\n",
+ " for a in mdp.actions(s))\n",
+ " delta = max(delta, abs(U1[s] - U[s]))\n",
+ " if delta <= epsilon * (1 - gamma) / gamma:\n",
+ " return U\n",
+ "
def expected_utility(a, s, U, mdp):\n",
+ " """The expected utility of doing a in state s, according to the MDP and U."""\n",
+ "\n",
+ " return sum(p * U[s1] for (p, s1) in mdp.T(s, a))\n",
+ "
def policy_iteration(mdp):\n",
+ " """Solve an MDP by policy iteration [Figure 17.7]"""\n",
+ "\n",
+ " U = {s: 0 for s in mdp.states}\n",
+ " pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}\n",
+ " while True:\n",
+ " U = policy_evaluation(pi, U, mdp)\n",
+ " unchanged = True\n",
+ " for s in mdp.states:\n",
+ " a = max(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))\n",
+ " if a != pi[s]:\n",
+ " pi[s] = a\n",
+ " unchanged = False\n",
+ " if unchanged:\n",
+ " return pi\n",
+ "
def policy_evaluation(pi, U, mdp, k=20):\n",
+ " """Return an updated utility mapping U from each state in the MDP to its\n",
+ " utility, using an approximation (modified policy iteration)."""\n",
+ "\n",
+ " R, T, gamma = mdp.R, mdp.T, mdp.gamma\n",
+ " for i in range(k):\n",
+ " for s in mdp.states:\n",
+ " U[s] = R(s) + gamma * sum(p * U[s1] for (p, s1) in T(s, pi[s]))\n",
+ " return U\n",
+ "
class Agent(Thing):\n",
+ " """An Agent is a subclass of Thing with one required instance attribute \n",
+ " (aka slot), .program, which should hold a function that takes one argument,\n",
+ " the percept, and returns an action. (What counts as a percept or action \n",
+ " will depend on the specific environment in which the agent exists.)\n",
+ " Note that 'program' is a slot, not a method. If it were a method, then the\n",
+ " program could 'cheat' and look at aspects of the agent. It's not supposed\n",
+ " to do that: the program can only look at the percepts. An agent program\n",
+ " that needs a model of the world (and of the agent itself) will have to\n",
+ " build and maintain its own model. There is an optional slot, .performance,\n",
+ " which is a number giving the performance measure of the agent in its\n",
+ " environment."""\n",
+ "\n",
+ " def __init__(self, program=None):\n",
+ " self.alive = True\n",
+ " self.bump = False\n",
+ " self.holding = []\n",
+ " self.performance = 0\n",
+ " if program is None or not isinstance(program, collections.abc.Callable):\n",
+ " print("Can't find a valid program for {}, falling back to default.".format(self.__class__.__name__))\n",
+ "\n",
+ " def program(percept):\n",
+ " return eval(input('Percept={}; action? '.format(percept)))\n",
+ "\n",
+ " self.program = program\n",
+ "\n",
+ " def can_grab(self, thing):\n",
+ " """Return True if this agent can grab this thing.\n",
+ " Override for appropriate subclasses of Agent and Thing."""\n",
+ " return False\n",
+ "
class Environment:\n",
+ " """Abstract class representing an Environment. 'Real' Environment classes\n",
+ " inherit from this. Your Environment will typically need to implement:\n",
+ " percept: Define the percept that an agent sees.\n",
+ " execute_action: Define the effects of executing an action.\n",
+ " Also update the agent.performance slot.\n",
+ " The environment keeps a list of .things and .agents (which is a subset\n",
+ " of .things). Each agent has a .performance slot, initialized to 0.\n",
+ " Each thing has a .location slot, even though some environments may not\n",
+ " need this."""\n",
+ "\n",
+ " def __init__(self):\n",
+ " self.things = []\n",
+ " self.agents = []\n",
+ "\n",
+ " def thing_classes(self):\n",
+ " return [] # List of classes that can go into environment\n",
+ "\n",
+ " def percept(self, agent):\n",
+ " """Return the percept that the agent sees at this point. (Implement this.)"""\n",
+ " raise NotImplementedError\n",
+ "\n",
+ " def execute_action(self, agent, action):\n",
+ " """Change the world to reflect this action. (Implement this.)"""\n",
+ " raise NotImplementedError\n",
+ "\n",
+ " def default_location(self, thing):\n",
+ " """Default location to place a new thing with unspecified location."""\n",
+ " return None\n",
+ "\n",
+ " def exogenous_change(self):\n",
+ " """If there is spontaneous change in the world, override this."""\n",
+ " pass\n",
+ "\n",
+ " def is_done(self):\n",
+ " """By default, we're done when we can't find a live agent."""\n",
+ " return not any(agent.is_alive() for agent in self.agents)\n",
+ "\n",
+ " def step(self):\n",
+ " """Run the environment for one time step. If the\n",
+ " actions and exogenous changes are independent, this method will\n",
+ " do. If there are interactions between them, you'll need to\n",
+ " override this method."""\n",
+ " if not self.is_done():\n",
+ " actions = []\n",
+ " for agent in self.agents:\n",
+ " if agent.alive:\n",
+ " actions.append(agent.program(self.percept(agent)))\n",
+ " else:\n",
+ " actions.append("")\n",
+ " for (agent, action) in zip(self.agents, actions):\n",
+ " self.execute_action(agent, action)\n",
+ " self.exogenous_change()\n",
+ "\n",
+ " def run(self, steps=1000):\n",
+ " """Run the Environment for given number of time steps."""\n",
+ " for step in range(steps):\n",
+ " if self.is_done():\n",
+ " return\n",
+ " self.step()\n",
+ "\n",
+ " def list_things_at(self, location, tclass=Thing):\n",
+ " """Return all things exactly at a given location."""\n",
+ " if isinstance(location, numbers.Number):\n",
+ " return [thing for thing in self.things\n",
+ " if thing.location == location and isinstance(thing, tclass)]\n",
+ " return [thing for thing in self.things\n",
+ " if all(x == y for x, y in zip(thing.location, location)) and isinstance(thing, tclass)]\n",
+ "\n",
+ " def some_things_at(self, location, tclass=Thing):\n",
+ " """Return true if at least one of the things at location\n",
+ " is an instance of class tclass (or a subclass)."""\n",
+ " return self.list_things_at(location, tclass) != []\n",
+ "\n",
+ " def add_thing(self, thing, location=None):\n",
+ " """Add a thing to the environment, setting its location. For\n",
+ " convenience, if thing is an agent program we make a new agent\n",
+ " for it. (Shouldn't need to override this.)"""\n",
+ " if not isinstance(thing, Thing):\n",
+ " thing = Agent(thing)\n",
+ " if thing in self.things:\n",
+ " print("Can't add the same thing twice")\n",
+ " else:\n",
+ " thing.location = location if location is not None else self.default_location(thing)\n",
+ " self.things.append(thing)\n",
+ " if isinstance(thing, Agent):\n",
+ " thing.performance = 0\n",
+ " self.agents.append(thing)\n",
+ "\n",
+ " def delete_thing(self, thing):\n",
+ " """Remove a thing from the environment."""\n",
+ " try:\n",
+ " self.things.remove(thing)\n",
+ " except ValueError as e:\n",
+ " print(e)\n",
+ " print(" in Environment delete_thing")\n",
+ " print(" Thing to be removed: {} at {}".format(thing, thing.location))\n",
+ " print(" from list: {}".format([(thing, thing.location) for thing in self.things]))\n",
+ " if thing in self.agents:\n",
+ " self.agents.remove(thing)\n",
+ "
def policy_evaluation(pi, U, mdp, k=20):\n",
- " """Return an updated utility mapping U from each state in the MDP to its\n",
- " utility, using an approximation (modified policy iteration)."""\n",
- " R, T, gamma = mdp.R, mdp.T, mdp.gamma\n",
- " for i in range(k):\n",
- " for s in mdp.states:\n",
- " U[s] = R(s) + gamma * sum([p * U[s1] for (p, s1) in T(s, pi[s])])\n",
- " return U\n",
- "
def T(self, state, action):\n",
- " if action is None:\n",
- " return [(0.0, state)]\n",
- " else:\n",
- " return self.transitions[state][action]\n",
- "
def to_arrows(self, policy):\n",
- " chars = {\n",
- " (1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}\n",
- " return self.to_grid({s: chars[a] for (s, a) in policy.items()})\n",
- "
def to_grid(self, mapping):\n",
- " """Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""\n",
- " return list(reversed([[mapping.get((x, y), None)\n",
- " for x in range(self.cols)]\n",
- " for y in range(self.rows)]))\n",
- "
class POMDP(MDP):\n",
- "\n",
- " """A Partially Observable Markov Decision Process, defined by\n",
- " a transition model P(s'|s,a), actions A(s), a reward function R(s),\n",
- " and a sensor model P(e|s). We also keep track of a gamma value,\n",
- " for use by algorithms. The transition and the sensor models\n",
- " are defined as matrices. We also keep track of the possible states\n",
- " and actions for each state. [page 659]."""\n",
- "\n",
- " def __init__(self, actions, transitions=None, evidences=None, rewards=None, states=None, gamma=0.95):\n",
- " """Initialize variables of the pomdp"""\n",
- "\n",
- " if not (0 < gamma <= 1):\n",
- " raise ValueError('A POMDP must have 0 < gamma <= 1')\n",
- "\n",
- " self.states = states\n",
- " self.actions = actions\n",
- "\n",
- " # transition model cannot be undefined\n",
- " self.t_prob = transitions or {}\n",
- " if not self.t_prob:\n",
- " print('Warning: Transition model is undefined')\n",
- " \n",
- " # sensor model cannot be undefined\n",
- " self.e_prob = evidences or {}\n",
- " if not self.e_prob:\n",
- " print('Warning: Sensor model is undefined')\n",
- " \n",
- " self.gamma = gamma\n",
- " self.rewards = rewards\n",
- "\n",
- " def remove_dominated_plans(self, input_values):\n",
- " """\n",
- " Remove dominated plans.\n",
- " This method finds all the lines contributing to the\n",
- " upper surface and removes those which don't.\n",
- " """\n",
- "\n",
- " values = [val for action in input_values for val in input_values[action]]\n",
- " values.sort(key=lambda x: x[0], reverse=True)\n",
- "\n",
- " best = [values[0]]\n",
- " y1_max = max(val[1] for val in values)\n",
- " tgt = values[0]\n",
- " prev_b = 0\n",
- " prev_ix = 0\n",
- " while tgt[1] != y1_max:\n",
- " min_b = 1\n",
- " min_ix = 0\n",
- " for i in range(prev_ix + 1, len(values)):\n",
- " if values[i][0] - tgt[0] + tgt[1] - values[i][1] != 0:\n",
- " trans_b = (values[i][0] - tgt[0]) / (values[i][0] - tgt[0] + tgt[1] - values[i][1])\n",
- " if 0 <= trans_b <= 1 and trans_b > prev_b and trans_b < min_b:\n",
- " min_b = trans_b\n",
- " min_ix = i\n",
- " prev_b = min_b\n",
- " prev_ix = min_ix\n",
- " tgt = values[min_ix]\n",
- " best.append(tgt)\n",
- "\n",
- " return self.generate_mapping(best, input_values)\n",
- "\n",
- " def remove_dominated_plans_fast(self, input_values):\n",
- " """\n",
- " Remove dominated plans using approximations.\n",
- " Resamples the upper boundary at intervals of 100 and\n",
- " finds the maximum values at these points.\n",
- " """\n",
- "\n",
- " values = [val for action in input_values for val in input_values[action]]\n",
- " values.sort(key=lambda x: x[0], reverse=True)\n",
- "\n",
- " best = []\n",
- " sr = 100\n",
- " for i in range(sr + 1):\n",
- " x = i / float(sr)\n",
- " maximum = (values[0][1] - values[0][0]) * x + values[0][0]\n",
- " tgt = values[0]\n",
- " for value in values:\n",
- " val = (value[1] - value[0]) * x + value[0]\n",
- " if val > maximum:\n",
- " maximum = val\n",
- " tgt = value\n",
- "\n",
- " if all(any(tgt != v) for v in best):\n",
- " best.append(tgt)\n",
- "\n",
- " return self.generate_mapping(best, input_values)\n",
- "\n",
- " def generate_mapping(self, best, input_values):\n",
- " """Generate mappings after removing dominated plans"""\n",
- "\n",
- " mapping = defaultdict(list)\n",
- " for value in best:\n",
- " for action in input_values:\n",
- " if any(all(value == v) for v in input_values[action]):\n",
- " mapping[action].append(value)\n",
- "\n",
- " return mapping\n",
- "\n",
- " def max_difference(self, U1, U2):\n",
- " """Find maximum difference between two utility mappings"""\n",
- "\n",
- " for k, v in U1.items():\n",
- " sum1 = 0\n",
- " for element in U1[k]:\n",
- " sum1 += sum(element)\n",
- " sum2 = 0\n",
- " for element in U2[k]:\n",
- " sum2 += sum(element)\n",
- " return abs(sum1 - sum2)\n",
- "
def pomdp_value_iteration(pomdp, epsilon=0.1):\n",
- " """Solving a POMDP by value iteration."""\n",
- "\n",
- " U = {'':[[0]* len(pomdp.states)]}\n",
- " count = 0\n",
- " while True:\n",
- " count += 1\n",
- " prev_U = U\n",
- " values = [val for action in U for val in U[action]]\n",
- " value_matxs = []\n",
- " for i in values:\n",
- " for j in values:\n",
- " value_matxs.append([i, j])\n",
- "\n",
- " U1 = defaultdict(list)\n",
- " for action in pomdp.actions:\n",
- " for u in value_matxs:\n",
- " u1 = Matrix.matmul(Matrix.matmul(pomdp.t_prob[int(action)], Matrix.multiply(pomdp.e_prob[int(action)], Matrix.transpose(u))), [[1], [1]])\n",
- " u1 = Matrix.add(Matrix.scalar_multiply(pomdp.gamma, Matrix.transpose(u1)), [pomdp.rewards[int(action)]])\n",
- " U1[action].append(u1[0])\n",
- "\n",
- " U = pomdp.remove_dominated_plans_fast(U1)\n",
- " # replace with U = pomdp.remove_dominated_plans(U1) for accurate calculations\n",
- " \n",
- " if count > 10:\n",
- " if pomdp.max_difference(U, prev_U) < epsilon * (1 - pomdp.gamma) / pomdp.gamma:\n",
- " return U\n",
- "