From 1c174b0c98a7a1684a683f12965d9c633ae50cd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Twoje=20Imi=C4=99=20Nazwisko?= Date: Thu, 20 Mar 2025 11:11:42 +0100 Subject: [PATCH] Link.py plik --- link.py | 429 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 429 insertions(+) create mode 100644 link.py diff --git a/link.py b/link.py new file mode 100644 index 0000000..8799c19 --- /dev/null +++ b/link.py @@ -0,0 +1,429 @@ +import argparse +import os +import subprocess +import sys +from pyroute2 import IPRoute, NetNS + +class NetworkNamespaceManager: + """ + Provides functionalities to manage network namespaces. + + Network namespaces partition network resources such as network links, + IP addresses, and port numbers into disjoint sets. + """ + + @staticmethod + def list_netns(): + """ + List all available network namespaces. + + Scans the '/var/run/netns' directory and prints out all + the network namespace files present. If no namespaces are found, it + prints a message indicating that. + """ + netns_dir = "/var/run/netns" + try: + netns_files = os.listdir(netns_dir) + if netns_files: + print("List of available network namespaces:") + for netns_file in netns_files: + print(f"- {netns_file}") + else: + print("No network namespaces found.") + except OSError as e: + print(f"Error listing network namespaces: {e}") + +class ContainerNetnsExposer: + """ + Responsible for exposing network namespaces of containers. + + Supports Docker, LXC, LXD, and Incus containers, allowing the user to interact + with their network namespaces. + """ + + def __init__(self): + """ + Initializes the ContainerNetnsExposer instance. + """ + self.netns_pid = None + self.netns_path = None + + def expose_container_netns(self, container_id_or_name, container_type='docker'): + """ + Expose the network namespace of a specified container. + + Parameters: + container_id_or_name (str): The identifier or name of the container. + container_type (str): The type of the container ('docker', 'lxc', 'lxd', or 'incus'). + + Returns: + str: The PID of the container as a string. + + Raises: + SystemExit: If the container type is unsupported or if there is an error + in retrieving the container's PID. + """ + if container_type == 'docker': + self.get_docker_container_pid(container_id_or_name) + elif container_type in ('lxc', 'lxd', 'incus'): + self.get_lxc_container_pid(container_id_or_name, container_type) + else: + print("Unsupported container type. Only 'docker', 'lxc', 'lxd', and 'incus' are supported.") + sys.exit(1) + + self.create_netns_directory() + self.create_or_remove_netns_symlink() + return str(self.netns_pid) + + def get_docker_container_pid(self, container_id_or_name): + """ + Retrieve the PID of a Docker container. + + Parameters: + container_id_or_name (str): The identifier or name of the Docker container. + + Raises: + SystemExit: If there is an error in retrieving the Docker container's PID. + """ + try: + self.netns_pid = subprocess.check_output( + ["sudo", "docker", "inspect", "-f", "{{.State.Pid}}", container_id_or_name], + universal_newlines=True + ).strip() + except subprocess.CalledProcessError: + print("Error retrieving Docker container PID. Make sure the container exists and is running.") + sys.exit(1) + + def get_lxc_container_pid(self, container_name, container_type='lxc'): + """ + Retrieve the PID of an LXC, LXD, or Incus container. + + Parameters: + container_name (str): The name of the container. + container_type (str): The type of the container ('lxc', 'lxd', or 'incus'). + + Raises: + SystemExit: If there is an error in retrieving the container's PID. + """ + try: + if container_type == 'lxc': + output = subprocess.check_output( + ["lxc-info", "-n", container_name, "-p"], + universal_newlines=True + ) + self.netns_pid = output.strip().split()[-1] + elif container_type == 'lxd': + output = subprocess.check_output( + ["sudo", "lxc", "info", container_name], + universal_newlines=True + ) + for line in output.splitlines(): + if line.strip().startswith("PID:"): + self.netns_pid = line.split(':')[1].strip() + break + else: + print(f"PID not found in 'lxc info' output for container '{container_name}'.") + sys.exit(1) + elif container_type == 'incus': + output = subprocess.check_output( + ["sudo", "incus", "info", container_name], + universal_newlines=True + ) + for line in output.splitlines(): + if line.strip().startswith("PID:"): + self.netns_pid = line.split(':')[1].strip() + break + else: + print(f"PID not found in 'incus info' output for container '{container_name}'.") + sys.exit(1) + except subprocess.CalledProcessError as e: + print(f"Error retrieving {container_type.upper()} container PID for '{container_name}'. Error: {e}") + sys.exit(1) + + def create_netns_directory(self): + """ + Create the network namespace directory if it does not exist. + + Ensures that the directory '/var/run/netns' exists, which is used + to store network namespace symlinks. + """ + try: + subprocess.run(["sudo", "mkdir", "-p", "/var/run/netns"], check=True) + except subprocess.CalledProcessError as e: + print(f"Error creating network namespace directory: {e.stderr.strip()}") + sys.exit(1) + + def create_or_remove_netns_symlink(self): + """ + Create or remove a symlink to the network namespace of a container. + + Sets up a symlink in '/var/run/netns', pointing to the network namespace + of the container identified by its PID. If a symlink with the same name + already exists, it is removed before creating a new one. + """ + self.netns_path = f"/var/run/netns/{self.netns_pid}" + + try: + if os.path.exists(self.netns_path): + subprocess.run(["sudo", "rm", "-rf", self.netns_path], check=True) + subprocess.run(["sudo", "ln", "-s", f"/proc/{self.netns_pid}/ns/net", self.netns_path], check=True) + except subprocess.CalledProcessError as e: + print(f"Error creating or removing symlink: {e}") + sys.exit(1) + +class IfaceManager: + """ + Manages network interfaces, including creation, deletion, and configuration + of veth pairs, VLANs, and bridges. + """ + + def delete(self, iface_name, namespace=None): + """ + Delete a network interface. + + Parameters: + iface_name (str): The name of the interface to delete. + namespace (str, optional): The network namespace where the interface exists. + + Raises: + Exception: If the interface cannot be deleted. + """ + try: + if namespace: + context_manager = NetNS(namespace) + else: + context_manager = IPRoute() + + with context_manager as ns: + iface_idx_list = ns.link_lookup(ifname=iface_name) + if not iface_idx_list: + raise ValueError(f"Interface {iface_name} not found.") + iface_idx = iface_idx_list[0] + ns.link('del', index=iface_idx) + except Exception as e: + print(f"Error deleting interface {iface_name} in namespace {namespace}: {e}") + + def create_veth(self, iface1, iface2, namespace=None): + """ + Create a veth pair. + + Parameters: + iface1 (str): The name of the first interface. + iface2 (str): The name of the second interface. + namespace (str, optional): The network namespace where to create the veth pair. + + Raises: + Exception: If the veth pair cannot be created. + """ + try: + if namespace: + context_manager = NetNS(namespace) + else: + context_manager = IPRoute() + + with context_manager as ns: + ns.link('add', ifname=iface1, peer={'ifname': iface2}, kind='veth') + except Exception as e: + print(f"Error creating veth pair {iface1} and {iface2}: {e}") + + def create_vlan(self, base_iface, vlan_id, namespace=None): + """ + Create a VLAN interface. + + Parameters: + base_iface (str): The base interface name. + vlan_id (int): The VLAN ID. + namespace (str, optional): The network namespace where to create the VLAN interface. + + Raises: + Exception: If the VLAN interface cannot be created. + """ + try: + if namespace: + context_manager = NetNS(namespace) + else: + context_manager = IPRoute() + + with context_manager as ns: + base_iface_idx_list = ns.link_lookup(ifname=base_iface) + if not base_iface_idx_list: + raise ValueError(f"Base interface {base_iface} not found.") + base_iface_idx = base_iface_idx_list[0] + + vlan_iface = f"{base_iface}.{vlan_id}" + ns.link('add', ifname=vlan_iface, link=base_iface_idx, kind='vlan', vlan_info={'id': vlan_id}) + except Exception as e: + print(f"Error creating VLAN on interface {base_iface}: {e}") + + def create_bridge(self, bridge_name, namespace=None): + """ + Create a bridge interface. + + Parameters: + bridge_name (str): The name of the bridge. + namespace (str, optional): The network namespace where to create the bridge. + + Raises: + Exception: If the bridge cannot be created. + """ + try: + if namespace: + context_manager = NetNS(namespace) + else: + context_manager = IPRoute() + + with context_manager as ns: + ns.link('add', ifname=bridge_name, kind='bridge') + except Exception as e: + print(f"Error creating bridge {bridge_name}: {e}") + + def move(self, iface, namespace): + """ + Move a network interface to another namespace. + + Parameters: + iface (str): The interface name to move. + namespace (str): The target network namespace. + + Raises: + Exception: If the interface cannot be moved. + """ + try: + ipr = IPRoute() + idx_list = ipr.link_lookup(ifname=iface) + if not idx_list: + raise ValueError(f"Interface {iface} not found.") + idx = idx_list[0] + ipr.link('set', index=idx, net_ns_fd=namespace) + except Exception as e: + print(f"Error moving interface {iface} to namespace {namespace}: {e}") + + def set_interface_up(self, iface_name, namespace=None): + """ + Set a network interface up. + + Parameters: + iface_name (str): The name of the interface. + namespace (str, optional): The network namespace where the interface exists. + + Raises: + Exception: If the interface cannot be set up. + """ + try: + if namespace: + context_manager = NetNS(namespace) + else: + context_manager = IPRoute() + + with context_manager as ns: + iface_idx_list = ns.link_lookup(ifname=iface_name) + if not iface_idx_list: + raise ValueError(f"Interface {iface_name} not found.") + iface_idx = iface_idx_list[0] + ns.link("set", index=iface_idx, state="up") + except Exception as e: + print(f"Error setting up interface {iface_name} in namespace {namespace}: {e}") + + def attach_to_bridge(self, iface_name, bridge_name, namespace=None): + """ + Attach an interface to a bridge. + + Parameters: + iface_name (str): The name of the interface. + bridge_name (str): The name of the bridge. + namespace (str, optional): The network namespace where the interface and bridge exist. + + Raises: + Exception: If the interface cannot be attached to the bridge. + """ + try: + if namespace: + context_manager = NetNS(namespace) + else: + context_manager = IPRoute() + + with context_manager as ns: + iface_idx_list = ns.link_lookup(ifname=iface_name) + if not iface_idx_list: + raise ValueError(f"Interface {iface_name} not found.") + iface_idx = iface_idx_list[0] + + bridge_idx_list = ns.link_lookup(ifname=bridge_name) + if not bridge_idx_list: + raise ValueError(f"Bridge {bridge_name} not found.") + bridge_idx = bridge_idx_list[0] + + ns.link("set", index=iface_idx, master=bridge_idx) + except Exception as e: + print(f"Error attaching interface {iface_name} to bridge {bridge_name} in namespace {namespace}: {e}") + +def interpret_namespace(namespace_arg): + """ + Interpret the namespace argument. + + If the argument is '1', converts it to None (representing the default namespace), + otherwise returns the argument as is. + + Parameters: + namespace_arg (str): The namespace argument. + + Returns: + str or None: The interpreted namespace. + """ + return None if namespace_arg == '1' else namespace_arg + +def main(): + parser = argparse.ArgumentParser(description="Create veth pairs between containers with optional bridge attachment.") + parser.add_argument("-ns1", "--namespace1", default=None, help="Name of the first namespace or container, or '1' for the default namespace.") + parser.add_argument("-ns2", "--namespace2", default=None, help="Name of the second namespace or container, or '1' for the default namespace.") + parser.add_argument("-n1", "--name1", required=True, help="Name of the first veth interface.") + parser.add_argument("-n2", "--name2", required=True, help="Name of the second veth interface.") + parser.add_argument("-b1", "--bridge1", default=None, help="Name of the network bridge for ns1.") + parser.add_argument("-b2", "--bridge2", default=None, help="Name of the network bridge for ns2.") + parser.add_argument("-t1", "--type1", default=None, help="Container type for ns1 ('docker', 'lxc', 'lxd', 'incus', or 'None' for the default namespace).") + parser.add_argument("-t2", "--type2", default=None, help="Container type for ns2 ('docker', 'lxc', 'lxd', 'incus', or 'None' for the default namespace).") + + args = parser.parse_args() + + # Processing namespace arguments and container types + ns1, type1 = interpret_namespace(args.namespace1), args.type1 + ns2, type2 = interpret_namespace(args.namespace2), args.type2 + + # Instantiate management classes + iface_manager = IfaceManager() + container_exposer = ContainerNetnsExposer() + + # Expose container network namespaces if applicable + if type1 and ns1: + ns1_pid = container_exposer.expose_container_netns(ns1, type1) + ns1 = ns1_pid + if type2 and ns2: + ns2_pid = container_exposer.expose_container_netns(ns2, type2) + ns2 = ns2_pid + + # Create veth pair + iface_manager.create_veth(args.name1, args.name2) + + # Move ends of the veth pair to appropriate namespaces if required + if ns1: + iface_manager.move(args.name1, ns1) + if ns2: + iface_manager.move(args.name2, ns2) + + # Optional: Attach to network bridge and bring interfaces up + if args.bridge1 and args.name1: + iface_manager.attach_to_bridge(args.name1, args.bridge1, ns1) + iface_manager.set_interface_up(args.name1, ns1) + if args.bridge2 and args.name2: + iface_manager.attach_to_bridge(args.name2, args.bridge2, ns2) + iface_manager.set_interface_up(args.name2, ns2) + + # Bring up interfaces if not already up + if not args.bridge1 and args.name1: + iface_manager.set_interface_up(args.name1, ns1) + if not args.bridge2 and args.name2: + iface_manager.set_interface_up(args.name2, ns2) + +if __name__ == "__main__": + main() +