# Copyright (C) 2008-2010 Adam Olsen
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
#
# The developers of the Exaile media player hereby grant permission
# for non-GPL compatible GStreamer and Exaile plugins to be used and
# distributed together with GStreamer and Exaile. This permission is
# above and beyond the permissions granted by the GPL license by which
# Exaile is covered. If you modify this code, you may extend this
# exception to your version of the code, but you are not obligated to
# do so. If you do not wish to do so, delete this exception statement
# from your version.
from __future__ import absolute_import
import logging
from copy import deepcopy
from xl import common, event
from xl.nls import gettext as _
from xl.trax.track import Track
from xl.trax.util import sort_tracks
from xl.trax.search import search_tracks_from_string
from time import time
logger = logging.getLogger(__name__)
class TrackHolder(object):
def __init__(self, track, key, **kwargs):
self._track = track
self._key = key
self._attrs = kwargs
def __getattr__(self, attr):
return getattr(self._track, attr)
class TrackDBIterator(object):
def __init__(self, track_iterator):
self.iter = track_iterator
def __iter__(self):
return self
def next(self):
return self.iter.next()[1]._track
[docs]class TrackDB(object):
"""
Manages a track database.
Allows you to add, remove, retrieve, search, save and load
Track objects.
:param name: The name of this :class:`TrackDB`.
:param location: Path to a file where this :class:`TrackDB`
should be stored.
:param pickle_attrs: A list of attributes to store in the
pickled representation of this object. All
attributes listed must be built-in types, with
one exception: If the object contains the phrase
'tracks' in its name it may be a list or dict
of :class:`Track` objects.
:param load_first: Set to True if this collection should be
loaded before any tracks are created.
"""
def __init__(self, name="", location="", pickle_attrs=[], loadfirst=False):
"""
Sets up the trackDB.
"""
# ensure that the DB is always loaded before any tracks are,
# otherwise internal values are not loaded and may be lost/corrupted
if loadfirst and Track._get_track_count() != 0:
raise RuntimeError(
(
"Internal error! %d tracks already loaded, "
+ "TrackDB must be loaded first!"
)
% Track._get_track_count()
)
self.name = name
self.location = location
self._dirty = False
self.tracks = {} # key is always URI of the track
self.pickle_attrs = pickle_attrs
self.pickle_attrs += ['tracks', 'name', '_key']
self._saving = False
self._key = 0
self._dbversion = 2.0
self._dbminorversion = 0
self._deleted_keys = []
if location:
self.load_from_location()
self._timeout_save()
def __iter__(self):
"""
Provide the ability to iterate over a TrackDB.
Just as with a dictionary, if tracks are added
or removed during iteration, iteration will halt
wuth a RuntimeError.
"""
track_iterator = self.tracks.iteritems()
iterator = TrackDBIterator(track_iterator)
return iterator
def __len__(self):
"""
Obtain a count of how many items are in the TrackDB
"""
return len(self.tracks)
@common.glib_wait_seconds(300)
def _timeout_save(self):
"""
Callback for auto-saving.
"""
self.save_to_location()
return True
def set_name(self, name):
"""
Sets the name of this :class:`TrackDB`
:param name: The new name.
:type name: string
"""
self.name = name
self._dirty = True
def get_name(self):
"""
Gets the name of this :class:`TrackDB`
:return: The name.
:rtype: string
"""
return self.name
def set_location(self, location):
"""
Sets the location to save to
:param location: the location to save to
"""
self.location = location
self._dirty = True
[docs] @common.synchronized
def load_from_location(self, location=None):
"""
Restores :class:`TrackDB` state from the pickled representation
stored at the specified location.
:param location: the location to load the data from
:type location: string
"""
if not location:
location = self.location
if not location:
raise AttributeError(
_("You did not specify a location to load the db from")
)
logger.debug("Loading %s DB from %s.", self.name, location)
pdata = common.open_shelf(location)
if "_dbversion" in pdata:
if int(pdata['_dbversion']) > int(self._dbversion):
raise common.VersionError("DB was created on a newer Exaile version.")
elif pdata['_dbversion'] < self._dbversion:
logger.info("Upgrading DB format....")
import shutil
shutil.copyfile(location, location + "-%s.bak" % pdata['_dbversion'])
import xl.migrations.database as dbmig
dbmig.handle_migration(
self, pdata, pdata['_dbversion'], self._dbversion
)
for attr in self.pickle_attrs:
try:
if 'tracks' == attr:
data = {}
for k in (x for x in pdata.keys() if x.startswith("tracks-")):
p = pdata[k]
tr = Track(_unpickles=p[0])
loc = tr.get_loc_for_io()
if loc not in data:
data[loc] = TrackHolder(tr, p[1], **p[2])
else:
logger.warning("Duplicate track found: %s", loc)
# presumably the second track was written because of an error,
# so use the first track found.
del pdata[k]
setattr(self, attr, data)
else:
setattr(self, attr, pdata.get(attr, getattr(self, attr)))
except Exception:
# FIXME: Do something about this
logger.exception("Exception occurred while loading %s", location)
pdata.close()
self._dirty = False
[docs] @common.synchronized
def save_to_location(self, location=None):
"""
Saves a pickled representation of this :class:`TrackDB` to the
specified location.
:param location: the location to save the data to
:type location: string
"""
if not self._dirty:
for track in self.tracks.itervalues():
if track._track._dirty:
self._dirty = True
break
if not self._dirty:
return
if not location:
location = self.location
if not location:
raise AttributeError(_("You did not specify a location to save the db"))
if self._saving:
return
self._saving = True
logger.debug("Saving %s DB to %s.", self.name, location)
try:
pdata = common.open_shelf(location)
if pdata.get('_dbversion', self._dbversion) > self._dbversion:
raise common.VersionError("DB was created on a newer Exaile.")
except Exception:
logger.exception("Failed to open music DB for writing.")
return
for attr in self.pickle_attrs:
# bad hack to allow saving of lists/dicts of Tracks
if 'tracks' == attr:
for k, track in self.tracks.iteritems():
key = "tracks-%s" % track._key
if track._track._dirty or key not in pdata:
pdata[key] = (
track._track._pickles(),
track._key,
deepcopy(track._attrs),
)
else:
pdata[attr] = deepcopy(getattr(self, attr))
pdata['_dbversion'] = self._dbversion
for key in self._deleted_keys:
key = "tracks-%s" % key
if key in pdata:
del pdata[key]
pdata.sync()
pdata.close()
for track in self.tracks.itervalues():
track._track._dirty = False
self._dirty = False
self._saving = False
def get_track_by_loc(self, loc, raw=False):
"""
returns the track having the given loc. if no such track exists,
returns None
"""
try:
return self.tracks[loc]._track
except KeyError:
return None
def get_tracks_by_locs(self, locs):
"""
returns the track having the given loc. if no such track exists,
returns None
"""
return [self.get_track_by_loc(loc) for loc in locs]
def loc_is_member(self, loc):
"""
Returns True if loc is a track in this collection, False
if it is not
"""
return loc in self.tracks
def get_count(self):
"""
Returns the number of tracks stored in this database
"""
count = len(self.tracks)
return count
[docs] def add(self, track):
"""
Adds a track to the database of tracks
:param track: The :class:`xl.trax.Track` to add
"""
self.add_tracks([track])
[docs] @common.synchronized
def add_tracks(self, tracks):
"""
Like add(), but takes a list of :class:`xl.trax.Track`
"""
locations = []
now = time()
for tr in tracks:
if not tr.get_tag_raw('__date_added'):
tr.set_tags(__date_added=now)
location = tr.get_loc_for_io()
# Don't add duplicates -- track URLs are unique
if location in self.tracks:
continue
locations += [location]
self.tracks[location] = TrackHolder(tr, self._key)
self._key += 1
if locations:
event.log_event('tracks_added', self, locations)
self._dirty = True
[docs] def remove(self, track):
"""
Removes a track from the database
:param track: the :class:`xl.trax.Track` to remove
"""
self.remove_tracks([track])
[docs] @common.synchronized
def remove_tracks(self, tracks):
"""
Like remove(), but takes a list of :class:`xl.trax.Track`
"""
locations = []
for tr in tracks:
location = tr.get_loc_for_io()
locations += [location]
self._deleted_keys.append(self.tracks[location]._key)
del self.tracks[location]
event.log_event('tracks_removed', self, locations)
self._dirty = True
def get_tracks(self):
return list(self)
def search(self, query, sort_fields=[], return_lim=-1, tracks=None, reverse=False):
"""
DEPRECATED, DO NOT USE IN NEW CODE
"""
import warnings
warnings.warn("TrackDB.search is deprecated.", DeprecationWarning)
tracks = [
x.track
for x in search_tracks_from_string(
self,
query,
case_sensitive=False,
keyword_tags=['artist', 'albumartist', 'album', 'title'],
)
]
if sort_fields:
tracks = sort_tracks(sort_fields, tracks, reverse)
if return_lim > 0:
tracks = tracks[:return_lim]
return tracks
# vim: et sts=4 sw=4