Source code for napari.utils.events.containers._evented_dict

"""MutableMapping that emits events when altered."""

from collections.abc import Mapping, Sequence
from typing import Optional, Union

from napari.utils.events.containers._dict import _K, _T, TypedMutableMapping
from napari.utils.events.event import EmitterGroup, Event
from napari.utils.events.types import SupportsEvents


[docs] class EventedDict(TypedMutableMapping[_K, _T]): """Mutable dictionary that emits events when altered. This class is designed to behave exactly like builtin ``dict``, but will emit events before and after all mutations (addition, removal, and changing). Parameters ---------- data : Mapping, optional Dictionary to initialize the class with. basetype : type of sequence of types, optional Type of the element in the dictionary. Events ------ changing (key: K) emitted before an item at ``key`` is changed changed (key: K, old_value: T, value: T) emitted when item at ``key`` is changed from ``old_value`` to ``value`` adding (key: K) emitted before an item is added to the dictionary with ``key`` added (key: K, value: T) emitted after ``value`` was added to the dictionary with ``key`` removing (key: K) emitted before ``key`` is removed from the dictionary removed (key: K, value: T) emitted after ``key`` was removed from the dictionary updated (key, K, value: T) emitted after ``value`` of ``key`` was changed. Only implemented by subclasses to give them an option to trigger some update after ``value`` was changed and this class did not register it. This can be useful if the ``basetype`` is not an evented object. """ events: EmitterGroup def __init__( self, data: Optional[Mapping[_K, _T]] = None, basetype: Union[type[_T], Sequence[type[_T]]] = (), ) -> None: _events = { 'changing': None, 'changed': None, 'adding': None, 'added': None, 'removing': None, 'removed': None, 'updated': None, } # For inheritance: If the mro already provides an EmitterGroup, add... if hasattr(self, 'events') and isinstance(self.events, EmitterGroup): self.events.add(**_events) else: # otherwise create a new one self.events = EmitterGroup( source=self, auto_connect=False, **_events ) super().__init__(data, basetype) def __setitem__(self, key: _K, value: _T) -> None: old = self._dict.get(key) if value is old or value == old: return if old is None: self.events.adding(key=key) super().__setitem__(key, value) self.events.added(key=key, value=value) self._connect_child_emitters(value) else: self.events.changing(key=key) super().__setitem__(key, value) self.events.changed(key=key, old_value=old, value=value) def __delitem__(self, key: _K) -> None: self.events.removing(key=key) self._disconnect_child_emitters(self[key]) item = self._dict.pop(key) self.events.removed(key=key, value=item) def _reemit_child_event(self, event: Event) -> None: """An item in the dict emitted an event. Re-emit with key""" if not hasattr(event, 'key'): event.key = self.key(event.source) # re-emit with this object's EventEmitter self.events(event) def _disconnect_child_emitters(self, child: _T) -> None: """Disconnect all events from the child from the re-emitter.""" if isinstance(child, SupportsEvents): child.events.disconnect(self._reemit_child_event) def _connect_child_emitters(self, child: _T) -> None: """Connect all events from the child to be re-emitted.""" if isinstance(child, SupportsEvents): # make sure the event source has been set on the child if child.events.source is None: child.events.source = child child.events.connect(self._reemit_child_event)
[docs] def key(self, value: _T) -> Optional[_K]: """Return first instance of value.""" for k, v in self._dict.items(): if v is value or v == value: return k return None