From defe336751cadcdbe9f7559cee59a6942c5d2a64 Mon Sep 17 00:00:00 2001 From: Tom Marshall Date: Mon, 6 Jan 2020 15:29:18 -0800 Subject: [PATCH] Initial nc-api script --- nc-api | 381 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 381 insertions(+) create mode 100755 nc-api diff --git a/nc-api b/nc-api new file mode 100755 index 0000000..dad7124 --- /dev/null +++ b/nc-api @@ -0,0 +1,381 @@ +#!/usr/bin/python + +# TODO: +# - Add support for TTL, MXPref, ... + +import os +import sys +import pwd +import getopt +import requests +from xml.etree import ElementTree +import re + +def cgi_method(): + return os.environ.get('REQUEST_METHOD') + +def write_response_cgi(text, err=False): + headers = "" + headers += "Content-Type: text/html\n" + + content = "" + content += "\n" + content += " \n" + content += " NameCheap Gateway\n" + content += " \n" + content += " \n" + content += "

NameCheap Gateway

\n" + content += "
\n" + if config['debug']: + for k in sorted(os.environ): + content += "%s=%s
\n" % (k, os.environ[k]) + content += "
\n" + if err: + content += "Error
\n" + for line in text.rstrip('\n').split('\n'): + content += "%s
\n" % (line) + content += " \n" + content += "\n" + sys.stdout.write("%s\n%s" % (headers, content)) + if err: + sys.exit(0) + +def write_response_cli(text, err=False): + for line in text.rstrip('\n').split('\n'): + if err: + sys.stderr.write("%s\n" % (line)) + else: + sys.stdout.write("%s\n" % (line)) + if err: + sys.exit(1) + +def write_response(text, err=False): + if cgi_method(): + write_response_cgi(text, err) + else: + write_response_cli(text, err) + +opt_flags = 'dhv' +opt_longflags = ['debug', 'help', 'verbose'] + +def usage(errtext): + prog = os.path.basename(sys.argv[0]) + text = "%s\n" % (errtext) + text += "\n" + text += "Usage: %s [-d] [-h] [-v] [args]\n" % (prog) + text += "\n" + text += " Commands:\n" + text += " get-hosts \n" + text += " Get host list for domain\n" + text += " set-hosts [--replace-all] ...\n" + text += " Set host list for domain\n" + text += " Use --replace-all to replace all hosts\n" + text += " update-hosts ...\n" + text += " Alias for set-hosts\n" + text += " replace-hosts ...\n" + text += " Alias for set-hosts --replace-all\n" + text += "\n" + text += " host-spec -> name[:type]=address\n" + text += " type -> A, AAAA, TXT, ...\n" + text += " ipv4, ipv6\n" + text += " none, delete\n" + write_response(text, True) + +def config_load(pathname): + required_keys = ['nc_user', 'nc_apikey', 'nc_clientip'] + if not os.path.exists(pathname): + raise RuntimeError("Configuration file %s does not exist" % (pathname)) + st = os.stat(pathname) + if (st.st_mode & 077) != 0: + raise RuntimeError("Configuration file %s has incorrect permissions" % (pathname)) + config['debug'] = False + config['verbose'] = False + config['server'] = 'api.namecheap.com' + with open(pathname, 'r') as f: + for line in f: + line = line.rstrip('\n') + if len(line) == 0 or line.startswith('#'): + continue + (k, v) = line.split('=', 1) + config[k] = v + for k in required_keys: + if not k in config: + raise RuntimeError("Configuration missing required key \"%s\"" % (k)) + +def args_parse_cgi(): + import cgi + argv = [] + opts = [] + + if not 'HTTP_X_SSL_CIPHER' in os.environ: + raise RuntimeError('Insecure request denied') + try: + values = cgi.parse() + except Exception as e: + raise RuntimeError(str(e)) + + arg = values.get('username', []) + if len(arg) != 1: + raise RuntimeError("No username specified") + username = arg[0] + del values['username'] + arg = values.get('password', []) + if len(arg) != 1: + raise RuntimeError("No password specified") + password = arg[0] + del values['password'] + + allowed = False + allowed_users = config.get('allowed_users', '') + for v in allowed_users.split(','): + fields = v.split(':', 1) + if len(fields) != 2: + raise RuntimeError("Invalid allowed_users") + if username == fields[0] and password == fields[1]: + allowed = True + if not allowed: + raise RuntimeError('Access denied') + + for k in ['cmd', 'domain', 'hosts']: + arg = values.get(k, []) + if len(arg) != 1: + raise RuntimeError("Bad parameter %s" % (k)) + argv.append(arg[0]) + del values[k] + for k in values: + if not k in opt_longflags: + raise RuntimeError("Unknown option %s" % (k)) + for v in values[k]: + opts.append(("--%s" % (k), v)) + + return opts, argv + +def args_parse_cli(): + try: + opts, argv = getopt.getopt(sys.argv[1:], opt_flags, opt_longflags) + except getopt.GetoptError as e: + raise RuntimeError("Bad arguments: %s\n" % (str(e))) + + return opts, argv + +def args_parse(): + if cgi_method(): + opts, argv = args_parse_cgi() + else: + opts, argv = args_parse_cli() + for k, v in opts: + if k in ('-d', '--debug'): + config['debug'] = True + if k in ('-h', '--help'): + config['help'] = True + if k in ('-v', '--verbose'): + config['verbose'] = True + + return argv + +def domain_parse(text): + parts = text.split('.') + if len(parts) != 2: + raise RuntimeError("Invalid domain: %s" % (text)) + return parts + +def address_is_ipv4(text): + result = re.match('^[0-9.]{7,15}$', text) + return result != None + +def address_is_ipv6(text): + result = re.match('^[0-9A-Fa-f:]{3,39}$', text) + return result != None + +def address_type(text): + if address_is_ipv4(text): + return 'A' + if address_is_ipv6(text): + return 'AAAA' + return 'TXT' + +class hostent: + def __init__(self, name, type): + self._name = name + self._type = type + + def name(self): + return self._name + def type(self): + return self._type + + def __str__(self): + return "%s:%s" % (self.name(), self.type()) + def __eq__(self, other): + return str(self) == str(other) + def __hash__(self): + return hash(str(self)) + +def hostspec_parse(text): + (k, v) = text.split('=') + if k.find(':') == -1: + hn = k + ht = address_type(v) + else: + (hn, ht) = k.split(':') + if v.lower() == 'none' or v.lower() == 'delete': + v = '' + return (hostent(hn, ht.upper()), v) + +def nc_api_url(server): + return "https://%s/xml.response" % (server) + +def nc_parse_response(text): + return ElementTree.fromstring(re.sub('xmlns="([^"]*)"', '', text)) + +def nc_api_get(cmd, args): + api_url = nc_api_url(config['server']) + params = args.copy() + params['apiuser'] = config['nc_user'] + params['apikey'] = config['nc_apikey'] + params['Command'] = cmd + params['UserName'] = config['nc_user'] + params['ClientIp'] = config['nc_clientip'] + r = rs.get(api_url, params=params) + r.raise_for_status() + root = nc_parse_response(r.text) + if root.attrib.get('Status') != 'OK': + elem = root.find('Errors').find('Error') + raise RuntimeError("API call failed: " + elem.text) + return root + +def nc_api_post(cmd, args): + api_url = nc_api_url(config['server']) + params = args.copy() + params['apiuser'] = config['nc_user'] + params['apikey'] = config['nc_apikey'] + params['Command'] = cmd + params['UserName'] = config['nc_user'] + params['ClientIp'] = config['nc_clientip'] + r = rs.post(api_url, data=params) + r.raise_for_status() + root = nc_parse_response(r.text) + if root.attrib.get('Status') != 'OK': + elem = root.find('Errors').find('Error') + raise RuntimeError("API call failed: " + elem.text) + return root + +def nc_get_hosts(domain): + cmd = 'namecheap.domains.dns.getHosts' + (sld, tld) = domain_parse(domain) + args = dict() + args['TLD'] = tld + args['SLD'] = sld + root = nc_api_get(cmd, args) + cr = root.find('CommandResponse') + hr = cr.find('DomainDNSGetHostsResult') + hosts = dict() + for h in list(hr): + hn = h.attrib.get('Name') + ht = h.attrib.get('Type') + ha = h.attrib.get('Address') + hosts[hostent(hn, ht)] = ha + return hosts + +def nc_set_hosts(domain, hosts): + cmd = 'namecheap.domains.dns.setHosts' + (sld, tld) = domain_parse(domain) + args = dict() + args['TLD'] = tld + args['SLD'] = sld + n = 1 + for k, v in hosts.items(): + args["HostName%d" % (n)] = k.name() + args["RecordType%d" % (n)] = k.type() + args["Address%d" % (n)] = v + n = n + 1 + nc_api_post(cmd, args) + +def cmd_get_hosts(argv): + if len(argv) != 1: + usage("get-hosts: Expected exactly one argument") + domain = argv.pop(0) + hosts = nc_get_hosts(domain) + text = '' + if config['verbose']: + text += "%-32s%-8s%s\n" % ('Host', 'Type', 'Address') + text += "%-32s%-8s%s\n" % ('====', '====', '=======') + for h, a in hosts.items(): + if config['verbose']: + text += "%-32s%-8s%s\n" % (h.name(), h.type(), a) + else: + text += "%s:%s=%s\n" % (h.name(), h.type(), a) + return text + +def cmd_set_hosts(argv): + replace_all = False + if len(argv) > 1 and argv[0] == '--replace-all': + replace_all = True + argv.pop(0) + if len(argv) < 2: + usage("set-hosts: Expected at least two arguments") + domain = argv.pop(0) + if not replace_all: + hosts = nc_get_hosts(domain) + for arg in argv: + (h, a) = hostspec_parse(arg) + if a: + hosts[h] = a + else: + if h in hosts: + del hosts[h] + nc_set_hosts(domain, hosts) + text = '' + if config['verbose']: + text += "Success\n" + return text + +def cmd_update_hosts(argv): + return cmd_set_hosts(argv) + +def cmd_replace_hosts(argv): + return cmd_set_hosts(['--replace-all'] + argv) + +### Begin main script ### + +uid = os.getuid() +pwent = pwd.getpwuid(uid) +home = pwent[5] +config = dict() +rs = requests.Session() + +try: + config_load("%s/.ncapirc" % (home)) +except RuntimeError as e: + write_response("Failed to load configuration: %s" % (str(e)), True) + +try: + argv = args_parse() +except RuntimeError as e: + write_response("Failed to parse arguments: %s\n" % (str(e)), True) + +cmd_actions = { + 'get-hosts': cmd_get_hosts, + 'set-hosts': cmd_set_hosts, + 'update-hosts': cmd_update_hosts, + 'replace-hosts': cmd_replace_hosts +} + +if len(argv) == 0: + usage("No action specified") +action = argv[0] +if not action in cmd_actions: + usage("Unrecognized action \"%s\"" % (action)) + +method = cgi_method() +if method: + if method != "GET" and method != "POST": + write_response("Unrecognized method", True) + if method == "GET" and not action.startswith("get"): + write_response("Invalid method", True) + +try: + text = cmd_actions[action](argv[1:]) + write_response(text) +except RuntimeError as e: + write_response("Error: %s" % (str(e)), True)