Source code for easyplayer.storage.data
from logging import getLogger
from .base import JSONFile, HashedJSONFile
from .files import FileStorage
log = getLogger(__name__)
[docs]class PlayerInfoData(HashedJSONFile):
""" storage for player info """
data_filename = 'info.json'
hash_filename = 'info.hash'
[docs]class ProgramData(HashedJSONFile):
""" storage for actual program data """
data_filename = 'programs.json'
hash_filename = 'programs.hash'
def __init__(self, data_dir):
super().__init__(data_dir)
self.fs = FileStorage(data_dir)
def __getattr__(self, name):
# defend against recursion in incomplete instance
if name == 'fs':
raise AttributeError()
return getattr(self.fs, name)
[docs] def switch(self, other):
""" move other file to my data path """
if other.data_path.exists():
if self.data_path.exists():
self.data_path.remove()
other.data_path.move(self.data_path)
[docs] def items(self):
""" iterate over program data items """
if not self.exists():
raise StopIteration
for program in self.read():
for item in program['items']:
yield item
#items.sort(key=lambda item: item['sequence'])
def program_names(self):
return u", ".join([program['name'] for program in self.read()])
[docs] def program_hash(self):
""" get stored hash of program data """
return self.get_hash()
[docs] def filenames(self):
""" iterate over program data file names """
for item in self.items():
for filename in item['files']:
yield filename
[docs] def existing_files(self):
""" iterate over existing file names from program data """
for filename in self.filenames():
if self.fs.file_exists(filename):
yield filename
def newer_files(self, mtime):
for filename in self.existing_files():
if self.fs.newer(filename, mtime):
yield filename
def unique_files(self):
return list(set([f for f in self.filenames()]))
@property
def total_files(self):
return len(self.unique_files())
def unique_existing_files(self):
return list(set([f for f in self.existing_files()]))
@property
def n_existing_files(self):
return len(self.unique_existing_files())
def unique_newer_files(self, mtime):
return list(set([f for f in self.newer_files(mtime)]))
@property
def n_newer_files(self, mtime):
return len(self.unique_newer_files(mtime))
def all_files_exist(self):
for filename in self.filenames():
if not self.fs.file_exists(filename):
return False
return True
[docs]class NewProgramData(ProgramData):
""" storage for new program data """
data_filename = 'new_programs.json'
hash_filename = 'programs.hash'
[docs] def compare_data(self, datastring):
# data path may not exist
return self.get_hash() == self.make_hash(datastring)
[docs] def program_hash(self):
""" get hash of new program data """
s = self.get_data_string()
return self.make_hash(s)
[docs]class InitialProgramData(JSONFile):
""" storage for initial data """
data_filename = 'initial_data.json'