365 lines
14 KiB
Python
Executable File
365 lines
14 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
from __future__ import print_function
|
|
|
|
import apt_pkg
|
|
import io
|
|
import os
|
|
import re
|
|
import sys
|
|
import gettext
|
|
import locale
|
|
import argparse
|
|
import subprocess
|
|
|
|
from softwareproperties.shortcuthandler import ShortcutException
|
|
from softwareproperties.shortcuts import shortcut_handler
|
|
from softwareproperties.ppa import PPAShortcutHandler
|
|
from softwareproperties.cloudarchive import CloudArchiveShortcutHandler
|
|
from softwareproperties.sourceslist import SourcesListShortcutHandler
|
|
from softwareproperties.uri import URIShortcutHandler
|
|
|
|
from softwareproperties.extendedsourceslist import (SourceEntry,
|
|
SourcesList,
|
|
CollapsedSourcesList)
|
|
|
|
from aptsources.distro import get_distro
|
|
from copy import copy
|
|
from gettext import gettext as _
|
|
|
|
|
|
apt_pkg.init()
|
|
|
|
SOURCESLIST = apt_pkg.config.find_file("Dir::Etc::sourcelist")
|
|
|
|
|
|
class AddAptRepository(object):
|
|
def __init__(self):
|
|
gettext.textdomain("software-properties")
|
|
self.distro = get_distro()
|
|
self.sourceslist = SourcesList()
|
|
self.distro.get_sources(self.sourceslist)
|
|
|
|
def parse_args(self, args):
|
|
description = "Only ONE of -P, -C, -U, -S, or old-style 'line' can be specified"
|
|
|
|
parser = argparse.ArgumentParser(description=description)
|
|
parser.add_argument("-d", "--debug", action="store_true",
|
|
help=_("Print debug"))
|
|
parser.add_argument("-r", "--remove", action="store_true",
|
|
help=_("Disable repository"))
|
|
parser.add_argument("-s", "--enable-source", action="count", default=0,
|
|
help=_("Allow downloading of the source packages from the repository"))
|
|
parser.add_argument("-c", "--component", action="append", default=[],
|
|
help=_("Components to use with the repository"))
|
|
parser.add_argument("-p", "--pocket",
|
|
help=_("Add entry for this pocket"))
|
|
parser.add_argument("-y", "--yes", action="store_true",
|
|
help=_("Assume yes to all queries"))
|
|
parser.add_argument("-n", "--no-update", dest="update", action="store_false",
|
|
help=_("Do not update package cache after adding"))
|
|
parser.add_argument("-u", "--update", action="store_true", default=True,
|
|
help=argparse.SUPPRESS)
|
|
parser.add_argument("-l", "--login", action="store_true",
|
|
help=_("Login to Launchpad."))
|
|
parser.add_argument("--dry-run", action="store_true",
|
|
help=_("Don't actually make any changes."))
|
|
|
|
group = parser.add_mutually_exclusive_group()
|
|
group.add_argument("-L", "--list", action="store_true",
|
|
help=_("List currently configured repositories"))
|
|
group.add_argument("-P", "--ppa",
|
|
help=_("PPA to add"))
|
|
group.add_argument("-C", "--cloud",
|
|
help=_("Cloud Archive to add"))
|
|
group.add_argument("-U", "--uri",
|
|
help=_("Archive URI to add"))
|
|
group.add_argument("-S", "--sourceslist", nargs='+', default=[],
|
|
help=_("Full sources.list entry line to add"))
|
|
group.add_argument("line", nargs='*', default=[],
|
|
help=_("sources.list line to add (deprecated)"))
|
|
|
|
self.parser = parser
|
|
self.options = self.parser.parse_args(args)
|
|
|
|
@property
|
|
def dry_run(self):
|
|
return self.options.dry_run
|
|
|
|
@property
|
|
def enable_source(self):
|
|
return self.options.enable_source > 0
|
|
|
|
@property
|
|
def components(self):
|
|
return self.options.component
|
|
|
|
@property
|
|
def pocket(self):
|
|
if self.options.pocket:
|
|
return self.options.pocket.lower()
|
|
return None
|
|
|
|
@property
|
|
def source_type(self):
|
|
return self.distro.source_type
|
|
|
|
@property
|
|
def binary_type(self):
|
|
return self.distro.binary_type
|
|
|
|
def is_components(self, comps):
|
|
if not comps:
|
|
return False
|
|
return set(comps.split()) <= set([comp.name for comp in self.distro.source_template.components])
|
|
|
|
def apt_update(self):
|
|
if self.options.update and not self.dry_run:
|
|
# We prefer to run apt-get update here. The built-in update support
|
|
# does not have any progress, and only works for shortcuts. Moving
|
|
# it to something like save() and using apt.progress.text would
|
|
# solve the problem, but the new errors might cause problems with
|
|
# the dbus server or other users of the API. Also, it's unclear
|
|
# how good the text progress is or how to pass it best.
|
|
subprocess.run(['apt-get', 'update'])
|
|
|
|
def prompt_user(self):
|
|
if self.dry_run:
|
|
print(_("DRY-RUN mode: no modifications will be made"))
|
|
return
|
|
if not self.options.yes and sys.stdin.isatty() and not "FORCE_ADD_APT_REPOSITORY" in os.environ:
|
|
try:
|
|
input(_("Press [ENTER] to continue or Ctrl-c to cancel."))
|
|
except KeyboardInterrupt:
|
|
print(_("Aborted."))
|
|
sys.exit(1)
|
|
|
|
def prompt_user_shortcut(self, shortcut):
|
|
'''Display more information about the shortcut / ppa info'''
|
|
print(_("Repository: '%s'") % shortcut.SourceEntry().line)
|
|
if shortcut.description:
|
|
# strip ANSI escape sequences
|
|
description = re.sub(r"(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]",
|
|
"", shortcut.description)
|
|
print(_("Description:"))
|
|
print(description)
|
|
if shortcut.web_link:
|
|
print(_("More info: %s") % shortcut.web_link)
|
|
if self.options.remove:
|
|
print(_("Removing repository."))
|
|
else:
|
|
print(_("Adding repository."))
|
|
self.prompt_user()
|
|
|
|
def change_components(self):
|
|
collapsedlist = CollapsedSourcesList(self.sourceslist, files=[SOURCESLIST])
|
|
|
|
for entry in collapsedlist.filter(invalid=False, disabled=False):
|
|
if self.options.remove:
|
|
removed = set(entry.comps) & set(self.components)
|
|
if removed:
|
|
entry.comps = list(set(entry.comps) - set(self.components))
|
|
print(_("Removed %s from: %s") % (' '.join(removed), str(entry)))
|
|
else:
|
|
added = set(self.components) - set(entry.comps)
|
|
if added:
|
|
entry.comps = list(set(entry.comps) | set(self.components))
|
|
print(_("Added %s to: %s") % (' '.join(added), str(entry)))
|
|
|
|
if not self.dry_run:
|
|
self.sourceslist.save()
|
|
|
|
def _add_pocket(self, collapsedlist):
|
|
binary_entries = collapsedlist.filter(invalid=False, disabled=False,
|
|
type=self.binary_type)
|
|
|
|
for uri in set([e.uri for e in binary_entries]):
|
|
pockets = {e.pocket: e for e in binary_entries if e.uri == uri}
|
|
if self.pocket in pockets:
|
|
print(_("Existing: %s") % str(pockets[self.pocket]))
|
|
continue
|
|
if 'release' in pockets:
|
|
entry = copy(pockets['release'])
|
|
else:
|
|
# without existing release pocket, use default comps
|
|
comps = ['main', 'restricted']
|
|
entry = next(iter(pockets.values()))._replace(comps=comps)
|
|
entry = entry._replace(pocket=self.pocket)
|
|
print(_("Adding: %s") % str(entry))
|
|
collapsedlist.add_entry(entry)
|
|
|
|
def _remove_pocket(self, collapsedlist):
|
|
for entry in collapsedlist.filter(invalid=False, disabled=False,
|
|
pocket=self.pocket):
|
|
entry.set_enabled(False)
|
|
print(_("Disabled: %s") % str(entry))
|
|
|
|
def change_pocket(self):
|
|
collapsedlist = CollapsedSourcesList(self.sourceslist, files=[SOURCESLIST])
|
|
|
|
if self.options.remove:
|
|
self._remove_pocket(collapsedlist)
|
|
else:
|
|
self._add_pocket(collapsedlist)
|
|
|
|
if not self.dry_run:
|
|
self.sourceslist.save()
|
|
|
|
def _enable_source(self):
|
|
collapsedlist = CollapsedSourcesList(self.sourceslist)
|
|
|
|
# Check each disabled deb-src line, enable if matching deb line exists
|
|
for s in self.sourceslist.filter(invalid=False, disabled=True, type=self.source_type):
|
|
b_merged_entry = collapsedlist.get_merged_entry(s._replace(type=self.binary_type, disabled=False))
|
|
if not b_merged_entry:
|
|
# no matching binary lines, leave the source line disabled
|
|
continue
|
|
disabled_comps = list(set(s.comps) - set(b_merged_entry.comps))
|
|
enabled_comps = list(set(s.comps) & set(b_merged_entry.comps))
|
|
if not enabled_comps:
|
|
# we can't enable any of the line
|
|
continue
|
|
if disabled_comps:
|
|
index = self.sourceslist.list.index(s)
|
|
self.sourceslist.list.insert(index + 1, s._replace(comps=disabled_comps))
|
|
s.comps = enabled_comps
|
|
s.set_enabled(True)
|
|
collapsedlist.refresh()
|
|
print(_("Enabled: %s") % str(s).strip())
|
|
|
|
# Check each enabled deb line, to warn about missing deb-src lines, or add one if -ss
|
|
for b in self.sourceslist.filter(invalid=False, disabled=False, type=self.binary_type):
|
|
s = b._replace(type=self.source_type)
|
|
s_merged_entry = collapsedlist.get_merged_entry(s)
|
|
scomps = set(s_merged_entry.comps if s_merged_entry else [])
|
|
missing_comps = list(set(b.comps) - scomps)
|
|
if not missing_comps:
|
|
continue
|
|
s.comps = missing_comps
|
|
if self.options.enable_source > 1:
|
|
# with multiple -s, add new deb-src entries if needed for all deb entries
|
|
collapsedlist.add_entry(s, after=b)
|
|
print(_("Added: %s") % str(s).strip())
|
|
else:
|
|
# if only one -s used, don't add missing deb-src, just notify
|
|
print(_("Warning, missing deb-src for: %s") % str(s).strip())
|
|
|
|
def _disable_source(self):
|
|
for s in self.sourceslist.filter(invalid=False, disabled=False, type=self.source_type):
|
|
s.set_enabled(False)
|
|
print(_("Disabled: %s") % str(s).strip())
|
|
|
|
def change_source(self):
|
|
if self.options.remove:
|
|
self._disable_source()
|
|
else:
|
|
self._enable_source()
|
|
if not self.dry_run:
|
|
self.sourceslist.save()
|
|
|
|
def global_change(self):
|
|
if self.components:
|
|
if self.options.remove:
|
|
print(_("Removing component(s) '%s' from all repositories.") % ', '.join(self.components))
|
|
else:
|
|
print(_("Adding component(s) '%s' to all repositories.") % ', '.join(self.components))
|
|
if self.pocket:
|
|
if self.options.remove:
|
|
print(_("Removing pocket %s for all repositories.") % self.pocket)
|
|
else:
|
|
print(_("Adding pocket %s for all repositories.") % self.pocket)
|
|
if self.enable_source:
|
|
if self.options.remove:
|
|
print(_("Disabling %s for all repositories.") % self.source_type)
|
|
else:
|
|
print(_("Enabling %s for all repositories.") % self.source_type)
|
|
self.prompt_user()
|
|
if self.components:
|
|
self.change_components()
|
|
if self.pocket:
|
|
self.change_pocket()
|
|
if self.enable_source:
|
|
self.change_source()
|
|
|
|
def show_list(self):
|
|
for s in CollapsedSourcesList(self.sourceslist):
|
|
if s.invalid or s.disabled:
|
|
continue
|
|
if not self.enable_source and s.type == self.source_type:
|
|
continue
|
|
print(s)
|
|
|
|
def main(self, args=sys.argv[1:]):
|
|
self.parse_args(args)
|
|
|
|
if not any((self.dry_run, self.options.list, os.geteuid() == 0)):
|
|
print(_("Error: must run as root"))
|
|
return False
|
|
|
|
line = ' '.join(self.options.line)
|
|
if line == '-':
|
|
line = sys.stdin.readline().strip()
|
|
|
|
# if 'line' is only (valid) components, handle as if only -c was used with no line
|
|
if self.is_components(line):
|
|
self.options.component += line.split()
|
|
line = ''
|
|
|
|
if self.options.ppa:
|
|
source = self.options.ppa
|
|
if not ':' in source:
|
|
source = 'ppa:' + source
|
|
handler = PPAShortcutHandler
|
|
elif self.options.cloud:
|
|
source = self.options.cloud
|
|
if not ':' in source:
|
|
source = 'uca:' + source
|
|
handler = CloudArchiveShortcutHandler
|
|
elif self.options.uri:
|
|
source = self.options.uri
|
|
handler = URIShortcutHandler
|
|
elif self.options.sourceslist:
|
|
source = ' '.join(self.options.sourceslist)
|
|
handler = SourcesListShortcutHandler
|
|
elif line:
|
|
source = line
|
|
handler = shortcut_handler
|
|
elif self.options.list:
|
|
self.show_list()
|
|
return True
|
|
elif any((self.enable_source, self.components, self.pocket)):
|
|
self.global_change()
|
|
self.apt_update()
|
|
return True
|
|
else:
|
|
print(_("Error: no actions requested."))
|
|
self.parser.print_help()
|
|
return False
|
|
|
|
try:
|
|
shortcut_params = {
|
|
'login': self.options.login,
|
|
'enable_source': self.enable_source,
|
|
'dry_run': self.dry_run,
|
|
'components': self.components,
|
|
'pocket': self.pocket,
|
|
}
|
|
shortcut = handler(source, **shortcut_params)
|
|
except ShortcutException as e:
|
|
print(e)
|
|
return False
|
|
|
|
self.prompt_user_shortcut(shortcut)
|
|
|
|
if self.options.remove:
|
|
shortcut.remove()
|
|
else:
|
|
shortcut.add()
|
|
|
|
self.apt_update()
|
|
return True
|
|
|
|
if __name__ == '__main__':
|
|
addaptrepo = AddAptRepository()
|
|
sys.exit(0 if addaptrepo.main() else 1)
|