# Copyright (C) 2008-2010 Adam Olsen
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
#
# The developers of the Exaile media player hereby grant permission
# for non-GPL compatible GStreamer and Exaile plugins to be used and
# distributed together with GStreamer and Exaile. This permission is
# above and beyond the permissions granted by the GPL license by which
# Exaile is covered. If you modify this code, you may extend this
# exception to your version of the code, but you are not obligated to
# do so. If you do not wish to do so, delete this exception statement
# from your version.
from copy import deepcopy
import logging
from time import time
from typing import Dict, Iterable, Iterator, List, Optional, Tuple
from xl import common, event
from xl.nls import gettext as _
from xl.trax.track import Track
logger = logging.getLogger(__name__)
class TrackHolder:
def __init__(self, track, key, **kwargs):
self._track = track
self._key = key
self._attrs = kwargs
def __getattr__(self, attr):
return getattr(self._track, attr)
class TrackDBIterator:
def __init__(self, track_iterator: Iterator[Tuple[str, TrackHolder]]):
self.iter = track_iterator
def __iter__(self):
return self
def __next__(self):
return next(self.iter)[1]._track
[docs]class TrackDB:
"""
Manages a track database.
Allows you to add, remove, retrieve, search, save and load
Track objects.
:param name: The name of this :class:`TrackDB`.
:param location: Path to a file where this :class:`TrackDB`
should be stored.
:param pickle_attrs: A list of attributes to store in the
pickled representation of this object. All
attributes listed must be built-in types, with
one exception: If the object contains the phrase
'tracks' in its name it may be a list or dict
of :class:`Track` objects.
:param load_first: Set to True if this collection should be
loaded before any tracks are created.
"""
def __init__(
self,
name: str = "",
location: str = "",
pickle_attrs: List[str] = [],
loadfirst: bool = False,
):
"""
Sets up the trackDB.
"""
# ensure that the DB is always loaded before any tracks are,
# otherwise internal values are not loaded and may be lost/corrupted
if loadfirst and Track._get_track_count() != 0:
raise RuntimeError(
(
"Internal error! %d tracks already loaded, "
+ "TrackDB must be loaded first!"
)
% Track._get_track_count()
)
self.name = name
self.location = location
self._dirty = False
self.tracks: Dict[str, TrackHolder] = {} # key is URI of the track
self.pickle_attrs = pickle_attrs
self.pickle_attrs += ['tracks', 'name', '_key']
self._saving = False
self._key = 0
self._dbversion = 2.0
self._dbminorversion = 0
self._deleted_keys = []
if location:
self.load_from_location()
self._timeout_save()
def __iter__(self):
"""
Provide the ability to iterate over a TrackDB.
Just as with a dictionary, if tracks are added
or removed during iteration, iteration will halt
with a RuntimeError.
"""
track_iterator = iter(self.tracks.items())
iterator = TrackDBIterator(track_iterator)
return iterator
def __len__(self):
"""
Obtain a count of how many items are in the TrackDB
"""
return len(self.tracks)
@common.glib_wait_seconds(300)
def _timeout_save(self):
"""
Callback for auto-saving.
"""
self.save_to_location()
return True
def set_name(self, name: str) -> None:
"""
Sets the name of this :class:`TrackDB`
:param name: The new name.
"""
self.name = name
self._dirty = True
def get_name(self) -> str:
"""
Gets the name of this :class:`TrackDB`
:return: The name.
"""
return self.name
def set_location(self, location: Optional[str]) -> None:
"""
Sets the location to save to
:param location: the location to save to
"""
self.location = location
self._dirty = True
[docs] @common.synchronized
def load_from_location(self, location: Optional[str] = None):
"""
Restores :class:`TrackDB` state from the pickled representation
stored at the specified location.
:param location: the location to load the data from
"""
if not location:
location = self.location
if not location:
raise AttributeError(
_("You did not specify a location to load the db from")
)
logger.debug("Loading %s DB from %s.", self.name, location)
pdata = common.open_shelf(location)
if "_dbversion" in pdata:
if int(pdata['_dbversion']) > int(self._dbversion):
raise common.VersionError("DB was created on a newer Exaile version.")
elif pdata['_dbversion'] < self._dbversion:
logger.info("Upgrading DB format....")
import shutil
shutil.copyfile(location, location + "-%s.bak" % pdata['_dbversion'])
import xl.migrations.database as dbmig
dbmig.handle_migration(
self, pdata, pdata['_dbversion'], self._dbversion
)
for attr in self.pickle_attrs:
try:
if 'tracks' == attr:
data = {}
for k in (x for x in pdata.keys() if x.startswith("tracks-")):
p = pdata[k]
tr = Track(_unpickles=p[0])
loc = tr.get_loc_for_io()
if loc not in data:
data[loc] = TrackHolder(tr, p[1], **p[2])
else:
logger.warning("Duplicate track found: %s", loc)
# presumably the second track was written because of an error,
# so use the first track found.
del pdata[k]
setattr(self, attr, data)
else:
setattr(self, attr, pdata.get(attr, getattr(self, attr)))
except Exception:
# FIXME: Do something about this
logger.exception("Exception occurred while loading %s", location)
pdata.close()
self._dirty = False
[docs] @common.synchronized
def save_to_location(self, location: Optional[str] = None):
"""
Saves a pickled representation of this :class:`TrackDB` to the
specified location.
:param location: the location to save the data to
"""
if not self._dirty:
for track in self.tracks.values():
if track._track._dirty:
self._dirty = True
break
if not self._dirty:
return
if not location:
location = self.location
if not location:
raise AttributeError(_("You did not specify a location to save the db"))
if self._saving:
return
self._saving = True
logger.debug("Saving %s DB to %s.", self.name, location)
try:
pdata = common.open_shelf(location)
if pdata.get('_dbversion', self._dbversion) > self._dbversion:
raise common.VersionError("DB was created on a newer Exaile.")
except Exception:
logger.exception("Failed to open music DB for writing.")
return
for attr in self.pickle_attrs:
# bad hack to allow saving of lists/dicts of Tracks
if 'tracks' == attr:
for k, track in self.tracks.items():
key = "tracks-%s" % track._key
if track._track._dirty or key not in pdata:
pdata[key] = (
track._track._pickles(),
track._key,
deepcopy(track._attrs),
)
else:
pdata[attr] = deepcopy(getattr(self, attr))
pdata['_dbversion'] = self._dbversion
for key in self._deleted_keys:
key = "tracks-%s" % key
if key in pdata:
del pdata[key]
pdata.sync()
pdata.close()
for track in self.tracks.values():
track._track._dirty = False
self._dirty = False
self._saving = False
def get_track_by_loc(self, loc: str) -> Optional[Track]:
"""
returns the track having the given loc. if no such track exists,
returns None
"""
try:
return self.tracks[loc]._track
except KeyError:
return None
def loc_is_member(self, loc: str) -> bool:
"""
Returns True if loc is a track in this collection, False
if it is not
"""
return loc in self.tracks
def get_count(self) -> int:
"""
Returns the number of tracks stored in this database
"""
return len(self.tracks)
[docs] def add(self, track: Track) -> None:
"""
Adds a track to the database of tracks
:param track: The :class:`xl.trax.Track` to add
"""
self.add_tracks([track])
[docs] @common.synchronized
def add_tracks(self, tracks: Iterable[Track]) -> None:
"""
Like add(), but takes a list of :class:`xl.trax.Track`
"""
locations = []
now = time()
for tr in tracks:
if not tr.get_tag_raw('__date_added'):
tr.set_tags(__date_added=now)
location = tr.get_loc_for_io()
# Don't add duplicates -- track URLs are unique
if location in self.tracks:
continue
# Don't add unsupported media files
if not tr.is_supported():
continue
locations += [location]
self.tracks[location] = TrackHolder(tr, self._key)
self._key += 1
if locations:
event.log_event('tracks_added', self, locations)
self._dirty = True
[docs] def remove(self, track: Track) -> None:
"""
Removes a track from the database
:param track: the :class:`xl.trax.Track` to remove
"""
self.remove_tracks([track])
[docs] @common.synchronized
def remove_tracks(self, tracks: Iterable[Track]) -> None:
"""
Like remove(), but takes a list of :class:`xl.trax.Track`
"""
locations = []
for tr in tracks:
location = tr.get_loc_for_io()
locations += [location]
self._deleted_keys.append(self.tracks[location]._key)
del self.tracks[location]
event.log_event('tracks_removed', self, locations)
self._dirty = True
def get_tracks(self) -> List[Track]:
return list(self)