diff --git a/test_generator.py b/test_generator.py index dd1513c..917bf02 100644 --- a/test_generator.py +++ b/test_generator.py @@ -3,12 +3,10 @@ from pathlib import Path from random import randint, random, sample from copy import deepcopy -from operator import itemgetter from typing import Optional, Tuple from drivebuildclient.AIExchangeService import AIExchangeService from termcolor import colored - from utils.xml_creator import build_all_xml, build_xml from utils.plotter import plot_all from utils.validity_checks import * @@ -20,6 +18,34 @@ import scipy.interpolate as si +def _add_ego_car(individual): + """Adds the ego car to the criteria xml file. Movement mode can be assigned manually. Each control point is one + waypoint. + :param individual: Individual of the population. + :return: Void. + """ + control_points = individual.get("control_points") + waypoints = [] + for point in control_points: + waypoint = {"x": point.get("x"), + "y": point.get("y"), + "tolerance": 2, + "movementMode": "_BEAMNG"} + waypoints.append(waypoint) + init_state = {"x": control_points[0].get("x"), + "y": control_points[0].get("y"), + "orientation": 0, + "movementMode": "_BEAMNG", + "speed": 50} + model = "ETK800" + ego = {"id": "ego", + "init_state": init_state, + "waypoints": waypoints, + "model": model} + participants = [ego] + individual["participants"] = participants + + class TestGenerator: """This class generates roads using a genetic algorithm.""" @@ -32,11 +58,11 @@ def __init__(self, difficulty="Easy"): self.files_name = "exampleTest" self.SPLINE_DEGREE = 5 # Sharpness of curves self.MAX_TRIES = 500 # Maximum number of invalid generated points/segments - self.POPULATION_SIZE = 5 # Minimum number of generated roads for each generation - self.NUMBER_ELITES = 3 # Number of best kept roads + self.POPULATION_SIZE = 8 # Minimum number of generated roads for each generation + self.NUMBER_ELITES = 4 # Number of best kept roads self.MIN_SEGMENT_LENGTH = 28 # Minimum length of a road segment self.MAX_SEGMENT_LENGTH = 45 # Maximum length of a road segment - self.WIDTH_OF_STREET = 5 # Width of all segments + self.WIDTH_OF_STREET = 4 # Width of all segments self.MIN_NODES = 8 # Minimum number of control points for each road self.MAX_NODES = 12 # Maximum number of control points for each road self.population_list = [] @@ -44,12 +70,15 @@ def __init__(self, difficulty="Easy"): def _bspline(self, control_points, samples=75): """Calculate {@code samples} samples on a bspline. This is the road representation function. - :param control_points: Array of control points. + :param control_points: List of control points. :param samples: Number of samples to return. :return: Array with samples, representing a bspline of the given function as a numpy array. """ - control_points = np.asarray(control_points) - count = len(control_points) + point_list = [] + for point in control_points: + point_list.append((point.get("x"), point.get("y"))) + point_list = np.asarray(point_list) + count = len(point_list) degree = np.clip(self.SPLINE_DEGREE, 1, count - 1) # Calculate knot vector. @@ -59,7 +88,7 @@ def _bspline(self, control_points, samples=75): u = np.linspace(False, (count - degree), samples) # Calculate result. - return np.array(si.splev(u, (kv, control_points.T, degree))).T + return np.array(si.splev(u, (kv, point_list.T, degree))).T def set_difficulty(self, difficulty): difficulty = difficulty.upper() @@ -68,25 +97,46 @@ def set_difficulty(self, difficulty): self.MIN_SEGMENT_LENGTH = 30 self.MAX_SEGMENT_LENGTH = 50 self.WIDTH_OF_STREET = 4 - self.MIN_NODES = 6 - self.MAX_NODES = 10 + self.MIN_NODES = 8 + self.MAX_NODES = 12 elif difficulty == "MEDIUM": - self.SPLINE_DEGREE = 5 + self.SPLINE_DEGREE = 6 self.MIN_SEGMENT_LENGTH = 25 self.MAX_SEGMENT_LENGTH = 45 - self.WIDTH_OF_STREET = 5 - self.MIN_NODES = 8 - self.MAX_NODES = 12 + self.WIDTH_OF_STREET = 4 + self.MIN_NODES = 12 + self.MAX_NODES = 16 elif difficulty == "HARD": self.SPLINE_DEGREE = 2 self.MIN_SEGMENT_LENGTH = 20 self.MAX_SEGMENT_LENGTH = 40 - self.WIDTH_OF_STREET = 6 - self.MIN_NODES = 12 - self.MAX_NODES = 16 + self.WIDTH_OF_STREET = 5 + self.MIN_NODES = 14 + self.MAX_NODES = 22 else: print(colored("Invalid difficulty level. Choosing default difficulty.", 'blue')) + def _generate_random_point(self, last_point): + """Generates a random point within a given range. + :param last_point: Last point of the control point list as dict type. + :return: A new random point as dict type. + """ + last_point_tmp = (last_point.get("x"), last_point.get("y")) + last_point_tmp = np.asarray(last_point_tmp) + x_min = last_point.get("x") - self.MAX_SEGMENT_LENGTH + x_max = last_point.get("x") + self.MAX_SEGMENT_LENGTH + y_min = last_point.get("y") - self.MAX_SEGMENT_LENGTH + y_max = last_point.get("y") + self.MAX_SEGMENT_LENGTH + tries = 0 + while tries < self.MAX_TRIES / 10: + x_pos = randint(x_min, x_max) + y_pos = randint(y_min, y_max) + point = (x_pos, y_pos) + dist = np.linalg.norm(np.asarray(point) - last_point_tmp) + if self.MAX_SEGMENT_LENGTH >= dist >= self.MIN_SEGMENT_LENGTH: + return {"x": point[0], "y": point[1]} + tries += 1 + def _generate_random_points(self): """Generates random valid points and returns when the list is full or the number of invalid nodes equals the number of maximum tries. @@ -94,8 +144,10 @@ def _generate_random_points(self): """ # Generating the first two points by myself. - p0 = (1, 0) - p1 = (65, 0) + p0 = {"x": 1, + "y": 0} + p1 = {"x": 65, + "y": 0} control_points = [p0, p1] tries = 0 while len(control_points) != self.MAX_NODES and tries <= self.MAX_TRIES: @@ -115,87 +167,69 @@ def _generate_random_points(self): spline_list = self._bspline(control_points, 100) if spline_intersection_check(spline_list): control_points.pop() - if len(control_points) < self.MIN_NODES or intersection_check_all(spline_list): + if len(control_points) < self.MIN_NODES or intersection_check_all_np(spline_list): print(colored("Couldn't create enough valid nodes. Restarting...", "blue")) else: print(colored("Finished list!", "blue")) return control_points - def _generate_random_point(self, last_point): - """Generates a random point within a given range. - :param last_point: Last point of the control point list. - :return: A new random point - """ - last_point_tmp = (last_point[0], last_point[1]) - last_point_tmp = np.asarray(last_point_tmp) - x_min = last_point[0] - self.MAX_SEGMENT_LENGTH - x_max = last_point[0] + self.MAX_SEGMENT_LENGTH - y_min = last_point[1] - self.MAX_SEGMENT_LENGTH - y_max = last_point[1] + self.MAX_SEGMENT_LENGTH - tries = 0 - while tries < self.MAX_TRIES / 10: - x_pos = randint(x_min, x_max) - y_pos = randint(y_min, y_max) - point = (x_pos, y_pos) - dist = np.linalg.norm(np.asarray(point) - last_point_tmp) - if self.MAX_SEGMENT_LENGTH >= dist >= self.MIN_SEGMENT_LENGTH: - return point - tries += 1 - def _create_start_population(self): - """Creates and returns start population.""" + """Creates and returns an initial population.""" startpop = [] iterator = 0 while len(startpop) < self.POPULATION_SIZE: point_list = self._generate_random_points() if point_list is not None: - startpop.append([point_list, 0]) - build_xml(point_list, self.files_name + str(iterator), self.WIDTH_OF_STREET) + individual = {"control_points": point_list, + "file_name": self.files_name, + "fitness": 0} + startpop.append(individual) iterator += 1 return startpop def _mutation(self, individual): """Mutates a road by randomly picking one point and replacing it with - a new, valid one. - :param individual: Individual in the form of [control_points, fitness_value]. + a new, valid one. There is a chance that the individual will be not mutated at all. + :param individual: Individual of the population. :return: Mutated individual. """ probability = 0.25 print(colored("Mutating individual...", "blue")) iterator = 2 - while iterator < len(individual[0]): + while iterator < len(individual.get("control_points")): if random() <= probability: valid = False tries = 0 while not valid and tries < self.MAX_TRIES / 10: - new_point = self._generate_random_point(individual[0][iterator - 1]) - new_point = (new_point[0], new_point[1]) - temp_list = deepcopy(individual[0]) + new_point = self._generate_random_point(individual.get("control_points")[iterator - 1]) + new_point = {"x": new_point.get("x"), + "y": new_point.get("y")} + temp_list = deepcopy(individual.get("control_points")) temp_list[iterator] = new_point spline_list = self._bspline(temp_list, 60) control_points_lines = convert_points_to_lines(spline_list) linestring_list = self._get_width_lines(spline_list) - if not (intersection_check_all(spline_list) + if not (intersection_check_all_np(spline_list) or intersection_check_width(linestring_list, control_points_lines)): valid = True - individual[0][iterator] = new_point + individual.get("control_points")[iterator] = new_point tries += 1 iterator += 1 - individual[1] = 0 + individual["fitness"] = 0 return individual def _crossover(self, parent1, parent2): - """Performs a crossover between two parents. - :param parent1: First parent - :param parent2: Second parent + """Performs a crossover between two parents. There is a chance that no crossover will happen. + :param parent1: First parent. + :param parent2: Second parent. :return: Valid children, which can be equal or different from the parents. """ print(colored("Performing crossover of two individuals...", "blue")) probability = 0.25 - if len(parent1[0]) <= len(parent2[0]): - smaller_index = len(parent1[0]) + if len(parent1.get("control_points")) <= len(parent2.get("control_points")): + smaller_index = len(parent1.get("control_points")) else: - smaller_index = len(parent2[0]) + smaller_index = len(parent2.get("control_points")) iterator = 1 tries = 0 while tries < self.MAX_TRIES / 5: @@ -203,15 +237,15 @@ def _crossover(self, parent1, parent2): child1 = deepcopy(parent1) child2 = deepcopy(parent2) if random() <= probability: - children = self._recombination(child1[0], child2[0], iterator) + children = self._recombination(child1, child2, iterator) child1 = children[0] child2 = children[1] - width_list1 = self._get_width_lines(self._bspline(child1[0])) - width_list2 = self._get_width_lines(self._bspline(child2[0])) - control_lines1 = convert_points_to_lines(self._bspline(child1[0])) - control_lines2 = convert_points_to_lines(self._bspline(child2[0])) - if not (intersection_check_all(child1[0]) - or intersection_check_all(child2[0]) + width_list1 = self._get_width_lines(self._bspline(child1.get("control_points"))) + width_list2 = self._get_width_lines(self._bspline(child2.get("control_points"))) + control_lines1 = convert_points_to_lines(self._bspline(child1.get("control_points"))) + control_lines2 = convert_points_to_lines(self._bspline(child2.get("control_points"))) + if not (intersection_check_all(child1.get("control_points")) + or intersection_check_all(child2.get("control_points")) or intersection_check_width(width_list1, control_lines1) or intersection_check_width(width_list2, control_lines2)): return [child1, child2] @@ -223,26 +257,30 @@ def _crossover(self, parent1, parent2): def _recombination(parent1, parent2, separation_index): """Helper method of the crossover method. Recombinates two individuals on a given point. Can be seen as a single crossover. - :param parent1: First parent - :param parent2: Second parent + :param parent1: First parent. + :param parent2: Second parent. :param separation_index: Point where the crossover should happen. :return: Return the two recombinated children. Can be invalid. """ - child1 = [] - child2 = [] + child1_control_points = [] + child2_control_points = [] iterator = 0 while iterator <= separation_index: - child1.append(parent1[iterator]) - child2.append(parent2[iterator]) + child1_control_points.append(parent1.get("control_points")[iterator]) + child2_control_points.append(parent2.get("control_points")[iterator]) iterator += 1 - while iterator < len(parent2): - child1.append(parent2[iterator]) + while iterator < len(parent2.get("control_points")): + child1_control_points.append(parent2.get("control_points")[iterator]) iterator += 1 iterator = separation_index + 1 - while iterator < len(parent1): - child2.append(parent1[iterator]) + while iterator < len(parent1.get("control_points")): + child2_control_points.append(parent1.get("control_points")[iterator]) iterator += 1 - children = [[child1, 0], [child2, 0]] + child1 = deepcopy(parent1) + child1["control_points"] = child1_control_points + child2 = deepcopy(parent2) + child2["control_points"] = child2_control_points + children = [child1, child2] return children def _calculate_fitness_value(self, distances, ticks): @@ -256,7 +294,7 @@ def _calculate_fitness_value(self, distances, ticks): while iterator < self.POPULATION_SIZE: time = ticks / 60 cumulative_distance = sum(distances) - self.population_list[iterator][1] = cumulative_distance / time + self.population_list[iterator]["fitness"] = cumulative_distance / time # Comment the three above lines and comment out the two following lines to use maximum distance as the # fitness function. @@ -267,10 +305,10 @@ def _calculate_fitness_value(self, distances, ticks): def _choose_elite(self, population): """Chooses the roads with the best fitness values. - :param population: List of control points and its corresponding fitness values. - :return: List of best x control points and its corresponding fitness values. + :param population: List of individuals. + :return: List of best x individuals according to their fitness value. """ - population.sort(key=itemgetter(1)) + population = sorted(population, key=lambda k: k['fitness']) elite = [] iterator = 0 while iterator < self.NUMBER_ELITES: @@ -282,7 +320,7 @@ def _get_resize_factor(self, length): """Returns the resize factor for the width lines so all lines have one specific length. :param length: Length of a LineString. - :return: Resize factor + :return: Resize factor. """ if length == 0: return 0 @@ -291,7 +329,7 @@ def _get_resize_factor(self, length): def _get_width_lines(self, control_points): """Determines the width lines of the road by flipping the LineString between two points by 90 degrees in both directions. - :param control_points: List of control points + :param control_points: List of control points. :return: List of LineStrings which represent the width of the road. """ spline_list = deepcopy(control_points) @@ -335,18 +373,50 @@ def _get_width_lines(self, control_points): iterator += 1 return linestring_list + def _add_width(self, individual): + """Adds the width value for each control point. + :param individual: Individual of the population. + :return: Void. + """ + for point in individual.get("control_points"): + point["width"] = self.WIDTH_OF_STREET + def _spline_population(self, population_list): """Converts the control points list of every individual to a bspline - list and adds the width parameter. - :param population_list: List of individuals + list and adds the width parameter as well as the ego car. + :param population_list: List of individuals. :return: List of individuals with bsplined control points. """ iterator = 0 while iterator < len(population_list): - population_list[iterator][0] = self._bspline(population_list[iterator][0]) + splined_list = self._bspline(population_list[iterator].get("control_points")) + jterator = 0 + control_points = [] + while jterator < len(splined_list): + point = {"x": splined_list[jterator][0], + "y": splined_list[jterator][1]} + control_points.append(point) + jterator += 1 + population_list[iterator]["control_points"] = control_points + _add_ego_car(population_list[iterator]) + self._add_width(population_list[iterator]) iterator += 1 return population_list + def _add_newcomer(self): + """Adds one new individual into the population. + :return: Void. + """ + control_points = None + while control_points is None: + control_points = self._generate_random_points() + individual = {"control_points": control_points, + "file_name": self.files_name, + "fitness": 0} + self._add_width(individual) + _add_ego_car(individual) + self.population_list.append(individual) + def genetic_algorithm(self): """The main algorithm to generate valid roads. Utilizes a genetic algorithm to evolve more critical roads for a AI. @@ -369,17 +439,14 @@ def genetic_algorithm(self): print(colored("Population finished.", "blue")) temp_list = deepcopy(self.population_list) temp_list = self._spline_population(temp_list) - build_all_xml(temp_list, self.WIDTH_OF_STREET, self.files_name) + build_all_xml(temp_list) - # Comment out if you want to see the generated roads (blocks until you close all images) + # Comment out if you want to see the generated roads (blocks until you close all images). plot_all(temp_list) self.population_list = self._choose_elite(self.population_list) # Introduce new individuals in the population. - new_child1 = self._generate_random_points() - self.population_list.append([new_child1, 0]) - # new_child2 = self._generate_random_points() - # self.population_list.append([new_child2, 0]) + self._add_newcomer() def set_files_name(self, new_name): """Sets a new name for the created xml files.""" @@ -394,7 +461,7 @@ def getTest(self) -> Optional[Tuple[Path, Path]]: matches = glob(xml_names) iterator = 0 self.genetic_algorithm() - while iterator < self.POPULATION_SIZE * 2: + while iterator < self.POPULATION_SIZE * 2 - 1: yield Path(matches[iterator + 1]), Path(matches[iterator]) iterator += 2 diff --git a/utils/dbc_xml_builder.py b/utils/dbc_xml_builder.py index f5780fc..579eb32 100644 --- a/utils/dbc_xml_builder.py +++ b/utils/dbc_xml_builder.py @@ -68,67 +68,77 @@ def ai_freq(self, frequency="6"): aifreq = ElementTree.SubElement(self.root, "aiFrequency") aifreq.text = str(frequency) - def add_car(self, init_state, waypoints, participant_id="ego", model="ETK800"): + def add_car(self, participant): """Adds a car to this test case. At least one car (the ego car) should be added. - :param init_state: Array with initial states. Contains: x-coordinate (int), y-coordinate (int), + :param participant: Dict which contains init_state, waypoints, participant_id and model. See the lines below + for more information: + init_state: Array with initial states. Contains: x-coordinate (int), y-coordinate (int), orientation (int), movementMode (MANUAL, _BEAMNG, AUTONOMOUS, TRAINING), speed (int) - :param waypoints: Array with waypoints. One waypoint contains: x-coordinate (int), + waypoints: Array with waypoints. One waypoint contains: x-coordinate (int), y-coordinate (int), tolerance (int), movementMode (see above), speedLimit (int) (optional) - :param participant_id: unique ID of this participant as String. - :param model: BeamNG model car as String. See beamngpy documentation for more models. + participant_id: unique ID of this participant as String. + model: BeamNG model car as String. See beamngpy documentation for more models. :return: Void """ + participant_id = participant.get("id") + init_state = participant.get("init_state") + waypoints = participant.get("waypoints") + model = participant.get("model") participant = ElementTree.SubElement(self.participants, "participant") participant.set("id", participant_id) participant.set("model", model) ElementTree.SubElement(participant, 'initialState x="{}" y="{}"' ' orientation="{}" movementMode="{}"' ' speed="{}"' - .format(str(init_state[0]), str(init_state[1]), str(init_state[2]), - init_state[3], str(init_state[4]))) + .format(str(init_state.get("x")), str(init_state.get("y")), + str(init_state.get("orientation")), init_state.get("movementMode"), + str(init_state.get("speed")))) ai = ElementTree.SubElement(participant, "ai") - # ElementTree.SubElement(ai, 'roadCenterDistance id="{}"'.format("egoLaneDist")) - # ElementTree.SubElement(ai, 'camera width="{}" height="{}" fov="{}" direction="{}" id="{}"' - # .format(str(320), str(160), str(120), "FRONT", "egoFrontCamera")) + ElementTree.SubElement(ai, 'roadCenterDistance id="{}"'.format("egoLaneDist")) + ElementTree.SubElement(ai, 'camera width="{}" height="{}" fov="{}" direction="{}" id="{}"' + .format(str(600), str(400), str(120), "FRONT", "egoFrontCamera")) movement = ElementTree.SubElement(participant, "movement") for waypoint in waypoints: waypoint_tag = ElementTree.SubElement(movement, 'waypoint x="{}" y="{}" tolerance="{}"' ' movementMode="{}"' - .format(str(waypoint[0]), str(waypoint[1]), - str(waypoint[2]), waypoint[3])) - if len(waypoint) == 5: - waypoint_tag.set("speedLimit", str(waypoint[4])) + .format(str(waypoint.get("x")), str(waypoint.get("y")), + str(waypoint.get("tolerance")), + waypoint.get("movementMode"))) + if waypoint.get("speedLimit"): + waypoint_tag.set("speedLimit", str(waypoint.get("speedLimit"))) def add_precond_partic_sc_speed(self, vc_pos, sc_speed): """Adds a precondition for a position, which must be satisfied in order to continue the test. This method requires a lower speed bound, which must be reached. - :param vc_pos: Position of the precondition. Array contains: participant id (string), + :param vc_pos: Position of the precondition. Dict contains: participant id (string), xPos (int), yPos (int), tolerance (int) defines a circle which must be entered. - :param sc_speed: Lower speed bound. Array contains: participant id (string), limit (int). + :param sc_speed: Lower speed bound as integer. :return: Void """ vc_position = ElementTree.SubElement(self.preconditions, 'vcPosition') - vc_position.set("participant", vc_pos[0]) - vc_position.set("x", str(vc_pos[1])) - vc_position.set("y", str(vc_pos[2])) - vc_position.set("tolerance", str(vc_pos[3])) + vc_position.set("participant", vc_pos.get("id")) + vc_position.set("x", str(vc_pos.get("x"))) + vc_position.set("y", str(vc_pos.get("y"))) + vc_position.set("tolerance", str(vc_pos.get("tolerance"))) not_tag = ElementTree.SubElement(vc_position, "not") ElementTree.SubElement(not_tag, 'scSpeed participant="{}" limit="{}"' - .format(vc_pos[0], str(sc_speed[1]))) + .format(vc_pos.get("id"), str(sc_speed))) - def add_success_point(self, sc_pos): - """Defines when a test was successfully finished. - :param sc_pos: Array of success states. Array contains: participant id (string), xPos (int), - yPos (int), tolerance (int) which defines a circle. + def add_success_point(self, participant_id, success_point): + """Point when reached a test was successfully finished. + :param: participant_id: ID of the participant as a string. + :param success_point: Dict of success states. Contains: x (int), y (int), tolerance (int) which defines a + circle. :return: Void """ ElementTree.SubElement(self.success, 'scPosition participant="{}" x="{}" y="{}" tolerance="{}"' - .format(sc_pos[0], str(sc_pos[1]), str(sc_pos[2]), str(sc_pos[3]))) + .format(participant_id, str(success_point.get("x")), str(success_point.get("y")), + str(success_point.get("tolerance")))) def add_failure_damage(self, participant_id): """Adds damage observation as a test failure condition. diff --git a/utils/dbc_xml_examplebuild.py b/utils/dbc_xml_examplebuild.py index f281732..d4849ac 100644 --- a/utils/dbc_xml_examplebuild.py +++ b/utils/dbc_xml_examplebuild.py @@ -12,20 +12,41 @@ dbc.steps_per_second(60) dbc.ai_freq(6) -init_state = [6, 6, 0, "MANUAL", 50] -waypoint1 = [0, 4, 4, "_BEAMNG", 40] -waypoint2 = [61, 4, 5, "AUTONOMOUS"] +init_state = {"x": 0, + "y": 4, + "orientation": 0, + "movementMode": "MANUAL", + "speed": 50} +waypoint1 = {"x": 15, + "y": 4, + "tolerance": 4, + "movementMode": "_BEAMNG", + "speed": 40} +waypoint2 = {"x": 61, + "y": 4, + "tolerance": 5, + "movementMode": "AUTONOMOUS"} waypoints = [waypoint1, waypoint2] participant_id = "ego" model = "ETK800" -dbc.add_car(init_state, waypoints, participant_id, model) - -vc_pos = [participant_id, -4, 4, 4] -sc_speed = [participant_id, 15] +participant = {"init_state": init_state, + "waypoints": waypoints, + "model": model, + "id": participant_id} +dbc.add_car(participant=participant) + +vc_pos = {"id": participant_id, + "x": waypoint1.get("x"), + "y": waypoint1.get("y"), + "tolerance": waypoint1.get("tolerance")} +sc_speed = 15 dbc.add_precond_partic_sc_speed(vc_pos, sc_speed) -success_point = [participant_id, 61, 4, 5] -dbc.add_success_point(success_point) +success_point = {"id": participant_id, + "x": waypoint2.get("x"), + "y": waypoint2.get("y"), + "tolerance": waypoint2.get("tolerance")} +dbc.add_success_point(participant_id=participant_id, success_point=success_point) dbc.add_failure_conditions(participant_id, "offroad") diff --git a/utils/dbe_xml_builder.py b/utils/dbe_xml_builder.py index c69231c..dedfedd 100644 --- a/utils/dbe_xml_builder.py +++ b/utils/dbe_xml_builder.py @@ -47,48 +47,69 @@ def indent(self, elem, level=0): def add_obstacles(self, obstacle_list): """Adds obstacles to the XML files. - :param obstacle_list: Array of obstacles. First value is the shape/name of the obstacles, - the other parameters are defining the obstacle. + :param obstacle_list: List of obstacles. Each obstacle is a dict and must contain x and y position. Check + generator.py in simnode in DriveBuild to see which object needs which properties. :return: Void. """ obstacles = ElementTree.SubElement(self.root, "obstacles") for obstacle in obstacle_list: - if obstacle[0] == "cube": - ElementTree.SubElement(obstacles, 'cube x="{}" y="{}" width="{}" length="{}"' - ' height="{}"' - .format(obstacle[1], obstacle[2], obstacle[3], obstacle[4], - obstacle[5])) - elif obstacle[0] == "cylinder": - ElementTree.SubElement(obstacles, 'cylinder x="{}" y="{}" radius="{}" height="{}"' - .format(obstacle[1], obstacle[2], obstacle[3], obstacle[4])) - elif obstacle[0] == "cone": - ElementTree.SubElement(obstacles, 'cone x="{}" y="{}" height="{}" baseRadius="{}"' - .format(obstacle[1], obstacle[2], obstacle[3], obstacle[4])) - elif obstacle[0] == "bump": - ElementTree.SubElement(obstacles, 'bump x="{}" y="{}" width="{}" length="{}" height="{}"' - ' upperLength="{}" upperWidth="{}"' - .format(obstacle[1], obstacle[2], obstacle[3], obstacle[4], - obstacle[5], obstacle[6], obstacle[7])) - - def add_lane(self, segments, markings=True, left_lanes=0, right_lanes=0): + name = obstacle.get("name") + x = obstacle.get("x") + y = obstacle.get("y") + z = obstacle.get("z") + xRot = obstacle.get("xRot") + yRot = obstacle.get("yRot") + zRot = obstacle.get("zRot") + width = obstacle.get("width") + length = obstacle.get("length") + height = obstacle.get("height") + radius = obstacle.get("radius") + baseRadius = obstacle.get("baseRadius") + upperWidth = obstacle.get("upperWidth") + upperLength = obstacle.get("upperLength") + full_string = '' + name + ' x="' + str(x) + '" y="' + str(y) + '"' + if z: + full_string += ' z="' + str(z) + '"' + if xRot: + full_string += ' xRot="' + str(xRot) + '"' + if yRot: + full_string += ' yRot="' + str(yRot) + '"' + if zRot: + full_string += ' zRot="' + str(zRot) + '"' + if width: + full_string += ' width="' + str(width) + '"' + if length: + full_string += ' length="' + str(length) + '"' + if height: + full_string += ' height="' + str(height) + '"' + if radius: + full_string += ' radius="' + str(radius) + '"' + if baseRadius: + full_string += ' baseRadius="' + str(baseRadius) + '"' + if upperWidth: + full_string += ' upperWidth="' + str(upperWidth) + '"' + if upperLength: + full_string += ' upperLength="' + str(upperLength) + '"' + ElementTree.SubElement(obstacles, full_string) + + def add_lane(self, segments, markings: bool = True, left_lanes: int = 0, right_lanes: int = 0): """Adds a lane and road segments. - :param segments: Array of tuples containing nodes to generate road segments. Segments must have - x-coordinate, y-coordinate and width. + :param segments: List of dicts containing x-coordinate, y-coordinate and width. :param markings: {@code True} Enables road markings, {@code False} makes them invisible. - :param left_lanes: number of left lanes (int) - :param right_lanes: number of right lanes (int) + :param left_lanes: number of left lanes + :param right_lanes: number of right lanes :return: Void """ lane = ElementTree.SubElement(self.lanes, "lane") if markings: lane.set("markings", "true") - if left_lanes != 0: - lane.set("leftLanes", left_lanes) - if right_lanes != 0: - lane.set("rightLanes", right_lanes) + if left_lanes != 0 and left_lanes is not None: + lane.set("leftLanes", str(left_lanes)) + if right_lanes != 0 and right_lanes is not None: + lane.set("rightLanes", str(right_lanes)) for segment in segments: ElementTree.SubElement(lane, 'laneSegment x="{}" y="{}" width="{}"' - .format(segment[0], segment[1], segment[2])) + .format(segment.get("x"), segment.get("y"), segment.get("width"))) def save_xml(self, name): """Creates and saves the XML file, and moves it to the scenario folder. diff --git a/utils/dbe_xml_examplebuild.py b/utils/dbe_xml_examplebuild.py index 31d602b..52b2ea6 100644 --- a/utils/dbe_xml_examplebuild.py +++ b/utils/dbe_xml_examplebuild.py @@ -7,16 +7,32 @@ dbe = DBEBuilder() -segment1 = [0, 0, 8] -segment2 = [50, 0, 8] -segment3 = [80, 20, 8] -segment4 = [100, 20, 8] +segment1 = {"x": 0, + "y": 0, + "width": 12} +segment2 = {"x": 50, + "y": 0, + "width": 12} +segment3 = {"x": 80, + "y": 20, + "width": 12} +segment4 = {"x": 100, + "y": 20, + "width": 12} segments = [segment1, segment2, segment3, segment4] -dbe.add_lane(segments) - -cone = ["cone", 5, 5, 5, 5] -cylinder = ["cylinder", 10, 10, 2, 2] +dbe.add_lane(segments, left_lanes=1, right_lanes=2) + +cone = {"name": "cone", + "x": 5, + "y": 5, + "baseRadius": 5, + "height": 5} +cylinder = {"name": "cylinder", + "x": 10, + "y": 10, + "radius": 2, + "height": 2} obstacles = [cone, cylinder] dbe.add_obstacles(obstacles) diff --git a/utils/plotter.py b/utils/plotter.py index 6286c0c..a351d71 100644 --- a/utils/plotter.py +++ b/utils/plotter.py @@ -6,14 +6,17 @@ def plotter(control_points): """Plots every point and lines between them. Used to visualize a road. - :param control_points: List of points as tuples [(x, y),...] + :param control_points: List of points as dict type. :return: Void. """ - control_points = np.asarray(control_points) - x = control_points[:, 0] - y = control_points[:, 1] - - plt.plot(x, y, '-og', markersize=10, linewidth=7) + point_list = [] + for point in control_points: + point_list.append((point.get("x"), point.get("y"))) + point_list = np.asarray(point_list) + x = point_list[:, 0] + y = point_list[:, 1] + + plt.plot(x, y, '-og', markersize=10, linewidth=control_points[0].get("width")) plt.xlim([min(x) - 0.3, max(x) + 0.3]) plt.ylim([min(y) - 0.3, max(y) + 0.3]) @@ -23,12 +26,12 @@ def plotter(control_points): def plot_all(population): """Plots a whole population. Method starts a new figure for every individual. - :param population: Population in form of [[control_points, fitness_value],...] + :param population: Population with individuals in dict form containing another dict type called control_points. :return: Void """ iterator = 0 while iterator < len(population): - plotter(population[iterator][0]) + plotter(population[iterator].get("control_points")) iterator += 1 diff --git a/utils/utility_functions.py b/utils/utility_functions.py index 1d1a5d7..f0be6e8 100644 --- a/utils/utility_functions.py +++ b/utils/utility_functions.py @@ -3,8 +3,8 @@ def convert_points_to_lines(control_points): """Turns a list of points into a list of LineStrings. - :param control_points: List of points - :return: List of LineStrings + :param control_points: List of dicts containing points. + :return: List of LineStrings. """ control_points_lines = [] iterator = 0 @@ -15,3 +15,15 @@ def convert_points_to_lines(control_points): control_points_lines.append(line) iterator += 1 return control_points_lines + +"""def convert_points_to_lines(control_points): + control_points_lines = [] + iterator = 0 + while iterator < (len(control_points) - 1): + p1 = (control_points[iterator].get("x"), control_points[iterator].get("y")) + p2 = (control_points[iterator + 1].get("x"), control_points[iterator + 1].get("y")) + line = LineString([p1, p2]) + control_points_lines.append(line) + iterator += 1 + return control_points_lines +""" diff --git a/utils/validity_checks.py b/utils/validity_checks.py index f9151ac..3517567 100644 --- a/utils/validity_checks.py +++ b/utils/validity_checks.py @@ -6,17 +6,18 @@ def intersection_check_last(control_points, point): """Checks for intersections between the line of the last two points and every other possible line. - :param control_points: List of control points as a tuple. + :param control_points: List of dicts containing points. :param point: Last inserted point, which should be checked for validity. :return: {@code True} if the last line intersects with another one, {@code False} if not. """ iterator = 0 while iterator <= (len(control_points) - 3): - p1 = (control_points[iterator][0], control_points[iterator][1]) - p2 = (control_points[iterator + 1][0], control_points[iterator + 1][1]) + p1 = (control_points[iterator].get("x"), control_points[iterator].get("y")) + p2 = (control_points[iterator + 1].get("x"), control_points[iterator + 1].get("y")) line1 = LineString([p1, p2]) - p3 = (control_points[-1][0], control_points[-1][1]) - line2 = LineString([p3, point]) + p3 = (control_points[-1].get("x"), control_points[-1].get("y")) + tmp_point = (point.get("x"), point.get("y")) + line2 = LineString([p3, tmp_point]) if line1.intersects(line2): return True iterator += 1 @@ -50,7 +51,7 @@ def intersection_check_width(width_lines, control_points_lines): def spline_intersection_check(control_points): """Checks for intersection of a splined list. New point must be already added to the list. - :param control_points: List of points as a tuple, containing the new added point. + :param control_points: List of dicts containing points. :return: {@code True} if the last line intersects with any other, {@code False} if not. """ iterator = 0 @@ -69,7 +70,28 @@ def spline_intersection_check(control_points): def intersection_check_all(control_points): """Checks for intersection between all lines of two connected control points. - :param control_points: List of points in tuple form. + :param control_points: List of dicts containing points. + :return: {@code True} if two lines intersects, {@code False} if not. + """ + iterator = 0 + while iterator < (len(control_points) - 1): + jterator = iterator + 2 + p1 = (control_points[iterator].get("x"), control_points[iterator].get("y")) + p2 = (control_points[iterator + 1].get("x"), control_points[iterator + 1].get("y")) + line1 = LineString([p1, p2]) + while jterator < (len(control_points) - 1): + p3 = (control_points[jterator].get("x"), control_points[jterator].get("y")) + p4 = (control_points[jterator + 1].get("x"), control_points[jterator + 1].get("y")) + line2 = LineString([p3, p4]) + if line1.intersects(line2): + return True + jterator += 1 + iterator += 1 + return False + +def intersection_check_all_np(control_points): + """Checks for intersection between all lines of two connected control points. + :param control_points: Numpy array containing points. :return: {@code True} if two lines intersects, {@code False} if not. """ iterator = 0 diff --git a/utils/xml_creator.py b/utils/xml_creator.py index eb40719..fee31b6 100644 --- a/utils/xml_creator.py +++ b/utils/xml_creator.py @@ -7,100 +7,92 @@ from utils.dbc_xml_builder import DBCBuilder -def build_environment_xml(control_points, file_name="exampleTest"): +def build_environment_xml(control_points, file_name="exampleTest", left_lanes=0, right_lanes=0, obstacles=[]): """Creates a dbe xml file. - :param control_points: List of control points as tuples. + :param control_points: List of dicts containing control points. :param file_name: Name of this dbe file. + :param left_lanes: Number of left lanes. + :param right_lanes: Number of right lanes. + :param obstacles: List of dicts containing obstacles. """ dbe = DBEBuilder() - dbe.add_lane(control_points) + dbe.add_lane(control_points, left_lanes=left_lanes, right_lanes=right_lanes) + if obstacles is not None and len(obstacles) > 0: + dbe.add_obstacles(obstacles) dbe.save_xml(file_name) -def build_criteria_xml(snd_point, car, file_name="exampleTest", env_name="exampleTest", - name="Example Test", fps="60", frequency="6", car_id="ego"): +def build_criteria_xml(participants: list, ego_car: dict, success_points: list, vc_pos, sc_speed, file_name: str ="exampleTest", + name: str ="Example Test", fps: str ="60", frequency: str ="6"): """Creates a dbc xml file. Failure, success and preconditions are controlled manually for this test generation since the road_generator creates simple lane following tests. - :param snd_point: Second point of the control point list, needed for precondition. - :param car: List of car states. See build_xml method for more details. - :param file_name: Name of this file. - :param env_name: Name of the environment file without extensions. + :param sc_speed: Speed condition that has to be met at vc_pos. + :param vc_pos: Position which must be entered at a specific speed by a specific participant. + :param participants: List of dicts of car states. See the add_car method in dbc_xml_builder.py for more + information. + :param ego_car: The test subject as dict. Contains the same information as any other participant. + :param success_points: List with points of success. Each one is a dict with x, y and tolerance. + :param file_name: Name of this dbc file. Should be the same as the environment file (laziness). :param name: Self defined description name of this file. - :param fps: frames per second + :param fps: Frames per second. :param frequency: Frequency of the AI to compute the next step. - :param car_id: Unique identifier for the participant car. :return: Void. """ dbc = DBCBuilder() dbc.define_name(name) - dbc.environment_name(env_name) + dbc.environment_name(file_name) dbc.steps_per_second(fps) dbc.ai_freq(frequency) - - dbc.add_car(car[0], car[1]) - vc_pos = [car_id, snd_point[0], snd_point[1], 4] - sc_speed = [car_id, 15] - + for participant in participants: + dbc.add_car(participant) + for success_point in success_points: + dbc.add_success_point(ego_car.get("id"), success_point) + dbc.add_failure_conditions(ego_car.get("id"), "offroad") dbc.add_precond_partic_sc_speed(vc_pos, sc_speed) - dbc.add_success_point([car_id, car[1][-1][0], car[1][-1][1], car[1][-1][2]]) - dbc.add_failure_conditions(car_id, "offroad") dbc.save_xml(file_name) -def _add_width(control_points, width): - """Adds the width parameter for the whole list of control points. - :param control_points: List of control points. - :return: List of control points with the width parameter added. - """ - new_list = [] - iterator = 0 - while iterator < len(control_points): - new_list.append([control_points[iterator][0], control_points[iterator][1], width]) - iterator += 1 - return new_list - - -def build_xml(control_points, name, width): +def build_xml(individual, iterator: int = 0): """Builds an environment and criteria xml file out of a list of control points. - :param width: Desired width of each road segment. - :param control_points: List of control points as tuples. - :param name: Name of this file. + :param individual: obstacles (list), number of right lanes (int), number of left lanes (int), + control points (list), file name (string), participants (list) + :param iterator: Unique index of a population. :return: Void. """ - temp_list = _add_width(control_points, width) - build_environment_xml(temp_list, name) - - init_state = [temp_list[0][0] + 3, temp_list[0][1], 0, "AUTONOMOUS", 50] - - waypoints = [] - - # Comment this block in to add waypoints for every point in the list, e.g. for beamng ai. - """ - waypoint = temp_list.pop(0) - waypoint = [waypoint[0] + 3, waypoint[1], 4, "AUTONOMOUS"] - waypoints.append(waypoint) - if len(temp_list) > 20: - for x in range(0, 5): - temp_list.pop(0) - for point in temp_list: - waypoint = [point[0], point[1], 4, "AUTONOMOUS"] - waypoints.append(waypoint) - """ - waypoints.append([temp_list[-1][0], temp_list[-1][1], 4, "AUTONOMOUS"]) - car = [init_state, waypoints] - build_criteria_xml(temp_list[0], car, name, name) - - -def build_all_xml(population, width, files_name="exampleXML"): + obstacles = individual.get("obstacles") + right_lanes = individual.get("right_lanes") + left_lanes = individual.get("left_lanes") + control_points = individual.get("control_points") + file_name = individual.get("file_name") + participants = individual.get("participants") + file_name = file_name + str(iterator) + success_point = {"x": control_points[-1].get("x"), + "y": control_points[-1].get("y"), + "tolerance": control_points[-1].get("width") / 2} + success_points = [success_point] + ego = None + for participant in participants: + if participant.get("id") is "ego": + ego = participant + break + vc_pos = {"id": ego.get("id"), + "tolerance": 3, + "x": control_points[1].get("x"), + "y": control_points[1].get("y")} + sc_speed = 10 + build_environment_xml(control_points=control_points, file_name=file_name, left_lanes=left_lanes, + right_lanes=right_lanes, obstacles=obstacles) + build_criteria_xml(participants=participants, ego_car=ego, success_points=success_points, + file_name=file_name, vc_pos=vc_pos, sc_speed=sc_speed) + + +def build_all_xml(population): """Calls the build_xml method for each individual. - :param width: Desired width of each road segment. :param population: List of individuals containing control points and a fitness value for each one. - :param files_name: Name of this file. :return: Void. """ iterator = 0 while iterator < len(population): - file_name = files_name + str(iterator) - build_xml(population[iterator][0], file_name, width) + build_xml(population[iterator], iterator) iterator += 1 diff --git a/utils/xml_to_bng_files.py b/utils/xml_to_bng_files.py index edaf9de..e96e24c 100644 --- a/utils/xml_to_bng_files.py +++ b/utils/xml_to_bng_files.py @@ -6,6 +6,8 @@ from subprocess import call from termcolor import colored import os +import shutil +from time import sleep def get_next_test(files_name): @@ -21,15 +23,36 @@ def get_next_test(files_name): return [matches[0], matches[1]] -def convert_test(files_name): - """Starts a test in DriveBuild to convert xml files to prefab and json files. +def add_prefab_files(): + pass + # TODO + + +def convert_test(dbc, dbe): + """Starts a test in DriveBuild to convert xml files to prefab, json and lua files. Moves them automatically to + the scenario folder in the BeamNG trunk folder. :return: Void. """ - next_test = get_next_test(files_name) - dbc = next_test[0] - dbe = next_test[1] service = AIExchangeService("localhost", 8383) service.run_tests("test", "test", Path(dbe), Path(dbc)) - print(colored("Starting DriveBuild to generate BeamNG files...", "blue")) - # Close BeamNG after converting - call("taskkill /f /im BeamNG.research.x64.exe", shell=True) + print(colored("Converting XML files to BNG files. Moving to scenarios folder...", "blue")) + + # Close BeamNG after converting. If you don't close it, BeamNG will load. + call("C:\\Windows\\System32\\taskkill.exe /f /im BeamNG.research.x64.exe", shell=True) + + # Change it to YOUR DriveBuild user path. + destination_path = "C:\\BeamNG.research_userpath\\drivebuild_*" + matches = glob(destination_path) + if len(matches) > 0: + latest_folder = max(matches, key=os.path.getmtime) + latest_folder = latest_folder + "\\levels\\drivebuild\\scenarios\\*" + matches = glob(latest_folder) + if len(matches) != 0: + latest_file = max(matches, key=os.path.getmtime) + elements = latest_file.split("\\") + filename = elements[-1].split(".")[0] + destination_path = latest_folder[:-1] + filename + "*" + matches = glob(destination_path) + for match in matches: + # Change it to YOUR DriveBuild scenario folder in the BNG trunk folder. + shutil.move(match, "D:\\Program Files (x86)\\BeamNG\\levels\\drivebuild\\scenarios")