import logging
import os
import shutil
from itertools import chain
from .helpers import (TEMPLATE_DIR, TEMPORARY_OUTPUT_DIR, Params,
render_to_template, run_sys_cmd)
[docs]def _build_packages_with_colcon(dev_ws, packages_list):
"""Generate a Vivado block design
Parameters
----------
dev_ws: str
Workspace of ROS2 packages
packages_list: List[str]
Names of ROS2 package to build
Returns
-------
None
"""
run_sys_cmd(
["colcon build --packages-select " + " ".join(packages_list)],
cwd=dev_ws,
)
[docs]class MessagePackage:
"""Class to create a package of ROS2 custom message"""
[docs] def _get_msg_files(self, ip):
"""Get ROS2 message file names
Parameters
----------
ip: List
Returns
-------
msg_files: List[str]
List of ROS2 message file names
"""
msg_files = []
for map_num in range(1, len(ip) + 1):
msg_files.append(os.path.join("msg", f"FpgaIn{map_num}.msg"))
msg_files.append(os.path.join("msg", f"FpgaOut{map_num}.msg"))
return msg_files
[docs] def _process_msg_file(self, filename, io, io_type):
"""Create the contents of custom messages
Parameters
----------
filename: str
io: List
io_type: List
Returns
-------
None
"""
ros2_type = ""
for i in range(len(io)):
signal_name = io[i]
signal_type = io_type[i]
ros2_type += f"{signal_type} {signal_name}\n"
f = open(
os.path.join(TEMPORARY_OUTPUT_DIR, "%s-int.msg" % filename),
"w",
)
f.write(ros2_type)
f.close()
[docs] def _create_msg_file(self, params):
"""Create custom messages
Parameters
----------
params: Params
Returns
-------
None
"""
msg_dir = os.path.join(params.dev_ws, "src", params.project, "msg")
if not os.path.exists(msg_dir):
os.makedirs(msg_dir)
for map_num, io_map in enumerate(params.io_maps, 1):
fpga_in_msg = f"FpgaIn{map_num}"
fpga_out_msg = f"FpgaOut{map_num}"
self._process_msg_file(fpga_in_msg, io_map.input, io_map.input_type)
self._process_msg_file(fpga_out_msg, io_map.output, io_map.output_type)
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, f"{fpga_in_msg}-int.msg"),
os.path.join(msg_dir, fpga_in_msg + ".msg"),
)
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, f"{fpga_out_msg}-int.msg"),
os.path.join(msg_dir, fpga_out_msg + ".msg"),
)
[docs] def _render(self, params):
"""Render custom message packages
Parameters
----------
params: Params
Returns
-------
None
"""
if not os.path.exists(TEMPORARY_OUTPUT_DIR):
os.makedirs(TEMPORARY_OUTPUT_DIR)
render_to_template(
"package-int.xml.jinja2",
os.path.join(TEMPORARY_OUTPUT_DIR, "package-int.xml"),
params,
)
render_to_template(
"CMakeLists-int.txt.jinja2",
os.path.join(TEMPORARY_OUTPUT_DIR, "CMakeLists-int.txt"),
params,
)
[docs] def _create(self, params):
"""Create a custom message package
Parameters
----------
params: Params
Returns
-------
None
"""
src_dir = os.path.join(params.dev_ws, "src")
if not os.path.exists(src_dir):
os.makedirs(src_dir)
run_sys_cmd(
["ros2 pkg create --build-type ament_cmake " + params.project],
cwd=src_dir,
)
self._render(params)
self._create_msg_file(params)
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, "CMakeLists-int.txt"),
os.path.join(params.dev_ws, "src", params.project, "CMakeLists.txt"),
)
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, "package-int.xml"),
os.path.join(params.dev_ws, "src", params.project, "package.xml"),
)
[docs]class NodePackage:
"""Class to create a packages of ROS2-FPGA nodes"""
[docs] def _get_all_ip_names(self, io_maps):
"""Get IP core names with suffix
Parameters
----------
io_maps: List
Returns
-------
ip_names: List
IP core names
"""
ip_names = []
for i in range(len(io_maps)):
name = io_maps[i].ip[0]
count = io_maps[i].count[0]
ip = [f"{name}_{x}" for x in range(count)]
ip_names.append(ip)
return ip_names
[docs] def _get_ip_msg_table(self, ip_names_2d):
"""Table containing IP core names and ROS2 message file suffix
Parameters
----------
ip_names_2d: List
Returns
-------
table: Dict
"""
table = {}
for i in range(1, len(ip_names_2d) + 1):
ip_names = ip_names_2d[i - 1]
for ip_name in ip_names:
table[ip_name] = i
return table
[docs] def _render(self, params):
"""Render ROS2-FPGA nodes packages
Parameters
----------
params: Params
Returns
-------
None
"""
if not os.path.exists(TEMPORARY_OUTPUT_DIR):
os.makedirs(TEMPORARY_OUTPUT_DIR)
render_to_template(
"package-node.xml.jinja2",
os.path.join(TEMPORARY_OUTPUT_DIR, "package-node.xml"),
params,
)
render_to_template(
"setup.py.jinja2",
os.path.join(TEMPORARY_OUTPUT_DIR, "setup-node.py"),
params,
)
render_to_template(
"fpga_node.py.jinja2",
os.path.join(TEMPORARY_OUTPUT_DIR, "fpga_node-node.py"),
params,
)
render_to_template(
"fpga_node_launch.py.jinja2",
os.path.join(TEMPORARY_OUTPUT_DIR, "fpga_node_launch.py"),
params,
)
if not params.test_enabled:
return
render_to_template(
"talker.py.jinja2",
os.path.join(TEMPORARY_OUTPUT_DIR, "talker-node.py"),
params,
)
render_to_template(
"talker_launch.py.jinja2",
os.path.join(TEMPORARY_OUTPUT_DIR, "talker_launch.py"),
params,
)
render_to_template(
"listener.py.jinja2",
os.path.join(TEMPORARY_OUTPUT_DIR, "listener-node.py"),
params,
)
render_to_template(
"listener_launch.py.jinja2",
os.path.join(TEMPORARY_OUTPUT_DIR, "listener_launch.py"),
params,
)
[docs] def _create(self, params):
"""Create a ROS2-FPGA nodes package
Parameters
----------
params: Params
Returns
-------
None
"""
src_dir = os.path.join(params.dev_ws, "src")
if not os.path.exists(src_dir):
os.makedirs(src_dir)
run_sys_cmd(
["ros2 pkg create --build-type ament_python " + params.project],
cwd=src_dir,
)
self._render(params)
# Copy modified node package.xml
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, "package-node.xml"),
os.path.join(params.dev_ws, "src", params.project, "package.xml"),
)
# Copy modified node setup.py
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, "setup-node.py"),
os.path.join(params.dev_ws, "src", params.project, "setup.py"),
)
# Copy modified node fpga_node.py
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, "fpga_node-node.py"),
os.path.join(
params.dev_ws,
"src",
params.project,
params.project,
"fpga_node.py",
),
)
# Copy modified node ros_fpga_lib.py
shutil.copy(
os.path.join(TEMPLATE_DIR, "pynq_driver.py"),
os.path.join(
params.dev_ws,
"src",
params.project,
params.project,
"pynq_driver.py",
),
)
shutil.copy(
os.path.join(TEMPLATE_DIR, "io_maps.py"),
os.path.join(
params.dev_ws,
"src",
params.project,
params.project,
"io_maps.py",
),
)
# Copy modified node fpga_node_launch.py
launch_dir = os.path.join(params.dev_ws, "src", params.project, "launch")
if not os.path.exists(launch_dir):
os.makedirs(launch_dir)
launch_file_name = "fpga_node_launch.py"
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, launch_file_name),
os.path.join(launch_dir, launch_file_name),
)
if not params.test_enabled:
return
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, "talker-node.py"),
os.path.join(
params.dev_ws,
"src",
params.project,
params.project,
"talker.py",
),
)
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, "listener-node.py"),
os.path.join(
params.dev_ws,
"src",
params.project,
params.project,
"listener.py",
),
)
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, "talker_launch.py"),
os.path.join(launch_dir, "talker_launch.py"),
)
shutil.copy(
os.path.join(TEMPORARY_OUTPUT_DIR, "listener_launch.py"),
os.path.join(launch_dir, "listener_launch.py"),
)
[docs]def generate_packages(args):
"""Generate ROS2 package
Parameters
----------
args: argparse.Namespace
Returns
-------
None
"""
logger = logging.getLogger("meta-FOrEST")
message_package = MessagePackage()
node_package = NodePackage()
message_package_params = message_package._configure_params(args)
node_package_params = node_package._configure_params(args)
logger.info("Generating the ROS2 package for the FPGA node messages")
message_package._create(message_package_params)
logger.info("Generating the ROS2 package for the FPGA node")
node_package._create(node_package_params)
logger.info("Building the ROS2 packages")
if args.step_to == "build":
_build_packages_with_colcon(
args.workspace,
[message_package_params.project, node_package_params.project],
)