Source code for easyplayer.download

import gc
from collections import Counter
from logging import getLogger
from enum import Enum
from path import Path
from time import time
from urllib.parse import urljoin
from circuits import Component, Worker, Event, Timer, task
from .events import (handle_download, continue_download, finish_download,
                     set_dwld_status, reload_programs)
from .remote.client import http_post, Endpoint
from .remote.files import download_file
from .storage import ProgramData, NewProgramData
from .settings import (FILE_DWL_TIMEOUT, FILE_DWL_BREAK_SHORT,
                       FILE_DWL_BREAK_LONG, FILE_DWL_BREAK_VERYLONG)
from .utils.dateutils import in_future, time_format, datetime_format, now
from .utils.hwinfo import HWInfo
from .utils.stream import MediaStream


[docs]class switch_storage(Event): """ switch current storage to the new one with downloaded files """
[docs]def communicate(song, dwld_info, token, remote_url): """ send request for media url that can be downloaded """ filename = song.filename if song else '' data={ 'filename': filename, 'dwld_info': dwld_info } success, response = http_post(str(Endpoint.MediaUrl), token, remote_url, data=data) if success and response and 'media_url' in response: media_url = response['media_url'] url = media_url if media_url.startswith('http') else urljoin(remote_url, media_url) timeout = FILE_DWL_TIMEOUT # download file from media url result, msg = download_file(url, song.path, timeout) if not result: msg = {'msg': msg, 'filename': filename} return result, msg elif not success: return False, {'msg': response, 'filename': filename} else: return True, ''
[docs]class State(Enum): stable = 1 new = 2 download = 3 suspended = 4
[docs]class NoNewProgramData(Exception): pass
[docs]class NoProgramData(Exception): pass
[docs]class DownloadManager(Component): """ handles downloading files from server """ channel = 'download' def init(self, config): self.config = config self.data_dir = Path(config['data_dir']).expanduser() self.connection = { 'token': config['token'], 'remote_url': config['remote_url'] } self.logger = getLogger(__name__) self.worker = Worker(process=True, workers=1, channel=self.channel).register(self) self.status = State.stable self.refresh = False self.storage = ProgramData(self.data_dir) self.new_storage = NewProgramData(self.data_dir) self.init_download() def init_download(self): self.dwld_info = {} self.timeout_counter = 0 self.checked_hash = [] self.failed_dwld = Counter() self.failed_hash = Counter() self.start_time = time() self.actual_start = 0 self.actual_end = 0 self.actual_size = None def started(self, *args): self.logger.info('started: %r', self) def _get_current_storage(self): storage = None if self.status == State.new: storage = self.new_storage if not storage.exists(): raise NoNewProgramData() elif self.status == State.download: storage = self.storage if not storage.exists(): raise NoProgramData() return storage def _remove_unnecessary_files(self): if self.storage.exists(): allfiles = self.storage.all_filenames existing = list(self.storage.existing_files()) unnecessary = list(set(allfiles) - set(existing)) for fn in unnecessary: self.storage.delete(fn) def _calculate_dwld_info(self, stream, song, current, total, dwld_time): self.dwld_info['current'] = current self.dwld_info['total'] = total self.dwld_info['timestamp'] = datetime_format(now()) if stream: if 'names' not in self.dwld_info: self.dwld_info['names'] = stream.names() if 'program_hash' not in self.dwld_info: self.dwld_info['program_hash'] = stream.storage.program_hash()[:9] if dwld_time > 0: size = self.actual_size or 0 speed = size / dwld_time #self.logger.info('Size: %s\ndwld_time:%s\nSpeed:%s', size, round(dwld_time,1), speed) self.dwld_info['speed'] = round(speed,1) # byte/s #self.dwld_info['playlists'] = stream.n_files_by_playlist(existing=True) if stream.total_size > 0 and speed > 0: missing_size = stream.total_size - stream.existing_size self.dwld_info['eta'] = str(round(missing_size / speed, 1)) self.dwld_info['checked_songs'] = len(self.checked_hash) def _check_hash(self, song): if song.filename not in self.checked_hash: x = song.check_hash() if x: self.checked_hash.append(song.filename) elif x is not None: # check failed song.remove() self.failed_hash[song.filename] += 1 self.download_message('failed_hashcheck', filename=song.filename) return x def switch_storage(self): self.logger.debug('switch storage') if self.status == State.new: storage = self.storage storage.switch(self.new_storage) self.logger.info('switched storage') self.status = State.download self.download_message('switched_storage')
[docs] def handle_download(self): """ download one song from program, if exists or if force is True """ self.logger.info('handling download: %s', self.status.name) try: storage = self._get_current_storage() except NoProgramData: # it makes no sense to continue without program data self.fire(finish_download()) return except NoNewProgramData: # if continue with old program self.status = State.download self.fire(handle_download()) return else: if storage is None: self.logger.info('wrong status: %s', self.status) return self.fire(set_dwld_status('downloading')) current = storage.n_newer_files(self, self.start_time) if self.refresh else storage.n_existing_files total = storage.total_files if current >= total: # all files downloaded self.fire(finish_download()) return if self.status == State.new and (current + 1 > total / 2): # more than half of files downloaded -> switch storage to start audio playing self.switch_storage() self.fire(reload_programs(), 'audio') # video player will wait till download finishes self.fire(handle_download()) return self.logger.info('downloading: %s / %s', current, total) remaining = total - current ms = MediaStream(storage) for song in ms.all_songs_by_playcount(): # skip songs that failed to download if anything else left if self.failed_dwld[song.filename] > 0 and len(self.failed_dwld.keys()) < remaining: continue if self.refresh and not song.newer(self.start_time): # remove file before downloading new one song.remove() if song.exists(): if self._check_hash(song) == False: # skip songs that failed 3x if self.failed_hash[song.filename] >= 3: continue else: # when hash check fails continue downloading self.fire(handle_download()) break else: d = self.actual_end - self.actual_start self._calculate_dwld_info(ms, song, current, total, d) self.logger.info('ETA: %s', self.dwld_info.get('eta', '?')) self.actual_start = time() self.fire(task(communicate, song, self.dwld_info, **self.connection), self.channel) self.logger.info('Downloading song: %s', song) # only one song per time break else: self.fire(finish_download()) return
def _handle_success(self, path): self.actual_size = Path(path).size self.actual_end = time() # remove files from old program if necessary if self.status == State.new: # load old stream if self.storage.exists(): ms = MediaStream(self.storage) # remove if available space under 512MB if HWInfo(self.data_dir).disk_usage() < 512: for song in ms.all_songs_by_playcount(existing=True): song.remove() if self.status != State.stable: self.fire(handle_download())
[docs] def task_success(self, event, task, response, **kw): """ returned from succesful task """ if self.channel in event.channels: success, resp = response if success: if resp: # file was successfully downloaded self._handle_success(resp) else: msg = resp['msg'] filename = resp['filename'] self.logger.warning('Download failed: %s', msg) if msg == 'TimeoutError': self._handle_timeout() else: self._handle_error(filename, msg)
[docs] def task_failure(self, event, task, error, **kw): """ returned from unhandled error in download task """ if self.channel in event.channels: self.logger.error(error[1])
def _handle_timeout(self): self.timeout_counter += 1 # if timeout occurs 3 times suspend dwld for an hour if self.timeout_counter >= 3: self.timeout_counter = 0 delay = FILE_DWL_BREAK_LONG else: delay = FILE_DWL_BREAK_SHORT self.logger.info('Download suspended for %s seconds.', delay) self.status = State.suspended self.fire(set_dwld_status('suspended')) until = time_format(in_future(delay).time()) self.download_message('download_suspended', delay=delay, until=until) Timer(delay, continue_download()).register(self) def _handle_error(self, filename, msg): self.failed_dwld[filename] += 1 self.download_message('download_error', filename=filename, msg=msg) # if error occurs 3 times suspend dwld for longer period if self.failed_dwld[filename] < 3: self.logger.debug('error: %s - continue downloading', self.failed_dwld[filename]) self.fire(handle_download()) else: self.failed_dwld[filename] = 0 delay = FILE_DWL_BREAK_VERYLONG self.logger.info('Download suspended for %s seconds.', delay) self.status = State.suspended self.fire(set_dwld_status('suspended')) until = time_format(in_future(delay).time()) self.download_message('download_suspended', delay=delay, until=until) Timer(delay, continue_download()).register(self)
[docs] def start_download(self, refresh=False, force=False): """start downloading workflow Keyword Arguments: * refresh {bool} -- redownload all files * force {bool} -- start even if suspended """ if self.status == State.stable or force and self.status == State.suspended: self.logger.info('start download%s', ': refresh' if refresh else '') self.status = State.new self.refresh = refresh self.init_download() self.fire(handle_download()) else: self.logger.info('already downloading')
[docs] def continue_download(self): """ continue download after suspnsion """ self.logger.info('Download continues after suspension.') self.status = State.new self.download_message('download_continues') self.fire(handle_download())
[docs] def finish_download(self): """ after finish actions """ self.logger.info('finish download') if self.status == State.new: self.switch_storage() # files are ready we can switch programs self.fire(reload_programs(), 'audio', 'video') # notify server of finishing download self.download_message('download_finished') self._remove_unnecessary_files() self.status = State.stable self.refresh = False self.init_download() gc.collect()
[docs] def download_message(self, message, **kwargs): """ compose message for server """ if message: storage = self.new_storage if self.new_storage.exists() else self.storage dwld_info={ 'names': storage.program_names(), 'program_hash': storage.program_hash()[:9], 'message': message, 'timestamp': datetime_format(now()), } if kwargs: dwld_info['kwargs'] = kwargs self.fire(task(communicate, None, dwld_info=dwld_info, **self.connection), self.channel)