Source code for xl.covers

# Copyright (C) 2008-2010 Adam Olsen
# Copyright (C) 2018 Johannes Sasongko <sasongko@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
#
#
# The developers of the Exaile media player hereby grant permission
# for non-GPL compatible GStreamer and Exaile plugins to be used and
# distributed together with GStreamer and Exaile. This permission is
# above and beyond the permissions granted by the GPL license by which
# Exaile is covered. If you modify this code, you may extend this
# exception to your version of the code, but you are not obligated to
# do so. If you do not wish to do so, delete this exception statement
# from your version.

"""
Provides the base for obtaining and storing covers, also known
as album art.
"""

from gi.repository import GLib
from gi.repository import Gio
import logging
import hashlib
import os

try:
    import cPickle as pickle
except ImportError:
    import pickle

from xl.nls import gettext as _
import xl.unicode
from xl import common, event, providers, settings, trax, xdg

logger = logging.getLogger(__name__)


# TODO: maybe this could go into common.py instead? could be
# useful in other areas.
class Cacher(object):
    """
        Simple on-disk cache.

        Note that as entries are stored as
        individual files, the data being stored should be of significant
        size (several KB) or a lot of disk space will likely be wasted.
    """

    def __init__(self, cache_dir):
        """
            :param cache_dir: directory to use for the cache. will be
                created if it does not exist.
        """
        try:
            os.makedirs(cache_dir)
        except OSError:
            pass
        self.cache_dir = cache_dir

    def add(self, data):
        """
            Adds an entry to the cache.  Returns a key that can be used
            to retrieve the data from the cache.

            :param data: The data to store, as a bytestring.
        """
        # FIXME: this doesnt handle hash collisions at all. with
        # 2^256 possible keys its unlikely that we'll have a collision,
        # but we should handle it anyway.
        h = hashlib.sha256()
        h.update(data)
        key = h.hexdigest()
        path = os.path.join(self.cache_dir, key)
        with open(path, "wb") as fp:
            fp.write(data)
        return key

    def remove(self, key):
        """
            Remove an entry from the cache.

            :param key: The key to remove data for.
        """
        path = os.path.join(self.cache_dir, key)
        try:
            os.remove(path)
        except OSError:
            pass

    def get(self, key):
        """
            Retrieve an entry from the cache.  Returns None if the given
            key does not exist.

            :param key: The key to retrieve data for.
        """
        path = os.path.join(self.cache_dir, key)
        if os.path.exists(path):
            with open(path, "rb") as fp:
                return fp.read()
        return None


[docs]class CoverManager(providers.ProviderHandler): """ Handles finding covers from various sources. """ DB_VERSION = 2 def __init__(self, location): """ :param location: The directory to load and store data in. """ providers.ProviderHandler.__init__(self, "covers") self.__cache = Cacher(os.path.join(location, 'cache')) self.location = location self.methods = {} self.order = settings.get_option('covers/preferred_order', []) self.db = {'version': self.DB_VERSION} self.load() for method in self.get_providers(): self.on_provider_added(method) with open(xdg.get_data_path('images', 'nocover.png'), 'rb') as f: self.default_cover_data = f.read() self.tag_fetcher = TagCoverFetcher() self.localfile_fetcher = LocalFileCoverFetcher() if settings.get_option('covers/use_tags', True): providers.register('covers', self.tag_fetcher) if settings.get_option('covers/use_localfile', True): providers.register('covers', self.localfile_fetcher) event.add_callback(self._on_option_set, 'covers_option_set') def _on_option_set(self, name, obj, data): if data == "covers/use_tags": if settings.get_option("covers/use_tags"): providers.register('covers', self.tag_fetcher) else: providers.unregister('covers', self.tag_fetcher) elif data == "covers/use_localfile": if settings.get_option("covers/use_localfile"): providers.register('covers', self.localfile_fetcher) else: providers.unregister('covers', self.localfile_fetcher) def _get_methods(self, fixed=False): """ Returns a list of Methods, sorted by preference :param fixed: If true, include fixed-position backends in the returned list. """ methods = [] for name in self.order: if name in self.methods: methods.append(self.methods[name]) for k, method in self.methods.iteritems(): if method not in methods: methods.append(method) nonfixed = [m for m in methods if not m.fixed] if fixed: fixed = [m for m in methods if m.fixed] fixed.sort(key=lambda x: x.fixed_priority) for i, v in enumerate(fixed): if v.fixed_priority > 50: methods = fixed[:i] + nonfixed + fixed[i:] break else: methods = fixed + nonfixed else: methods = nonfixed return methods @staticmethod def _get_track_key(track): """Get a unique, hashable identifier for the track's album.""" # The output is in the form # u'tag1 \0 value1a \1 value1b \0 tag2 \0 value2' # without the spaces. # # Possible tag combinations, in order of preference: # * musicbrainz_albumid # * album albumartist [date] # * __compilation [date] # * album [artist] [date] def _get_pair(tag): value = track.get_tag_raw(tag) if not value: return None value = u'\1'.join( xl.unicode.to_unicode(v, 'utf-8', 'surrogateescape') for v in value ) assert isinstance(tag, bytes) return tag.decode('ascii') + u'\0' + value albumid = _get_pair('musicbrainz_albumid') if albumid: return albumid album = _get_pair('album') if not album: return None albumartist = _get_pair('albumartist') if albumartist: dbkey = album + u'\0' + albumartist else: compilation = _get_pair('__compilation') if compilation: # compilation is directory+album, where the directory mimics # the role of albumartist. dbkey = compilation else: dbkey = album artist = _get_pair('artist') if artist: dbkey += u'\0' + artist assert dbkey date = _get_pair('date') if date: dbkey += u'\0' + date return dbkey
[docs] def get_db_string(self, track): """ Returns the internal string used to map the cover to a track :param track: the track to retrieve the string for :type track: :class:`xl.trax.Track` :returns: the internal identifier string """ key = self._get_track_key(track) return self.db.get(key)
[docs] @common.synchronized @common.cached(5) def find_covers(self, track, limit=-1, local_only=False): """ Find all covers for a track :param track: The track to find covers for :param limit: maximum number of covers to return. -1=unlimited. :param local_only: If True, will only return results from local sources. """ if track is None: return covers = [] for method in self._get_methods(fixed=True): if local_only and method.use_cache: continue new = method.find_covers(track, limit=limit) new = ["%s:%s" % (method.name, x) for x in new] covers.extend(new) if limit != -1 and len(covers) >= limit: break return covers
[docs] def set_cover(self, track, db_string, data=None): """ Sets the cover for a track. This will overwrite any existing entry. :param track: The track to set the cover for :param db_string: the string identifying the source of the cover, in "method:key" format. :param data: The raw cover data to store for the track. Will only be stored if the method has use_cache=True """ name = db_string.split(":", 1)[0] method = self.methods.get(name) if method and method.use_cache and data: db_string = "cache:%s" % self.__cache.add(data) key = self._get_track_key(track) if key: self.db[key] = db_string self.timeout_save() event.log_event('cover_set', self, track)
[docs] def remove_cover(self, track): """ Remove the saved cover entry for a track, if it exists. """ if track is None: return key = self._get_track_key(track) db_string = self.db.get(key) if db_string: del self.db[key] self.__cache.remove(db_string) self.timeout_save() event.log_event('cover_removed', self, track)
[docs] def get_cover(self, track, save_cover=True, set_only=False, use_default=False): """ get the cover for a given track. if the track has no set cover, backends are searched until a cover is found or we run out of backends. :param track: the Track to get the cover for. :param save_cover: if True, a set_cover call will be made to store the cover for later use. :param set_only: Only retrieve covers that have been set in the db. :param use_default: If True, returns the default cover instead of None when no covers are found. """ if track is None: return self.get_default_cover() if use_default else None db_string = self.get_db_string(track) if db_string: cover = self.get_cover_data(db_string, use_default=use_default) if cover: return cover if set_only: return self.get_default_cover() if use_default else None covers = self.find_covers(track, limit=1) if covers: cover = covers[0] data = self.get_cover_data(cover, use_default=use_default) if save_cover and data != self.get_default_cover(): self.set_cover(track, cover, data) return data return self.get_default_cover() if use_default else None
[docs] def get_cover_data(self, db_string, use_default=False): """ Get the raw image data for a cover. :param db_string: The db_string identifying the cover to get. :param use_default: If True, returns the default cover instead of None when no covers are found. """ source, data = db_string.split(":", 1) ret = None if source == "cache": ret = self.__cache.get(data) else: method = self.methods.get(source) if method: ret = method.get_cover_data(data) if ret is None and use_default is True: ret = self.get_default_cover() return ret
[docs] def get_default_cover(self): """ Get the raw image data for the cover to show if there is no cover to display. """ # TODO: wrap this into get_cover_data and get_cover somehow? return self.default_cover_data
[docs] def load(self): """ Load the saved db """ path = os.path.join(self.location, 'covers.db') data = None for loc in [path, path + ".old", path + ".new"]: try: with open(loc, 'rb') as f: data = pickle.load(f) except IOError: pass except EOFError: try: os.remove(loc) except Exception: pass if data: break if data: self.db = data version = self.db.get('version', 1) if version > self.DB_VERSION: logger.error( "covers.db version (%s) higher than supported (%s); using anyway", version, self.DB_VERSION, )
@common.glib_wait_seconds(60) def timeout_save(self): self.save()
[docs] def save(self): """ Save the db """ path = os.path.join(self.location, 'covers.db') try: with open(path + ".new", 'wb') as f: pickle.dump(self.db, f, common.PICKLE_PROTOCOL) except IOError: return try: os.rename(path, path + ".old") except OSError: pass # if it doesn'texist we don't care os.rename(path + ".new", path) try: os.remove(path + ".old") except OSError: pass
[docs] def on_provider_added(self, provider): self.methods[provider.name] = provider if provider.name not in self.order: self.order.append(provider.name)
[docs] def on_provider_removed(self, provider): try: del self.methods[provider.name] except KeyError: pass if provider.name in self.order: self.order.remove(provider.name)
[docs] def set_preferred_order(self, order): """ Sets the preferred search order :param order: a list containing the order you'd like to search first """ if not type(order) in (list, tuple): raise TypeError("order must be a list or tuple") self.order = order settings.set_option('covers/preferred_order', list(order))
[docs] def get_cover_for_tracks(self, tracks, db_strings_to_ignore): """ For tracks, try to find a cover Basically returns the first cover found :param tracks: list of tracks [xl.trax.Track] :param db_strings_to_ignore: list [str] :return: GdkPixbuf.Pixbuf or None if no cover found """ for track in tracks: db_string = self.get_db_string(track) if db_string and db_string not in db_strings_to_ignore: db_strings_to_ignore.append(db_string) return self.get_cover_data(db_string) return None # No cover found
[docs]class CoverSearchMethod(object): """ Base class for creating cover search methods. Search methods do not have to inherit from this class, it's intended more as a template to demonstrate the needed interface. """ #: If true, cover results will be cached for faster lookup use_cache = True #: A name uniquely identifing the search method. name = "base" #: Whether the backend should have a fixed priority instead of being # configurable. fixed = False #: Priority for fixed-position backends. Lower is earlier, non-fixed # backends will always be 50. fixed_priority = 50
[docs] def find_covers(self, track, limit=-1): """ Find the covers for a given track. :param track: The track to find covers for. :param limit: Maximal number of covers to return. :returns: A list of strings that can be passed to get_cover_data. """ raise NotImplementedError
[docs] def get_cover_data(self, db_string): """ Get the image data for a cover :param db_string: A method-dependent string that identifies the cover to get. """ raise NotImplementedError
[docs]class TagCoverFetcher(CoverSearchMethod): """ Cover source that looks for images embedded in tags. """ use_cache = False name = "tags" title = _('Tags') cover_tags = ["cover", "coverart"] fixed = True fixed_priority = 30 def find_covers(self, track, limit=-1): covers = [] tagname = None uri = track.get_loc_for_io() for tag in self.cover_tags: try: # Force type conversion to list, fails for None covers = list(track.get_tag_disk(tag)) tagname = tag break except (TypeError, KeyError): pass return [ '{tagname}:{index}:{uri}'.format(tagname=tagname, index=index, uri=uri) for index in range(0, len(covers)) ] def get_cover_data(self, db_string): tag, index, uri = db_string.split(':', 2) track = trax.Track(uri, scan=False) covers = track.get_tag_disk(tag) if not covers: return None return covers[int(index)].data
[docs]class LocalFileCoverFetcher(CoverSearchMethod): """ Cover source that looks for images in the same directory as the Track. """ use_cache = False name = "localfile" title = _('Local file') uri_types = ['file', 'smb', 'sftp', 'nfs'] extensions = ['.png', '.jpg', '.jpeg', '.gif'] preferred_names = [] fixed = True fixed_priority = 31 def __init__(self): CoverSearchMethod.__init__(self) event.add_callback(self.on_option_set, 'covers_localfile_option_set') self.on_option_set( 'covers_localfile_option_set', settings, 'covers/localfile/preferred_names' ) def find_covers(self, track, limit=-1): # TODO: perhaps should instead check to see if its mounted in # gio, rather than basing this on uri type. file:// should # always be checked, obviously. if track.get_type() not in self.uri_types: return [] basedir = Gio.File.new_for_uri(track.get_loc_for_io()).get_parent() try: if ( not basedir.query_info( "standard::type", Gio.FileQueryInfoFlags.NONE, None ).get_file_type() == Gio.FileType.DIRECTORY ): return [] except GLib.Error: return [] covers = [] for fileinfo in basedir.enumerate_children( "standard::type" ",standard::name", Gio.FileQueryInfoFlags.NONE, None ): gloc = basedir.get_child(fileinfo.get_name()) if not fileinfo.get_file_type() == Gio.FileType.REGULAR: continue filename = gloc.get_basename() base, ext = os.path.splitext(filename) if ext.lower() not in self.extensions: continue if base in self.preferred_names: covers.insert(0, gloc.get_uri()) else: covers.append(gloc.get_uri()) if limit == -1: return covers else: return covers[:limit] def get_cover_data(self, db_string): try: data = Gio.File.new_for_uri(db_string).load_contents(None)[1] return data except GLib.GError: return None def on_option_set(self, e, settings, option): """ Updates the internal settings upon option change """ if option == 'covers/localfile/preferred_names': self.preferred_names = settings.get_option(option, ['album', 'cover'])
#: The singleton :class:`CoverManager` instance MANAGER = CoverManager(location=xdg.get_data_home_path("covers", check_exists=False))