Link.py plik
This commit is contained in:
commit
1c174b0c98
429
link.py
Normal file
429
link.py
Normal file
@ -0,0 +1,429 @@
|
||||
import argparse
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from pyroute2 import IPRoute, NetNS
|
||||
|
||||
class NetworkNamespaceManager:
|
||||
"""
|
||||
Provides functionalities to manage network namespaces.
|
||||
|
||||
Network namespaces partition network resources such as network links,
|
||||
IP addresses, and port numbers into disjoint sets.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def list_netns():
|
||||
"""
|
||||
List all available network namespaces.
|
||||
|
||||
Scans the '/var/run/netns' directory and prints out all
|
||||
the network namespace files present. If no namespaces are found, it
|
||||
prints a message indicating that.
|
||||
"""
|
||||
netns_dir = "/var/run/netns"
|
||||
try:
|
||||
netns_files = os.listdir(netns_dir)
|
||||
if netns_files:
|
||||
print("List of available network namespaces:")
|
||||
for netns_file in netns_files:
|
||||
print(f"- {netns_file}")
|
||||
else:
|
||||
print("No network namespaces found.")
|
||||
except OSError as e:
|
||||
print(f"Error listing network namespaces: {e}")
|
||||
|
||||
class ContainerNetnsExposer:
|
||||
"""
|
||||
Responsible for exposing network namespaces of containers.
|
||||
|
||||
Supports Docker, LXC, LXD, and Incus containers, allowing the user to interact
|
||||
with their network namespaces.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initializes the ContainerNetnsExposer instance.
|
||||
"""
|
||||
self.netns_pid = None
|
||||
self.netns_path = None
|
||||
|
||||
def expose_container_netns(self, container_id_or_name, container_type='docker'):
|
||||
"""
|
||||
Expose the network namespace of a specified container.
|
||||
|
||||
Parameters:
|
||||
container_id_or_name (str): The identifier or name of the container.
|
||||
container_type (str): The type of the container ('docker', 'lxc', 'lxd', or 'incus').
|
||||
|
||||
Returns:
|
||||
str: The PID of the container as a string.
|
||||
|
||||
Raises:
|
||||
SystemExit: If the container type is unsupported or if there is an error
|
||||
in retrieving the container's PID.
|
||||
"""
|
||||
if container_type == 'docker':
|
||||
self.get_docker_container_pid(container_id_or_name)
|
||||
elif container_type in ('lxc', 'lxd', 'incus'):
|
||||
self.get_lxc_container_pid(container_id_or_name, container_type)
|
||||
else:
|
||||
print("Unsupported container type. Only 'docker', 'lxc', 'lxd', and 'incus' are supported.")
|
||||
sys.exit(1)
|
||||
|
||||
self.create_netns_directory()
|
||||
self.create_or_remove_netns_symlink()
|
||||
return str(self.netns_pid)
|
||||
|
||||
def get_docker_container_pid(self, container_id_or_name):
|
||||
"""
|
||||
Retrieve the PID of a Docker container.
|
||||
|
||||
Parameters:
|
||||
container_id_or_name (str): The identifier or name of the Docker container.
|
||||
|
||||
Raises:
|
||||
SystemExit: If there is an error in retrieving the Docker container's PID.
|
||||
"""
|
||||
try:
|
||||
self.netns_pid = subprocess.check_output(
|
||||
["sudo", "docker", "inspect", "-f", "{{.State.Pid}}", container_id_or_name],
|
||||
universal_newlines=True
|
||||
).strip()
|
||||
except subprocess.CalledProcessError:
|
||||
print("Error retrieving Docker container PID. Make sure the container exists and is running.")
|
||||
sys.exit(1)
|
||||
|
||||
def get_lxc_container_pid(self, container_name, container_type='lxc'):
|
||||
"""
|
||||
Retrieve the PID of an LXC, LXD, or Incus container.
|
||||
|
||||
Parameters:
|
||||
container_name (str): The name of the container.
|
||||
container_type (str): The type of the container ('lxc', 'lxd', or 'incus').
|
||||
|
||||
Raises:
|
||||
SystemExit: If there is an error in retrieving the container's PID.
|
||||
"""
|
||||
try:
|
||||
if container_type == 'lxc':
|
||||
output = subprocess.check_output(
|
||||
["lxc-info", "-n", container_name, "-p"],
|
||||
universal_newlines=True
|
||||
)
|
||||
self.netns_pid = output.strip().split()[-1]
|
||||
elif container_type == 'lxd':
|
||||
output = subprocess.check_output(
|
||||
["sudo", "lxc", "info", container_name],
|
||||
universal_newlines=True
|
||||
)
|
||||
for line in output.splitlines():
|
||||
if line.strip().startswith("PID:"):
|
||||
self.netns_pid = line.split(':')[1].strip()
|
||||
break
|
||||
else:
|
||||
print(f"PID not found in 'lxc info' output for container '{container_name}'.")
|
||||
sys.exit(1)
|
||||
elif container_type == 'incus':
|
||||
output = subprocess.check_output(
|
||||
["sudo", "incus", "info", container_name],
|
||||
universal_newlines=True
|
||||
)
|
||||
for line in output.splitlines():
|
||||
if line.strip().startswith("PID:"):
|
||||
self.netns_pid = line.split(':')[1].strip()
|
||||
break
|
||||
else:
|
||||
print(f"PID not found in 'incus info' output for container '{container_name}'.")
|
||||
sys.exit(1)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error retrieving {container_type.upper()} container PID for '{container_name}'. Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def create_netns_directory(self):
|
||||
"""
|
||||
Create the network namespace directory if it does not exist.
|
||||
|
||||
Ensures that the directory '/var/run/netns' exists, which is used
|
||||
to store network namespace symlinks.
|
||||
"""
|
||||
try:
|
||||
subprocess.run(["sudo", "mkdir", "-p", "/var/run/netns"], check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error creating network namespace directory: {e.stderr.strip()}")
|
||||
sys.exit(1)
|
||||
|
||||
def create_or_remove_netns_symlink(self):
|
||||
"""
|
||||
Create or remove a symlink to the network namespace of a container.
|
||||
|
||||
Sets up a symlink in '/var/run/netns', pointing to the network namespace
|
||||
of the container identified by its PID. If a symlink with the same name
|
||||
already exists, it is removed before creating a new one.
|
||||
"""
|
||||
self.netns_path = f"/var/run/netns/{self.netns_pid}"
|
||||
|
||||
try:
|
||||
if os.path.exists(self.netns_path):
|
||||
subprocess.run(["sudo", "rm", "-rf", self.netns_path], check=True)
|
||||
subprocess.run(["sudo", "ln", "-s", f"/proc/{self.netns_pid}/ns/net", self.netns_path], check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error creating or removing symlink: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
class IfaceManager:
|
||||
"""
|
||||
Manages network interfaces, including creation, deletion, and configuration
|
||||
of veth pairs, VLANs, and bridges.
|
||||
"""
|
||||
|
||||
def delete(self, iface_name, namespace=None):
|
||||
"""
|
||||
Delete a network interface.
|
||||
|
||||
Parameters:
|
||||
iface_name (str): The name of the interface to delete.
|
||||
namespace (str, optional): The network namespace where the interface exists.
|
||||
|
||||
Raises:
|
||||
Exception: If the interface cannot be deleted.
|
||||
"""
|
||||
try:
|
||||
if namespace:
|
||||
context_manager = NetNS(namespace)
|
||||
else:
|
||||
context_manager = IPRoute()
|
||||
|
||||
with context_manager as ns:
|
||||
iface_idx_list = ns.link_lookup(ifname=iface_name)
|
||||
if not iface_idx_list:
|
||||
raise ValueError(f"Interface {iface_name} not found.")
|
||||
iface_idx = iface_idx_list[0]
|
||||
ns.link('del', index=iface_idx)
|
||||
except Exception as e:
|
||||
print(f"Error deleting interface {iface_name} in namespace {namespace}: {e}")
|
||||
|
||||
def create_veth(self, iface1, iface2, namespace=None):
|
||||
"""
|
||||
Create a veth pair.
|
||||
|
||||
Parameters:
|
||||
iface1 (str): The name of the first interface.
|
||||
iface2 (str): The name of the second interface.
|
||||
namespace (str, optional): The network namespace where to create the veth pair.
|
||||
|
||||
Raises:
|
||||
Exception: If the veth pair cannot be created.
|
||||
"""
|
||||
try:
|
||||
if namespace:
|
||||
context_manager = NetNS(namespace)
|
||||
else:
|
||||
context_manager = IPRoute()
|
||||
|
||||
with context_manager as ns:
|
||||
ns.link('add', ifname=iface1, peer={'ifname': iface2}, kind='veth')
|
||||
except Exception as e:
|
||||
print(f"Error creating veth pair {iface1} and {iface2}: {e}")
|
||||
|
||||
def create_vlan(self, base_iface, vlan_id, namespace=None):
|
||||
"""
|
||||
Create a VLAN interface.
|
||||
|
||||
Parameters:
|
||||
base_iface (str): The base interface name.
|
||||
vlan_id (int): The VLAN ID.
|
||||
namespace (str, optional): The network namespace where to create the VLAN interface.
|
||||
|
||||
Raises:
|
||||
Exception: If the VLAN interface cannot be created.
|
||||
"""
|
||||
try:
|
||||
if namespace:
|
||||
context_manager = NetNS(namespace)
|
||||
else:
|
||||
context_manager = IPRoute()
|
||||
|
||||
with context_manager as ns:
|
||||
base_iface_idx_list = ns.link_lookup(ifname=base_iface)
|
||||
if not base_iface_idx_list:
|
||||
raise ValueError(f"Base interface {base_iface} not found.")
|
||||
base_iface_idx = base_iface_idx_list[0]
|
||||
|
||||
vlan_iface = f"{base_iface}.{vlan_id}"
|
||||
ns.link('add', ifname=vlan_iface, link=base_iface_idx, kind='vlan', vlan_info={'id': vlan_id})
|
||||
except Exception as e:
|
||||
print(f"Error creating VLAN on interface {base_iface}: {e}")
|
||||
|
||||
def create_bridge(self, bridge_name, namespace=None):
|
||||
"""
|
||||
Create a bridge interface.
|
||||
|
||||
Parameters:
|
||||
bridge_name (str): The name of the bridge.
|
||||
namespace (str, optional): The network namespace where to create the bridge.
|
||||
|
||||
Raises:
|
||||
Exception: If the bridge cannot be created.
|
||||
"""
|
||||
try:
|
||||
if namespace:
|
||||
context_manager = NetNS(namespace)
|
||||
else:
|
||||
context_manager = IPRoute()
|
||||
|
||||
with context_manager as ns:
|
||||
ns.link('add', ifname=bridge_name, kind='bridge')
|
||||
except Exception as e:
|
||||
print(f"Error creating bridge {bridge_name}: {e}")
|
||||
|
||||
def move(self, iface, namespace):
|
||||
"""
|
||||
Move a network interface to another namespace.
|
||||
|
||||
Parameters:
|
||||
iface (str): The interface name to move.
|
||||
namespace (str): The target network namespace.
|
||||
|
||||
Raises:
|
||||
Exception: If the interface cannot be moved.
|
||||
"""
|
||||
try:
|
||||
ipr = IPRoute()
|
||||
idx_list = ipr.link_lookup(ifname=iface)
|
||||
if not idx_list:
|
||||
raise ValueError(f"Interface {iface} not found.")
|
||||
idx = idx_list[0]
|
||||
ipr.link('set', index=idx, net_ns_fd=namespace)
|
||||
except Exception as e:
|
||||
print(f"Error moving interface {iface} to namespace {namespace}: {e}")
|
||||
|
||||
def set_interface_up(self, iface_name, namespace=None):
|
||||
"""
|
||||
Set a network interface up.
|
||||
|
||||
Parameters:
|
||||
iface_name (str): The name of the interface.
|
||||
namespace (str, optional): The network namespace where the interface exists.
|
||||
|
||||
Raises:
|
||||
Exception: If the interface cannot be set up.
|
||||
"""
|
||||
try:
|
||||
if namespace:
|
||||
context_manager = NetNS(namespace)
|
||||
else:
|
||||
context_manager = IPRoute()
|
||||
|
||||
with context_manager as ns:
|
||||
iface_idx_list = ns.link_lookup(ifname=iface_name)
|
||||
if not iface_idx_list:
|
||||
raise ValueError(f"Interface {iface_name} not found.")
|
||||
iface_idx = iface_idx_list[0]
|
||||
ns.link("set", index=iface_idx, state="up")
|
||||
except Exception as e:
|
||||
print(f"Error setting up interface {iface_name} in namespace {namespace}: {e}")
|
||||
|
||||
def attach_to_bridge(self, iface_name, bridge_name, namespace=None):
|
||||
"""
|
||||
Attach an interface to a bridge.
|
||||
|
||||
Parameters:
|
||||
iface_name (str): The name of the interface.
|
||||
bridge_name (str): The name of the bridge.
|
||||
namespace (str, optional): The network namespace where the interface and bridge exist.
|
||||
|
||||
Raises:
|
||||
Exception: If the interface cannot be attached to the bridge.
|
||||
"""
|
||||
try:
|
||||
if namespace:
|
||||
context_manager = NetNS(namespace)
|
||||
else:
|
||||
context_manager = IPRoute()
|
||||
|
||||
with context_manager as ns:
|
||||
iface_idx_list = ns.link_lookup(ifname=iface_name)
|
||||
if not iface_idx_list:
|
||||
raise ValueError(f"Interface {iface_name} not found.")
|
||||
iface_idx = iface_idx_list[0]
|
||||
|
||||
bridge_idx_list = ns.link_lookup(ifname=bridge_name)
|
||||
if not bridge_idx_list:
|
||||
raise ValueError(f"Bridge {bridge_name} not found.")
|
||||
bridge_idx = bridge_idx_list[0]
|
||||
|
||||
ns.link("set", index=iface_idx, master=bridge_idx)
|
||||
except Exception as e:
|
||||
print(f"Error attaching interface {iface_name} to bridge {bridge_name} in namespace {namespace}: {e}")
|
||||
|
||||
def interpret_namespace(namespace_arg):
|
||||
"""
|
||||
Interpret the namespace argument.
|
||||
|
||||
If the argument is '1', converts it to None (representing the default namespace),
|
||||
otherwise returns the argument as is.
|
||||
|
||||
Parameters:
|
||||
namespace_arg (str): The namespace argument.
|
||||
|
||||
Returns:
|
||||
str or None: The interpreted namespace.
|
||||
"""
|
||||
return None if namespace_arg == '1' else namespace_arg
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Create veth pairs between containers with optional bridge attachment.")
|
||||
parser.add_argument("-ns1", "--namespace1", default=None, help="Name of the first namespace or container, or '1' for the default namespace.")
|
||||
parser.add_argument("-ns2", "--namespace2", default=None, help="Name of the second namespace or container, or '1' for the default namespace.")
|
||||
parser.add_argument("-n1", "--name1", required=True, help="Name of the first veth interface.")
|
||||
parser.add_argument("-n2", "--name2", required=True, help="Name of the second veth interface.")
|
||||
parser.add_argument("-b1", "--bridge1", default=None, help="Name of the network bridge for ns1.")
|
||||
parser.add_argument("-b2", "--bridge2", default=None, help="Name of the network bridge for ns2.")
|
||||
parser.add_argument("-t1", "--type1", default=None, help="Container type for ns1 ('docker', 'lxc', 'lxd', 'incus', or 'None' for the default namespace).")
|
||||
parser.add_argument("-t2", "--type2", default=None, help="Container type for ns2 ('docker', 'lxc', 'lxd', 'incus', or 'None' for the default namespace).")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Processing namespace arguments and container types
|
||||
ns1, type1 = interpret_namespace(args.namespace1), args.type1
|
||||
ns2, type2 = interpret_namespace(args.namespace2), args.type2
|
||||
|
||||
# Instantiate management classes
|
||||
iface_manager = IfaceManager()
|
||||
container_exposer = ContainerNetnsExposer()
|
||||
|
||||
# Expose container network namespaces if applicable
|
||||
if type1 and ns1:
|
||||
ns1_pid = container_exposer.expose_container_netns(ns1, type1)
|
||||
ns1 = ns1_pid
|
||||
if type2 and ns2:
|
||||
ns2_pid = container_exposer.expose_container_netns(ns2, type2)
|
||||
ns2 = ns2_pid
|
||||
|
||||
# Create veth pair
|
||||
iface_manager.create_veth(args.name1, args.name2)
|
||||
|
||||
# Move ends of the veth pair to appropriate namespaces if required
|
||||
if ns1:
|
||||
iface_manager.move(args.name1, ns1)
|
||||
if ns2:
|
||||
iface_manager.move(args.name2, ns2)
|
||||
|
||||
# Optional: Attach to network bridge and bring interfaces up
|
||||
if args.bridge1 and args.name1:
|
||||
iface_manager.attach_to_bridge(args.name1, args.bridge1, ns1)
|
||||
iface_manager.set_interface_up(args.name1, ns1)
|
||||
if args.bridge2 and args.name2:
|
||||
iface_manager.attach_to_bridge(args.name2, args.bridge2, ns2)
|
||||
iface_manager.set_interface_up(args.name2, ns2)
|
||||
|
||||
# Bring up interfaces if not already up
|
||||
if not args.bridge1 and args.name1:
|
||||
iface_manager.set_interface_up(args.name1, ns1)
|
||||
if not args.bridge2 and args.name2:
|
||||
iface_manager.set_interface_up(args.name2, ns2)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
Loading…
Reference in New Issue
Block a user