SolarFM/src/versions/solarfm-0.0.1/SolarFM/solarfm/core/mixins/signals/file_action_signals_mixin.py

376 lines
15 KiB
Python

# Python imports
import os
import time
import shlex
# Lib imports
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GObject
from gi.repository import GLib
from gi.repository import Gio
# Application imports
from ...ui.io_widget import IOWidget
class FileActionSignalsMixin:
"""docstring for FileActionSignalsMixin"""
def get_dir_size(self, sdir):
"""Get the size of a directory. Based on code found online."""
size = os.path.getsize(sdir)
for item in os.listdir(sdir):
item = os.path.join(sdir, item)
if os.path.isfile(item):
size = size + os.path.getsize(item)
elif os.path.isdir(item):
size = size + self.get_dir_size(item)
return size
def set_file_watcher(self, tab):
if tab.get_dir_watcher():
watcher = tab.get_dir_watcher()
watcher.cancel()
if settings.is_debug():
logger.debug(f"Watcher Is Cancelled: {watcher.is_cancelled()}")
cur_dir = tab.get_current_directory()
dir_watcher = Gio.File.new_for_path(cur_dir) \
.monitor_directory(Gio.FileMonitorFlags.WATCH_MOVES, Gio.Cancellable())
wid = tab.get_wid()
tid = tab.get_id()
dir_watcher.connect("changed", self.dir_watch_updates, (f"{wid}|{tid}",))
tab.set_dir_watcher(dir_watcher)
# NOTE: Too lazy to impliment a proper update handler and so just regen store and update tab.
# Use a lock system to prevent too many update calls for certain instances but user can manually refresh if they have urgency
def dir_watch_updates(self, file_monitor, file, other_file=None, eve_type=None, data=None):
if eve_type in [Gio.FileMonitorEvent.CREATED, Gio.FileMonitorEvent.DELETED,
Gio.FileMonitorEvent.RENAMED, Gio.FileMonitorEvent.MOVED_IN,
Gio.FileMonitorEvent.MOVED_OUT]:
if settings.is_debug():
logger.debug(eve_type)
if eve_type in [Gio.FileMonitorEvent.MOVED_IN, Gio.FileMonitorEvent.MOVED_OUT]:
self.update_on_soft_lock_end(data[0])
elif data[0] in self.soft_update_lock.keys():
self.soft_update_lock[data[0]]["last_update_time"] = time.time()
else:
self.soft_lock_countdown(data[0])
@threaded
def soft_lock_countdown(self, tab_widget):
self.soft_update_lock[tab_widget] = { "last_update_time": time.time()}
lock = True
while lock:
time.sleep(0.6)
last_update_time = self.soft_update_lock[tab_widget]["last_update_time"]
current_time = time.time()
if (current_time - last_update_time) > 0.6:
lock = False
self.soft_update_lock.pop(tab_widget, None)
GLib.idle_add(self.update_on_soft_lock_end, *(tab_widget,))
def update_on_soft_lock_end(self, tab_widget):
wid, tid = tab_widget.split("|")
notebook = self.builder.get_object(f"window_{wid}")
tab = self.get_fm_window(wid).get_tab_by_id(tid)
icon_grid = self.builder.get_object(f"{wid}|{tid}|icon_grid")
store = icon_grid.get_model()
_store, tab_widget_label = self.get_store_and_label_from_notebook(notebook, f"{wid}|{tid}")
tab.load_directory()
self.load_store(tab, store)
tab_widget_label.set_label(tab.get_end_of_path())
state = self.get_current_state()
if [wid, tid] in [state.wid, state.tid]:
self.set_bottom_labels(tab)
def do_file_search(self, widget, eve=None):
if not self.ctrl_down and not self.shift_down and not self.alt_down:
target = widget.get_name()
notebook = self.builder.get_object(target)
page = notebook.get_current_page()
nth_page = notebook.get_nth_page(page)
icon_grid = nth_page.get_children()[0]
wid, tid = icon_grid.get_name().split("|")
tab = self.get_fm_window(wid).get_tab_by_id(tid)
query = widget.get_text().lower()
icon_grid.unselect_all()
for i, file in enumerate(tab.get_files()):
if query and query in file[0].lower():
path = Gtk.TreePath().new_from_indices([i])
icon_grid.select_path(path)
items = icon_grid.get_selected_items()
if len(items) == 1:
icon_grid.scroll_to_path(items[-1], True, 0.5, 0.5)
def open_files(self):
state = self.get_current_state()
uris = self.format_to_uris(state.store, state.wid, state.tid, self.selected_files, True)
for file in uris:
state.tab.open_file_locally(file)
def open_with_files(self, app_info):
state = self.get_current_state()
uris = self.format_to_uris(state.store, state.wid, state.tid, self.selected_files)
state.tab.app_chooser_exec(app_info, uris)
def execute_files(self, in_terminal=False):
state = self.get_current_state()
paths = self.format_to_uris(state.store, state.wid, state.tid, self.selected_files, True)
current_dir = state.tab.get_current_directory()
command = None
for path in paths:
command = f"{shlex.quote(path)}" if not in_terminal else f"{state.tab.terminal_app} -e {shlex.quote(path)}"
state.tab.execute(shlex.split(command), start_dir=state.tab.get_current_directory())
def rename_files(self):
rename_label = self.builder.get_object("file_to_rename_label")
rename_input = self.builder.get_object("rename_fname")
state = self.get_current_state()
uris = self.format_to_uris(state.store, state.wid, state.tid, self.selected_files, True)
for uri in uris:
entry = uri.split("/")[-1]
rename_label.set_label(entry)
rename_input.set_text(entry)
response = event_system.emit_and_await("show_rename_file_menu", rename_input)
if response == "skip_edit":
continue
if response == "cancel_edit":
break
rname_to = rename_input.get_text().strip()
if rname_to:
target = f"{state.tab.get_current_directory()}/{rname_to}"
self.handle_files([uri], "rename", target)
event_system.emit("hide_rename_file_menu")
self.selected_files.clear()
def cut_files(self):
self.to_copy_files.clear()
state = self.get_current_state()
uris = self.format_to_uris(state.store, state.wid, state.tid, self.selected_files, True)
self.to_cut_files = uris
def copy_name(self):
state = self.get_current_state()
uris = self.format_to_uris(state.store, state.wid, state.tid, self.selected_files, True)
if len(uris) == 1:
file_name = uris[0].split("/")[-1]
self.set_clipboard_data(file_name)
def copy_files(self):
self.to_cut_files.clear()
state = self.get_current_state()
uris = self.format_to_uris(state.store, state.wid, state.tid, self.selected_files, True)
self.to_copy_files = uris
def paste_files(self):
wid, tid = self.fm_controller.get_active_wid_and_tid()
tab = self.get_fm_window(wid).get_tab_by_id(tid)
target = f"{tab.get_current_directory()}"
if self.to_copy_files:
self.handle_files(self.to_copy_files, "copy", target)
elif self.to_cut_files:
self.handle_files(self.to_cut_files, "move", target)
def create_files(self):
fname_field = self.builder.get_object("new_fname_field")
cancel_creation = event_system.emit_and_await("show_new_file_menu", fname_field)
if cancel_creation:
event_system.emit("hide_new_file_menu")
return
file_name = fname_field.get_text().strip()
type = self.builder.get_object("new_file_toggle_type").get_state()
wid, tid = self.fm_controller.get_active_wid_and_tid()
tab = self.get_fm_window(wid).get_tab_by_id(tid)
target = f"{tab.get_current_directory()}"
if file_name:
path = f"{target}/{file_name}"
if type == True: # Create File
self.handle_files([path], "create_file")
else: # Create Folder
self.handle_files([path], "create_dir")
event_system.emit("hide_new_file_menu")
def move_files(self, files, target):
self.handle_files(files, "move", target)
# NOTE: Gtk recommends using fail flow than pre check which is more
# race condition proof. They're right; but, they can't even delete
# directories properly. So... f**k them. I'll do it my way.
def handle_files(self, paths, action, _target_path=None):
target = None
_file = None
response = None
overwrite_all = False
rename_auto_all = False
for path in paths:
try:
if "file://" in path:
path = path.split("file://")[1]
file = Gio.File.new_for_path(path)
if _target_path:
if file.get_parent().get_path() == _target_path:
raise Exception("Parent dir of target and file locations are the same! Won't copy or move!")
if os.path.isdir(_target_path):
info = file.query_info("standard::display-name", 0, cancellable=None)
_target = f"{_target_path}/{info.get_display_name()}"
_file = Gio.File.new_for_path(_target)
else:
_file = Gio.File.new_for_path(_target_path)
else:
_file = Gio.File.new_for_path(path)
if _file.query_exists():
if not overwrite_all and not rename_auto_all:
event_system.emit("setup_exists_data", (file, _file))
response = event_system.emit_and_await("show_exists_page")
if response == "overwrite_all":
overwrite_all = True
if response == "rename_auto_all":
rename_auto_all = True
if response == "rename":
base_path = _file.get_parent().get_path()
new_name = self.builder.get_object("exists_file_field").get_text().strip()
rfPath = f"{base_path}/{new_name}"
_file = Gio.File.new_for_path(rfPath)
if response == "rename_auto" or rename_auto_all:
_file = self.rename_proc(_file)
if response == "overwrite" or overwrite_all:
type = _file.query_file_type(flags=Gio.FileQueryInfoFlags.NONE)
if type == Gio.FileType.DIRECTORY:
wid, tid = self.fm_controller.get_active_wid_and_tid()
tab = self.get_fm_window(wid).get_tab_by_id(tid)
tab.delete_file( _file.get_path() )
else:
_file.delete(cancellable=None)
if response == "skip":
continue
if response == "skip_all":
break
if _target_path:
target = _file
else:
file = _file
if action == "create_file":
file.create(flags=Gio.FileCreateFlags.NONE, cancellable=None)
continue
if action == "create_dir":
file.make_directory(cancellable=None)
continue
type = file.query_file_type(flags=Gio.FileQueryInfoFlags.NONE)
if type == Gio.FileType.DIRECTORY:
wid, tid = self.fm_controller.get_active_wid_and_tid()
tab = self.get_fm_window(wid).get_tab_by_id(tid)
fPath = file.get_path()
tPath = target.get_path()
state = True
if action == "copy":
tab.copy_file(fPath, tPath)
if action == "move" or action == "rename":
tab.move_file(fPath, tPath)
else:
io_widget = IOWidget(action, file)
if action == "copy":
file.copy_async(destination=target,
flags=Gio.FileCopyFlags.BACKUP,
io_priority=98,
cancellable=io_widget.cancle_eve,
progress_callback=io_widget.update_progress,
callback=io_widget.finish_callback)
self.builder.get_object("io_list").add(io_widget)
if action == "move" or action == "rename":
file.move_async(destination=target,
flags=Gio.FileCopyFlags.BACKUP,
io_priority=98,
cancellable=io_widget.cancle_eve,
progress_callback=None,
# NOTE: progress_callback here causes seg fault when set
callback=io_widget.finish_callback)
self.builder.get_object("io_list").add(io_widget)
except GObject.GError as e:
raise OSError(e)
self.exists_file_rename_bttn.set_sensitive(False)
def rename_proc(self, gio_file):
full_path = gio_file.get_path()
base_path = gio_file.get_parent().get_path()
file_name = os.path.splitext(gio_file.get_basename())[0]
extension = os.path.splitext(full_path)[-1]
target = Gio.File.new_for_path(full_path)
start = "-copy"
if settings.is_debug():
logger.debug(f"Path: {full_path}")
logger.debug(f"Base Path: {base_path}")
logger.debug(f'Name: {file_name}')
logger.debug(f"Extension: {extension}")
i = 2
while target.query_exists():
try:
value = file_name[(file_name.find(start)+len(start)):]
int(value)
file_name = file_name.split(start)[0]
except Exception as e:
pass
target = Gio.File.new_for_path(f"{base_path}/{file_name}-copy{i}{extension}")
i += 1
return target