Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added enum switch node #88

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions ros_bt_py/ros_bt_py/nodes/fallback.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,109 @@
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from typing import Optional, Dict
from rclpy.node import Node
from ros_bt_py_interfaces.msg import Node as NodeMsg
from ros_bt_py_interfaces.msg import UtilityBounds

from ros_bt_py.debug_manager import DebugManager
from ros_bt_py.node import FlowControl, define_bt_node
from ros_bt_py.node_config import NodeConfig

from ros_bt_py.ros_helpers import EnumValue, get_message_constant_fields
from ros_bt_py.exceptions import BehaviorTreeException

@define_bt_node(
NodeConfig(
version="0.1.0",
options={"ros_message_type": type},
inputs={"case": int},
outputs={},
max_children=None,
)
)
class EnumSwitch(FlowControl):
def __init__(
self,
options: Optional[Dict] = None,
debug_manager: Optional[DebugManager] = None,
name: Optional[str] = None,
ros_node: Optional[Node] = None,
succeed_always: bool = False,
simulate_tick: bool = False,
):
super(EnumSwitch, self).__init__(
options=options,
debug_manager=debug_manager,
name=name,
ros_node=ros_node,
succeed_always=succeed_always,
simulate_tick=simulate_tick,
)

self.possible_children = get_message_constant_fields(
self.options["ros_message_type"]
)

if not self.possible_children:
raise BehaviorTreeException(
"%s has no constant fields" % (self.options["ros_message_type"])
)

self.msg = self.options["ros_message_type"]()

self.pchild_dict = dict(
zip(
[getattr(self.msg, entry) for entry in self.possible_children],
self.possible_children,
)
)

def _do_setup(self):
self.child_map = {child.name.split(".")[-1]: child for child in self.children}
for child in self.children:
if child.name.split(".")[-1] not in self.possible_children:
self.logerr("Unwanted child detected please fix name")
child.setup()

def _do_tick(self):
name = self.inputs["case"]

if name in self.pchild_dict.keys():
e_name = self.pchild_dict[name]
else:
self.logwarn("Input did not match possible children.")
return NodeMsg.FAILED

if e_name not in self.child_map:
self.logwarn("Ticking without children. Is this really what you want?")
return NodeMsg.FAILED

# If we've previously succeeded or failed, untick all children
if self.state in [NodeMsg.SUCCEEDED, NodeMsg.FAILED]:
for child in self.children:
child.reset()

for child_name, child in self.child_map.items():
if not child_name == e_name:
child.untick()

return self.child_map[e_name].tick()

def _do_untick(self):
for child in self.children:
child.untick()
return NodeMsg.IDLE

def _do_reset(self):
for child in self.children:
child.reset()
return NodeMsg.IDLE

def _do_shutdown(self):
for child in self.children:
child.shutdown()


@define_bt_node(
NodeConfig(
Expand Down
Loading