917 lines
30 KiB
Python
917 lines
30 KiB
Python
## system-config-printer
|
|
|
|
## Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015 Red Hat, Inc.
|
|
## Authors:
|
|
## Florian Festi <ffesti@redhat.com>
|
|
## Tim Waugh <twaugh@redhat.com>
|
|
|
|
## This program is free software; you can redistribute it and/or modify
|
|
## it under the terms of the GNU General Public License as published by
|
|
## the Free Software Foundation; either version 2 of the License, or
|
|
## (at your option) any later version.
|
|
|
|
## This program is distributed in the hope that it will be useful,
|
|
## but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
## GNU General Public License for more details.
|
|
|
|
## You should have received a copy of the GNU General Public License
|
|
## along with this program; if not, write to the Free Software
|
|
## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
|
|
import cups, pprint, os, tempfile, re, string
|
|
import locale
|
|
from . import _debugprint
|
|
from . import config
|
|
from functools import reduce
|
|
|
|
class Printer:
|
|
_flags_blacklist = ["options", "local"]
|
|
|
|
def __init__(self, name, connection, **kw):
|
|
"""
|
|
@param name: printer name
|
|
@type name: string
|
|
@param connection: CUPS connection
|
|
@type connection: CUPS.Connection object
|
|
@param kw: printer attributes
|
|
@type kw: dict indexed by string
|
|
"""
|
|
self.name = name
|
|
self.connection = connection
|
|
self.class_members = []
|
|
have_kw = len (kw) > 0
|
|
fetch_attrs = True
|
|
if have_kw:
|
|
self.update (**kw)
|
|
if self.is_class:
|
|
fetch_attrs = True
|
|
else:
|
|
fetch_attrs = False
|
|
|
|
if fetch_attrs:
|
|
self.getAttributes ()
|
|
|
|
self._ppd = None # load on demand
|
|
|
|
def __del__ (self):
|
|
if self._ppd is not None:
|
|
os.unlink(self._ppd)
|
|
|
|
def __repr__ (self):
|
|
return "<cupshelpers.Printer \"%s\">" % self.name
|
|
|
|
def _expand_flags(self):
|
|
|
|
def _ascii_lower(str):
|
|
return str.lower();
|
|
|
|
prefix = "CUPS_PRINTER_"
|
|
prefix_length = len(prefix)
|
|
|
|
# loop over cups constants
|
|
for name in cups.__dict__:
|
|
if name.startswith(prefix):
|
|
attr_name = \
|
|
_ascii_lower(name[prefix_length:])
|
|
if attr_name in self._flags_blacklist: continue
|
|
if attr_name == "class": attr_name = "is_class"
|
|
# set as attribute
|
|
setattr(self, attr_name,
|
|
bool(self.type & getattr(cups, name)))
|
|
|
|
def update(self, **kw):
|
|
"""
|
|
Update object from printer attributes.
|
|
|
|
@param kw: printer attributes
|
|
@type kw: dict indexed by string
|
|
"""
|
|
self.state = kw.get('printer-state', 0)
|
|
self.enabled = self.state != cups.IPP_PRINTER_STOPPED
|
|
self.device_uri = kw.get('device-uri', "")
|
|
self.info = kw.get('printer-info', "")
|
|
self.is_shared = kw.get('printer-is-shared', None)
|
|
self.location = kw.get('printer-location', "")
|
|
self.make_and_model = kw.get('printer-make-and-model', "")
|
|
self.type = kw.get('printer-type', 0)
|
|
self.uri_supported = kw.get('printer-uri-supported', "")
|
|
if type (self.uri_supported) != list:
|
|
self.uri_supported = [self.uri_supported]
|
|
self._expand_flags()
|
|
if self.is_shared is None:
|
|
self.is_shared = not self.not_shared
|
|
del self.not_shared
|
|
self.class_members = kw.get('member-names', [])
|
|
if type (self.class_members) != list:
|
|
self.class_members = [self.class_members]
|
|
self.class_members.sort ()
|
|
self.other_attributes = kw
|
|
|
|
def getAttributes(self):
|
|
"""
|
|
Fetch further attributes for the printer.
|
|
|
|
Normally only a small set of attributes is fetched. This
|
|
method is for fetching more.
|
|
"""
|
|
attrs = self.connection.getPrinterAttributes(self.name)
|
|
self.attributes = {}
|
|
self.other_attributes = {}
|
|
self.possible_attributes = {
|
|
'landscape' : ('False', ['True', 'False']),
|
|
'page-border' : ('none', ['none', 'single', 'single-thick',
|
|
'double', 'double-thick']),
|
|
}
|
|
|
|
for key, value in attrs.items():
|
|
if key.endswith("-default"):
|
|
name = key[:-len("-default")]
|
|
if name in ["job-sheets", "printer-error-policy",
|
|
"printer-op-policy", # handled below
|
|
"notify-events", # cannot be set
|
|
"document-format", # cannot be set
|
|
"notify-lease-duration"]: # cannot be set
|
|
continue
|
|
|
|
supported = attrs.get(name + "-supported", None) or \
|
|
self.possible_attributes.get(name, None) or \
|
|
""
|
|
|
|
# Convert a list into a comma-separated string, since
|
|
# it can only really have been misinterpreted as a list
|
|
# by CUPS.
|
|
if isinstance (value, list):
|
|
value = reduce (lambda x, y: x+','+y, value)
|
|
|
|
self.attributes[name] = value
|
|
|
|
if name+"-supported" in attrs:
|
|
supported = attrs[name+"-supported"]
|
|
self.possible_attributes[name] = (value, supported)
|
|
elif (not key.endswith ("-supported") and
|
|
key != 'job-sheets-default' and
|
|
key != 'printer-error-policy' and
|
|
key != 'printer-op-policy' and
|
|
not key.startswith ('requesting-user-name-')):
|
|
self.other_attributes[key] = value
|
|
|
|
self.job_sheet_start, self.job_sheet_end = attrs.get(
|
|
'job-sheets-default', ('none', 'none'))
|
|
self.job_sheets_supported = attrs.get('job-sheets-supported', ['none'])
|
|
self.error_policy = attrs.get('printer-error-policy', 'none')
|
|
self.error_policy_supported = attrs.get(
|
|
'printer-error-policy-supported', ['none'])
|
|
self.op_policy = attrs.get('printer-op-policy', "") or "default"
|
|
self.op_policy_supported = attrs.get(
|
|
'printer-op-policy-supported', ["default"])
|
|
|
|
self.default_allow = True
|
|
self.except_users = []
|
|
if 'requesting-user-name-allowed' in attrs:
|
|
self.except_users = attrs['requesting-user-name-allowed']
|
|
self.default_allow = False
|
|
elif 'requesting-user-name-denied' in attrs:
|
|
self.except_users = attrs['requesting-user-name-denied']
|
|
self.except_users_string = ', '.join(self.except_users)
|
|
self.update (**attrs)
|
|
|
|
def getServer(self):
|
|
"""
|
|
Find out which server defines this printer.
|
|
|
|
@returns: server URI or None
|
|
"""
|
|
if not self.uri_supported[0].startswith('ipp://'):
|
|
return None
|
|
uri = self.uri_supported[0][6:]
|
|
uri = uri.split('/')[0]
|
|
uri = uri.split(':')[0]
|
|
if uri == "localhost.localdomain":
|
|
uri = "localhost"
|
|
return uri
|
|
|
|
def getPPD(self):
|
|
"""
|
|
Obtain the printer's PPD.
|
|
|
|
@returns: cups.PPD object, or False for raw queues
|
|
@raise cups.IPPError: IPP error
|
|
"""
|
|
result = None
|
|
if self._ppd is None:
|
|
try:
|
|
self._ppd = self.connection.getPPD(self.name)
|
|
result = cups.PPD (self._ppd)
|
|
except cups.IPPError as emargs:
|
|
(e, m) = emargs.args
|
|
if e == cups.IPP_NOT_FOUND:
|
|
result = False
|
|
else:
|
|
raise
|
|
|
|
if result is None and self._ppd is not None:
|
|
result = cups.PPD (self._ppd)
|
|
|
|
return result
|
|
|
|
def setOption(self, name, value):
|
|
"""
|
|
Set a printer's option.
|
|
|
|
@param name: option name
|
|
@type name: string
|
|
@param value: option value
|
|
@type value: option-specific
|
|
"""
|
|
if isinstance (value, float):
|
|
radixchar = locale.nl_langinfo (locale.RADIXCHAR)
|
|
if radixchar != '.':
|
|
# Convert floats to strings, being careful with decimal points.
|
|
value = str (value).replace (radixchar, '.')
|
|
self.connection.addPrinterOptionDefault(self.name, name, value)
|
|
|
|
def unsetOption(self, name):
|
|
"""
|
|
Unset a printer's option.
|
|
|
|
@param name: option name
|
|
@type name: string
|
|
"""
|
|
self.connection.deletePrinterOptionDefault(self.name, name)
|
|
|
|
def setEnabled(self, on, reason=None):
|
|
"""
|
|
Set the printer's enabled state.
|
|
|
|
@param on: whether it will be enabled
|
|
@type on: bool
|
|
@param reason: reason for this state
|
|
@type reason: string
|
|
"""
|
|
if on:
|
|
self.connection.enablePrinter(self.name)
|
|
else:
|
|
if reason:
|
|
self.connection.disablePrinter(self.name, reason=reason)
|
|
else:
|
|
self.connection.disablePrinter(self.name)
|
|
|
|
def setAccepting(self, on, reason=None):
|
|
"""
|
|
Set the printer's accepting state.
|
|
|
|
@param on: whether it will be accepting
|
|
@type on: bool
|
|
@param reason: reason for this state
|
|
@type reason: string
|
|
"""
|
|
if on:
|
|
self.connection.acceptJobs(self.name)
|
|
else:
|
|
if reason:
|
|
self.connection.rejectJobs(self.name, reason=reason)
|
|
else:
|
|
self.connection.rejectJobs(self.name)
|
|
|
|
def setShared(self,on):
|
|
"""
|
|
Set the printer's shared state.
|
|
|
|
@param on: whether it will be accepting
|
|
@type on: bool
|
|
"""
|
|
self.connection.setPrinterShared(self.name, on)
|
|
|
|
def setErrorPolicy (self, policy):
|
|
"""
|
|
Set the printer's error policy.
|
|
|
|
@param policy: error policy
|
|
@type policy: string
|
|
"""
|
|
self.connection.setPrinterErrorPolicy(self.name, policy)
|
|
|
|
def setOperationPolicy(self, policy):
|
|
"""
|
|
Set the printer's operation policy.
|
|
|
|
@param policy: operation policy
|
|
@type policy: string
|
|
"""
|
|
self.connection.setPrinterOpPolicy(self.name, policy)
|
|
|
|
def setJobSheets(self, start, end):
|
|
"""
|
|
Set the printer's job sheets.
|
|
|
|
@param start: start sheet
|
|
@type start: string
|
|
@param end: end sheet
|
|
@type end: string
|
|
"""
|
|
self.connection.setPrinterJobSheets(self.name, start, end)
|
|
|
|
def setAccess(self, allow, except_users):
|
|
"""
|
|
Set access control list.
|
|
|
|
@param allow: whether to allow by default, otherwise deny
|
|
@type allow: bool
|
|
@param except_users: exception list
|
|
@type except_users: string list
|
|
"""
|
|
if isinstance(except_users, str):
|
|
users = except_users.split()
|
|
users = [u.split(",") for u in users]
|
|
except_users = []
|
|
for u in users:
|
|
except_users.extend(u)
|
|
except_users = [u.strip() for u in except_users]
|
|
except_users = [_f for _f in except_users if _f]
|
|
|
|
if allow:
|
|
self.connection.setPrinterUsersDenied(self.name, except_users)
|
|
else:
|
|
self.connection.setPrinterUsersAllowed(self.name, except_users)
|
|
|
|
def jobsQueued(self, only_tests=False, limit=None):
|
|
"""
|
|
Find out whether jobs are queued for this printer.
|
|
|
|
@param only_tests: whether to restrict search to test pages
|
|
@type only_tests: bool
|
|
@returns: list of job IDs
|
|
"""
|
|
ret = []
|
|
try:
|
|
try:
|
|
r = ['job-id', 'job-printer-uri', 'job-name']
|
|
jobs = self.connection.getJobs (requested_attributes=r)
|
|
except TypeError:
|
|
# requested_attributes requires pycups 1.9.50
|
|
jobs = self.connection.getJobs ()
|
|
except cups.IPPError:
|
|
return ret
|
|
|
|
for id, attrs in jobs.items():
|
|
try:
|
|
uri = attrs['job-printer-uri']
|
|
uri = uri[uri.rindex ('/') + 1:]
|
|
except:
|
|
continue
|
|
if uri != self.name:
|
|
continue
|
|
|
|
if (not only_tests or
|
|
('job-name' in attrs and
|
|
attrs['job-name'] == 'Test Page')):
|
|
ret.append (id)
|
|
|
|
if limit is not None and len (ret) == limit:
|
|
break
|
|
return ret
|
|
|
|
def jobsPreserved(self, limit=None):
|
|
"""
|
|
Find out whether there are preserved jobs for this printer.
|
|
|
|
@return: list of job IDs
|
|
"""
|
|
ret = []
|
|
try:
|
|
try:
|
|
r = ['job-id', 'job-printer-uri', 'job-state']
|
|
jobs = self.connection.getJobs (which_jobs='completed',
|
|
requested_attributes=r)
|
|
except TypeError:
|
|
# requested_attributes requires pycups 1.9.50
|
|
jobs = self.connection.getJobs (which_jobs='completed')
|
|
except cups.IPPError:
|
|
return ret
|
|
|
|
for id, attrs in jobs.items():
|
|
try:
|
|
uri = attrs['job-printer-uri']
|
|
uri = uri[uri.rindex ('/') + 1:]
|
|
except:
|
|
continue
|
|
if uri != self.name:
|
|
continue
|
|
if (attrs.get ('job-state',
|
|
cups.IPP_JOB_PENDING) < cups.IPP_JOB_COMPLETED):
|
|
continue
|
|
ret.append (id)
|
|
if limit is not None and len (ret) == limit:
|
|
break
|
|
|
|
return ret
|
|
|
|
def testsQueued(self, limit=None):
|
|
"""
|
|
Find out whether test jobs are queued for this printer.
|
|
|
|
@returns: list of job IDs
|
|
"""
|
|
return self.jobsQueued (only_tests=True, limit=limit)
|
|
|
|
def setAsDefault(self):
|
|
"""
|
|
Set this printer as the system default.
|
|
"""
|
|
self.connection.setDefault(self.name)
|
|
|
|
# Also need to check system-wide lpoptions because that's how
|
|
# previous Fedora versions set the default (bug #217395).
|
|
with tempfile.TemporaryFile () as f:
|
|
try:
|
|
resource = "/admin/conf/lpoptions"
|
|
self.connection.getFile(resource, fd=f.fileno ())
|
|
except cups.HTTPError as e:
|
|
(s,) = e.args
|
|
if s in [cups.HTTP_NOT_FOUND, cups.HTTP_AUTHORIZATION_CANCELED]:
|
|
return False
|
|
|
|
raise cups.HTTPError (s)
|
|
|
|
f.seek (0)
|
|
lines = [ line.decode('UTF-8') for line in f.readlines () ]
|
|
changed = False
|
|
i = 0
|
|
for line in lines:
|
|
if line.startswith ("Default "):
|
|
# This is the system-wide default.
|
|
name = line.split (' ')[1]
|
|
if name != self.name:
|
|
# Stop it from over-riding the server default.
|
|
lines[i] = "Dest " + line[8:]
|
|
changed = True
|
|
i += 1
|
|
|
|
if changed:
|
|
f.seek (0)
|
|
f.writelines ([ line.encode('UTF-8') for line in lines ])
|
|
f.truncate ()
|
|
f.flush ()
|
|
f.seek (0)
|
|
try:
|
|
self.connection.putFile (resource, fd=f.fileno ())
|
|
except cups.HTTPError:
|
|
return False
|
|
|
|
return changed
|
|
|
|
def getPrinters(connection):
|
|
"""
|
|
Obtain a list of printers.
|
|
|
|
@param connection: CUPS connection
|
|
@type connection: CUPS.Connection object
|
|
@returns: L{Printer} list
|
|
"""
|
|
printers = connection.getPrinters()
|
|
classes = connection.getClasses()
|
|
for name, printer in printers.items():
|
|
printer = Printer(name, connection, **printer)
|
|
printers[name] = printer
|
|
if name in classes:
|
|
printer.class_members = classes[name]
|
|
printer.class_members.sort()
|
|
return printers
|
|
|
|
def parseDeviceID (id):
|
|
"""
|
|
Parse an IEEE 1284 Device ID, so that it may be indexed by field name.
|
|
|
|
@param id: IEEE 1284 Device ID, without the two leading length bytes
|
|
@type id: string
|
|
@returns: dict indexed by field name
|
|
"""
|
|
id_dict = {}
|
|
pieces = id.split(";")
|
|
for piece in pieces:
|
|
if piece.find(":") == -1:
|
|
continue
|
|
name, value = piece.split(":",1)
|
|
id_dict[name.strip ()] = value.strip()
|
|
if "MANUFACTURER" in id_dict:
|
|
id_dict.setdefault("MFG", id_dict["MANUFACTURER"])
|
|
if "MODEL" in id_dict:
|
|
id_dict.setdefault("MDL", id_dict["MODEL"])
|
|
if "COMMAND SET" in id_dict:
|
|
id_dict.setdefault("CMD", id_dict["COMMAND SET"])
|
|
for name in ["MFG", "MDL", "CMD", "CLS", "DES", "SN", "S", "P", "J"]:
|
|
id_dict.setdefault(name, "")
|
|
if id_dict["CMD"] == '':
|
|
id_dict["CMD"] = []
|
|
else:
|
|
id_dict["CMD"] = id_dict["CMD"].split(',')
|
|
return id_dict
|
|
|
|
class Device:
|
|
"""
|
|
This class represents a CUPS device.
|
|
"""
|
|
|
|
def __init__(self, uri, **kw):
|
|
"""
|
|
@param uri: device URI
|
|
@type uri: string
|
|
@param kw: device attributes
|
|
@type kw: dict
|
|
"""
|
|
self.uri = uri
|
|
self.device_class = kw.get('device-class', '')
|
|
self.info = kw.get('device-info', '')
|
|
self.make_and_model = kw.get('device-make-and-model', '')
|
|
self.id = kw.get('device-id', '')
|
|
self.location = kw.get('device-location', '')
|
|
|
|
uri_pieces = uri.split(":")
|
|
self.type = uri_pieces[0]
|
|
self.is_class = len(uri_pieces)==1
|
|
|
|
#self.id = 'MFG:HEWLETT-PACKARD;MDL:DESKJET 990C;CMD:MLC,PCL,PML;CLS:PRINTER;DES:Hewlett-Packard DeskJet 990C;SN:US05N1J00XLG;S:00808880800010032C1000000C2000000;P:0800,FL,B0;J: ;'
|
|
|
|
self.id_dict = parseDeviceID (self.id)
|
|
|
|
s = uri.find("serial=")
|
|
if s != -1 and not self.id_dict.get ('SN',''):
|
|
self.id_dict['SN'] = uri[s + 7:]
|
|
|
|
def __repr__ (self):
|
|
return "<cupshelpers.Device \"%s\">" % self.uri
|
|
|
|
def __lt__(self, other):
|
|
"""
|
|
Compare devices by order of preference.
|
|
"""
|
|
if other is None:
|
|
return True
|
|
|
|
if self.is_class != other.is_class:
|
|
if other.is_class:
|
|
return True
|
|
return False
|
|
|
|
stype = self.type
|
|
if stype == "dnssd":
|
|
if self.uri.find("._ipp") != -1:
|
|
stype = "dnssdi"
|
|
elif self.uri.find("._pdl-datastream") != -1:
|
|
stype = "dnssds"
|
|
elif self.uri.find("._printer") != -1:
|
|
stype = "dnssdl"
|
|
if stype == "usb":
|
|
if self.uri.lower().find("fax") != -1:
|
|
stype = "usbfax"
|
|
otype = other.type
|
|
if otype == "dnssd":
|
|
if other.uri.find("._ipp") != -1:
|
|
otype = "dnssdi"
|
|
elif other.uri.find("._pdl-datastream") != -1:
|
|
otype = "dnssds"
|
|
elif other.uri.find("._printer") != -1:
|
|
otype = "dnssdl"
|
|
if otype == "usb":
|
|
if other.uri.lower().find("fax") != -1:
|
|
otype = "usbfax"
|
|
|
|
if not self.is_class and (stype != otype):
|
|
# "hp"/"hpfax" before "usb" before * before "parallel" before
|
|
# "serial"
|
|
if otype == "serial":
|
|
return True
|
|
if stype == "serial":
|
|
return False
|
|
if otype == "parallel":
|
|
return True
|
|
if stype == "parallel":
|
|
return False
|
|
if otype == "hp":
|
|
return False
|
|
if stype == "hp":
|
|
return True
|
|
if otype == "hpfax":
|
|
return False
|
|
if stype == "hpfax":
|
|
return True
|
|
if otype == "dnssdi":
|
|
return False
|
|
if stype == "dnssdi":
|
|
return True
|
|
if otype == "ipps":
|
|
return False
|
|
if stype == "ipps":
|
|
return True
|
|
if otype == "ipp":
|
|
return False
|
|
if stype == "ipp":
|
|
return True
|
|
if otype == "dnssds":
|
|
return False
|
|
if stype == "dnssds":
|
|
return True
|
|
if otype == "socket":
|
|
return False
|
|
if stype == "socket":
|
|
return True
|
|
if otype == "dnssdl":
|
|
return False
|
|
if stype == "dnssdl":
|
|
return True
|
|
if otype == "lpd":
|
|
return False
|
|
if stype == "lpd":
|
|
return True
|
|
if otype == "usb":
|
|
return False
|
|
if stype == "usb":
|
|
return True
|
|
if otype == "usbfax":
|
|
return False
|
|
if stype == "usbfax":
|
|
return True
|
|
result = bool(self.id) < bool(other.id)
|
|
if not result:
|
|
result = self.info < other.info
|
|
|
|
return result
|
|
|
|
class _GetDevicesCall(object):
|
|
def call (self, connection, kwds):
|
|
if "reply_handler" in kwds:
|
|
self._client_reply_handler = kwds.get ("reply_handler")
|
|
kwds["reply_handler"] = self._reply_handler
|
|
return connection.getDevices (**kwds)
|
|
|
|
self._client_reply_handler = None
|
|
result = connection.getDevices (**kwds)
|
|
return self._reply_handler (connection, result)
|
|
|
|
def _reply_handler (self, connection, devices):
|
|
for uri, data in devices.items():
|
|
device = Device(uri, **data)
|
|
devices[uri] = device
|
|
if device.info != '' and device.make_and_model == '':
|
|
device.make_and_model = device.info
|
|
|
|
if self._client_reply_handler:
|
|
self._client_reply_handler (connection, devices)
|
|
else:
|
|
return devices
|
|
|
|
def getDevices(connection, **kw):
|
|
"""
|
|
Obtain a list of available CUPS devices.
|
|
|
|
@param connection: CUPS connection
|
|
@type connection: cups.Connection object
|
|
@returns: a list of L{Device} objects
|
|
@raise cups.IPPError: IPP Error
|
|
"""
|
|
op = _GetDevicesCall ()
|
|
return op.call (connection, kw)
|
|
|
|
def activateNewPrinter(connection, name):
|
|
"""
|
|
Set a new printer enabled, accepting jobs, and (if necessary) the
|
|
default printer.
|
|
|
|
@param connection: CUPS connection
|
|
@type connection: cups.Connection object
|
|
@param name: printer name
|
|
@type name: string
|
|
@raise cups.IPPError: IPP error
|
|
"""
|
|
connection.enablePrinter (name)
|
|
connection.acceptJobs (name)
|
|
|
|
# Set as the default if there is not already a default printer.
|
|
if connection.getDefault () is None:
|
|
connection.setDefault (name)
|
|
|
|
def copyPPDOptions(ppd1, ppd2):
|
|
"""
|
|
Copy default options between PPDs.
|
|
|
|
@param ppd1: source PPD
|
|
@type ppd1: cups.PPD object
|
|
@param ppd2: destination PPD
|
|
@type ppd2: cups.PPD object
|
|
"""
|
|
def getPPDGroupOptions(group):
|
|
options = group.options[:]
|
|
for g in group.subgroups:
|
|
options.extend(getPPDGroupOptions(g))
|
|
return options
|
|
|
|
def iteratePPDOptions(ppd):
|
|
for group in ppd.optionGroups:
|
|
for option in getPPDGroupOptions(group):
|
|
yield option
|
|
|
|
for option in iteratePPDOptions(ppd1):
|
|
if option.keyword == "PageRegion":
|
|
continue
|
|
new_option = ppd2.findOption(option.keyword)
|
|
if new_option and option.ui==new_option.ui:
|
|
value = option.defchoice
|
|
for choice in new_option.choices:
|
|
if choice["choice"]==value:
|
|
ppd2.markOption(new_option.keyword, value)
|
|
_debugprint ("set %s = %s" % (repr (new_option.keyword),
|
|
repr (value)))
|
|
|
|
def setPPDPageSize(ppd, language):
|
|
"""
|
|
Set the PPD page size according to locale.
|
|
|
|
@param ppd: PPD
|
|
@type ppd: cups.PPD object
|
|
@param language: language, as given by the first element of
|
|
locale.setlocale
|
|
@type language: string
|
|
"""
|
|
# Just set the page size to A4 or Letter, that's all.
|
|
# Use the same method CUPS uses.
|
|
size = 'A4'
|
|
letter = [ 'C', 'POSIX', 'en', 'en_US', 'en_CA', 'fr_CA' ]
|
|
for each in letter:
|
|
if language == each:
|
|
size = 'Letter'
|
|
|
|
# Use setting in /etc/papersize if available
|
|
try:
|
|
f = open ("/etc/papersize")
|
|
for line in f:
|
|
if line.startswith("#"):
|
|
continue
|
|
if line.strip().lower().startswith("a4"):
|
|
size = 'A4'
|
|
elif line.strip().lower().startswith("letter"):
|
|
size = 'Letter'
|
|
elif line.strip() != "":
|
|
break
|
|
f.close()
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
ppd.markOption ('PageSize', size)
|
|
_debugprint ("set PageSize = %s" % size)
|
|
except:
|
|
_debugprint ("Failed to set PageSize (%s not available?)" % size)
|
|
|
|
def missingExecutables(ppd):
|
|
"""
|
|
Check that all relevant executables for a PPD are installed.
|
|
|
|
@param ppd: PPD
|
|
@type ppd: cups.PPD object
|
|
@returns: string list, representing missing executables
|
|
"""
|
|
|
|
# First, a local function. How to check that something exists
|
|
# in a path:
|
|
def pathcheck (name, path="/usr/bin:/bin"):
|
|
if name == "-":
|
|
# A filter of "-" means that no filter is required,
|
|
# i.e. the device accepts the given format as-is.
|
|
return "builtin"
|
|
# Strip out foomatic '%'-style place-holders.
|
|
p = name.find ('%')
|
|
if p != -1:
|
|
name = name[:p]
|
|
if len (name) == 0:
|
|
return "true"
|
|
if name[0] == '/':
|
|
if os.access (name, os.X_OK):
|
|
_debugprint ("%s: found" % name)
|
|
return name
|
|
else:
|
|
_debugprint ("%s: NOT found" % name)
|
|
return None
|
|
if name.find ("=") != -1:
|
|
return "builtin"
|
|
if name in [ ":", ".", "[", "alias", "bind", "break", "cd",
|
|
"continue", "declare", "echo", "else", "eval",
|
|
"exec", "exit", "export", "fi", "if", "kill", "let",
|
|
"local", "popd", "printf", "pushd", "pwd", "read",
|
|
"readonly", "set", "shift", "shopt", "source",
|
|
"test", "then", "trap", "type", "ulimit", "umask",
|
|
"unalias", "unset", "wait" ]:
|
|
return "builtin"
|
|
for component in path.split (':'):
|
|
file = component.rstrip (os.path.sep) + os.path.sep + name
|
|
if os.access (file, os.X_OK):
|
|
_debugprint ("%s: found" % file)
|
|
return file
|
|
_debugprint ("%s: NOT found in %s" % (name, path))
|
|
return None
|
|
|
|
exes_to_install = []
|
|
|
|
def add_missing (exe):
|
|
# Strip out foomatic '%'-style place-holders.
|
|
p = exe.find ('%')
|
|
if p != -1:
|
|
exe = exe[:p]
|
|
|
|
exes_to_install.append (exe)
|
|
|
|
# Find a 'FoomaticRIPCommandLine' attribute.
|
|
exe = exepath = None
|
|
attr = ppd.findAttr ('FoomaticRIPCommandLine')
|
|
if attr:
|
|
# Foomatic RIP command line to check.
|
|
cmdline = attr.value.replace ('&&\n', '')
|
|
cmdline = cmdline.replace ('"', '"')
|
|
cmdline = cmdline.replace ('<', '<')
|
|
cmdline = cmdline.replace ('>', '>')
|
|
if (cmdline.find ("(") != -1 or
|
|
cmdline.find ("&") != -1):
|
|
# Don't try to handle sub-shells or unreplaced HTML entities.
|
|
cmdline = ""
|
|
|
|
# Strip out foomatic '%'-style place-holders
|
|
pipes = cmdline.split (';')
|
|
for pipe in pipes:
|
|
cmds = pipe.strip ().split ('|')
|
|
for cmd in cmds:
|
|
args = cmd.strip ().split (' ')
|
|
exe = args[0]
|
|
exepath = pathcheck (exe)
|
|
if not exepath:
|
|
add_missing (exe)
|
|
continue
|
|
|
|
# Main executable found. But if it's 'gs',
|
|
# perhaps there is an IJS server we also need
|
|
# to check.
|
|
if os.path.basename (exepath) == 'gs':
|
|
argn = len (args)
|
|
argi = 1
|
|
search = "-sIjsServer="
|
|
while argi < argn:
|
|
arg = args[argi]
|
|
if arg.startswith (search):
|
|
exe = arg[len (search):]
|
|
exepath = pathcheck (exe)
|
|
if not exepath:
|
|
add_missing (exe)
|
|
|
|
break
|
|
|
|
argi += 1
|
|
|
|
if not exepath:
|
|
# Next pipe.
|
|
break
|
|
|
|
if exepath or not exe:
|
|
# Look for '*cupsFilter' lines in the PPD and check that
|
|
# the filters are installed.
|
|
(tmpfd, tmpfname) = tempfile.mkstemp (text=True)
|
|
os.unlink (tmpfname)
|
|
ppd.writeFd (tmpfd)
|
|
os.lseek (tmpfd, 0, os.SEEK_SET)
|
|
f = os.fdopen (tmpfd, "rt", encoding="utf-8")
|
|
search = "*cupsFilter:"
|
|
for line in f:
|
|
if line.startswith (search):
|
|
line = line[len (search):].strip ().strip ('"')
|
|
try:
|
|
(mimetype, cost, exe) = line.split (' ')
|
|
except:
|
|
continue
|
|
|
|
exepath = pathcheck (exe,
|
|
config.cupsserverbindir + "/filter:"
|
|
"/usr/lib64/cups/filter")
|
|
if not exepath:
|
|
add_missing (config.cupsserverbindir + "/filter/" + exe)
|
|
|
|
return exes_to_install
|
|
|
|
def missingPackagesAndExecutables(ppd):
|
|
"""
|
|
Check that all relevant executables for a PPD are installed.
|
|
|
|
@param ppd: PPD
|
|
@type ppd: cups.PPD object
|
|
@returns: string list pair, representing missing packages and
|
|
missing executables
|
|
"""
|
|
executables = missingExecutables(ppd)
|
|
return ([], executables)
|
|
|
|
def _main():
|
|
c = cups.Connection()
|
|
#printers = getPrinters(c)
|
|
for device in getDevices(c).values():
|
|
print (device.uri, device.id_dict)
|
|
|
|
if __name__=="__main__":
|
|
_main()
|