�&ǐk�@'bJ�h�ۊL'}T� :��'2�Z#$��n�a��� �>a��`��_3d�Qpt�/�P -��#5�,�M��� �pA:©�q�����NW��ډ�A���� �9nʺج���� �TSM��{J6?7��r�@�\����D��� �׶���s�f�TJj?"��D��`?��̒� b�#�%�C*v�$�{�$����5Ծ�F�s��y�e/8��h-�f�̰&(����Gj�L:U� 2�� ����v�_k����Y��gp,�k�WF�R������_C�R��N@���R�@�ߔ?A�w9���F("iNa-S���Q�o�3tDMLh*�#4k�T/iQ��Y*�G��m����)��8�hBm/�I�,g�ﯖ���Z��}�Cz�q@´��d.����L�ŕ�,��1�Z�܌�: ̪���F+J-'��c�tvJ8��]Q-��b��y �6;*J`r_�d ��'�G ~p��)'�C,�%F��E(��2�k�����lР�z�!�=t ��_�0��f7��� ;�p�|�U �% max_count: max_count = count max_ip = ip return max_ip def _get_domain_ips(self): data = {} passwd = self._get_plesk_passwd() if not passwd: return data env = {'MYSQL_PWD': passwd} query = ("""SELECT dom.name, iad.ip_address FROM domains """ """dom LEFT JOIN DomainServices d ON (dom.id = d.dom_id """ """AND d.type = 'web') LEFT JOIN IpAddressesCollections ia """ """ON ia.ipCollectionId = d.ipCollectionId LEFT JOIN """ """IP_Addresses iad ON iad.id = ia.ipAddressId""") cmd = ['mysql', '-u', 'admin', '-Dpsa', '-e', query] try: p = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, env=env) out, err = p.communicate() except (subprocess.CalledProcessError, FileNotFoundError): return data if not out: return data for line in out.splitlines(): domain, ip = line.split() if not ip: continue if self.host_addresses and ip not in self.host_addresses: continue if ip not in data: data[ip] = [] data[ip].append(domain) return data @classmethod def _get_ip_info(cls): data = {} patt = re.compile( r"""[^:]+: # Interface name, e.g 'eth0:' (?P[^/]+) # all symbols before forward slash """, re.VERBOSE) cmd = ['plesk', 'bin', 'ipmanage', '--ip_list'] try: out = subprocess.check_output( cmd, stderr=subprocess.DEVNULL, universal_newlines=True) except subprocess.CalledProcessError: return data if not out: return data for line in out.splitlines(): splitted = line.split() if not splitted: continue if len(splitted) < 3: continue match = patt.match(splitted[2]) # third field contains IP if match and splitted[1] in ('S', 'E'): data[match.group('ip')] = True if splitted[1] == 'S' else False return data def prepare(self): all_domains = self._get_domain_ips() self.main_ip = self._find_main_ip(all_domains) if not all_domains or not self.main_ip: self.first_domain = None self.additional = {} return shared_ips = sorted(all_domains.get(self.main_ip, tuple())) self.first_domain = shared_ips[0] if shared_ips else None self.additional = {k: v for k, v in all_domains.items() if k != self.main_ip} class DirectAdmin(Panel): detect_path = '/usr/local/directadmin/custombuild/build' glob_path = '/usr/local/directadmin/data/users/*/domains/*.conf' @classmethod def _get_paths(cls): paths = [] for path in glob.iglob(cls.glob_path): paths.append(path) return paths @staticmethod def _read_config(path): domain, ip = None, None domain_found, ip_found = False, False with open(path) as f: for line in f: if domain_found and ip_found: break if line.startswith('#'): continue if '=' not in line: continue key, value = [i.strip() for i in line.split('=', 1)] if key == 'domain': domain = value domain_found = True elif key == 'ip': ip = value ip_found = True else: continue return ip, domain @staticmethod def _find_main_ip(ip_map): """ The IP address with maximum domains is the main one """ max_count = 0 max_ip = None for ip, domains in ip_map.items(): count = len(domains) if count > max_count: max_count = count max_ip = ip return max_ip @classmethod def _get_domains(cls): domains = {} for path in cls._get_paths(): try: ip, domain = cls._read_config(path) except UnicodeDecodeError: continue if ip and domain: if ip not in domains: domains[ip] = [] domains[ip].append(domain) return domains def prepare(self): all_domains = self._get_domains() self.main_ip = self._find_main_ip(all_domains) if not all_domains or not self.main_ip: self.first_domain = None self.additional = {} return shared_ips = sorted(all_domains.get(self.main_ip, tuple())) self.first_domain = shared_ips[0] if shared_ips else None self.additional = {k: v for k, v in all_domains.items() if k != self.main_ip} class AddressHandler: map_conf = '/etc/imunify360-webshield/backend-destinations.conf' map_file = '/etc/imunify360-webshield/default-destinations.dat' @staticmethod def _generate(length=8): sample = string.ascii_letters + string.digits return ''.join(random.sample(sample, length)) @staticmethod def _get_panel(): for panel in cPanel, Plesk, DirectAdmin: if panel.detect(): return panel @staticmethod def _get_ip_addresses(): addresses = set() patt = re.compile( r"""(?:\d+:\s?)? # number (e.g. '1:') and optional space (?P\S+) # interface name (e.g. 'eth0') \s+? # space(s) inet6?\s # word 'inet' or 'inet6' (?P(?: # start IP capturing \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3} # for IPv4 | # or [0-9a-fA-F:]+) # for IPv6 ) # end capturing (?:/(?P\d{1,3}))? # capture mask (e.g.'/24'), if any """, re.VERBOSE) p = subprocess.Popen(['ip', '-o', 'address', 'show'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) out, err = p.communicate() if not out: return for line in out.splitlines(): m = patt.match(line) if not m: continue iface, ip, mask = m.group('if', 'ip', 'mask') if iface == 'lo': continue if ':' in ip and ipaddress.IPv6Address(ip).is_link_local: continue addresses.add(ip) return addresses @classmethod def _save(cls, data, path, semicolon=True): if not data: return fmt = "{} {};\n" if semicolon else "{} {}\n" tmp_path = '.'.join([path, cls._generate()]) with open(tmp_path, 'w') as f: for key, val in data.items(): f.write(fmt.format(key, val)) shutil.move(tmp_path, path) @classmethod def run(cls): addresses = cls._get_ip_addresses() if len(addresses) < 2: sys.stderr.write('No additional IP addresses found\n') return panel = cls._get_panel() if panel is None: sys.stderr.write('We cannot use unknown hosting panels. Skip\n') return p = panel(addresses) p.prepare() cls._save(p.get_ssl_mapping(), cls.map_file, False) if __name__ == '__main__': AddressHandler.run()