mirror of
https://github.com/YunoHost/yunohost.git
synced 2024-09-03 20:06:10 +02:00
Revamp of yunomdns :
* Use ifaddr (also used by zeroconf) to find ip addresses * Use python type hinting * small cleanups
This commit is contained in:
parent
2edca5bd22
commit
919ec75877
1 changed files with 53 additions and 113 deletions
166
bin/yunomdns
166
bin/yunomdns
|
@ -4,104 +4,42 @@
|
|||
Pythonic declaration of mDNS .local domains for YunoHost
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import re
|
||||
import sys
|
||||
import yaml
|
||||
|
||||
import socket
|
||||
from time import sleep
|
||||
from typing import List, Dict
|
||||
|
||||
import ifaddr
|
||||
from zeroconf import Zeroconf, ServiceInfo
|
||||
|
||||
# Helper command taken from Moulinette
|
||||
def check_output(args, stderr=subprocess.STDOUT, shell=True, **kwargs):
|
||||
"""Run command with arguments and return its output as a byte string
|
||||
Overwrite some of the arguments to capture standard error in the result
|
||||
and use shell by default before calling subprocess.check_output.
|
||||
|
||||
def get_network_local_interfaces() -> Dict[str, Dict[str, List[str]]]:
|
||||
"""
|
||||
return (
|
||||
subprocess.check_output(args, stderr=stderr, shell=shell, **kwargs)
|
||||
.decode("utf-8")
|
||||
.strip()
|
||||
)
|
||||
|
||||
# Helper command taken from Moulinette
|
||||
def _extract_inet(string, skip_netmask=False, skip_loopback=True):
|
||||
Returns interfaces with their associated local IPs
|
||||
"""
|
||||
Extract IP addresses (v4 and/or v6) from a string limited to one
|
||||
address by protocol
|
||||
|
||||
Keyword argument:
|
||||
string -- String to search in
|
||||
skip_netmask -- True to skip subnet mask extraction
|
||||
skip_loopback -- False to include addresses reserved for the
|
||||
loopback interface
|
||||
def islocal(ip: str) -> bool:
|
||||
local_prefixes = ["192.168.", "10.", "172.16.", "fc00:"]
|
||||
return any(ip.startswith(prefix) for prefix in local_prefixes)
|
||||
|
||||
Returns:
|
||||
A dict of {protocol: address} with protocol one of 'ipv4' or 'ipv6'
|
||||
|
||||
"""
|
||||
ip4_pattern = (
|
||||
r"((25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}"
|
||||
)
|
||||
ip6_pattern = r"(((?:[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4})*)?)::?((?:[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4})*)?)"
|
||||
ip4_pattern += r"/[0-9]{1,2})" if not skip_netmask else ")"
|
||||
ip6_pattern += r"/[0-9]{1,3})" if not skip_netmask else ")"
|
||||
result = {}
|
||||
|
||||
for m in re.finditer(ip4_pattern, string):
|
||||
addr = m.group(1)
|
||||
if skip_loopback and addr.startswith("127."):
|
||||
continue
|
||||
|
||||
# Limit to only one result
|
||||
result["ipv4"] = addr
|
||||
break
|
||||
|
||||
for m in re.finditer(ip6_pattern, string):
|
||||
addr = m.group(1)
|
||||
if skip_loopback and addr == "::1":
|
||||
continue
|
||||
|
||||
# Limit to only one result
|
||||
result["ipv6"] = addr
|
||||
break
|
||||
|
||||
return result
|
||||
|
||||
# Helper command taken from Moulinette
|
||||
def get_network_interfaces():
|
||||
|
||||
# Get network devices and their addresses (raw infos from 'ip addr')
|
||||
devices_raw = {}
|
||||
output = check_output("ip --brief a").split("\n")
|
||||
for line in output:
|
||||
line = line.split()
|
||||
iname = line[0]
|
||||
ips = ' '.join(line[2:])
|
||||
|
||||
devices_raw[iname] = ips
|
||||
|
||||
# Parse relevant informations for each of them
|
||||
devices = {
|
||||
name: _extract_inet(addrs)
|
||||
for name, addrs in devices_raw.items()
|
||||
if name != "lo"
|
||||
interfaces = {
|
||||
adapter.name: {
|
||||
"ipv4": [ip.ip for ip in adapter.ips if ip.is_IPv4 and islocal(ip.ip)],
|
||||
"ipv6": [ip.ip[0] for ip in adapter.ips if ip.is_IPv6 and islocal(ip.ip[0])],
|
||||
}
|
||||
for adapter in ifaddr.get_adapters()
|
||||
if adapter.name != "lo"
|
||||
}
|
||||
return interfaces
|
||||
|
||||
return devices
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
def main() -> bool:
|
||||
###
|
||||
# CONFIG
|
||||
###
|
||||
|
||||
with open('/etc/yunohost/mdns.yml', 'r') as f:
|
||||
config = yaml.safe_load(f) or {}
|
||||
updated = False
|
||||
|
||||
required_fields = ["interfaces", "domains"]
|
||||
missing_fields = [field for field in required_fields if field not in config]
|
||||
|
@ -111,47 +49,44 @@ if __name__ == '__main__':
|
|||
|
||||
if config['interfaces'] is None:
|
||||
print('No interface listed for broadcast.')
|
||||
sys.exit(0)
|
||||
return True
|
||||
|
||||
if 'yunohost.local' not in config['domains']:
|
||||
config['domains'].append('yunohost.local')
|
||||
|
||||
zcs = {}
|
||||
interfaces = get_network_interfaces()
|
||||
zcs: Dict[Zeroconf, List[ServiceInfo]] = {}
|
||||
|
||||
interfaces = get_network_local_interfaces()
|
||||
for interface in config['interfaces']:
|
||||
infos = [] # List of ServiceInfo objects, to feed Zeroconf
|
||||
ips = [] # Human-readable IPs
|
||||
b_ips = [] # Binary-convered IPs
|
||||
if interface not in interfaces:
|
||||
print(f'Interface {interface} of config file is not present on system.')
|
||||
continue
|
||||
|
||||
ipv4 = interfaces[interface]['ipv4'].split('/')[0]
|
||||
if ipv4:
|
||||
ips.append(ipv4)
|
||||
b_ips.append(socket.inet_pton(socket.AF_INET, ipv4))
|
||||
|
||||
ipv6 = interfaces[interface]['ipv6'].split('/')[0]
|
||||
if ipv6:
|
||||
ips.append(ipv6)
|
||||
b_ips.append(socket.inet_pton(socket.AF_INET6, ipv6))
|
||||
ips: List[str] = interfaces[interface]['ipv4'] + interfaces[interface]['ipv6']
|
||||
|
||||
# If at least one IP is listed
|
||||
if ips:
|
||||
# Create a Zeroconf object, and store the ServiceInfos
|
||||
zc = Zeroconf(interfaces=ips)
|
||||
zcs[zc]=[]
|
||||
for d in config['domains']:
|
||||
d_domain=d.replace('.local','')
|
||||
if '.' in d_domain:
|
||||
print(d_domain+'.local: subdomains are not supported.')
|
||||
else:
|
||||
# Create a ServiceInfo object for each .local domain
|
||||
zcs[zc].append(ServiceInfo(
|
||||
type_='_device-info._tcp.local.',
|
||||
name=interface+': '+d_domain+'._device-info._tcp.local.',
|
||||
addresses=b_ips,
|
||||
port=80,
|
||||
server=d+'.',
|
||||
))
|
||||
print('Adding '+d+' with addresses '+str(ips)+' on interface '+interface)
|
||||
if not ips:
|
||||
continue
|
||||
# Create a Zeroconf object, and store the ServiceInfos
|
||||
zc = Zeroconf(interfaces=ips) # type: ignore
|
||||
zcs[zc] = []
|
||||
|
||||
for d in config['domains']:
|
||||
d_domain = d.replace('.local', '')
|
||||
if '.' in d_domain:
|
||||
print(f'{d_domain}.local: subdomains are not supported.')
|
||||
continue
|
||||
# Create a ServiceInfo object for each .local domain
|
||||
zcs[zc].append(
|
||||
ServiceInfo(
|
||||
type_='_device-info._tcp.local.',
|
||||
name=f'{interface}: {d_domain}._device-info._tcp.local.',
|
||||
parsed_addresses=ips,
|
||||
port=80,
|
||||
server=f'{d}.',
|
||||
)
|
||||
)
|
||||
print(f'Adding {d} with addresses {ips} on interface {interface}')
|
||||
|
||||
# Run registration
|
||||
print("Registering...")
|
||||
|
@ -168,6 +103,11 @@ if __name__ == '__main__':
|
|||
finally:
|
||||
print("Unregistering...")
|
||||
for zc, infos in zcs.items():
|
||||
for info in infos:
|
||||
zc.unregister_service(info)
|
||||
zc.unregister_all_services()
|
||||
zc.close()
|
||||
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(0 if main() else 1)
|
||||
|
|
Loading…
Add table
Reference in a new issue