#!/usr/bin/env python3 """ WIP Pythonic declaration of mDNS .local domains for YunoHost """ import subprocess import os import re import sys import argparse import yaml import socket from time import sleep from typing import List, Dict from zeroconf import Zeroconf, ServiceInfo # Helper command taken from Moulinette def check_output(args, stderr=subprocess.STDOUT, shell=True, **kwargs): """Run command with arguments and return its output as a byte string Overwrite some of the arguments to capture standard error in the result and use shell by default before calling subprocess.check_output. """ return ( subprocess.check_output(args, stderr=stderr, shell=shell, **kwargs) .decode("utf-8") .strip() ) # Helper command taken from Moulinette def _extract_inet(string, skip_netmask=False, skip_loopback=True): """ Extract IP addresses (v4 and/or v6) from a string limited to one address by protocol Keyword argument: string -- String to search in skip_netmask -- True to skip subnet mask extraction skip_loopback -- False to include addresses reserved for the loopback interface Returns: A dict of {protocol: address} with protocol one of 'ipv4' or 'ipv6' """ ip4_pattern = ( r"((25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}" ) ip6_pattern = r"(((?:[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4})*)?)::((?:[0-9A-Fa-f]{1,4}(?::[0-9A-Fa-f]{1,4})*)?)" ip4_pattern += r"/[0-9]{1,2})" if not skip_netmask else ")" ip6_pattern += r"/[0-9]{1,3})" if not skip_netmask else ")" result = {} for m in re.finditer(ip4_pattern, string): addr = m.group(1) if skip_loopback and addr.startswith("127."): continue # Limit to only one result result["ipv4"] = addr break for m in re.finditer(ip6_pattern, string): addr = m.group(1) if skip_loopback and addr == "::1": continue # Limit to only one result result["ipv6"] = addr break return result # Helper command taken from Moulinette def get_network_interfaces(): # Get network devices and their addresses (raw infos from 'ip addr') devices_raw = {} output = check_output("ip addr show") for d in re.split(r"^(?:[0-9]+: )", output, flags=re.MULTILINE): # Extract device name (1) and its addresses (2) m = re.match(r"([^\s@]+)(?:@[\S]+)?: (.*)", d, flags=re.DOTALL) if m: devices_raw[m.group(1)] = m.group(2) # Parse relevant informations for each of them devices = { name: _extract_inet(addrs) for name, addrs in devices_raw.items() if name != "lo" } return devices if __name__ == '__main__': ### # ARGUMENTS ### parser = argparse.ArgumentParser(description=''' mDNS broadcast for .local domains. Configuration file: /etc/yunohost/mdns.yml Subdomains are not supported. ''') parser.add_argument('--regen', nargs='?', const='as_stored', choices=['domains', 'interfaces', 'all', 'as_stored'], help=''' Regenerates selection into the configuration file then starts mDNS broadcasting. ''') parser.add_argument('--set-regen', choices=['domains', 'interfaces', 'all', 'none'], help=''' Set which part of the configuration to be regenerated. Implies --regen as_stored, with newly stored parameter. ''') able = parser.add_mutually_exclusive_group() able.add_argument('--enable', action='store_true', help='Enables mDNS broadcast, and regenerates the configuration') able.add_argument('--disable', action='store_true') args = parser.parse_args() ### # CONFIG ### with open('/etc/yunohost/mdns.yml', 'r') as f: config = yaml.load(f) or {} updated = False if args.enable: config['enabled'] = True args.regen = 'as_stored' updated = True if args.disable: config['enabled'] = False updated = True if args.set_regen: config['regen'] = args.set_regen args.regen = 'as_stored' updated = True if args.regen: if args.regen == 'as_stored': r = config['regen'] else: r = args.regen if r == 'none': print('Regeneration disabled.') if r == 'interfaces' or r == 'all': config['interfaces'] = [ i for i in get_network_interfaces() ] print('Regenerated interfaces list: ' + str(config['interfaces'])) if r == 'domains' or r == 'all': import glob config['domains'] = [ d.rsplit('/',1)[1][:-2] for d in glob.glob('/etc/nginx/conf.d/*.local.d') ] if 'yunohost.local' not in config['domains']: config['domains'].append('yunohost.local') print('Regenerated domains list: ' + str(config['domains'])) updated = True if updated: with open('/etc/yunohost/mdns.yml', 'w') as f: yaml.safe_dump(config, f, default_flow_style=False) print('Configuration file updated.') ### # MAIN SCRIPT ### if config['enabled'] is not True: print('YunomDNS is disabled.') sys.exit(0) if config['interfaces'] is None: print('No interface listed for broadcast.') sys.exit(0) zcs = {} interfaces = get_network_interfaces() for interface in config['interfaces']: infos = [] # List of ServiceInfo objects, to feed Zeroconf ips = [] # Human-readable IPs b_ips = [] # Binary-convered IPs # Parse the IPs and prepare their binary version addressed = False try: ip = interfaces[interface]['ipv4'].split('/')[0] if len(ip)>0: addressed = True ips.append(ip) b_ips.append(socket.inet_pton(socket.AF_INET, ip)) except: pass try: ip = interfaces[interface]['ipv6'].split('/')[0] if len(ip)>0: addressed = True ips.append(ip) b_ips.append(socket.inet_pton(socket.AF_INET6, ip)) except: pass # If at least one IP is listed if addressed: # Create a Zeroconf object, and store the ServiceInfos zc = Zeroconf(interfaces=ips) zcs[zc]=[] for d in config['domains']: d_domain=d.replace('.local','') if '.' in d_domain: print(d_domain+'.local: subdomains are not supported.') else: # Create a ServiceInfo object for each .local domain zcs[zc].append(ServiceInfo( type_='_device-info._tcp.local.', name=interface+': '+d_domain+'._device-info._tcp.local.', addresses=b_ips, port=80, server=d+'.', )) print('Adding '+d+' with addresses '+str(ips)+' on interface '+interface) # Run registration print("Registering...") for zc, infos in zcs.items(): for info in infos: zc.register_service(info) try: print("Registered. Press Ctrl+C or stop service to stop.") while True: sleep(1) except KeyboardInterrupt: pass finally: print("Unregistering...") for zc, infos in zcs.items(): for info in infos: zc.unregister_service(info) zc.close()