Rework async file copy and rename to acp_*

This commit is contained in:
Tom Marshall 2021-04-27 07:58:32 -07:00
parent 8cb0308349
commit 476ece4388
1 changed files with 73 additions and 40 deletions

113
vmmd
View File

@ -736,7 +736,7 @@ class Image(DbObject):
pathname = "%s/%s" % (config['image.storage.location'], os.path.basename(url))
img = Image({'name': name, 'pathname': pathname, 'owner': user['name'], 'public': public})
print("Image: add %s to fetch queue" % (url))
file_copy_async(url, pathname)
acp_queue(url, pathname)
return img
@staticmethod
@ -744,11 +744,9 @@ class Image(DbObject):
filename = os.path.basename(vm.disk_pathname())
pathname = "%s/%s" % (config['image.storage.location'], filename)
img = Image({'name': name, 'pathname': pathname, 'owner': user['name'], 'public': public})
file_copy_async(vm.disk_pathname(), img.pathname())
acp_queue(vm.disk_pathname(), img.pathname())
return img
def copying(self):
return self['pathname'] in file_copy_map
def fmt(self):
return self._fmt
def physical_size(self):
@ -821,7 +819,7 @@ class VirtualMachine(DbObject):
argv = [find_in_path('qemu-img'), 'create', '-f', 'qcow2', '-b', img['pathname'], diskpath]
cmd_run(argv)
else:
file_copy_async(img['pathname'], diskpath)
acp_queue(img['pathname'], diskpath)
return VirtualMachine({'name': name, 'owner': owner['name'],
'arch': arch, 'cpus': cpus, 'mem': mem,
'vncpass': None, 'macaddr': None,
@ -848,7 +846,7 @@ class VirtualMachine(DbObject):
(root, ext) = os.path.splitext(image_url)
disk_pathname = VirtualMachine.pathname_for_disk(owner.name(), name, ext)
vm = VirtualMachine(None, name, owner, arch, cpus, mem, None, None, disk_pathname)
file_copy_async(image_url, disk_pathname)
acp_queue(image_url, disk_pathname)
return vm
def _qemu_pidfile(self):
@ -908,8 +906,6 @@ class VirtualMachine(DbObject):
return self._disk_psize
def disk_virtual_size(self):
return self._disk_vsize
def copying(self):
return self['diskpath'] in file_copy_map
def running(self):
return not self._pid is None
@ -930,7 +926,7 @@ class VirtualMachine(DbObject):
def start(self, **kwargs):
if not self.disk_exists():
raise RuntimeError("Cannot start without disk")
if self.copying():
if not acp_progress(self['diskpath']) is None:
raise RuntimeError("Cannot start while copying")
force_readonly = self['diskpath'].endswith('.vmdk')
readonly = force_readonly or kwargs.get('readonly', False)
@ -1510,10 +1506,11 @@ class HttpClientRequestHandler(http.server.BaseHTTPRequestHandler):
r += " <tr><td style=\"font-weight:bold\">Virtual Size<td>%s\n" % (readable_size(img.virtual_size(), ONE_MB))
r += " <tr><td style=\"font-weight:bold\">Physical Size<td>%s\n" % (readable_size(img.physical_size(), ONE_MB))
r += ' <tr><td>&nbsp;<td>&nbsp;\n'
if img.copying():
r += ' <tr><td style="font-weight:bold">Copying<td>&nbsp;\n'
else:
pct = acp_progress(img['pathname'])
if pct is None:
r += ' <tr><td><input style="color:red" type="submit" name="action" value="Delete"><td>&nbsp;'
else:
r += " <tr><td style=\"font-weight:bold\">Copying<td>%d%%\n" % (pct)
r += ' </table>\n'
r += ' </form>\n'
else:
@ -1542,13 +1539,14 @@ class HttpClientRequestHandler(http.server.BaseHTTPRequestHandler):
r += "<td><a href=\"/ui/image?type=%s&id=%d\">%s</a>" % (img_type, img['id'], img['name'])
r += "<td>%s" % (img['owner'])
r += "<td>%s" % ('Public' if img['public'] else 'Private')
if not img.copying():
pct = acp_progress(img['pathname'])
if pct is None:
if img.type() == 'disk':
r += "<td><a href=\"/ui/vm/create?img_id=%d\">Launch</a>" % (img['id'])
else:
r += "<td>&nbsp;"
else:
r += "<td>Creating..."
r += "<td>Creating %d%%" % (pct)
r += '</tr>\n'
r += ' </table>\n'
r += self._html_foot(user)
@ -1868,12 +1866,13 @@ class HttpClientRequestHandler(http.server.BaseHTTPRequestHandler):
else:
r += "<td>%s<input type=\"submit\" name=\"action\" value=\"Insert\">\n" % (self._iso_image_select())
r += ' <tr><td>&nbsp;<td>&nbsp;\n'
pct = acp_progress(vm['diskpath'])
if vm.running():
r += ' <tr><td colspan="2"><input type="submit" name="action" value="Suspend">\n'
r += ' <tr><td colspan="2"><input type="submit" name="action" value="Power Off">\n'
r += ' <tr><td colspan="2"><input type="submit" name="action" value="Kill">\n'
elif vm.copying():
r += ' <tr><td style="font-weight:bold">Copying<td>&nbsp;\n'
elif not pct is None:
r += " <tr><td style=\"font-weight:bold\">Copying<td>%d%%\n" % (pct)
else:
r += ' <tr><td colspan="2"><input type="submit" name="action" value="Start">'
# XXX: vm.disktype()
@ -2127,18 +2126,33 @@ def ssl_http_listener():
### File copier ###
file_copy_lock = threading.Lock()
file_copy_queue = []
file_copy_map = {}
_acp_lock = threading.Lock()
_acp_queue = []
_acp_map = {}
_acp_src = None
_acp_dst = None
_acp_pct = None
def file_copy_async(src, dst):
file_copy_lock.acquire()
file_copy_queue.append(dst)
file_copy_map[dst] = src
file_copy_lock.release()
def acp_queue(src, dst):
_acp_lock.acquire()
_acp_queue.append(dst)
_acp_map[dst] = src
_acp_lock.release()
def file_copy_simple(srcfile, dstfile, file_size, watcher):
def acp_progress(dst):
pct = None
_acp_lock.acquire()
if dst in _acp_map:
pct = 0
if _acp_dst == dst:
pct = _acp_pct
_acp_lock.release()
return pct
def acp_simple(srcfile, dstfile, file_size):
global _acp_pct
off = 0
pct = 0
while off < file_size:
chunk = min(1024*1024, file_size - off)
buf = srcfile.read(chunk)
@ -2146,12 +2160,19 @@ def file_copy_simple(srcfile, dstfile, file_size, watcher):
raise RuntimeError("Failed to read")
dstfile.write(buf)
off += len(buf)
if watcher:
watcher.copy_pct((int)(100 * off / file_size))
curpct = int(100 * off / file_size)
if curpct != pct:
print("acp_simple: dstfile=%s curpct=%d" % (dstfile, curpct))
pct = curpct
_acp_lock.acquire()
_acp_pct = pct
_acp_lock.release()
# XXX: could provide better status with disk blocks and copied size
def file_copy_sparse(srcfile, dstfile, file_size, watcher):
def acp_sparse(srcfile, dstfile, file_size):
global _acp_pct
off = 0
pct = 0
while off < file_size:
off = srcfile.seek(off, os.SEEK_DATA)
end = srcfile.seek(off, os.SEEK_HOLE)
@ -2164,32 +2185,44 @@ def file_copy_sparse(srcfile, dstfile, file_size, watcher):
raise RuntimeError("Failed to read")
dstfile.write(buf)
off += len(buf)
if watcher:
watcher.copy_pct((int)(100 * off / file_size))
curpct = int(100 * off / file_size)
if curpct != pct:
pct = curpct
_acp_lock.acquire()
_acp_pct = pct
_acp_lock.release()
def file_copier():
global _acp_src
global _acp_dst
global _acp_pct
while True:
try:
src = None
file_copy_lock.acquire()
if file_copy_queue:
dst = file_copy_queue.pop(0)
src = file_copy_map[dst]
file_copy_lock.release()
_acp_lock.acquire()
if _acp_queue:
dst = _acp_queue.pop(0)
src = _acp_map[dst]
_acp_lock.release()
if src:
_acp_src = src
_acp_dst = dst
_acp_pct = 0
if src.startswith('/'):
srcfile = open(src, 'rb')
dstfile = open(dst, 'wb')
size = os.stat(src).st_size
file_copy_sparse(srcfile, dstfile, size, None)
acp_sparse(srcfile, dstfile, size)
else:
srcfile = urllib.request.urlopen(src)
dstfile = open(dst, 'wb')
size = int(srcfile.headers.get('Content-Length', '0'))
file_copy_simple(srcfile, dstfile, size, None)
file_copy_lock.acquire()
del file_copy_map[dst]
file_copy_lock.release()
acp_simple(srcfile, dstfile, size)
_acp_src = None
_acp_dst = None
_acp_lock.acquire()
del _acp_map[dst]
_acp_lock.release()
except BaseException as e:
loge("Exception in file_copy thread: %s" % (e))
time.sleep(1)