import gc
from collections import Counter
from logging import getLogger
from enum import Enum
from path import Path
from time import time
from urllib.parse import urljoin
from circuits import Component, Worker, Event, Timer, task
from .events import (handle_download, continue_download, finish_download,
set_dwld_status, reload_programs)
from .remote.client import http_post, Endpoint
from .remote.files import download_file
from .storage import ProgramData, NewProgramData
from .settings import (FILE_DWL_TIMEOUT, FILE_DWL_BREAK_SHORT,
FILE_DWL_BREAK_LONG, FILE_DWL_BREAK_VERYLONG)
from .utils.dateutils import in_future, time_format, datetime_format, now
from .utils.hwinfo import HWInfo
from .utils.stream import MediaStream
[docs]class switch_storage(Event):
""" switch current storage to the new one with downloaded files """
[docs]def communicate(song, dwld_info, token, remote_url):
""" send request for media url that can be downloaded """
filename = song.filename if song else ''
data={
'filename': filename,
'dwld_info': dwld_info
}
success, response = http_post(str(Endpoint.MediaUrl), token, remote_url, data=data)
if success and response and 'media_url' in response:
media_url = response['media_url']
url = media_url if media_url.startswith('http') else urljoin(remote_url, media_url)
timeout = FILE_DWL_TIMEOUT
# download file from media url
result, msg = download_file(url, song.path, timeout)
if not result:
msg = {'msg': msg, 'filename': filename}
return result, msg
elif not success:
return False, {'msg': response, 'filename': filename}
else:
return True, ''
[docs]class State(Enum):
stable = 1
new = 2
download = 3
suspended = 4
[docs]class NoNewProgramData(Exception):
pass
[docs]class NoProgramData(Exception):
pass
[docs]class DownloadManager(Component):
""" handles downloading files from server """
channel = 'download'
def init(self, config):
self.config = config
self.data_dir = Path(config['data_dir']).expanduser()
self.connection = {
'token': config['token'],
'remote_url': config['remote_url']
}
self.logger = getLogger(__name__)
self.worker = Worker(process=True, workers=1, channel=self.channel).register(self)
self.status = State.stable
self.refresh = False
self.storage = ProgramData(self.data_dir)
self.new_storage = NewProgramData(self.data_dir)
self.init_download()
def init_download(self):
self.dwld_info = {}
self.timeout_counter = 0
self.checked_hash = []
self.failed_dwld = Counter()
self.failed_hash = Counter()
self.start_time = time()
self.actual_start = 0
self.actual_end = 0
self.actual_size = None
def started(self, *args):
self.logger.info('started: %r', self)
def _get_current_storage(self):
storage = None
if self.status == State.new:
storage = self.new_storage
if not storage.exists():
raise NoNewProgramData()
elif self.status == State.download:
storage = self.storage
if not storage.exists():
raise NoProgramData()
return storage
def _remove_unnecessary_files(self):
if self.storage.exists():
allfiles = self.storage.all_filenames
existing = list(self.storage.existing_files())
unnecessary = list(set(allfiles) - set(existing))
for fn in unnecessary:
self.storage.delete(fn)
def _calculate_dwld_info(self, stream, song, current, total, dwld_time):
self.dwld_info['current'] = current
self.dwld_info['total'] = total
self.dwld_info['timestamp'] = datetime_format(now())
if stream:
if 'names' not in self.dwld_info:
self.dwld_info['names'] = stream.names()
if 'program_hash' not in self.dwld_info:
self.dwld_info['program_hash'] = stream.storage.program_hash()[:9]
if dwld_time > 0:
size = self.actual_size or 0
speed = size / dwld_time
#self.logger.info('Size: %s\ndwld_time:%s\nSpeed:%s', size, round(dwld_time,1), speed)
self.dwld_info['speed'] = round(speed,1) # byte/s
#self.dwld_info['playlists'] = stream.n_files_by_playlist(existing=True)
if stream.total_size > 0 and speed > 0:
missing_size = stream.total_size - stream.existing_size
self.dwld_info['eta'] = str(round(missing_size / speed, 1))
self.dwld_info['checked_songs'] = len(self.checked_hash)
def _check_hash(self, song):
if song.filename not in self.checked_hash:
x = song.check_hash()
if x:
self.checked_hash.append(song.filename)
elif x is not None:
# check failed
song.remove()
self.failed_hash[song.filename] += 1
self.download_message('failed_hashcheck', filename=song.filename)
return x
def switch_storage(self):
self.logger.debug('switch storage')
if self.status == State.new:
storage = self.storage
storage.switch(self.new_storage)
self.logger.info('switched storage')
self.status = State.download
self.download_message('switched_storage')
[docs] def handle_download(self):
""" download one song from program, if exists or if force is True """
self.logger.info('handling download: %s', self.status.name)
try:
storage = self._get_current_storage()
except NoProgramData:
# it makes no sense to continue without program data
self.fire(finish_download())
return
except NoNewProgramData:
# if continue with old program
self.status = State.download
self.fire(handle_download())
return
else:
if storage is None:
self.logger.info('wrong status: %s', self.status)
return
self.fire(set_dwld_status('downloading'))
current = storage.n_newer_files(self, self.start_time) if self.refresh else storage.n_existing_files
total = storage.total_files
if current >= total:
# all files downloaded
self.fire(finish_download())
return
if self.status == State.new and (current + 1 > total / 2):
# more than half of files downloaded -> switch storage to start audio playing
self.switch_storage()
self.fire(reload_programs(), 'audio') # video player will wait till download finishes
self.fire(handle_download())
return
self.logger.info('downloading: %s / %s', current, total)
remaining = total - current
ms = MediaStream(storage)
for song in ms.all_songs_by_playcount():
# skip songs that failed to download if anything else left
if self.failed_dwld[song.filename] > 0 and len(self.failed_dwld.keys()) < remaining:
continue
if self.refresh and not song.newer(self.start_time):
# remove file before downloading new one
song.remove()
if song.exists():
if self._check_hash(song) == False:
# skip songs that failed 3x
if self.failed_hash[song.filename] >= 3:
continue
else:
# when hash check fails continue downloading
self.fire(handle_download())
break
else:
d = self.actual_end - self.actual_start
self._calculate_dwld_info(ms, song, current, total, d)
self.logger.info('ETA: %s', self.dwld_info.get('eta', '?'))
self.actual_start = time()
self.fire(task(communicate, song, self.dwld_info,
**self.connection), self.channel)
self.logger.info('Downloading song: %s', song)
# only one song per time
break
else:
self.fire(finish_download())
return
def _handle_success(self, path):
self.actual_size = Path(path).size
self.actual_end = time()
# remove files from old program if necessary
if self.status == State.new:
# load old stream
if self.storage.exists():
ms = MediaStream(self.storage)
# remove if available space under 512MB
if HWInfo(self.data_dir).disk_usage() < 512:
for song in ms.all_songs_by_playcount(existing=True):
song.remove()
if self.status != State.stable:
self.fire(handle_download())
[docs] def task_success(self, event, task, response, **kw):
""" returned from succesful task """
if self.channel in event.channels:
success, resp = response
if success:
if resp:
# file was successfully downloaded
self._handle_success(resp)
else:
msg = resp['msg']
filename = resp['filename']
self.logger.warning('Download failed: %s', msg)
if msg == 'TimeoutError':
self._handle_timeout()
else:
self._handle_error(filename, msg)
[docs] def task_failure(self, event, task, error, **kw):
""" returned from unhandled error in download task """
if self.channel in event.channels:
self.logger.error(error[1])
def _handle_timeout(self):
self.timeout_counter += 1
# if timeout occurs 3 times suspend dwld for an hour
if self.timeout_counter >= 3:
self.timeout_counter = 0
delay = FILE_DWL_BREAK_LONG
else:
delay = FILE_DWL_BREAK_SHORT
self.logger.info('Download suspended for %s seconds.', delay)
self.status = State.suspended
self.fire(set_dwld_status('suspended'))
until = time_format(in_future(delay).time())
self.download_message('download_suspended', delay=delay, until=until)
Timer(delay, continue_download()).register(self)
def _handle_error(self, filename, msg):
self.failed_dwld[filename] += 1
self.download_message('download_error', filename=filename, msg=msg)
# if error occurs 3 times suspend dwld for longer period
if self.failed_dwld[filename] < 3:
self.logger.debug('error: %s - continue downloading', self.failed_dwld[filename])
self.fire(handle_download())
else:
self.failed_dwld[filename] = 0
delay = FILE_DWL_BREAK_VERYLONG
self.logger.info('Download suspended for %s seconds.', delay)
self.status = State.suspended
self.fire(set_dwld_status('suspended'))
until = time_format(in_future(delay).time())
self.download_message('download_suspended', delay=delay, until=until)
Timer(delay, continue_download()).register(self)
[docs] def start_download(self, refresh=False, force=False):
"""start downloading workflow
Keyword Arguments:
* refresh {bool} -- redownload all files
* force {bool} -- start even if suspended
"""
if self.status == State.stable or force and self.status == State.suspended:
self.logger.info('start download%s', ': refresh' if refresh else '')
self.status = State.new
self.refresh = refresh
self.init_download()
self.fire(handle_download())
else:
self.logger.info('already downloading')
[docs] def continue_download(self):
""" continue download after suspnsion """
self.logger.info('Download continues after suspension.')
self.status = State.new
self.download_message('download_continues')
self.fire(handle_download())
[docs] def finish_download(self):
""" after finish actions """
self.logger.info('finish download')
if self.status == State.new:
self.switch_storage()
# files are ready we can switch programs
self.fire(reload_programs(), 'audio', 'video')
# notify server of finishing download
self.download_message('download_finished')
self._remove_unnecessary_files()
self.status = State.stable
self.refresh = False
self.init_download()
gc.collect()
[docs] def download_message(self, message, **kwargs):
""" compose message for server """
if message:
storage = self.new_storage if self.new_storage.exists() else self.storage
dwld_info={
'names': storage.program_names(),
'program_hash': storage.program_hash()[:9],
'message': message,
'timestamp': datetime_format(now()),
}
if kwargs:
dwld_info['kwargs'] = kwargs
self.fire(task(communicate, None,
dwld_info=dwld_info,
**self.connection), self.channel)