567 lines
22 KiB
Python
567 lines
22 KiB
Python
import bisect
|
|
import copy
|
|
import pyatspi
|
|
import time
|
|
from gi.repository import GLib
|
|
|
|
from . import cmdnames
|
|
from . import chnames
|
|
from . import debug
|
|
from . import keybindings
|
|
from . import messages
|
|
from . import input_event
|
|
from . import orca_state
|
|
from . import settings_manager
|
|
|
|
_settingsManager = settings_manager.getManager()
|
|
|
|
# define 'live' property types
|
|
LIVE_OFF = -1
|
|
LIVE_NONE = 0
|
|
LIVE_POLITE = 1
|
|
LIVE_ASSERTIVE = 2
|
|
LIVE_RUDE = 3
|
|
|
|
# Seconds a message is held in the queue before it is discarded
|
|
MSG_KEEPALIVE_TIME = 45 # in seconds
|
|
|
|
# The number of messages that are cached and can later be reviewed via
|
|
# LiveRegionManager.reviewLiveAnnouncement.
|
|
CACHE_SIZE = 9 # corresponds to one of nine key bindings
|
|
|
|
class PriorityQueue:
|
|
""" This class represents a thread **UNSAFE** priority queue where priority
|
|
is determined by the given integer priority. The entries are also
|
|
maintained in chronological order.
|
|
|
|
TODO: experiment with Queue.Queue to make thread safe
|
|
"""
|
|
def __init__(self):
|
|
self.queue = []
|
|
|
|
def enqueue(self, data, priority, obj):
|
|
""" Add a new element to the queue according to 1) priority and
|
|
2) timestamp. """
|
|
bisect.insort_left(self.queue, (priority, time.time(), data, obj))
|
|
|
|
def dequeue(self):
|
|
"""get the highest priority element from the queue. """
|
|
return self.queue.pop(0)
|
|
|
|
def clear(self):
|
|
""" Clear the queue """
|
|
self.queue = []
|
|
|
|
def purgeByKeepAlive(self):
|
|
""" Purge items from the queue that are older than the keepalive
|
|
time """
|
|
currenttime = time.time()
|
|
myfilter = lambda item: item[1] + MSG_KEEPALIVE_TIME > currenttime
|
|
self.queue = list(filter(myfilter, self.queue))
|
|
|
|
def purgeByPriority(self, priority):
|
|
""" Purge items from the queue that have a lower than or equal priority
|
|
than the given argument """
|
|
myfilter = lambda item: item[0] > priority
|
|
self.queue = list(filter(myfilter, self.queue))
|
|
|
|
def __len__(self):
|
|
""" Return the length of the queue """
|
|
return len(self.queue)
|
|
|
|
|
|
class LiveRegionManager:
|
|
def __init__(self, script):
|
|
self._script = script
|
|
# message priority queue
|
|
self.msg_queue = PriorityQueue()
|
|
|
|
self.inputEventHandlers = self._getInputEventHandlers()
|
|
self.keyBindings = self._getKeyBindings()
|
|
|
|
# This is temporary.
|
|
self.functions = [self.advancePoliteness,
|
|
self.setLivePolitenessOff,
|
|
self.toggleMonitoring,
|
|
self.reviewLiveAnnouncement]
|
|
|
|
# Message cache. Used to store up to 9 previous messages so user can
|
|
# review if desired.
|
|
self.msg_cache = []
|
|
|
|
# User overrides for politeness settings.
|
|
self._politenessOverrides = None
|
|
self._restoreOverrides = None
|
|
|
|
# last live obj to be announced
|
|
self.lastliveobj = None
|
|
|
|
# Used to track whether a user wants to monitor all live regions
|
|
# Not to be confused with the global Gecko.liveRegionsOn which
|
|
# completely turns off live region support. This one is based on
|
|
# a user control by changing politeness levels to LIVE_OFF or back
|
|
# to the bookmark or markup politeness value.
|
|
self.monitoring = True
|
|
|
|
# Set up politeness level overrides and subscribe to bookmarks
|
|
# for load and save user events.
|
|
# We are initialized after bookmarks so call the load handler once
|
|
# to get initialized.
|
|
#
|
|
self.bookmarkLoadHandler()
|
|
script.bookmarks.addSaveObserver(self.bookmarkSaveHandler)
|
|
script.bookmarks.addLoadObserver(self.bookmarkLoadHandler)
|
|
|
|
def _getInputEventHandlers(self):
|
|
handlers = {}
|
|
|
|
handlers["advanceLivePoliteness"] = \
|
|
input_event.InputEventHandler(
|
|
self.advancePoliteness,
|
|
cmdnames.LIVE_REGIONS_ADVANCE_POLITENESS)
|
|
|
|
handlers["setLivePolitenessOff"] = \
|
|
input_event.InputEventHandler(
|
|
self.setLivePolitenessOff,
|
|
cmdnames.LIVE_REGIONS_SET_POLITENESS_OFF)
|
|
|
|
handlers["monitorLiveRegions"] = \
|
|
input_event.InputEventHandler(
|
|
self.toggleMonitoring,
|
|
cmdnames.LIVE_REGIONS_MONITOR)
|
|
|
|
handlers["reviewLiveAnnouncement"] = \
|
|
input_event.InputEventHandler(
|
|
self.reviewLiveAnnouncement,
|
|
cmdnames.LIVE_REGIONS_REVIEW)
|
|
|
|
return handlers
|
|
|
|
def _getKeyBindings(self):
|
|
keyBindings = keybindings.KeyBindings()
|
|
|
|
keyBindings.add(
|
|
keybindings.KeyBinding(
|
|
"backslash",
|
|
keybindings.defaultModifierMask,
|
|
keybindings.NO_MODIFIER_MASK,
|
|
self.inputEventHandlers.get("advanceLivePoliteness")))
|
|
|
|
keyBindings.add(
|
|
keybindings.KeyBinding(
|
|
"backslash",
|
|
keybindings.defaultModifierMask,
|
|
keybindings.SHIFT_MODIFIER_MASK,
|
|
self.inputEventHandlers.get("setLivePolitenessOff")))
|
|
|
|
keyBindings.add(
|
|
keybindings.KeyBinding(
|
|
"backslash",
|
|
keybindings.defaultModifierMask,
|
|
keybindings.ORCA_SHIFT_MODIFIER_MASK,
|
|
self.inputEventHandlers.get("monitorLiveRegions")))
|
|
|
|
for key in ["F1", "F2", "F3", "F4", "F5", "F6", "F7", "F8", "F9"]:
|
|
keyBindings.add(
|
|
keybindings.KeyBinding(
|
|
key,
|
|
keybindings.defaultModifierMask,
|
|
keybindings.ORCA_MODIFIER_MASK,
|
|
self.inputEventHandlers.get("reviewLiveAnnouncement")))
|
|
|
|
return keyBindings
|
|
|
|
def reset(self):
|
|
# First we will purge our politeness override dictionary of LIVE_NONE
|
|
# objects that are not registered for this page
|
|
newpoliteness = {}
|
|
currenturi = self._script.bookmarks.getURIKey()
|
|
for key, value in self._politenessOverrides.items():
|
|
if key[0] == currenturi or value != LIVE_NONE:
|
|
newpoliteness[key] = value
|
|
self._politenessOverrides = newpoliteness
|
|
|
|
def bookmarkSaveHandler(self):
|
|
"""Bookmark save callback"""
|
|
self._script.bookmarks.saveBookmarksToDisk(self._politenessOverrides,
|
|
filename='politeness')
|
|
|
|
def bookmarkLoadHandler(self):
|
|
"""Bookmark load callback"""
|
|
# readBookmarksFromDisk() returns None on error. Just initialize to an
|
|
# empty dictionary if this is the case.
|
|
self._politenessOverrides = \
|
|
self._script.bookmarks.readBookmarksFromDisk(filename='politeness') \
|
|
or {}
|
|
|
|
def handleEvent(self, event):
|
|
"""Main live region event handler"""
|
|
politeness = self._getLiveType(event.source)
|
|
if politeness == LIVE_OFF:
|
|
return
|
|
if politeness == LIVE_NONE:
|
|
# All the 'registered' LIVE_NONE objects will be set to off
|
|
# if not monitoring. We will ignore LIVE_NONE objects that
|
|
# arrive after the user switches off monitoring.
|
|
if not self.monitoring:
|
|
return
|
|
elif politeness == LIVE_POLITE:
|
|
# Nothing to do for now
|
|
pass
|
|
elif politeness == LIVE_ASSERTIVE:
|
|
self.msg_queue.purgeByPriority(LIVE_POLITE)
|
|
elif politeness == LIVE_RUDE:
|
|
self.msg_queue.purgeByPriority(LIVE_ASSERTIVE)
|
|
|
|
message = self._getMessage(event)
|
|
if message:
|
|
if len(self.msg_queue) == 0:
|
|
GLib.timeout_add(100, self.pumpMessages)
|
|
self.msg_queue.enqueue(message, politeness, event.source)
|
|
|
|
def pumpMessages(self):
|
|
""" Main gobject callback for live region support. Handles both
|
|
purging the message queue and outputting any queued messages that
|
|
were queued up in the handleEvent() method.
|
|
"""
|
|
|
|
if len(self.msg_queue) > 0:
|
|
debug.println(debug.eventDebugLevel, "\nvvvvv PRESENT LIVE REGION MESSAGE vvvvv")
|
|
self.msg_queue.purgeByKeepAlive()
|
|
politeness, timestamp, message, obj = self.msg_queue.dequeue()
|
|
# Form output message. No need to repeat labels and content.
|
|
# TODO: really needs to be tested in real life cases. Perhaps
|
|
# a verbosity setting?
|
|
if message['labels'] == message['content']:
|
|
utts = message['content']
|
|
else:
|
|
utts = message['labels'] + message['content']
|
|
|
|
if self.monitoring:
|
|
self._script.presentMessage(utts)
|
|
else:
|
|
msg = "INFO: Not presenting message because monitoring is off"
|
|
debug.println(debug.LEVEL_INFO, msg, True)
|
|
|
|
# set the last live obj to be announced
|
|
self.lastliveobj = obj
|
|
|
|
# cache our message
|
|
self._cacheMessage(utts)
|
|
|
|
# We still want to maintain our queue if we are not monitoring
|
|
if not self.monitoring:
|
|
self.msg_queue.purgeByKeepAlive()
|
|
|
|
msg = 'LIVE REGIONS: messages in queue: %i' % len(self.msg_queue)
|
|
debug.println(debug.LEVEL_INFO, msg, True)
|
|
debug.println(debug.eventDebugLevel, "^^^^^ PRESENT LIVE REGION MESSAGE ^^^^^\n")
|
|
|
|
# See you again soon, stay in event loop if we still have messages.
|
|
return len(self.msg_queue) > 0
|
|
|
|
def getLiveNoneObjects(self):
|
|
"""Return the live objects that are registered and have a politeness
|
|
of LIVE_NONE. """
|
|
retval = []
|
|
currenturi = self._script.bookmarks.getURIKey()
|
|
for uri, objectid in self._politenessOverrides:
|
|
if uri == currenturi and isinstance(objectid, tuple):
|
|
retval.append(self._script.bookmarks.pathToObj(objectid))
|
|
return retval
|
|
|
|
def advancePoliteness(self, script, inputEvent):
|
|
"""Advance the politeness level of the given object"""
|
|
|
|
if not _settingsManager.getSetting('inferLiveRegions'):
|
|
self._script.presentMessage(messages.LIVE_REGIONS_OFF)
|
|
return
|
|
|
|
obj = orca_state.locusOfFocus
|
|
objectid = self._getObjectId(obj)
|
|
uri = self._script.bookmarks.getURIKey()
|
|
|
|
try:
|
|
# The current priority is either a previous override or the
|
|
# live property. If an exception is thrown, an override for
|
|
# this object has never occurred and the object does not have
|
|
# live markup. In either case, set the override to LIVE_NONE.
|
|
cur_priority = self._politenessOverrides[(uri, objectid)]
|
|
except KeyError:
|
|
cur_priority = self._liveStringToType(obj)
|
|
|
|
if cur_priority == LIVE_OFF or cur_priority == LIVE_NONE:
|
|
self._politenessOverrides[(uri, objectid)] = LIVE_POLITE
|
|
self._script.presentMessage(messages.LIVE_REGIONS_LEVEL_POLITE)
|
|
elif cur_priority == LIVE_POLITE:
|
|
self._politenessOverrides[(uri, objectid)] = LIVE_ASSERTIVE
|
|
self._script.presentMessage(messages.LIVE_REGIONS_LEVEL_ASSERTIVE)
|
|
elif cur_priority == LIVE_ASSERTIVE:
|
|
self._politenessOverrides[(uri, objectid)] = LIVE_RUDE
|
|
self._script.presentMessage(messages.LIVE_REGIONS_LEVEL_RUDE)
|
|
elif cur_priority == LIVE_RUDE:
|
|
self._politenessOverrides[(uri, objectid)] = LIVE_OFF
|
|
self._script.presentMessage(messages.LIVE_REGIONS_LEVEL_OFF)
|
|
|
|
|
|
def goLastLiveRegion(self):
|
|
"""Move the caret to the last announced live region and speak the
|
|
contents of that object"""
|
|
if self.lastliveobj:
|
|
self._script.utilities.setCaretPosition(self.lastliveobj, 0)
|
|
self._script.speakContents(self._script.utilities.getObjectContentsAtOffset(
|
|
self.lastliveobj, 0))
|
|
|
|
def reviewLiveAnnouncement(self, script, inputEvent):
|
|
"""Speak the given number cached message"""
|
|
|
|
msgnum = int(inputEvent.event_string[1:])
|
|
if not _settingsManager.getSetting('inferLiveRegions'):
|
|
self._script.presentMessage(messages.LIVE_REGIONS_OFF)
|
|
return
|
|
|
|
if msgnum > len(self.msg_cache):
|
|
self._script.presentMessage(messages.LIVE_REGIONS_NO_MESSAGE)
|
|
else:
|
|
self._script.presentMessage(self.msg_cache[-msgnum])
|
|
|
|
def setLivePolitenessOff(self, script, inputEvent):
|
|
"""User toggle to set all live regions to LIVE_OFF or back to their
|
|
original politeness."""
|
|
|
|
if not _settingsManager.getSetting('inferLiveRegions'):
|
|
self._script.presentMessage(messages.LIVE_REGIONS_OFF)
|
|
return
|
|
|
|
# start at the document frame
|
|
docframe = self._script.utilities.documentFrame()
|
|
# get the URI of the page. It is used as a partial key.
|
|
uri = self._script.bookmarks.getURIKey()
|
|
|
|
# The user is currently monitoring live regions but now wants to
|
|
# change all live region politeness on page to LIVE_OFF
|
|
if self.monitoring:
|
|
self._script.presentMessage(messages.LIVE_REGIONS_ALL_OFF)
|
|
self.msg_queue.clear()
|
|
|
|
# First we'll save off a copy for quick restoration
|
|
self._restoreOverrides = copy.copy(self._politenessOverrides)
|
|
|
|
# Set all politeness overrides to LIVE_OFF.
|
|
for override in self._politenessOverrides.keys():
|
|
self._politenessOverrides[override] = LIVE_OFF
|
|
|
|
# look through all the objects on the page and set/add to
|
|
# politeness overrides. This only adds live regions with good
|
|
# markup.
|
|
matches = self._script.utilities.findAllDescendants(
|
|
docframe, self.matchLiveRegion)
|
|
for match in matches:
|
|
objectid = self._getObjectId(match)
|
|
self._politenessOverrides[(uri, objectid)] = LIVE_OFF
|
|
|
|
# Toggle our flag
|
|
self.monitoring = False
|
|
|
|
# The user wants to restore politeness levels
|
|
else:
|
|
for key, value in self._restoreOverrides.items():
|
|
self._politenessOverrides[key] = value
|
|
self._script.presentMessage(messages.LIVE_REGIONS_ALL_RESTORED)
|
|
# Toggle our flag
|
|
self.monitoring = True
|
|
|
|
def generateLiveRegionDescription(self, obj, **args):
|
|
"""Used in conjunction with whereAmI to output description and
|
|
politeness of the given live region object"""
|
|
objectid = self._getObjectId(obj)
|
|
uri = self._script.bookmarks.getURIKey()
|
|
|
|
results = []
|
|
|
|
# get the description if there is one.
|
|
for relation in obj.getRelationSet():
|
|
relationtype = relation.getRelationType()
|
|
if relationtype == pyatspi.RELATION_DESCRIBED_BY:
|
|
targetobj = relation.getTarget(0)
|
|
try:
|
|
# We will add on descriptions if they don't duplicate
|
|
# what's already in the object's description.
|
|
# See http://bugzilla.gnome.org/show_bug.cgi?id=568467
|
|
# for more information.
|
|
#
|
|
description = targetobj.queryText().getText(0, -1)
|
|
if description.strip() != obj.description.strip():
|
|
results.append(description)
|
|
except NotImplemented:
|
|
pass
|
|
|
|
# get the politeness level as a string
|
|
try:
|
|
livepriority = self._politenessOverrides[(uri, objectid)]
|
|
liveprioritystr = self._liveTypeToString(livepriority)
|
|
except KeyError:
|
|
liveprioritystr = 'none'
|
|
|
|
# We will only output useful information
|
|
#
|
|
if results or liveprioritystr != 'none':
|
|
results.append(messages.LIVE_REGIONS_LEVEL % liveprioritystr)
|
|
|
|
return results
|
|
|
|
def matchLiveRegion(self, obj):
|
|
"""Predicate used to find a live region"""
|
|
attrs = self._getAttrDictionary(obj)
|
|
return 'container-live' in attrs
|
|
|
|
def _findContainer(self, obj):
|
|
isContainer = lambda x: self._getAttrDictionary(x).get('atomic')
|
|
if isContainer(obj):
|
|
return obj
|
|
|
|
return pyatspi.findAncestor(obj, isContainer)
|
|
|
|
def _getMessage(self, event):
|
|
"""Gets the message associated with a given live event."""
|
|
attrs = self._getAttrDictionary(event.source)
|
|
content = ""
|
|
labels = ""
|
|
|
|
# A message is divided into two parts: labels and content. We
|
|
# will first try to get the content. If there is None,
|
|
# assume it is an invalid message and return None
|
|
if event.type.startswith('object:children-changed:add'):
|
|
if attrs.get('container-atomic') == 'true':
|
|
content = self._script.utilities.expandEOCs(event.source)
|
|
else:
|
|
content = self._script.utilities.expandEOCs(event.any_data)
|
|
|
|
elif event.type.startswith('object:text-changed:insert'):
|
|
if attrs.get('container-atomic') != 'true':
|
|
if not "\ufffc" in event.any_data:
|
|
content = event.any_data
|
|
else:
|
|
content = self._script.utilities.expandEOCs(
|
|
event.source, event.detail1, event.detail1 + event.detail2)
|
|
else:
|
|
container = self._findContainer(event.source)
|
|
content = self._script.utilities.expandEOCs(container)
|
|
|
|
if not content:
|
|
return None
|
|
|
|
content = content.strip()
|
|
if len(content) == 1:
|
|
content = chnames.getCharacterName(content)
|
|
|
|
# Proper live regions typically come with proper aria labels. These
|
|
# labels are typically exposed as names. Failing that, descriptions.
|
|
# Looking for actual labels seems a non-performant waste of time.
|
|
name = (event.source.name or event.source.description).strip()
|
|
if name and name != content:
|
|
labels = name
|
|
|
|
# instantly send out notify messages
|
|
if attrs.get('channel') == 'notify':
|
|
utts = labels + content
|
|
self._script.presentationInterrupt()
|
|
self._script.presentMessage(utts)
|
|
return None
|
|
|
|
return {'content':[content], 'labels':[labels]}
|
|
|
|
def flushMessages(self):
|
|
self.msg_queue.clear()
|
|
|
|
def _cacheMessage(self, utts):
|
|
"""Cache a message in our cache list of length CACHE_SIZE"""
|
|
self.msg_cache.append(utts)
|
|
if len(self.msg_cache) > CACHE_SIZE:
|
|
self.msg_cache.pop(0)
|
|
|
|
def _getLiveType(self, obj):
|
|
"""Returns the live politeness setting for a given object. Also,
|
|
registers LIVE_NONE objects in politeness overrides when monitoring."""
|
|
objectid = self._getObjectId(obj)
|
|
uri = self._script.bookmarks.getURIKey()
|
|
if (uri, objectid) in self._politenessOverrides:
|
|
# look to see if there is a user politeness override
|
|
return self._politenessOverrides[(uri, objectid)]
|
|
else:
|
|
livetype = self._liveStringToType(obj)
|
|
# We'll save off a reference to LIVE_NONE if we are monitoring
|
|
# to give the user a chance to change the politeness level. It
|
|
# is done here for performance sake (objectid, uri are expensive)
|
|
if livetype == LIVE_NONE and self.monitoring:
|
|
self._politenessOverrides[(uri, objectid)] = livetype
|
|
return livetype
|
|
|
|
def _getObjectId(self, obj):
|
|
"""Returns the HTML 'id' or a path to the object is an HTML id is
|
|
unavailable"""
|
|
attrs = self._getAttrDictionary(obj)
|
|
if attrs is None:
|
|
return self._getPath(obj)
|
|
try:
|
|
return attrs['id']
|
|
except KeyError:
|
|
return self._getPath(obj)
|
|
|
|
def _liveStringToType(self, obj, attributes=None):
|
|
"""Returns the politeness enum for a given object"""
|
|
attrs = attributes or self._getAttrDictionary(obj)
|
|
try:
|
|
if attrs['container-live'] == 'off':
|
|
return LIVE_OFF
|
|
elif attrs['container-live'] == 'polite':
|
|
return LIVE_POLITE
|
|
elif attrs['container-live'] == 'assertive':
|
|
return LIVE_ASSERTIVE
|
|
elif attrs['container-live'] == 'rude':
|
|
return LIVE_RUDE
|
|
else: return LIVE_NONE
|
|
except KeyError:
|
|
return LIVE_NONE
|
|
|
|
def _liveTypeToString(self, politeness):
|
|
"""Returns the politeness level as a string given a politeness enum"""
|
|
if politeness == LIVE_OFF:
|
|
return 'off'
|
|
elif politeness == LIVE_POLITE:
|
|
return 'polite'
|
|
elif politeness == LIVE_ASSERTIVE:
|
|
return 'assertive'
|
|
elif politeness == LIVE_RUDE:
|
|
return 'rude'
|
|
elif politeness == LIVE_NONE:
|
|
return 'none'
|
|
else: return 'unknown'
|
|
|
|
def _getAttrDictionary(self, obj):
|
|
return self._script.utilities.objectAttributes(obj)
|
|
|
|
def _getPath(self, obj):
|
|
""" Returns, as a tuple of integers, the path from the given object
|
|
to the document frame."""
|
|
docframe = self._script.utilities.documentFrame()
|
|
path = []
|
|
while True:
|
|
if obj.parent is None or obj == docframe:
|
|
path.reverse()
|
|
return tuple(path)
|
|
try:
|
|
path.append(obj.getIndexInParent())
|
|
except Exception:
|
|
raise LookupError
|
|
obj = obj.parent
|
|
|
|
def toggleMonitoring(self, script, inputEvent):
|
|
if not _settingsManager.getSetting('inferLiveRegions'):
|
|
_settingsManager.setSetting('inferLiveRegions', True)
|
|
self._script.presentMessage(messages.LIVE_REGIONS_MONITORING_ON)
|
|
else:
|
|
_settingsManager.setSetting('inferLiveRegions', False)
|
|
self.flushMessages()
|
|
self._script.presentMessage(messages.LIVE_REGIONS_MONITORING_OFF)
|