307 lines
11 KiB
Python
307 lines
11 KiB
Python
# Gtk imports
|
|
|
|
# Python imports
|
|
import os, subprocess, hashlib, threading
|
|
|
|
from os.path import isdir, isfile, join
|
|
from os import listdir
|
|
|
|
|
|
# Application imports
|
|
from .Logger import Logger
|
|
from .. import app, config
|
|
|
|
|
|
|
|
class FileManager:
|
|
def __init__(self, _db, _Settings):
|
|
self.db = _db
|
|
self.SettingsDBObj = _Settings
|
|
self.logging = Logger().get_logger("FileManager")
|
|
|
|
self.THIS_FILE_PTH = os.path.dirname(os.path.realpath(__file__))
|
|
self.STATIC_FPTH = self.THIS_FILE_PTH + "/../static"
|
|
self.REMUX_FOLDER = self.STATIC_FPTH + "/remuxs"
|
|
self.FFMPG_THUMBNLR = self.STATIC_FPTH + "/ffmpegthumbnailer"
|
|
self.VEXTENSION = ('.mkv', '.avi', '.flv', '.mov', '.m4v', '.mpg', '.wmv', '.mpeg', '.mp4', '.webm')
|
|
self.OEXTENSION = ('.doc', '.docx', '.xls', '.xlsx', '.xlt', '.xltx', '.xlm', '.ppt', 'pptx', '.pps', '.ppsx', '.odt', '.rtf')
|
|
self.IEXTENSION = ('.png', '.jpg', '.jpeg', '.gif', '.ico', '.tga')
|
|
self.TEXTENSION = ('.txt', '.text', '.sh', '.cfg', '.conf')
|
|
self.MEXTENSION = ('.psf', '.mp3', '.ogg', '.flac', '.m4a')
|
|
self.PEXTENSION = ('.pdf')
|
|
|
|
self.dotHash = self.hashText(".")
|
|
self.dotdotHash = self.hashText("..")
|
|
self.pathParts = []
|
|
self.pathPartHashs = []
|
|
self.dirs = []
|
|
self.vids = []
|
|
self.images = []
|
|
self.files = []
|
|
|
|
self.loadPreviousPath()
|
|
self.generateLists(self.dotHash)
|
|
|
|
|
|
def generateLists(self, partHash, showHiddenFiles = False):
|
|
path = self.determinAction(partHash)
|
|
if "error" in path:
|
|
return path
|
|
|
|
self.dirs.clear()
|
|
self.vids.clear()
|
|
self.images.clear()
|
|
self.files.clear()
|
|
|
|
for f in listdir(path):
|
|
file = join(path, f)
|
|
fileHash = self.hashText(f)
|
|
|
|
if f.startswith('.') and not showHiddenFiles:
|
|
continue
|
|
|
|
if isfile(file):
|
|
if file.lower().endswith(self.VEXTENSION):
|
|
absHashImgPth = self.STATIC_FPTH + "/imgs/thumbnails/" + fileHash + ".jpg"
|
|
hashImgPth = "static/imgs/thumbnails/" + fileHash + ".jpg"
|
|
|
|
if isfile(absHashImgPth) == False:
|
|
self.generateVideoThumbnail(file, absHashImgPth)
|
|
|
|
self.vids.append([f, hashImgPth, fileHash])
|
|
elif file.lower().endswith(self.IEXTENSION):
|
|
self.images.append([f, fileHash])
|
|
else:
|
|
self.files.append([f, fileHash])
|
|
else:
|
|
self.dirs.append([f, fileHash])
|
|
|
|
self.dirs.sort()
|
|
self.vids.sort()
|
|
self.images.sort()
|
|
self.files.sort()
|
|
|
|
return "success"
|
|
|
|
|
|
def determinAction(self, partHash):
|
|
pathPart = self.returnPathPartFromHash(partHash)
|
|
isDotorDotDot = False
|
|
|
|
if self.isDotHash(partHash) or self.isDotDotHash(partHash):
|
|
isDotorDotDot = True
|
|
|
|
# If hash not in dir hashes prob bad path or data...
|
|
if not pathPart and not isDotorDotDot:
|
|
return "error"
|
|
|
|
# Pop from stack if .. and not length 1
|
|
if partHash in self.dotdotHash:
|
|
if len(self.pathParts) > 1 and len(self.pathPartHashs) > 1:
|
|
self.pathParts.pop()
|
|
self.pathPartHashs.pop()
|
|
|
|
# Append to our stacks if not . or ..
|
|
if not isDotorDotDot:
|
|
self.pathParts.append(pathPart)
|
|
self.pathPartHashs.append(partHash)
|
|
|
|
# Get path parts excluding base path...
|
|
with app.app_context():
|
|
current_path_hash = self.db.session.query(self.SettingsDBObj).filter_by(key="current_path_hash").first()
|
|
|
|
size = len(self.pathPartHashs)
|
|
if size > 0: # If 0 or less then we have 0 or 1 slot (IE base path or none)
|
|
current_path_hash.value = "/".join(self.pathPartHashs[1:size] )
|
|
|
|
current_path = self.db.session.query(self.SettingsDBObj).filter_by(key="current_path").first()
|
|
size = len(self.pathParts)
|
|
if size > 0: # If 0 or less then we have 0 or 1 slot (IE base path or none)
|
|
current_path.value = "/".join(self.pathParts[1:size])
|
|
|
|
self.db.session.commit()
|
|
|
|
return "/".join(self.pathParts)
|
|
|
|
|
|
def returnPathPartFromHash(self, partHash):
|
|
hashes = [self.dotHash, self.dotdotHash]
|
|
print(self.pathParts)
|
|
path = "/".join(self.pathParts)
|
|
pathPart = None
|
|
for f in listdir(path):
|
|
hash = self.hashText(f)
|
|
|
|
if partHash == hash:
|
|
pathPart = f
|
|
break
|
|
|
|
return pathPart
|
|
|
|
|
|
def loadPreviousPath(self):
|
|
with app.app_context():
|
|
basePath = self.db.session.query(self.SettingsDBObj).filter_by(key="base_path").first()
|
|
basePathHash = self.hashText(basePath.value)
|
|
self.pathParts.append(basePath.value)
|
|
self.pathPartHashs.append(basePathHash)
|
|
|
|
# TODO: Need to tie current path to a user or session instead of this
|
|
# current_path = self.db.session.query(self.SettingsDBObj).filter_by(key="current_path").first()
|
|
current_path = self.db.session.query(self.SettingsDBObj).filter_by(key="current_path").first()
|
|
if current_path.value and not "NULL" in current_path.value:
|
|
parts = current_path.value.split("/")
|
|
for part in parts:
|
|
self.pathParts.append(part)
|
|
|
|
current_path_hash = self.db.session.query(self.SettingsDBObj).filter_by(key="current_path_hash").first()
|
|
if current_path_hash.value and not "NULL" in current_path_hash.value:
|
|
parts = current_path_hash.value.split("/")
|
|
for part in parts:
|
|
self.pathPartHashs.append(part)
|
|
|
|
def setNewPathFromFavorites(self, path):
|
|
with app.app_context():
|
|
current_path = self.db.session.query(self.SettingsDBObj).filter_by(key="current_path").first()
|
|
current_path_hash = self.db.session.query(self.SettingsDBObj).filter_by(key="current_path_hash").first()
|
|
current_path.value = path
|
|
parts = path.split("/")
|
|
hashParts = []
|
|
|
|
while len(self.pathParts) >= 1 and len(self.pathPartHashs) >= 1:
|
|
self.pathParts.pop()
|
|
self.pathPartHashs.pop()
|
|
|
|
for part in parts:
|
|
hash = self.hashText(part)
|
|
hashParts.append(hash)
|
|
|
|
size = len(hashParts)
|
|
current_path_hash.value = "/".join(hashParts[0:size])
|
|
self.db.session.commit()
|
|
|
|
|
|
def reset_path(self):
|
|
while len(self.pathParts) > 1 and len(self.pathPartHashs) > 1:
|
|
self.pathParts.pop()
|
|
self.pathPartHashs.pop()
|
|
|
|
|
|
def isDotHash(self, hash):
|
|
return True if self.dotHash == hash else False
|
|
def isDotDotHash(self, hash):
|
|
return True if self.dotdotHash == hash else False
|
|
def getPath(self):
|
|
size = len(self.pathParts)
|
|
return "/".join(self.pathParts[1:size])
|
|
def getFullPath(self):
|
|
size = len(self.pathParts)
|
|
return "/".join(self.pathParts[0:size])
|
|
def getDirs(self):
|
|
return self.dirs
|
|
def getVids(self):
|
|
return self.vids
|
|
def getImgs(self):
|
|
return self.images
|
|
def getFiles(self):
|
|
return self.files
|
|
def getDotHash(self):
|
|
return self.dotHash
|
|
def getDotDotHash(self):
|
|
return self.dotdotHash
|
|
def hashText(self, text):
|
|
return hashlib.sha256(str.encode(text)).hexdigest()[:18]
|
|
|
|
def openFilelocally(self, file):
|
|
lowerName = file.lower()
|
|
command = []
|
|
|
|
if lowerName.endswith(self.VEXTENSION):
|
|
player = config["settings"]["media_app"]
|
|
options = config["settings"]["mplayer_options"].split()
|
|
command = [player]
|
|
|
|
if "mplayer" in player:
|
|
command += options
|
|
|
|
command += [file]
|
|
elif lowerName.endswith(self.IEXTENSION):
|
|
command = [config["settings"]["image_app"], file]
|
|
elif lowerName.endswith(self.MEXTENSION):
|
|
command = [config["settings"]["music_app"], file]
|
|
elif lowerName.endswith(self.OEXTENSION):
|
|
command = [config["settings"]["office_app"], file]
|
|
elif lowerName.endswith(self.TEXTENSION):
|
|
command = [config["settings"]["text_app"], file]
|
|
elif lowerName.endswith(self.PEXTENSION):
|
|
command = [config["settings"]["pdf_app"], file]
|
|
else:
|
|
command = [config["settings"]["file_manager_app"], file]
|
|
|
|
self.logging.debug(command)
|
|
DEVNULL = open(os.devnull, 'w')
|
|
subprocess.Popen(command, start_new_session=True, stdout=DEVNULL, stderr=DEVNULL, close_fds=True)
|
|
|
|
|
|
def remuxVideo(self, hash, file):
|
|
remux_vid_pth = self.REMUX_FOLDER + "/" + hash + ".mp4"
|
|
message = '{"path":"static/remuxs/' + hash + '.mp4"}'
|
|
|
|
self.logging.debug(remux_vid_pth)
|
|
self.logging.debug(message)
|
|
|
|
if not os.path.isfile(remux_vid_pth):
|
|
limit = config["settings"]["remux_folder_max_disk_usage"]
|
|
try:
|
|
limit = int(limit)
|
|
except Exception as e:
|
|
self.logging.debug(e)
|
|
return
|
|
|
|
usage = self.getRemuxFolderUsage(self.REMUX_FOLDER)
|
|
if usage > limit:
|
|
files = os.listdir(self.REMUX_FOLDER)
|
|
for file in files:
|
|
fp = os.path.join(self.REMUX_FOLDER, file)
|
|
os.unlink(fp)
|
|
|
|
command = ["ffmpeg", "-i", file, "-hide_banner", "-movflags", "+faststart"]
|
|
if file.endswith("mkv"):
|
|
command += ["-codec", "copy", "-strict", "-2"]
|
|
if file.endswith("avi"):
|
|
command += ["-c:v", "libx264", "-crf", "21", "-c:a", "aac", "-b:a", "192k", "-ac", "2"]
|
|
if file.endswith("wmv"):
|
|
command += ["-c:v", "libx264", "-crf", "23", "-c:a", "aac", "-strict", "-2", "-q:a", "100"]
|
|
if file.endswith("f4v") or file.endswith("flv"):
|
|
command += ["-vcodec", "copy"]
|
|
|
|
command += [remux_vid_pth]
|
|
try:
|
|
proc = subprocess.Popen(command)
|
|
proc.wait()
|
|
except Exception as e:
|
|
message = '{"message": {"type": "danger", "text":"' + str( repr(e) ) + '"}}'
|
|
self.logging.debug(message)
|
|
self.logging.debug(e)
|
|
|
|
return message
|
|
|
|
|
|
def generateVideoThumbnail(self, fullPath, hashImgPth):
|
|
try:
|
|
proc = subprocess.Popen([self.FFMPG_THUMBNLR, "-t", "65%", "-s", "300", "-c", "jpg", "-i", fullPath, "-o", hashImgPth])
|
|
proc.wait()
|
|
except Exception as e:
|
|
self.logging.debug(repr(e))
|
|
|
|
def getRemuxFolderUsage(self, start_path = "."):
|
|
total_size = 0
|
|
for dirpath, dirnames, filenames in os.walk(start_path):
|
|
for f in filenames:
|
|
fp = os.path.join(dirpath, f)
|
|
# skip if it is symbolic link
|
|
if not os.path.islink(fp):
|
|
total_size += os.path.getsize(fp)
|
|
|
|
return total_size
|