Initial nc-api script
This commit is contained in:
parent
1339b438ce
commit
defe336751
|
@ -0,0 +1,381 @@
|
||||||
|
#!/usr/bin/python
|
||||||
|
|
||||||
|
# TODO:
|
||||||
|
# - Add support for TTL, MXPref, ...
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import pwd
|
||||||
|
import getopt
|
||||||
|
import requests
|
||||||
|
from xml.etree import ElementTree
|
||||||
|
import re
|
||||||
|
|
||||||
|
def cgi_method():
|
||||||
|
return os.environ.get('REQUEST_METHOD')
|
||||||
|
|
||||||
|
def write_response_cgi(text, err=False):
|
||||||
|
headers = ""
|
||||||
|
headers += "Content-Type: text/html\n"
|
||||||
|
|
||||||
|
content = ""
|
||||||
|
content += "<html>\n"
|
||||||
|
content += " <head>\n"
|
||||||
|
content += " <title>NameCheap Gateway</title>\n"
|
||||||
|
content += " </head>\n"
|
||||||
|
content += " <body>\n"
|
||||||
|
content += " <h1>NameCheap Gateway</h1>\n"
|
||||||
|
content += " <hr noshade>\n"
|
||||||
|
if config['debug']:
|
||||||
|
for k in sorted(os.environ):
|
||||||
|
content += "%s=%s<br>\n" % (k, os.environ[k])
|
||||||
|
content += " <hr noshade>\n"
|
||||||
|
if err:
|
||||||
|
content += "<font color=\"red\">Error</font><br>\n"
|
||||||
|
for line in text.rstrip('\n').split('\n'):
|
||||||
|
content += "%s<br>\n" % (line)
|
||||||
|
content += " </body>\n"
|
||||||
|
content += "</html>\n"
|
||||||
|
sys.stdout.write("%s\n%s" % (headers, content))
|
||||||
|
if err:
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
def write_response_cli(text, err=False):
|
||||||
|
for line in text.rstrip('\n').split('\n'):
|
||||||
|
if err:
|
||||||
|
sys.stderr.write("%s\n" % (line))
|
||||||
|
else:
|
||||||
|
sys.stdout.write("%s\n" % (line))
|
||||||
|
if err:
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def write_response(text, err=False):
|
||||||
|
if cgi_method():
|
||||||
|
write_response_cgi(text, err)
|
||||||
|
else:
|
||||||
|
write_response_cli(text, err)
|
||||||
|
|
||||||
|
opt_flags = 'dhv'
|
||||||
|
opt_longflags = ['debug', 'help', 'verbose']
|
||||||
|
|
||||||
|
def usage(errtext):
|
||||||
|
prog = os.path.basename(sys.argv[0])
|
||||||
|
text = "%s\n" % (errtext)
|
||||||
|
text += "\n"
|
||||||
|
text += "Usage: %s [-d] [-h] [-v] <command> [args]\n" % (prog)
|
||||||
|
text += "\n"
|
||||||
|
text += " Commands:\n"
|
||||||
|
text += " get-hosts <domain>\n"
|
||||||
|
text += " Get host list for domain\n"
|
||||||
|
text += " set-hosts [--replace-all] <domain> <host-spec> ...\n"
|
||||||
|
text += " Set host list for domain\n"
|
||||||
|
text += " Use --replace-all to replace all hosts\n"
|
||||||
|
text += " update-hosts <domain> <host-spec> ...\n"
|
||||||
|
text += " Alias for set-hosts\n"
|
||||||
|
text += " replace-hosts <domain> <host-spec> ...\n"
|
||||||
|
text += " Alias for set-hosts --replace-all\n"
|
||||||
|
text += "\n"
|
||||||
|
text += " host-spec -> name[:type]=address\n"
|
||||||
|
text += " type -> A, AAAA, TXT, ...\n"
|
||||||
|
text += " ipv4, ipv6\n"
|
||||||
|
text += " none, delete\n"
|
||||||
|
write_response(text, True)
|
||||||
|
|
||||||
|
def config_load(pathname):
|
||||||
|
required_keys = ['nc_user', 'nc_apikey', 'nc_clientip']
|
||||||
|
if not os.path.exists(pathname):
|
||||||
|
raise RuntimeError("Configuration file %s does not exist" % (pathname))
|
||||||
|
st = os.stat(pathname)
|
||||||
|
if (st.st_mode & 077) != 0:
|
||||||
|
raise RuntimeError("Configuration file %s has incorrect permissions" % (pathname))
|
||||||
|
config['debug'] = False
|
||||||
|
config['verbose'] = False
|
||||||
|
config['server'] = 'api.namecheap.com'
|
||||||
|
with open(pathname, 'r') as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.rstrip('\n')
|
||||||
|
if len(line) == 0 or line.startswith('#'):
|
||||||
|
continue
|
||||||
|
(k, v) = line.split('=', 1)
|
||||||
|
config[k] = v
|
||||||
|
for k in required_keys:
|
||||||
|
if not k in config:
|
||||||
|
raise RuntimeError("Configuration missing required key \"%s\"" % (k))
|
||||||
|
|
||||||
|
def args_parse_cgi():
|
||||||
|
import cgi
|
||||||
|
argv = []
|
||||||
|
opts = []
|
||||||
|
|
||||||
|
if not 'HTTP_X_SSL_CIPHER' in os.environ:
|
||||||
|
raise RuntimeError('Insecure request denied')
|
||||||
|
try:
|
||||||
|
values = cgi.parse()
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(str(e))
|
||||||
|
|
||||||
|
arg = values.get('username', [])
|
||||||
|
if len(arg) != 1:
|
||||||
|
raise RuntimeError("No username specified")
|
||||||
|
username = arg[0]
|
||||||
|
del values['username']
|
||||||
|
arg = values.get('password', [])
|
||||||
|
if len(arg) != 1:
|
||||||
|
raise RuntimeError("No password specified")
|
||||||
|
password = arg[0]
|
||||||
|
del values['password']
|
||||||
|
|
||||||
|
allowed = False
|
||||||
|
allowed_users = config.get('allowed_users', '')
|
||||||
|
for v in allowed_users.split(','):
|
||||||
|
fields = v.split(':', 1)
|
||||||
|
if len(fields) != 2:
|
||||||
|
raise RuntimeError("Invalid allowed_users")
|
||||||
|
if username == fields[0] and password == fields[1]:
|
||||||
|
allowed = True
|
||||||
|
if not allowed:
|
||||||
|
raise RuntimeError('Access denied')
|
||||||
|
|
||||||
|
for k in ['cmd', 'domain', 'hosts']:
|
||||||
|
arg = values.get(k, [])
|
||||||
|
if len(arg) != 1:
|
||||||
|
raise RuntimeError("Bad parameter %s" % (k))
|
||||||
|
argv.append(arg[0])
|
||||||
|
del values[k]
|
||||||
|
for k in values:
|
||||||
|
if not k in opt_longflags:
|
||||||
|
raise RuntimeError("Unknown option %s" % (k))
|
||||||
|
for v in values[k]:
|
||||||
|
opts.append(("--%s" % (k), v))
|
||||||
|
|
||||||
|
return opts, argv
|
||||||
|
|
||||||
|
def args_parse_cli():
|
||||||
|
try:
|
||||||
|
opts, argv = getopt.getopt(sys.argv[1:], opt_flags, opt_longflags)
|
||||||
|
except getopt.GetoptError as e:
|
||||||
|
raise RuntimeError("Bad arguments: %s\n" % (str(e)))
|
||||||
|
|
||||||
|
return opts, argv
|
||||||
|
|
||||||
|
def args_parse():
|
||||||
|
if cgi_method():
|
||||||
|
opts, argv = args_parse_cgi()
|
||||||
|
else:
|
||||||
|
opts, argv = args_parse_cli()
|
||||||
|
for k, v in opts:
|
||||||
|
if k in ('-d', '--debug'):
|
||||||
|
config['debug'] = True
|
||||||
|
if k in ('-h', '--help'):
|
||||||
|
config['help'] = True
|
||||||
|
if k in ('-v', '--verbose'):
|
||||||
|
config['verbose'] = True
|
||||||
|
|
||||||
|
return argv
|
||||||
|
|
||||||
|
def domain_parse(text):
|
||||||
|
parts = text.split('.')
|
||||||
|
if len(parts) != 2:
|
||||||
|
raise RuntimeError("Invalid domain: %s" % (text))
|
||||||
|
return parts
|
||||||
|
|
||||||
|
def address_is_ipv4(text):
|
||||||
|
result = re.match('^[0-9.]{7,15}$', text)
|
||||||
|
return result != None
|
||||||
|
|
||||||
|
def address_is_ipv6(text):
|
||||||
|
result = re.match('^[0-9A-Fa-f:]{3,39}$', text)
|
||||||
|
return result != None
|
||||||
|
|
||||||
|
def address_type(text):
|
||||||
|
if address_is_ipv4(text):
|
||||||
|
return 'A'
|
||||||
|
if address_is_ipv6(text):
|
||||||
|
return 'AAAA'
|
||||||
|
return 'TXT'
|
||||||
|
|
||||||
|
class hostent:
|
||||||
|
def __init__(self, name, type):
|
||||||
|
self._name = name
|
||||||
|
self._type = type
|
||||||
|
|
||||||
|
def name(self):
|
||||||
|
return self._name
|
||||||
|
def type(self):
|
||||||
|
return self._type
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "%s:%s" % (self.name(), self.type())
|
||||||
|
def __eq__(self, other):
|
||||||
|
return str(self) == str(other)
|
||||||
|
def __hash__(self):
|
||||||
|
return hash(str(self))
|
||||||
|
|
||||||
|
def hostspec_parse(text):
|
||||||
|
(k, v) = text.split('=')
|
||||||
|
if k.find(':') == -1:
|
||||||
|
hn = k
|
||||||
|
ht = address_type(v)
|
||||||
|
else:
|
||||||
|
(hn, ht) = k.split(':')
|
||||||
|
if v.lower() == 'none' or v.lower() == 'delete':
|
||||||
|
v = ''
|
||||||
|
return (hostent(hn, ht.upper()), v)
|
||||||
|
|
||||||
|
def nc_api_url(server):
|
||||||
|
return "https://%s/xml.response" % (server)
|
||||||
|
|
||||||
|
def nc_parse_response(text):
|
||||||
|
return ElementTree.fromstring(re.sub('xmlns="([^"]*)"', '', text))
|
||||||
|
|
||||||
|
def nc_api_get(cmd, args):
|
||||||
|
api_url = nc_api_url(config['server'])
|
||||||
|
params = args.copy()
|
||||||
|
params['apiuser'] = config['nc_user']
|
||||||
|
params['apikey'] = config['nc_apikey']
|
||||||
|
params['Command'] = cmd
|
||||||
|
params['UserName'] = config['nc_user']
|
||||||
|
params['ClientIp'] = config['nc_clientip']
|
||||||
|
r = rs.get(api_url, params=params)
|
||||||
|
r.raise_for_status()
|
||||||
|
root = nc_parse_response(r.text)
|
||||||
|
if root.attrib.get('Status') != 'OK':
|
||||||
|
elem = root.find('Errors').find('Error')
|
||||||
|
raise RuntimeError("API call failed: " + elem.text)
|
||||||
|
return root
|
||||||
|
|
||||||
|
def nc_api_post(cmd, args):
|
||||||
|
api_url = nc_api_url(config['server'])
|
||||||
|
params = args.copy()
|
||||||
|
params['apiuser'] = config['nc_user']
|
||||||
|
params['apikey'] = config['nc_apikey']
|
||||||
|
params['Command'] = cmd
|
||||||
|
params['UserName'] = config['nc_user']
|
||||||
|
params['ClientIp'] = config['nc_clientip']
|
||||||
|
r = rs.post(api_url, data=params)
|
||||||
|
r.raise_for_status()
|
||||||
|
root = nc_parse_response(r.text)
|
||||||
|
if root.attrib.get('Status') != 'OK':
|
||||||
|
elem = root.find('Errors').find('Error')
|
||||||
|
raise RuntimeError("API call failed: " + elem.text)
|
||||||
|
return root
|
||||||
|
|
||||||
|
def nc_get_hosts(domain):
|
||||||
|
cmd = 'namecheap.domains.dns.getHosts'
|
||||||
|
(sld, tld) = domain_parse(domain)
|
||||||
|
args = dict()
|
||||||
|
args['TLD'] = tld
|
||||||
|
args['SLD'] = sld
|
||||||
|
root = nc_api_get(cmd, args)
|
||||||
|
cr = root.find('CommandResponse')
|
||||||
|
hr = cr.find('DomainDNSGetHostsResult')
|
||||||
|
hosts = dict()
|
||||||
|
for h in list(hr):
|
||||||
|
hn = h.attrib.get('Name')
|
||||||
|
ht = h.attrib.get('Type')
|
||||||
|
ha = h.attrib.get('Address')
|
||||||
|
hosts[hostent(hn, ht)] = ha
|
||||||
|
return hosts
|
||||||
|
|
||||||
|
def nc_set_hosts(domain, hosts):
|
||||||
|
cmd = 'namecheap.domains.dns.setHosts'
|
||||||
|
(sld, tld) = domain_parse(domain)
|
||||||
|
args = dict()
|
||||||
|
args['TLD'] = tld
|
||||||
|
args['SLD'] = sld
|
||||||
|
n = 1
|
||||||
|
for k, v in hosts.items():
|
||||||
|
args["HostName%d" % (n)] = k.name()
|
||||||
|
args["RecordType%d" % (n)] = k.type()
|
||||||
|
args["Address%d" % (n)] = v
|
||||||
|
n = n + 1
|
||||||
|
nc_api_post(cmd, args)
|
||||||
|
|
||||||
|
def cmd_get_hosts(argv):
|
||||||
|
if len(argv) != 1:
|
||||||
|
usage("get-hosts: Expected exactly one argument")
|
||||||
|
domain = argv.pop(0)
|
||||||
|
hosts = nc_get_hosts(domain)
|
||||||
|
text = ''
|
||||||
|
if config['verbose']:
|
||||||
|
text += "%-32s%-8s%s\n" % ('Host', 'Type', 'Address')
|
||||||
|
text += "%-32s%-8s%s\n" % ('====', '====', '=======')
|
||||||
|
for h, a in hosts.items():
|
||||||
|
if config['verbose']:
|
||||||
|
text += "%-32s%-8s%s\n" % (h.name(), h.type(), a)
|
||||||
|
else:
|
||||||
|
text += "%s:%s=%s\n" % (h.name(), h.type(), a)
|
||||||
|
return text
|
||||||
|
|
||||||
|
def cmd_set_hosts(argv):
|
||||||
|
replace_all = False
|
||||||
|
if len(argv) > 1 and argv[0] == '--replace-all':
|
||||||
|
replace_all = True
|
||||||
|
argv.pop(0)
|
||||||
|
if len(argv) < 2:
|
||||||
|
usage("set-hosts: Expected at least two arguments")
|
||||||
|
domain = argv.pop(0)
|
||||||
|
if not replace_all:
|
||||||
|
hosts = nc_get_hosts(domain)
|
||||||
|
for arg in argv:
|
||||||
|
(h, a) = hostspec_parse(arg)
|
||||||
|
if a:
|
||||||
|
hosts[h] = a
|
||||||
|
else:
|
||||||
|
if h in hosts:
|
||||||
|
del hosts[h]
|
||||||
|
nc_set_hosts(domain, hosts)
|
||||||
|
text = ''
|
||||||
|
if config['verbose']:
|
||||||
|
text += "Success\n"
|
||||||
|
return text
|
||||||
|
|
||||||
|
def cmd_update_hosts(argv):
|
||||||
|
return cmd_set_hosts(argv)
|
||||||
|
|
||||||
|
def cmd_replace_hosts(argv):
|
||||||
|
return cmd_set_hosts(['--replace-all'] + argv)
|
||||||
|
|
||||||
|
### Begin main script ###
|
||||||
|
|
||||||
|
uid = os.getuid()
|
||||||
|
pwent = pwd.getpwuid(uid)
|
||||||
|
home = pwent[5]
|
||||||
|
config = dict()
|
||||||
|
rs = requests.Session()
|
||||||
|
|
||||||
|
try:
|
||||||
|
config_load("%s/.ncapirc" % (home))
|
||||||
|
except RuntimeError as e:
|
||||||
|
write_response("Failed to load configuration: %s" % (str(e)), True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
argv = args_parse()
|
||||||
|
except RuntimeError as e:
|
||||||
|
write_response("Failed to parse arguments: %s\n" % (str(e)), True)
|
||||||
|
|
||||||
|
cmd_actions = {
|
||||||
|
'get-hosts': cmd_get_hosts,
|
||||||
|
'set-hosts': cmd_set_hosts,
|
||||||
|
'update-hosts': cmd_update_hosts,
|
||||||
|
'replace-hosts': cmd_replace_hosts
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(argv) == 0:
|
||||||
|
usage("No action specified")
|
||||||
|
action = argv[0]
|
||||||
|
if not action in cmd_actions:
|
||||||
|
usage("Unrecognized action \"%s\"" % (action))
|
||||||
|
|
||||||
|
method = cgi_method()
|
||||||
|
if method:
|
||||||
|
if method != "GET" and method != "POST":
|
||||||
|
write_response("Unrecognized method", True)
|
||||||
|
if method == "GET" and not action.startswith("get"):
|
||||||
|
write_response("Invalid method", True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
text = cmd_actions[action](argv[1:])
|
||||||
|
write_response(text)
|
||||||
|
except RuntimeError as e:
|
||||||
|
write_response("Error: %s" % (str(e)), True)
|
Loading…
Reference in New Issue