ncapi/nc-api

390 lines
11 KiB
Python
Executable File

#!/usr/bin/python3
# TODO:
# - Add support for TTL, MXPref, ...
import os
import sys
import pwd
import getopt
import requests
from xml.etree import ElementTree
import re
def cgi_method():
return os.environ.get('REQUEST_METHOD')
def write_response_cgi(text, err=False):
headers = ""
headers += "Content-Type: text/html\n"
content = ""
content += "<html>\n"
content += " <head>\n"
content += " <title>NameCheap Gateway</title>\n"
content += " </head>\n"
content += " <body>\n"
content += " <h1>NameCheap Gateway</h1>\n"
content += " <hr noshade>\n"
if config['debug']:
for k in sorted(os.environ):
content += "%s=%s<br>\n" % (k, os.environ[k])
content += " <hr noshade>\n"
if err:
content += "<font color=\"red\">Error</font><br>\n"
for line in text.rstrip('\n').split('\n'):
content += "%s<br>\n" % (line)
content += " </body>\n"
content += "</html>\n"
sys.stdout.write("%s\n%s" % (headers, content))
if err:
sys.exit(0)
def write_response_cli(text, err=False):
for line in text.rstrip('\n').split('\n'):
if err:
sys.stderr.write("%s\n" % (line))
else:
sys.stdout.write("%s\n" % (line))
if err:
sys.exit(1)
def write_response(text, err=False):
if cgi_method():
write_response_cgi(text, err)
else:
write_response_cli(text, err)
opt_flags = 'dhv'
opt_longflags = ['debug', 'help', 'verbose']
def usage(errtext):
prog = os.path.basename(sys.argv[0])
text = "%s\n" % (errtext)
text += "\n"
text += "Usage: %s [-d] [-h] [-v] <command> [args]\n" % (prog)
text += "\n"
text += " Commands:\n"
text += " get-hosts <domain>\n"
text += " Get host list for domain\n"
text += " set-hosts [--replace-all] <domain> <host-spec> ...\n"
text += " Set host list for domain\n"
text += " Use --replace-all to replace all hosts\n"
text += " update-hosts <domain> <host-spec> ...\n"
text += " Alias for set-hosts\n"
text += " replace-hosts <domain> <host-spec> ...\n"
text += " Alias for set-hosts --replace-all\n"
text += "\n"
text += " host-spec -> name[:type]=address\n"
text += " type -> A, AAAA, TXT, ...\n"
text += " ipv4, ipv6\n"
text += " none, delete\n"
write_response(text, True)
def config_load(pathname):
required_keys = ['nc_user', 'nc_apikey', 'nc_clientip']
if not os.path.exists(pathname):
raise RuntimeError("Configuration file %s does not exist" % (pathname))
st = os.stat(pathname)
if (st.st_mode & 0o077) != 0:
raise RuntimeError("Configuration file %s has incorrect permissions" % (pathname))
config['debug'] = False
config['verbose'] = False
config['server'] = 'api.namecheap.com'
with open(pathname, 'r') as f:
for line in f:
line = line.rstrip('\n')
if len(line) == 0 or line.startswith('#'):
continue
(k, v) = line.split('=', 1)
config[k] = v
for k in required_keys:
if not k in config:
raise RuntimeError("Configuration missing required key \"%s\"" % (k))
def args_parse_cgi():
import cgi
argv = []
opts = []
if not 'HTTP_X_SSL_CIPHER' in os.environ:
raise RuntimeError('Insecure request denied')
try:
values = cgi.parse()
except Exception as e:
raise RuntimeError(str(e))
arg = values.get('username', [])
if len(arg) != 1:
raise RuntimeError("No username specified")
username = arg[0]
del values['username']
arg = values.get('password', [])
if len(arg) != 1:
raise RuntimeError("No password specified")
password = arg[0]
del values['password']
allowed = False
allowed_users = config.get('allowed_users', '')
for v in allowed_users.split(','):
fields = v.split(':', 1)
if len(fields) != 2:
raise RuntimeError("Invalid allowed_users")
if username == fields[0] and password == fields[1]:
allowed = True
if not allowed:
raise RuntimeError('Access denied')
for k in ['cmd', 'domain']:
arg = values.get(k, [])
if len(arg) != 1:
raise RuntimeError("Bad parameter %s" % (k))
argv.append(arg[0])
del values[k]
arg = values.get('hosts', [])
if argv[0] == 'get-hosts':
if arg:
raise RuntimeError("Unexpected hosts")
else:
if len(arg) != 1:
raise RuntimeError("Bad parameter hosts")
argv.append(arg[0])
for k in values:
if not k in opt_longflags:
raise RuntimeError("Unknown option %s" % (k))
for v in values[k]:
opts.append(("--%s" % (k), v))
return opts, argv
def args_parse_cli():
try:
opts, argv = getopt.getopt(sys.argv[1:], opt_flags, opt_longflags)
except getopt.GetoptError as e:
raise RuntimeError("Bad arguments: %s\n" % (str(e)))
return opts, argv
def args_parse():
if cgi_method():
opts, argv = args_parse_cgi()
else:
opts, argv = args_parse_cli()
for k, v in opts:
if k in ('-d', '--debug'):
config['debug'] = True
if k in ('-h', '--help'):
config['help'] = True
if k in ('-v', '--verbose'):
config['verbose'] = True
return argv
def domain_parse(text):
parts = text.split('.')
if len(parts) != 2:
raise RuntimeError("Invalid domain: %s" % (text))
return parts
def address_is_ipv4(text):
result = re.match('^[0-9.]{7,15}$', text)
return result != None
def address_is_ipv6(text):
result = re.match('^[0-9A-Fa-f:]{3,39}$', text)
return result != None
def address_type(text):
if address_is_ipv4(text):
return 'A'
if address_is_ipv6(text):
return 'AAAA'
return 'TXT'
class hostent:
def __init__(self, name, type):
self._name = name
self._type = type
def name(self):
return self._name
def type(self):
return self._type
def __str__(self):
return "%s:%s" % (self.name(), self.type())
def __eq__(self, other):
return str(self) == str(other)
def __hash__(self):
return hash(str(self))
def hostspec_parse(text):
(k, v) = text.split('=')
if k.find(':') == -1:
hn = k
ht = address_type(v)
else:
(hn, ht) = k.split(':')
if v.lower() == 'none' or v.lower() == 'delete':
v = ''
return (hostent(hn, ht.upper()), v)
def nc_api_url(server):
return "https://%s/xml.response" % (server)
def nc_parse_response(text):
return ElementTree.fromstring(re.sub('xmlns="([^"]*)"', '', text))
def nc_api_get(cmd, args):
api_url = nc_api_url(config['server'])
params = args.copy()
params['apiuser'] = config['nc_user']
params['apikey'] = config['nc_apikey']
params['Command'] = cmd
params['UserName'] = config['nc_user']
params['ClientIp'] = config['nc_clientip']
r = rs.get(api_url, params=params)
r.raise_for_status()
root = nc_parse_response(r.text)
if root.attrib.get('Status') != 'OK':
elem = root.find('Errors').find('Error')
raise RuntimeError("API call failed: " + elem.text)
return root
def nc_api_post(cmd, args):
api_url = nc_api_url(config['server'])
params = args.copy()
params['apiuser'] = config['nc_user']
params['apikey'] = config['nc_apikey']
params['Command'] = cmd
params['UserName'] = config['nc_user']
params['ClientIp'] = config['nc_clientip']
r = rs.post(api_url, data=params)
r.raise_for_status()
root = nc_parse_response(r.text)
if root.attrib.get('Status') != 'OK':
elem = root.find('Errors').find('Error')
raise RuntimeError("API call failed: " + elem.text)
return root
def nc_get_hosts(domain):
cmd = 'namecheap.domains.dns.getHosts'
(sld, tld) = domain_parse(domain)
args = dict()
args['TLD'] = tld
args['SLD'] = sld
root = nc_api_get(cmd, args)
cr = root.find('CommandResponse')
hr = cr.find('DomainDNSGetHostsResult')
hosts = dict()
for h in list(hr):
hn = h.attrib.get('Name')
ht = h.attrib.get('Type')
ha = h.attrib.get('Address')
hosts[hostent(hn, ht)] = ha
return hosts
def nc_set_hosts(domain, hosts):
cmd = 'namecheap.domains.dns.setHosts'
(sld, tld) = domain_parse(domain)
args = dict()
args['TLD'] = tld
args['SLD'] = sld
n = 1
for k, v in hosts.items():
args["HostName%d" % (n)] = k.name()
args["RecordType%d" % (n)] = k.type()
args["Address%d" % (n)] = v
n = n + 1
nc_api_post(cmd, args)
def cmd_get_hosts(argv):
if len(argv) != 1:
usage("get-hosts: Expected exactly one argument")
domain = argv.pop(0)
hosts = nc_get_hosts(domain)
text = ''
if config['verbose']:
text += "%-32s%-8s%s\n" % ('Host', 'Type', 'Address')
text += "%-32s%-8s%s\n" % ('====', '====', '=======')
for h, a in hosts.items():
if config['verbose']:
text += "%-32s%-8s%s\n" % (h.name(), h.type(), a)
else:
text += "%s:%s=%s\n" % (h.name(), h.type(), a)
return text
def cmd_set_hosts(argv):
replace_all = False
if len(argv) > 1 and argv[0] == '--replace-all':
replace_all = True
argv.pop(0)
if len(argv) < 2:
usage("set-hosts: Expected at least two arguments")
domain = argv.pop(0)
if not replace_all:
hosts = nc_get_hosts(domain)
for arg in argv:
(h, a) = hostspec_parse(arg)
if a:
hosts[h] = a
else:
if h in hosts:
del hosts[h]
nc_set_hosts(domain, hosts)
text = ''
if config['verbose']:
text += "Success\n"
return text
def cmd_update_hosts(argv):
return cmd_set_hosts(argv)
def cmd_replace_hosts(argv):
return cmd_set_hosts(['--replace-all'] + argv)
### Begin main script ###
uid = os.getuid()
pwent = pwd.getpwuid(uid)
home = pwent[5]
config = dict()
rs = requests.Session()
try:
config_load("%s/.ncapirc" % (home))
except RuntimeError as e:
write_response("Failed to load configuration: %s" % (str(e)), True)
try:
argv = args_parse()
except RuntimeError as e:
write_response("Failed to parse arguments: %s\n" % (str(e)), True)
cmd_actions = {
'get-hosts': cmd_get_hosts,
'set-hosts': cmd_set_hosts,
'update-hosts': cmd_update_hosts,
'replace-hosts': cmd_replace_hosts
}
if len(argv) == 0:
usage("No action specified")
action = argv[0]
if not action in cmd_actions:
usage("Unrecognized action \"%s\"" % (action))
method = cgi_method()
if method:
if method != "GET" and method != "POST":
write_response("Unrecognized method", True)
if method == "GET" and not action.startswith("get"):
write_response("Invalid method", True)
try:
text = cmd_actions[action](argv[1:])
write_response(text)
except RuntimeError as e:
write_response("Error: %s" % (str(e)), True)