235 lines
9.3 KiB
Python
Executable File
235 lines
9.3 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
# Copyright (C) 2009 Canonical Ltd.
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License version 3,
|
|
# as published by the Free Software Foundation.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import dbus
|
|
from gi.repository import GObject, GLib, UDisks
|
|
import dbus.service
|
|
import logging
|
|
import os
|
|
import time
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
from dbus.mainloop.glib import DBusGMainLoop
|
|
from usbcreator.misc import (
|
|
USBCreatorProcessException,
|
|
find_on_path,
|
|
popen,
|
|
sane_path,
|
|
)
|
|
|
|
USBCREATOR_IFACE = 'com.ubuntu.USBCreator'
|
|
PROPS_IFACE = 'org.freedesktop.DBus.Properties'
|
|
|
|
no_options = GLib.Variant('a{sv}', {})
|
|
|
|
loop_prefix = '/org/freedesktop/UDisks2/block_devices/loop'
|
|
|
|
sane_path()
|
|
|
|
def _get_object_path_from_device(device_name):
|
|
if device_name.startswith('/dev/'):
|
|
return '/org/freedesktop/UDisks2/block_devices/' + device_name[5:]
|
|
return device_name
|
|
|
|
def _get_parent_object(udisks, device_name):
|
|
obj = udisks.get_object(_get_object_path_from_device(device_name))
|
|
if obj.get_partition_table():
|
|
return obj
|
|
partition = obj.get_partition()
|
|
if not partition:
|
|
return obj
|
|
parent = partition.get_cached_property('Table').get_string()
|
|
return udisks.get_object(parent)
|
|
|
|
def unmount_all(udisks, parent):
|
|
'''Unmounts the device or any partitions of the device.'''
|
|
parent_path = parent.get_object_path()
|
|
manager = udisks.get_object_manager()
|
|
for obj in manager.get_objects():
|
|
block = obj.get_block()
|
|
partition = obj.get_partition()
|
|
fs = obj.get_filesystem()
|
|
if not (block and partition and fs):
|
|
continue
|
|
block_name = block.get_cached_property('Device').get_bytestring().decode('utf-8')
|
|
table = partition.get_cached_property('Table').get_string()
|
|
mounts = fs.get_cached_property('MountPoints').get_bytestring_array()
|
|
if table == parent_path and len(mounts):
|
|
logging.debug('Unmounting %s' % block_name)
|
|
# We explictly avoid catching errors here so that failure to
|
|
# unmount a partition causes the format method to fail with the
|
|
# error floating up to the frontend.
|
|
fs.call_unmount_sync(no_options, None)
|
|
|
|
fs = parent.get_filesystem()
|
|
if not fs:
|
|
return
|
|
dev_name = parent.get_block().get_cached_property('Device').get_bytestring().decode('utf-8')
|
|
mounts = fs.get_cached_property('MountPoints').get_bytestring_array()
|
|
if len(mounts):
|
|
logging.debug('Unmounting %s' % dev_name)
|
|
fs.call_unmount_sync(no_options, None)
|
|
|
|
def check_system_internal(device):
|
|
block = device.get_block()
|
|
is_system = block.get_cached_property('HintSystem').get_boolean()
|
|
is_loop = block.get_object_path().startswith(loop_prefix) and not block.get_cached_property('ReadOnly').get_boolean()
|
|
if is_system and not is_loop:
|
|
raise dbus.DBusException('com.ubuntu.USBCreator.Error.SystemInternal')
|
|
|
|
def mem_free():
|
|
# Largely copied from partman-base.
|
|
free = 0
|
|
with open('/proc/meminfo') as meminfo:
|
|
for line in meminfo:
|
|
if line.startswith('MemFree:'):
|
|
free += int(line.split()[1]) / 1024.0
|
|
if line.startswith('Buffers:'):
|
|
free += int(line.split()[1]) / 1024.0
|
|
return free
|
|
|
|
class USBCreator(dbus.service.Object):
|
|
def __init__(self):
|
|
bus_name = dbus.service.BusName(USBCREATOR_IFACE, bus=dbus.SystemBus())
|
|
dbus.service.Object.__init__(self, bus_name, '/com/ubuntu/USBCreator')
|
|
self.dbus_info = None
|
|
self.polkit = None
|
|
|
|
def _builtin_dd(self, source, target, block_size=8388608):
|
|
src_size = os.stat(source).st_size
|
|
src = open(source, 'rb')
|
|
dst = open(target, 'wb')
|
|
written = 0
|
|
current_progress = 0
|
|
self.Progress(0)
|
|
|
|
data = src.read(block_size)
|
|
while(data):
|
|
dst.write(data)
|
|
written += len(data)
|
|
new_progress = int(written / src_size * 100.0)
|
|
if new_progress != current_progress:
|
|
# flush buffers and sync before notifying the new progress
|
|
dst.flush()
|
|
os.fsync(dst.fileno())
|
|
|
|
self.Progress(new_progress)
|
|
current_progress = new_progress
|
|
data = src.read(block_size)
|
|
|
|
src.close()
|
|
dst.close()
|
|
|
|
@dbus.service.method(USBCREATOR_IFACE, in_signature='', out_signature='b')
|
|
def KVMOk(self):
|
|
mem = mem_free()
|
|
logging.debug('Asked to run KVM with %f M free' % mem)
|
|
if mem >= 768 and find_on_path('kvm-ok') and find_on_path('kvm'):
|
|
import subprocess
|
|
if subprocess.call(['kvm-ok']) == 0:
|
|
return True
|
|
return False
|
|
|
|
@dbus.service.method(USBCREATOR_IFACE, in_signature='sa{ss}', out_signature='',
|
|
sender_keyword='sender', connection_keyword='conn')
|
|
def KVMTest(self, device, env, sender=None, conn=None):
|
|
'''Run KVM with the freshly created device as the first disk.'''
|
|
self.check_polkit(sender, conn, 'com.ubuntu.usbcreator.kvm')
|
|
for key in ('DISPLAY', 'XAUTHORITY'):
|
|
if key not in env:
|
|
logging.debug('Missing %s' % key)
|
|
return
|
|
udisks = UDisks.Client.new_sync(None)
|
|
obj = _get_parent_object(udisks, device)
|
|
# TODO unmount all the partitions.
|
|
dev_file = obj.get_block().get_cached_property('Device').get_bytestring().decode('utf-8')
|
|
if mem_free() >= 1280:
|
|
envp = []
|
|
for k, v in env.items():
|
|
envp.append('%s=%s' % (str(k), str(v)))
|
|
cmd = ('kvm', '-m', '1024', '-hda', str(dev_file))
|
|
flags = (GObject.SPAWN_SEARCH_PATH)
|
|
# Don't let SIGINT propagate to the child.
|
|
GObject.spawn_async(cmd, envp=envp, flags=flags, child_setup=os.setsid)
|
|
|
|
@dbus.service.method(USBCREATOR_IFACE, in_signature='ssb', out_signature='',
|
|
sender_keyword='sender', connection_keyword='conn')
|
|
def Image(self, source, target, allow_system_internal,
|
|
sender=None, conn=None):
|
|
self.check_polkit(sender, conn, 'com.ubuntu.usbcreator.image')
|
|
|
|
udisks = UDisks.Client.new_sync(None)
|
|
obj = udisks.get_object(_get_object_path_from_device(target))
|
|
logging.debug('Using target: %s' % target)
|
|
if not allow_system_internal:
|
|
check_system_internal(obj)
|
|
|
|
unmount_all(udisks, obj)
|
|
start_time = time.time()
|
|
self._builtin_dd(source.encode(), target.encode())
|
|
logging.debug('Wrote image in %s seconds' % str(int(time.time() - start_time)))
|
|
|
|
@dbus.service.signal(USBCREATOR_IFACE, signature='u')
|
|
def Progress(self, value):
|
|
pass
|
|
|
|
@dbus.service.method(USBCREATOR_IFACE, in_signature='', out_signature='',
|
|
sender_keyword='sender', connection_keyword='conn')
|
|
def Shutdown(self, sender=None, conn=None):
|
|
logging.debug('Shutting down.')
|
|
loop.quit()
|
|
|
|
# Taken from Jockey 0.5.3.
|
|
def check_polkit(self, sender, conn, priv):
|
|
if sender is None and conn is None:
|
|
return
|
|
if self.dbus_info is None:
|
|
self.dbus_info = dbus.Interface(conn.get_object(
|
|
'org.freedesktop.DBus',
|
|
'/org/freedesktop/DBus/Bus',
|
|
False), 'org.freedesktop.DBus')
|
|
pid = self.dbus_info.GetConnectionUnixProcessID(sender)
|
|
if self.polkit is None:
|
|
self.polkit = dbus.Interface(dbus.SystemBus().get_object(
|
|
'org.freedesktop.PolicyKit1',
|
|
'/org/freedesktop/PolicyKit1/Authority',
|
|
False), 'org.freedesktop.PolicyKit1.Authority')
|
|
try:
|
|
# we don't need is_challenge return here, since we call with
|
|
# AllowUserInteraction
|
|
(is_auth, _, details) = self.polkit.CheckAuthorization(
|
|
('system-bus-name', {'name': dbus.String(sender,
|
|
variant_level = 1)}), priv, {'': ''},
|
|
dbus.UInt32(1), '', timeout=600)
|
|
except dbus.DBusException as e:
|
|
if e._dbus_error_name == 'org.freedesktop.DBus.Error.ServiceUnknown':
|
|
# polkitd timed out, connect again
|
|
self.polkit = None
|
|
return self.check_polkit(sender, conn, priv)
|
|
else:
|
|
raise
|
|
|
|
if not is_auth:
|
|
logging.debug('_check_polkit_privilege: sender %s on connection %s '
|
|
'pid %i is not authorized for %s: %s' %
|
|
(sender, conn, pid, priv, str(details)))
|
|
raise dbus.DBusException('com.ubuntu.USBCreator.Error.NotAuthorized')
|
|
|
|
DBusGMainLoop(set_as_default=True)
|
|
helper = USBCreator()
|
|
loop = GLib.MainLoop()
|
|
loop.run()
|